reth_ipc/client/
mod.rs
1use crate::stream_codec::StreamCodec;
4use futures::{StreamExt, TryFutureExt};
5use interprocess::local_socket::{
6 tokio::{prelude::*, RecvHalf, SendHalf},
7 GenericFilePath,
8};
9use jsonrpsee::{
10 async_client::{Client, ClientBuilder},
11 core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
12};
13use std::{io, time::Duration};
14use tokio::io::AsyncWriteExt;
15use tokio_util::codec::FramedRead;
16
17#[derive(Debug)]
19pub(crate) struct Sender {
20 inner: SendHalf,
21}
22
23#[async_trait::async_trait]
24impl TransportSenderT for Sender {
25 type Error = IpcError;
26
27 async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
30 Ok(self.inner.write_all(msg.as_bytes()).await?)
31 }
32
33 async fn send_ping(&mut self) -> Result<(), Self::Error> {
34 tracing::trace!("send ping - not implemented");
35 Err(IpcError::NotSupported)
36 }
37
38 async fn close(&mut self) -> Result<(), Self::Error> {
40 Ok(())
41 }
42}
43
44#[derive(Debug)]
46pub(crate) struct Receiver {
47 pub(crate) inner: FramedRead<RecvHalf, StreamCodec>,
48}
49
50#[async_trait::async_trait]
51impl TransportReceiverT for Receiver {
52 type Error = IpcError;
53
54 async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
56 self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
57 }
58}
59
60#[derive(Debug, Clone, Default)]
62#[non_exhaustive]
63pub(crate) struct IpcTransportClientBuilder;
64
65impl IpcTransportClientBuilder {
66 pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> {
67 let conn = async { path.to_fs_name::<GenericFilePath>() }
68 .and_then(LocalSocketStream::connect)
69 .await
70 .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?;
71
72 let (recv, send) = conn.split();
73
74 Ok((
75 Sender { inner: send },
76 Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) },
77 ))
78 }
79}
80
81#[derive(Clone, Debug)]
83#[non_exhaustive]
84pub struct IpcClientBuilder {
85 request_timeout: Duration,
86}
87
88impl Default for IpcClientBuilder {
89 fn default() -> Self {
90 Self { request_timeout: Duration::from_secs(60) }
91 }
92}
93
94impl IpcClientBuilder {
95 pub async fn build(self, name: &str) -> Result<Client, IpcError> {
107 let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?;
108 Ok(self.build_with_tokio(tx, rx))
109 }
110
111 pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
113 where
114 S: TransportSenderT + Send,
115 R: TransportReceiverT + Send,
116 {
117 ClientBuilder::default()
118 .request_timeout(self.request_timeout)
119 .build_with_tokio(sender, receiver)
120 }
121
122 pub fn request_timeout(mut self, timeout: Duration) -> Self {
124 self.request_timeout = timeout;
125 self
126 }
127}
128
129#[derive(Debug, thiserror::Error)]
131pub enum IpcError {
132 #[error("operation not supported")]
134 NotSupported,
135 #[error("stream closed")]
137 Closed,
138 #[error("failed to connect to socket {path}: {err}")]
140 FailedToConnect {
141 #[doc(hidden)]
143 path: String,
144 #[doc(hidden)]
146 err: io::Error,
147 },
148 #[error(transparent)]
150 Io(#[from] io::Error),
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::server::dummy_name;
157 use interprocess::local_socket::ListenerOptions;
158
159 #[tokio::test]
160 async fn test_connect() {
161 let name = &dummy_name();
162
163 let binding = ListenerOptions::new()
164 .name(name.as_str().to_fs_name::<GenericFilePath>().unwrap())
165 .create_tokio()
166 .unwrap();
167 tokio::spawn(async move {
168 let _x = binding.accept().await;
169 });
170
171 let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap();
172 let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
173 }
174}