use crate::stream_codec::StreamCodec;
use futures::{StreamExt, TryFutureExt};
use interprocess::local_socket::{
tokio::{prelude::*, RecvHalf, SendHalf},
GenericFilePath,
};
use jsonrpsee::{
async_client::{Client, ClientBuilder},
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
};
use std::io;
use tokio::io::AsyncWriteExt;
use tokio_util::codec::FramedRead;
#[derive(Debug)]
pub(crate) struct Sender {
inner: SendHalf,
}
#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
Ok(self.inner.write_all(msg.as_bytes()).await?)
}
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented");
Err(IpcError::NotSupported)
}
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
#[derive(Debug)]
pub(crate) struct Receiver {
pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
}
#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
}
}
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub(crate) struct IpcTransportClientBuilder;
impl IpcTransportClientBuilder {
pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
let conn = async { path.to_fs_name::<GenericFilePath>() }
.and_then(LocalSocketStream::connect)
.await
.map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
let (recv, send) = conn.split();
Ok((
Sender { inner: send },
Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
))
}
}
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
pub struct IpcClientBuilder;
impl IpcClientBuilder {
pub async fn build(self, name: &str) -> Result<Client, IpcError> {
let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
Ok(self.build_with_tokio(tx, rx))
}
pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
where
S: TransportSenderT + Send,
R: TransportReceiverT + Send,
{
ClientBuilder::default().build_with_tokio(sender, receiver)
}
}
#[derive(Debug, thiserror::Error)]
pub enum IpcError {
#[error("operation not supported")]
NotSupported,
#[error("stream closed")]
Closed,
#[error("failed to connect to socket {path}: {err}")]
FailedToConnect {
#[doc(hidden)]
path: String,
#[doc(hidden)]
err: io::Error,
},
#[error(transparent)]
Io(#[from] io::Error),
}
#[cfg(test)]
mod tests {
use super::*;
use crate::server::dummy_name;
use interprocess::local_socket::ListenerOptions;
#[tokio::test]
async fn test_connect() {
let name = &dummy_name();
let binding = ListenerOptions::new()
.name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
.create_tokio()
.unwrap();
tokio::spawn(async move {
let _x = binding.accept().await;
});
let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
}
}