reth_ipc/client/
mod.rs

1//! [`jsonrpsee`] transport adapter implementation for IPC.
2
3use crate::stream_codec::StreamCodec;
4use futures::{StreamExt, TryFutureExt};
5use interprocess::local_socket::{
6    tokio::{prelude::*, RecvHalf, SendHalf},
7    GenericFilePath,
8};
9use jsonrpsee::{
10    async_client::{Client, ClientBuilder},
11    core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
12};
13use std::{io, time::Duration};
14use tokio::io::AsyncWriteExt;
15use tokio_util::codec::FramedRead;
16
17/// Sending end of IPC transport.
18#[derive(Debug)]
19pub(crate) struct Sender {
20    inner: SendHalf,
21}
22
23#[async_trait::async_trait]
24impl TransportSenderT for Sender {
25    type Error = IpcError;
26
27    /// Sends out a request. Returns a Future that finishes when the request has been successfully
28    /// sent.
29    async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
30        Ok(self.inner.write_all(msg.as_bytes()).await?)
31    }
32
33    async fn send_ping(&mut self) -> Result<(), Self::Error> {
34        tracing::trace!("send ping - not implemented");
35        Err(IpcError::NotSupported)
36    }
37
38    /// Close the connection.
39    async fn close(&mut self) -> Result<(), Self::Error> {
40        Ok(())
41    }
42}
43
44/// Receiving end of IPC transport.
45#[derive(Debug)]
46pub(crate) struct Receiver {
47    pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
48}
49
50#[async_trait::async_trait]
51impl TransportReceiverT for Receiver {
52    type Error = IpcError;
53
54    /// Returns a Future resolving when the server sent us something back.
55    async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
56        self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
57    }
58}
59
60/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
61#[derive(Debug, Clone, Default)]
62#[non_exhaustive]
63pub(crate) struct IpcTransportClientBuilder;
64
65impl IpcTransportClientBuilder {
66    pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
67        let conn = async { path.to_fs_name::<GenericFilePath>() }
68            .and_then(LocalSocketStream::connect)
69            .await
70            .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
71
72        let (recv, send) = conn.split();
73
74        Ok((
75            Sender { inner: send },
76            Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
77        ))
78    }
79}
80
81/// Builder type for [`Client`]
82#[derive(Clone, Debug)]
83#[non_exhaustive]
84pub struct IpcClientBuilder {
85    request_timeout: Duration,
86}
87
88impl Default for IpcClientBuilder {
89    fn default() -> Self {
90        Self { request_timeout: Duration::from_secs(60) }
91    }
92}
93
94impl IpcClientBuilder {
95    /// Connects to an IPC socket
96    ///
97    /// ```
98    /// use jsonrpsee::{core::client::ClientT, rpc_params};
99    /// use reth_ipc::client::IpcClientBuilder;
100    ///
101    /// # async fn run_client() -> Result<(), Box<dyn core::error::Error +  Send + Sync>> {
102    /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
103    /// let response: String = client.request("say_hello", rpc_params![]).await?;
104    /// # Ok(()) }
105    /// ```
106    pub async fn build(self, name: &str) -> Result<Client, IpcError> {
107        let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
108        Ok(self.build_with_tokio(tx, rx))
109    }
110
111    /// Uses the sender and receiver channels to connect to the socket.
112    pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
113    where
114        S: TransportSenderT + Send,
115        R: TransportReceiverT + Send,
116    {
117        ClientBuilder::default()
118            .request_timeout(self.request_timeout)
119            .build_with_tokio(sender, receiver)
120    }
121
122    /// Set request timeout (default is 60 seconds).
123    pub fn request_timeout(mut self, timeout: Duration) -> Self {
124        self.request_timeout = timeout;
125        self
126    }
127}
128
129/// Error variants that can happen in IPC transport.
130#[derive(Debug, thiserror::Error)]
131pub enum IpcError {
132    /// Operation not supported
133    #[error("operation not supported")]
134    NotSupported,
135    /// Stream was closed
136    #[error("stream closed")]
137    Closed,
138    /// Thrown when failed to establish a socket connection.
139    #[error("failed to connect to socket {path}: {err}")]
140    FailedToConnect {
141        /// The path of the socket.
142        #[doc(hidden)]
143        path: String,
144        /// The error occurred while connecting.
145        #[doc(hidden)]
146        err: io::Error,
147    },
148    /// Wrapped IO Error
149    #[error(transparent)]
150    Io(#[from] io::Error),
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::server::dummy_name;
157    use interprocess::local_socket::ListenerOptions;
158
159    #[tokio::test]
160    async fn test_connect() {
161        let name = &dummy_name();
162
163        let binding = ListenerOptions::new()
164            .name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
165            .create_tokio()
166            .unwrap();
167        tokio::spawn(async move {
168            let _x = binding.accept().await;
169        });
170
171        let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
172        let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
173    }
174}