reth_ipc/server/
mod.rs

1//! JSON-RPC IPC server implementation
2
3use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7    tokio::prelude::{LocalSocketListener, LocalSocketStream},
8    traits::tokio::{Listener, Stream},
9    GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12    core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
13    server::{
14        middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
15        RandomIntegerIdProvider, ServerHandle, StopHandle,
16    },
17    BoundedSubscriptions, MethodResponse, MethodSink, Methods,
18};
19use std::{
20    future::Future,
21    io,
22    pin::{pin, Pin},
23    sync::Arc,
24    task::{Context, Poll},
25};
26use tokio::{
27    io::{AsyncRead, AsyncWrite, AsyncWriteExt},
28    sync::oneshot,
29};
30use tower::{layer::util::Identity, Layer, Service};
31use tracing::{debug, instrument, trace, warn, Instrument};
32// re-export so can be used during builder setup
33use crate::{
34    server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
35    stream_codec::StreamCodec,
36};
37use tokio::sync::mpsc;
38use tokio_stream::wrappers::ReceiverStream;
39use tower::layer::{util::Stack, LayerFn};
40
41mod connection;
42mod ipc;
43mod rpc_service;
44
45pub use rpc_service::RpcService;
46
47/// Ipc Server implementation
48///
49/// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
50pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
51    /// The endpoint we listen for incoming transactions
52    endpoint: String,
53    id_provider: Arc<dyn IdProvider>,
54    cfg: Settings,
55    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
56    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
57}
58
59impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
60    /// Returns the configured endpoint
61    pub fn endpoint(&self) -> String {
62        self.endpoint.clone()
63    }
64}
65
66impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
67where
68    RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
69    HttpMiddleware: Layer<
70            TowerServiceNoHttp<RpcMiddleware>,
71            Service: Service<
72                String,
73                Response = Option<String>,
74                Error = Box<dyn core::error::Error + Send + Sync + 'static>,
75                Future: Send + Unpin,
76            > + Send,
77        > + Send
78        + 'static,
79{
80    /// Start responding to connections requests.
81    ///
82    /// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is
83    /// dropped.
84    ///
85    /// ```
86    /// use jsonrpsee::RpcModule;
87    /// use reth_ipc::server::Builder;
88    /// async fn run_server() -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
89    ///     let server = Builder::default().build("/tmp/my-uds".into());
90    ///     let mut module = RpcModule::new(());
91    ///     module.register_method("say_hello", |_, _, _| "lo")?;
92    ///     let handle = server.start(module).await?;
93    ///
94    ///     // In this example we don't care about doing shutdown so let's it run forever.
95    ///     // You may use the `ServerHandle` to shut it down or manage it yourself.
96    ///     let server = tokio::spawn(handle.stopped());
97    ///     server.await.unwrap();
98    ///     Ok(())
99    /// }
100    /// ```
101    pub async fn start(
102        mut self,
103        methods: impl Into<Methods>,
104    ) -> Result<ServerHandle, IpcServerStartError> {
105        let methods = methods.into();
106
107        let (stop_handle, server_handle) = stop_channel();
108
109        // use a signal channel to wait until we're ready to accept connections
110        let (tx, rx) = oneshot::channel();
111
112        match self.cfg.tokio_runtime.take() {
113            Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
114            None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
115        };
116        rx.await.expect("channel is open")?;
117
118        Ok(server_handle)
119    }
120
121    async fn start_inner(
122        self,
123        methods: Methods,
124        stop_handle: StopHandle,
125        on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
126    ) {
127        trace!(endpoint = ?self.endpoint, "starting ipc server");
128
129        if cfg!(unix) {
130            // ensure the file does not exist
131            if std::fs::remove_file(&self.endpoint).is_ok() {
132                debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
133            }
134        }
135
136        let listener = match self
137            .endpoint
138            .as_str()
139            .to_fs_name::<GenericFilePath>()
140            .and_then(|name| ListenerOptions::new().name(name).create_tokio())
141        {
142            Ok(listener) => {
143                #[cfg(unix)]
144                {
145                    // set permissions only on unix
146                    use std::os::unix::fs::PermissionsExt;
147                    if let Some(perms_str) = &self.cfg.ipc_socket_permissions {
148                        if let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8) {
149                            let perms = std::fs::Permissions::from_mode(mode);
150                            let _ = std::fs::set_permissions(&self.endpoint, perms);
151                        }
152                    }
153                }
154                listener
155            }
156            Err(err) => {
157                on_ready
158                    .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
159                    .ok();
160                return;
161            }
162        };
163
164        // signal that we're ready to accept connections
165        on_ready.send(Ok(())).ok();
166
167        let mut id: u32 = 0;
168        let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
169
170        let stopped = stop_handle.clone().shutdown();
171        let mut stopped = pin!(stopped);
172
173        let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
174
175        trace!("accepting ipc connections");
176        loop {
177            match try_accept_conn(&listener, stopped).await {
178                AcceptConnection::Established { local_socket_stream, stop } => {
179                    let Some(conn_permit) = connection_guard.try_acquire() else {
180                        let (_reader, mut writer) = local_socket_stream.split();
181                        let _ = writer
182                            .write_all(b"Too many connections. Please try again later.")
183                            .await;
184                        stopped = stop;
185                        continue;
186                    };
187
188                    let max_conns = connection_guard.max_connections();
189                    let curr_conns = max_conns - connection_guard.available_connections();
190                    trace!("Accepting new connection {}/{}", curr_conns, max_conns);
191
192                    let conn_permit = Arc::new(conn_permit);
193
194                    process_connection(ProcessConnection {
195                        http_middleware: &self.http_middleware,
196                        rpc_middleware: self.rpc_middleware.clone(),
197                        conn_permit,
198                        conn_id: id,
199                        server_cfg: self.cfg.clone(),
200                        stop_handle: stop_handle.clone(),
201                        drop_on_completion: drop_on_completion.clone(),
202                        methods: methods.clone(),
203                        id_provider: self.id_provider.clone(),
204                        local_socket_stream,
205                    });
206
207                    id = id.wrapping_add(1);
208                    stopped = stop;
209                }
210                AcceptConnection::Shutdown => {
211                    break;
212                }
213                AcceptConnection::Err((err, stop)) => {
214                    tracing::error!(%err, "Failed accepting a new IPC connection");
215                    stopped = stop;
216                }
217            }
218        }
219
220        // Drop the last Sender
221        drop(drop_on_completion);
222
223        // Once this channel is closed it is safe to assume that all connections have been
224        // gracefully shutdown
225        while process_connection_awaiter.recv().await.is_some() {
226            // Generally, messages should not be sent across this channel,
227            // but we'll loop here to wait for `None` just to be on the safe side
228        }
229    }
230}
231
232enum AcceptConnection<S> {
233    Shutdown,
234    Established { local_socket_stream: LocalSocketStream, stop: S },
235    Err((io::Error, S)),
236}
237
238async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
239where
240    S: Future + Unpin,
241{
242    match futures_util::future::select(pin!(listener.accept()), stopped).await {
243        Either::Left((res, stop)) => match res {
244            Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
245            Err(e) => AcceptConnection::Err((e, stop)),
246        },
247        Either::Right(_) => AcceptConnection::Shutdown,
248    }
249}
250
251impl std::fmt::Debug for IpcServer {
252    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253        f.debug_struct("IpcServer")
254            .field("endpoint", &self.endpoint)
255            .field("cfg", &self.cfg)
256            .field("id_provider", &self.id_provider)
257            .finish()
258    }
259}
260
261/// Error thrown when server couldn't be started.
262#[derive(Debug, thiserror::Error)]
263#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
264pub struct IpcServerStartError {
265    endpoint: String,
266    #[source]
267    source: io::Error,
268}
269
270/// Data required by the server to handle requests received via an IPC connection
271#[derive(Debug, Clone)]
272#[allow(dead_code)]
273pub(crate) struct ServiceData {
274    /// Registered server methods.
275    pub(crate) methods: Methods,
276    /// Subscription ID provider.
277    pub(crate) id_provider: Arc<dyn IdProvider>,
278    /// Stop handle.
279    pub(crate) stop_handle: StopHandle,
280    /// Connection ID
281    pub(crate) conn_id: u32,
282    /// Connection Permit.
283    pub(crate) conn_permit: Arc<ConnectionPermit>,
284    /// Limits the number of subscriptions for this connection
285    pub(crate) bounded_subscriptions: BoundedSubscriptions,
286    /// Sink that is used to send back responses to the connection.
287    ///
288    /// This is used for subscriptions.
289    pub(crate) method_sink: MethodSink,
290    /// `ServerConfig`
291    pub(crate) server_cfg: Settings,
292}
293
294/// Similar to [`tower::ServiceBuilder`] but doesn't
295/// support any tower middleware implementations.
296#[derive(Debug, Clone)]
297pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
298
299impl Default for RpcServiceBuilder<Identity> {
300    fn default() -> Self {
301        Self(tower::ServiceBuilder::new())
302    }
303}
304
305impl RpcServiceBuilder<Identity> {
306    /// Create a new [`RpcServiceBuilder`].
307    pub const fn new() -> Self {
308        Self(tower::ServiceBuilder::new())
309    }
310}
311
312impl<L> RpcServiceBuilder<L> {
313    /// Optionally add a new layer `T` to the [`RpcServiceBuilder`].
314    ///
315    /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details.
316    pub fn option_layer<T>(
317        self,
318        layer: Option<T>,
319    ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
320        let layer = if let Some(layer) = layer {
321            Either::Left(layer)
322        } else {
323            Either::Right(Identity::new())
324        };
325        self.layer(layer)
326    }
327
328    /// Add a new layer `T` to the [`RpcServiceBuilder`].
329    ///
330    /// See the documentation for [`tower::ServiceBuilder::layer`] for more details.
331    pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
332        RpcServiceBuilder(self.0.layer(layer))
333    }
334
335    /// Add a [`tower::Layer`] built from a function that accepts a service and returns another
336    /// service.
337    ///
338    /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details.
339    pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
340        RpcServiceBuilder(self.0.layer_fn(f))
341    }
342
343    /// Add a logging layer to [`RpcServiceBuilder`]
344    ///
345    /// This logs each request and response for every call.
346    pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
347        RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
348    }
349
350    /// Wrap the service `S` with the middleware.
351    pub(crate) fn service<S>(&self, service: S) -> L::Service
352    where
353        L: tower::Layer<S>,
354    {
355        self.0.service(service)
356    }
357}
358
359/// `JsonRPSee` service compatible with `tower`.
360///
361/// # Note
362/// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html).
363#[derive(Debug, Clone)]
364pub struct TowerServiceNoHttp<L> {
365    inner: ServiceData,
366    rpc_middleware: RpcServiceBuilder<L>,
367}
368
369impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
370where
371    RpcMiddleware: for<'a> Layer<RpcService>,
372    for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
373        Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
374{
375    /// The response of a handled RPC call
376    ///
377    /// This is an `Option` because subscriptions and call responses are handled differently.
378    /// This will be `Some` for calls, and `None` for subscriptions, because the subscription
379    /// response will be emitted via the `method_sink`.
380    type Response = Option<String>;
381
382    type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
383
384    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
385
386    /// Opens door for back pressure implementation.
387    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
388        Poll::Ready(Ok(()))
389    }
390
391    fn call(&mut self, request: String) -> Self::Future {
392        trace!("{:?}", request);
393
394        let cfg = RpcServiceCfg::CallsAndSubscriptions {
395            bounded_subscriptions: BoundedSubscriptions::new(
396                self.inner.server_cfg.max_subscriptions_per_connection,
397            ),
398            id_provider: self.inner.id_provider.clone(),
399            sink: self.inner.method_sink.clone(),
400        };
401
402        let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
403        let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
404        let conn = self.inner.conn_permit.clone();
405        let rpc_service = self.rpc_middleware.service(RpcService::new(
406            self.inner.methods.clone(),
407            max_response_body_size,
408            self.inner.conn_id.into(),
409            cfg,
410        ));
411        // an ipc connection needs to handle read+write concurrently
412        // even if the underlying rpc handler spawns the actual work or is does a lot of async any
413        // additional overhead performed by `handle_request` can result in I/O latencies, for
414        // example tracing calls are relatively CPU expensive on serde::serialize alone, moving this
415        // work to a separate task takes the pressure off the connection so all concurrent responses
416        // are also serialized concurrently and the connection can focus on read+write
417        let f = tokio::task::spawn(async move {
418            ipc::call_with_service(
419                request,
420                rpc_service,
421                max_response_body_size,
422                max_request_body_size,
423                conn,
424            )
425            .await
426        });
427
428        Box::pin(async move { f.await.map_err(|err| err.into()) })
429    }
430}
431
432struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
433    http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
434    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
435    conn_permit: Arc<ConnectionPermit>,
436    conn_id: u32,
437    server_cfg: Settings,
438    stop_handle: StopHandle,
439    drop_on_completion: mpsc::Sender<()>,
440    methods: Methods,
441    id_provider: Arc<dyn IdProvider>,
442    local_socket_stream: LocalSocketStream,
443}
444
445/// Spawns the IPC connection onto a new task
446#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
447fn process_connection<RpcMiddleware, HttpMiddleware>(
448    params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
449) where
450    RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
451    for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
452    HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
453    <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
454    + Service<
455        String,
456        Response = Option<String>,
457        Error = Box<dyn core::error::Error + Send + Sync + 'static>,
458    >,
459    <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
460    Send + Unpin,
461 {
462    let ProcessConnection {
463        http_middleware,
464        rpc_middleware,
465        conn_permit,
466        conn_id,
467        server_cfg,
468        stop_handle,
469        drop_on_completion,
470        id_provider,
471        methods,
472        local_socket_stream,
473    } = params;
474
475    let ipc = IpcConn(tokio_util::codec::Decoder::framed(
476        StreamCodec::stream_incoming(),
477        local_socket_stream,
478    ));
479
480    let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
481    let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
482    let tower_service = TowerServiceNoHttp {
483        inner: ServiceData {
484            methods,
485            id_provider,
486            stop_handle: stop_handle.clone(),
487            server_cfg: server_cfg.clone(),
488            conn_id,
489            conn_permit,
490            bounded_subscriptions: BoundedSubscriptions::new(
491                server_cfg.max_subscriptions_per_connection,
492            ),
493            method_sink,
494        },
495        rpc_middleware,
496    };
497
498    let service = http_middleware.service(tower_service);
499    tokio::spawn(async {
500        to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
501        drop(drop_on_completion)
502    });
503}
504
505async fn to_ipc_service<S, T>(
506    ipc: IpcConn<JsonRpcStream<T>>,
507    service: S,
508    stop_handle: StopHandle,
509    rx: mpsc::Receiver<Box<JsonRawValue>>,
510) where
511    S: Service<String, Response = Option<String>> + Send + 'static,
512    S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
513    S::Future: Send + Unpin,
514    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
515{
516    let rx_item = ReceiverStream::new(rx);
517    let conn = IpcConnDriver {
518        conn: ipc,
519        service,
520        pending_calls: Default::default(),
521        items: Default::default(),
522    };
523    let stopped = stop_handle.shutdown();
524
525    let mut conn = pin!(conn);
526    let mut rx_item = pin!(rx_item);
527    let mut stopped = pin!(stopped);
528
529    loop {
530        tokio::select! {
531            _ = &mut conn => {
532               break
533            }
534            item = rx_item.next() => {
535                if let Some(item) = item {
536                    conn.push_back(item.to_string());
537                }
538            }
539            _ = &mut stopped => {
540                // shutdown
541                break
542            }
543        }
544    }
545}
546
547/// JSON-RPC IPC server settings.
548#[derive(Debug, Clone)]
549pub struct Settings {
550    /// Maximum size in bytes of a request.
551    max_request_body_size: u32,
552    /// Maximum size in bytes of a response.
553    max_response_body_size: u32,
554    /// Max length for logging for requests and responses
555    ///
556    /// Logs bigger than this limit will be truncated.
557    max_log_length: u32,
558    /// Maximum number of incoming connections allowed.
559    max_connections: u32,
560    /// Maximum number of subscriptions per connection.
561    max_subscriptions_per_connection: u32,
562    /// Number of messages that server is allowed `buffer` until backpressure kicks in.
563    message_buffer_capacity: u32,
564    /// Custom tokio runtime to run the server on.
565    tokio_runtime: Option<tokio::runtime::Handle>,
566    /// The permissions to create the IPC socket with.
567    ipc_socket_permissions: Option<String>,
568}
569
570impl Default for Settings {
571    fn default() -> Self {
572        Self {
573            max_request_body_size: TEN_MB_SIZE_BYTES,
574            max_response_body_size: TEN_MB_SIZE_BYTES,
575            max_log_length: 4096,
576            max_connections: 100,
577            max_subscriptions_per_connection: 1024,
578            message_buffer_capacity: 1024,
579            tokio_runtime: None,
580            ipc_socket_permissions: None,
581        }
582    }
583}
584
585/// Builder to configure and create a JSON-RPC server
586#[derive(Debug)]
587pub struct Builder<HttpMiddleware, RpcMiddleware> {
588    settings: Settings,
589    /// Subscription ID provider.
590    id_provider: Arc<dyn IdProvider>,
591    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
592    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
593}
594
595impl Default for Builder<Identity, Identity> {
596    fn default() -> Self {
597        Self {
598            settings: Settings::default(),
599            id_provider: Arc::new(RandomIntegerIdProvider),
600            rpc_middleware: RpcServiceBuilder::new(),
601            http_middleware: tower::ServiceBuilder::new(),
602        }
603    }
604}
605
606impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
607    /// Set the maximum size of a request body in bytes. Default is 10 MiB.
608    pub const fn max_request_body_size(mut self, size: u32) -> Self {
609        self.settings.max_request_body_size = size;
610        self
611    }
612
613    /// Set the maximum size of a response body in bytes. Default is 10 MiB.
614    pub const fn max_response_body_size(mut self, size: u32) -> Self {
615        self.settings.max_response_body_size = size;
616        self
617    }
618
619    /// Set the maximum size of a log
620    pub const fn max_log_length(mut self, size: u32) -> Self {
621        self.settings.max_log_length = size;
622        self
623    }
624
625    /// Set the maximum number of connections allowed. Default is 100.
626    pub const fn max_connections(mut self, max: u32) -> Self {
627        self.settings.max_connections = max;
628        self
629    }
630
631    /// Set the maximum number of subscriptions per connection. Default is 1024.
632    pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
633        self.settings.max_subscriptions_per_connection = max;
634        self
635    }
636
637    /// The server enforces backpressure which means that
638    /// `n` messages can be buffered and if the client
639    /// can't keep up with the server.
640    ///
641    /// This `capacity` is applied per connection and
642    /// applies globally on the connection which implies
643    /// all JSON-RPC messages.
644    ///
645    /// For example if a subscription produces plenty of new items
646    /// and the client can't keep up then no new messages are handled.
647    ///
648    /// If this limit is exceeded then the server will "back-off"
649    /// and only accept new messages once the client reads pending messages.
650    ///
651    /// # Panics
652    ///
653    /// Panics if the buffer capacity is 0.
654    pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
655        self.settings.message_buffer_capacity = c;
656        self
657    }
658
659    /// Configure a custom [`tokio::runtime::Handle`] to run the server on.
660    ///
661    /// Default: [`tokio::spawn`]
662    pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
663        self.settings.tokio_runtime = Some(rt);
664        self
665    }
666
667    /// Sets the permissions for the IPC socket file.
668    pub fn set_ipc_socket_permissions(mut self, permissions: Option<String>) -> Self {
669        self.settings.ipc_socket_permissions = permissions;
670        self
671    }
672
673    /// Configure custom `subscription ID` provider for the server to use
674    /// to when getting new subscription calls.
675    ///
676    /// You may choose static dispatch or dynamic dispatch because
677    /// `IdProvider` is implemented for `Box<T>`.
678    ///
679    /// Default: [`RandomIntegerIdProvider`].
680    ///
681    /// # Examples
682    ///
683    /// ```rust
684    /// use jsonrpsee::server::RandomStringIdProvider;
685    /// use reth_ipc::server::Builder;
686    ///
687    /// // static dispatch
688    /// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16));
689    ///
690    /// // or dynamic dispatch
691    /// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
692    /// ```
693    pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
694        self.id_provider = Arc::new(id_provider);
695        self
696    }
697
698    /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied
699    /// to the RPC service.
700    ///
701    /// Default: No tower layers are applied to the RPC service.
702    ///
703    /// # Examples
704    ///
705    /// ```rust
706    /// #[tokio::main]
707    /// async fn main() {
708    ///     let builder = tower::ServiceBuilder::new();
709    ///     let server = reth_ipc::server::Builder::default()
710    ///         .set_http_middleware(builder)
711    ///         .build("/tmp/my-uds".into());
712    /// }
713    /// ```
714    pub fn set_http_middleware<T>(
715        self,
716        service_builder: tower::ServiceBuilder<T>,
717    ) -> Builder<T, RpcMiddleware> {
718        Builder {
719            settings: self.settings,
720            id_provider: self.id_provider,
721            http_middleware: service_builder,
722            rpc_middleware: self.rpc_middleware,
723        }
724    }
725
726    /// Enable middleware that is invoked on every JSON-RPC call.
727    ///
728    /// The middleware itself is very similar to the `tower middleware` but
729    /// it has a different service trait which takes &self instead &mut self
730    /// which means that you can't use built-in middleware from tower.
731    ///
732    /// Another consequence of `&self` is that you must wrap any of the middleware state in
733    /// a type which is Send and provides interior mutability such `Arc<Mutex>`.
734    ///
735    /// The builder itself exposes a similar API as the [`tower::ServiceBuilder`]
736    /// where it is possible to compose layers to the middleware.
737    pub fn set_rpc_middleware<T>(
738        self,
739        rpc_middleware: RpcServiceBuilder<T>,
740    ) -> Builder<HttpMiddleware, T> {
741        Builder {
742            settings: self.settings,
743            id_provider: self.id_provider,
744            rpc_middleware,
745            http_middleware: self.http_middleware,
746        }
747    }
748
749    /// Finalize the configuration of the server. Consumes the [`Builder`].
750    pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
751        IpcServer {
752            endpoint,
753            cfg: self.settings,
754            id_provider: self.id_provider,
755            http_middleware: self.http_middleware,
756            rpc_middleware: self.rpc_middleware,
757        }
758    }
759}
760
761#[cfg(test)]
762#[expect(missing_docs)]
763pub fn dummy_name() -> String {
764    use rand::Rng;
765    let num: u64 = rand::rng().random();
766    if cfg!(windows) {
767        format!(r"\\.\pipe\my-pipe-{num}")
768    } else {
769        format!(r"/tmp/my-uds-{num}")
770    }
771}
772
773#[cfg(test)]
774mod tests {
775    use super::*;
776    use crate::client::IpcClientBuilder;
777    use futures::future::select;
778    use jsonrpsee::{
779        core::{
780            client::{self, ClientT, Error, Subscription, SubscriptionClientT},
781            middleware::{Batch, BatchEntry, Notification},
782            params::BatchRequestBuilder,
783        },
784        rpc_params,
785        types::Request,
786        PendingSubscriptionSink, RpcModule, SubscriptionMessage,
787    };
788    use reth_tracing::init_test_tracing;
789    use std::pin::pin;
790    use tokio::sync::broadcast;
791    use tokio_stream::wrappers::BroadcastStream;
792
793    #[tokio::test]
794    #[cfg(unix)]
795    async fn test_ipc_socket_permissions() {
796        use std::os::unix::fs::PermissionsExt;
797        let endpoint = &dummy_name();
798        let perms = "0777";
799        let server = Builder::default()
800            .set_ipc_socket_permissions(Some(perms.to_string()))
801            .build(endpoint.clone());
802        let module = RpcModule::new(());
803        let handle = server.start(module).await.unwrap();
804        tokio::spawn(handle.stopped());
805
806        let meta = std::fs::metadata(endpoint).unwrap();
807        let perms = meta.permissions();
808        assert_eq!(perms.mode() & 0o777, 0o777);
809    }
810
811    async fn pipe_from_stream_with_bounded_buffer(
812        pending: PendingSubscriptionSink,
813        stream: BroadcastStream<usize>,
814    ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
815        let sink = pending.accept().await.unwrap();
816        let closed = sink.closed();
817
818        let mut closed = pin!(closed);
819        let mut stream = pin!(stream);
820
821        loop {
822            match select(closed, stream.next()).await {
823                // subscription closed or stream is closed.
824                Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
825
826                // received new item from the stream.
827                Either::Right((Some(Ok(item)), c)) => {
828                    let raw_value = serde_json::value::to_raw_value(&item)?;
829                    let notif = SubscriptionMessage::from(raw_value);
830
831                    // NOTE: this will block until there a spot in the queue
832                    // and you might want to do something smarter if it's
833                    // critical that "the most recent item" must be sent when it is produced.
834                    if sink.send(notif).await.is_err() {
835                        break Ok(());
836                    }
837
838                    closed = c;
839                }
840
841                // Send back the error.
842                Either::Right((Some(Err(e)), _)) => break Err(e.into()),
843            }
844        }
845    }
846
847    // Naive example that broadcasts the produced values to all active subscribers.
848    fn produce_items(tx: broadcast::Sender<usize>) {
849        for c in 1..=100 {
850            std::thread::sleep(std::time::Duration::from_millis(1));
851            let _ = tx.send(c);
852        }
853    }
854
855    #[tokio::test]
856    async fn can_set_the_max_response_body_size() {
857        // init_test_tracing();
858        let endpoint = &dummy_name();
859        let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
860        let mut module = RpcModule::new(());
861        module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
862        let handle = server.start(module).await.unwrap();
863        tokio::spawn(handle.stopped());
864
865        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
866        let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
867        assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
868    }
869
870    #[tokio::test]
871    async fn can_set_the_max_request_body_size() {
872        init_test_tracing();
873        let endpoint = &dummy_name();
874        let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
875        let mut module = RpcModule::new(());
876        module.register_method("anything", |_, _, _| "succeed").unwrap();
877        let handle = server.start(module).await.unwrap();
878        tokio::spawn(handle.stopped());
879
880        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
881        let response: Result<String, Error> =
882            client.request("anything", rpc_params!["a".repeat(101)]).await;
883        assert!(response.is_err());
884        let mut batch_request_builder = BatchRequestBuilder::new();
885        let _ = batch_request_builder.insert("anything", rpc_params![]);
886        let _ = batch_request_builder.insert("anything", rpc_params![]);
887        let _ = batch_request_builder.insert("anything", rpc_params![]);
888        // the raw request string is:
889        //  [{"jsonrpc":"2.0","id":0,"method":"anything"},{"jsonrpc":"2.0","id":1, \
890        //    "method":"anything"},{"jsonrpc":"2.0","id":2,"method":"anything"}]"
891        // which is 136 bytes, more than 100 bytes.
892        let response: Result<client::BatchResponse<'_, String>, Error> =
893            client.batch_request(batch_request_builder).await;
894        assert!(response.is_err());
895    }
896
897    #[tokio::test]
898    async fn can_set_max_connections() {
899        init_test_tracing();
900
901        let endpoint = &dummy_name();
902        let server = Builder::default().max_connections(2).build(endpoint.clone());
903        let mut module = RpcModule::new(());
904        module.register_method("anything", |_, _, _| "succeed").unwrap();
905        let handle = server.start(module).await.unwrap();
906        tokio::spawn(handle.stopped());
907
908        let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
909        let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
910        let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
911
912        let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
913        let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
914        let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
915
916        assert!(response1.is_ok());
917        assert!(response2.is_ok());
918        // Third connection is rejected
919        assert!(response3.is_err());
920
921        // Decrement connection count
922        drop(client2);
923        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
924
925        // Can connect again
926        let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
927        let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
928        assert!(response4.is_ok());
929    }
930
931    #[tokio::test]
932    async fn test_rpc_request() {
933        init_test_tracing();
934        let endpoint = &dummy_name();
935        let server = Builder::default().build(endpoint.clone());
936        let mut module = RpcModule::new(());
937        let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
938        module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
939        let handle = server.start(module).await.unwrap();
940        tokio::spawn(handle.stopped());
941
942        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
943        let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
944        assert_eq!(response, msg);
945    }
946
947    #[tokio::test]
948    async fn test_batch_request() {
949        let endpoint = &dummy_name();
950        let server = Builder::default().build(endpoint.clone());
951        let mut module = RpcModule::new(());
952        module.register_method("anything", |_, _, _| "ok").unwrap();
953        let handle = server.start(module).await.unwrap();
954        tokio::spawn(handle.stopped());
955
956        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
957        let mut batch_request_builder = BatchRequestBuilder::new();
958        let _ = batch_request_builder.insert("anything", rpc_params![]);
959        let _ = batch_request_builder.insert("anything", rpc_params![]);
960        let _ = batch_request_builder.insert("anything", rpc_params![]);
961        let result = client
962            .batch_request(batch_request_builder)
963            .await
964            .unwrap()
965            .into_ok()
966            .unwrap()
967            .collect::<Vec<String>>();
968        assert_eq!(result, vec!["ok", "ok", "ok"]);
969    }
970
971    #[tokio::test]
972    async fn test_ipc_modules() {
973        reth_tracing::init_test_tracing();
974        let endpoint = &dummy_name();
975        let server = Builder::default().build(endpoint.clone());
976        let mut module = RpcModule::new(());
977        let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
978        module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
979        let handle = server.start(module).await.unwrap();
980        tokio::spawn(handle.stopped());
981
982        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
983        let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
984        assert_eq!(response, msg);
985    }
986
987    #[tokio::test(flavor = "multi_thread")]
988    async fn test_rpc_subscription() {
989        let endpoint = &dummy_name();
990        let server = Builder::default().build(endpoint.clone());
991        let (tx, _rx) = broadcast::channel::<usize>(16);
992
993        let mut module = RpcModule::new(tx.clone());
994        std::thread::spawn(move || produce_items(tx));
995
996        module
997            .register_subscription(
998                "subscribe_hello",
999                "s_hello",
1000                "unsubscribe_hello",
1001                |_, pending, tx, _| async move {
1002                    let rx = tx.subscribe();
1003                    let stream = BroadcastStream::new(rx);
1004                    pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1005                    Ok(())
1006                },
1007            )
1008            .unwrap();
1009
1010        let handle = server.start(module).await.unwrap();
1011        tokio::spawn(handle.stopped());
1012
1013        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1014        let sub: Subscription<usize> =
1015            client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1016
1017        let items = sub.take(16).collect::<Vec<_>>().await;
1018        assert_eq!(items.len(), 16);
1019    }
1020
1021    #[tokio::test]
1022    async fn test_rpc_middleware() {
1023        #[derive(Clone)]
1024        struct ModifyRequestIf<S>(S);
1025
1026        impl<S> RpcServiceT for ModifyRequestIf<S>
1027        where
1028            S: Send + Sync + RpcServiceT,
1029        {
1030            type MethodResponse = S::MethodResponse;
1031            type NotificationResponse = S::NotificationResponse;
1032            type BatchResponse = S::BatchResponse;
1033
1034            fn call<'a>(
1035                &self,
1036                mut req: Request<'a>,
1037            ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
1038                // Re-direct all calls that isn't `say_hello` to `say_goodbye`
1039                if req.method == "say_hello" {
1040                    req.method = "say_goodbye".into();
1041                } else if req.method == "say_goodbye" {
1042                    req.method = "say_hello".into();
1043                }
1044
1045                self.0.call(req)
1046            }
1047
1048            fn batch<'a>(
1049                &self,
1050                mut batch: Batch<'a>,
1051            ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1052                for call in batch.iter_mut() {
1053                    match call {
1054                        Ok(BatchEntry::Call(req)) => {
1055                            if req.method == "say_hello" {
1056                                req.method = "say_goodbye".into();
1057                            } else if req.method == "say_goodbye" {
1058                                req.method = "say_hello".into();
1059                            }
1060                        }
1061                        Ok(BatchEntry::Notification(n)) => {
1062                            if n.method == "say_hello" {
1063                                n.method = "say_goodbye".into();
1064                            } else if n.method == "say_goodbye" {
1065                                n.method = "say_hello".into();
1066                            }
1067                        }
1068                        // Invalid request, we don't care about it.
1069                        Err(_err) => {}
1070                    }
1071                }
1072
1073                self.0.batch(batch)
1074            }
1075
1076            fn notification<'a>(
1077                &self,
1078                mut n: Notification<'a>,
1079            ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1080                if n.method == "say_hello" {
1081                    n.method = "say_goodbye".into();
1082                } else if n.method == "say_goodbye" {
1083                    n.method = "say_hello".into();
1084                }
1085                self.0.notification(n)
1086            }
1087        }
1088
1089        reth_tracing::init_test_tracing();
1090        let endpoint = &dummy_name();
1091
1092        let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1093        let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1094
1095        let mut module = RpcModule::new(());
1096        let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1097        let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1098        module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1099        module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1100        let handle = server.start(module).await.unwrap();
1101        tokio::spawn(handle.stopped());
1102
1103        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1104        let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1105        let say_goodbye_response: String =
1106            client.request("say_goodbye", rpc_params![]).await.unwrap();
1107
1108        assert_eq!(say_hello_response, goodbye_msg);
1109        assert_eq!(say_goodbye_response, hello_msg);
1110    }
1111}