Skip to main content

reth_ipc/server/
mod.rs

1//! JSON-RPC IPC server implementation
2
3use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7    tokio::prelude::{LocalSocketListener, LocalSocketStream},
8    traits::tokio::{Listener, Stream},
9    GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12    core::{
13        middleware::layer::{Either as RpcEither, RpcLoggerLayer},
14        JsonRawValue, TEN_MB_SIZE_BYTES,
15    },
16    server::{
17        middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
18        RandomIntegerIdProvider, ServerHandle, StopHandle,
19    },
20    BoundedSubscriptions, MethodResponse, MethodSink, Methods,
21};
22use std::{
23    future::Future,
24    io,
25    pin::{pin, Pin},
26    sync::Arc,
27    task::{Context, Poll},
28};
29use tokio::{
30    io::{AsyncRead, AsyncWrite, AsyncWriteExt},
31    sync::oneshot,
32};
33use tower::{layer::util::Identity, Layer, Service};
34use tracing::{debug, instrument, trace, warn, Instrument};
35// re-export so can be used during builder setup
36use crate::{
37    server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
38    stream_codec::StreamCodec,
39};
40use tokio::sync::mpsc;
41use tokio_stream::wrappers::ReceiverStream;
42use tower::layer::{util::Stack, LayerFn};
43
44mod connection;
45mod ipc;
46mod rpc_service;
47
48pub use rpc_service::RpcService;
49
50/// Ipc Server implementation
51///
52/// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
53pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
54    /// The endpoint we listen for incoming transactions
55    endpoint: String,
56    id_provider: Arc<dyn IdProvider>,
57    cfg: Settings,
58    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
59    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
60}
61
62impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
63    /// Returns the configured endpoint
64    pub fn endpoint(&self) -> String {
65        self.endpoint.clone()
66    }
67}
68
69impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
70where
71    RpcMiddleware: Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
72    HttpMiddleware: Layer<
73            TowerServiceNoHttp<RpcMiddleware>,
74            Service: Service<
75                String,
76                Response = Option<String>,
77                Error = Box<dyn core::error::Error + Send + Sync + 'static>,
78                Future: Send + Unpin,
79            > + Send,
80        > + Send
81        + 'static,
82{
83    /// Start responding to connections requests.
84    ///
85    /// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is
86    /// dropped.
87    ///
88    /// ```
89    /// use jsonrpsee::RpcModule;
90    /// use reth_ipc::server::Builder;
91    /// async fn run_server() -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
92    ///     let server = Builder::default().build("/tmp/my-uds".into());
93    ///     let mut module = RpcModule::new(());
94    ///     module.register_method("say_hello", |_, _, _| "lo")?;
95    ///     let handle = server.start(module).await?;
96    ///
97    ///     // In this example we don't care about doing shutdown so let's it run forever.
98    ///     // You may use the `ServerHandle` to shut it down or manage it yourself.
99    ///     let server = tokio::spawn(handle.stopped());
100    ///     server.await.unwrap();
101    ///     Ok(())
102    /// }
103    /// ```
104    pub async fn start(
105        mut self,
106        methods: impl Into<Methods>,
107    ) -> Result<ServerHandle, IpcServerStartError> {
108        let methods = methods.into();
109
110        let (stop_handle, server_handle) = stop_channel();
111
112        // use a signal channel to wait until we're ready to accept connections
113        let (tx, rx) = oneshot::channel();
114
115        match self.cfg.tokio_runtime.take() {
116            Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
117            None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
118        };
119        rx.await.expect("channel is open")?;
120
121        Ok(server_handle)
122    }
123
124    async fn start_inner(
125        self,
126        methods: Methods,
127        stop_handle: StopHandle,
128        on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
129    ) {
130        trace!(endpoint = ?self.endpoint, "starting ipc server");
131
132        if cfg!(unix) {
133            // ensure the file does not exist
134            if std::fs::remove_file(&self.endpoint).is_ok() {
135                debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
136            }
137        }
138
139        let listener = match self
140            .endpoint
141            .as_str()
142            .to_fs_name::<GenericFilePath>()
143            .and_then(|name| ListenerOptions::new().name(name).create_tokio())
144        {
145            Ok(listener) => {
146                #[cfg(unix)]
147                {
148                    // set permissions only on unix
149                    use std::os::unix::fs::PermissionsExt;
150                    if let Some(perms_str) = &self.cfg.ipc_socket_permissions &&
151                        let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8)
152                    {
153                        let perms = std::fs::Permissions::from_mode(mode);
154                        let _ = std::fs::set_permissions(&self.endpoint, perms);
155                    }
156                }
157                listener
158            }
159            Err(err) => {
160                on_ready
161                    .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
162                    .ok();
163                return;
164            }
165        };
166
167        // signal that we're ready to accept connections
168        on_ready.send(Ok(())).ok();
169
170        let mut id: u32 = 0;
171        let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
172
173        let stopped = stop_handle.clone().shutdown();
174        let mut stopped = pin!(stopped);
175
176        let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
177
178        trace!("accepting ipc connections");
179        loop {
180            match try_accept_conn(&listener, stopped).await {
181                AcceptConnection::Established { local_socket_stream, stop } => {
182                    let Some(conn_permit) = connection_guard.try_acquire() else {
183                        let (_reader, mut writer) = local_socket_stream.split();
184                        let _ = writer
185                            .write_all(b"Too many connections. Please try again later.")
186                            .await;
187                        stopped = stop;
188                        continue;
189                    };
190
191                    let max_conns = connection_guard.max_connections();
192                    let curr_conns = max_conns - connection_guard.available_connections();
193                    trace!("Accepting new connection {}/{}", curr_conns, max_conns);
194
195                    let conn_permit = Arc::new(conn_permit);
196
197                    process_connection(ProcessConnection {
198                        http_middleware: &self.http_middleware,
199                        rpc_middleware: self.rpc_middleware.clone(),
200                        conn_permit,
201                        conn_id: id,
202                        server_cfg: self.cfg.clone(),
203                        stop_handle: stop_handle.clone(),
204                        drop_on_completion: drop_on_completion.clone(),
205                        methods: methods.clone(),
206                        id_provider: self.id_provider.clone(),
207                        local_socket_stream,
208                    });
209
210                    id = id.wrapping_add(1);
211                    stopped = stop;
212                }
213                AcceptConnection::Shutdown => {
214                    break;
215                }
216                AcceptConnection::Err((err, stop)) => {
217                    tracing::error!(%err, "Failed accepting a new IPC connection");
218                    stopped = stop;
219                }
220            }
221        }
222
223        // Drop the last Sender
224        drop(drop_on_completion);
225
226        // Once this channel is closed it is safe to assume that all connections have been
227        // gracefully shutdown
228        while process_connection_awaiter.recv().await.is_some() {
229            // Generally, messages should not be sent across this channel,
230            // but we'll loop here to wait for `None` just to be on the safe side
231        }
232    }
233}
234
235enum AcceptConnection<S> {
236    Shutdown,
237    Established { local_socket_stream: LocalSocketStream, stop: S },
238    Err((io::Error, S)),
239}
240
241async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
242where
243    S: Future + Unpin,
244{
245    match futures_util::future::select(pin!(listener.accept()), stopped).await {
246        Either::Left((res, stop)) => match res {
247            Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
248            Err(e) => AcceptConnection::Err((e, stop)),
249        },
250        Either::Right(_) => AcceptConnection::Shutdown,
251    }
252}
253
254impl<HttpMiddleware, RpcMiddleware> std::fmt::Debug for IpcServer<HttpMiddleware, RpcMiddleware> {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        f.debug_struct("IpcServer")
257            .field("endpoint", &self.endpoint)
258            .field("cfg", &self.cfg)
259            .field("id_provider", &self.id_provider)
260            .finish()
261    }
262}
263
264/// Error thrown when server couldn't be started.
265#[derive(Debug, thiserror::Error)]
266#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
267pub struct IpcServerStartError {
268    endpoint: String,
269    #[source]
270    source: io::Error,
271}
272
273/// Data required by the server to handle requests received via an IPC connection
274#[derive(Debug, Clone)]
275#[allow(dead_code)]
276pub(crate) struct ServiceData {
277    /// Registered server methods.
278    pub(crate) methods: Methods,
279    /// Subscription ID provider.
280    pub(crate) id_provider: Arc<dyn IdProvider>,
281    /// Stop handle.
282    pub(crate) stop_handle: StopHandle,
283    /// Connection ID
284    pub(crate) conn_id: u32,
285    /// Connection Permit.
286    pub(crate) conn_permit: Arc<ConnectionPermit>,
287    /// Limits the number of subscriptions for this connection
288    pub(crate) bounded_subscriptions: BoundedSubscriptions,
289    /// Sink that is used to send back responses to the connection.
290    ///
291    /// This is used for subscriptions.
292    pub(crate) method_sink: MethodSink,
293    /// `ServerConfig`
294    pub(crate) server_cfg: Settings,
295}
296
297/// Similar to [`tower::ServiceBuilder`] but doesn't
298/// support any tower middleware implementations.
299#[derive(Debug, Clone)]
300pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
301
302impl Default for RpcServiceBuilder<Identity> {
303    fn default() -> Self {
304        Self(tower::ServiceBuilder::new())
305    }
306}
307
308impl RpcServiceBuilder<Identity> {
309    /// Create a new [`RpcServiceBuilder`].
310    pub const fn new() -> Self {
311        Self(tower::ServiceBuilder::new())
312    }
313}
314
315impl<L> RpcServiceBuilder<L> {
316    /// Optionally add a new layer `T` to the [`RpcServiceBuilder`].
317    ///
318    /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details.
319    pub fn option_layer<T>(
320        self,
321        layer: Option<T>,
322    ) -> RpcServiceBuilder<Stack<RpcEither<T, Identity>, L>> {
323        let layer = if let Some(layer) = layer {
324            RpcEither::Left(layer)
325        } else {
326            RpcEither::Right(Identity::new())
327        };
328        self.layer(layer)
329    }
330
331    /// Add a new layer `T` to the [`RpcServiceBuilder`].
332    ///
333    /// See the documentation for [`tower::ServiceBuilder::layer`] for more details.
334    pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
335        RpcServiceBuilder(self.0.layer(layer))
336    }
337
338    /// Add a [`tower::Layer`] built from a function that accepts a service and returns another
339    /// service.
340    ///
341    /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details.
342    pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
343        RpcServiceBuilder(self.0.layer_fn(f))
344    }
345
346    /// Add a logging layer to [`RpcServiceBuilder`]
347    ///
348    /// This logs each request and response for every call.
349    pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
350        RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
351    }
352
353    /// Wrap the service `S` with the middleware.
354    pub(crate) fn service<S>(&self, service: S) -> L::Service
355    where
356        L: tower::Layer<S>,
357    {
358        self.0.service(service)
359    }
360}
361
362/// `JsonRPSee` service compatible with `tower`.
363///
364/// # Note
365/// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html).
366#[derive(Debug, Clone)]
367pub struct TowerServiceNoHttp<L> {
368    inner: ServiceData,
369    rpc_middleware: RpcServiceBuilder<L>,
370}
371
372impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
373where
374    RpcMiddleware: Layer<RpcService>,
375    <RpcMiddleware as Layer<RpcService>>::Service:
376        Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
377{
378    /// The response of a handled RPC call
379    ///
380    /// This is an `Option` because subscriptions and call responses are handled differently.
381    /// This will be `Some` for calls, and `None` for subscriptions, because the subscription
382    /// response will be emitted via the `method_sink`.
383    type Response = Option<String>;
384
385    type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
386
387    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
388
389    /// Opens door for back pressure implementation.
390    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
391        Poll::Ready(Ok(()))
392    }
393
394    fn call(&mut self, request: String) -> Self::Future {
395        trace!("{:?}", request);
396
397        let cfg = RpcServiceCfg {
398            bounded_subscriptions: BoundedSubscriptions::new(
399                self.inner.server_cfg.max_subscriptions_per_connection,
400            ),
401            id_provider: self.inner.id_provider.clone(),
402            sink: self.inner.method_sink.clone(),
403        };
404
405        let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
406        let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
407        let conn = self.inner.conn_permit.clone();
408        let rpc_service = self.rpc_middleware.service(RpcService::new(
409            self.inner.methods.clone(),
410            max_response_body_size,
411            self.inner.conn_id.into(),
412            cfg,
413        ));
414        // an ipc connection needs to handle read+write concurrently
415        // even if the underlying rpc handler spawns the actual work or is does a lot of async any
416        // additional overhead performed by `handle_request` can result in I/O latencies, for
417        // example tracing calls are relatively CPU expensive on serde::serialize alone, moving this
418        // work to a separate task takes the pressure off the connection so all concurrent responses
419        // are also serialized concurrently and the connection can focus on read+write
420        let f = tokio::task::spawn(async move {
421            ipc::call_with_service(
422                request,
423                rpc_service,
424                max_response_body_size,
425                max_request_body_size,
426                conn,
427            )
428            .await
429        });
430
431        Box::pin(async move { f.await.map_err(|err| err.into()) })
432    }
433}
434
435struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
436    http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
437    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
438    conn_permit: Arc<ConnectionPermit>,
439    conn_id: u32,
440    server_cfg: Settings,
441    stop_handle: StopHandle,
442    drop_on_completion: mpsc::Sender<()>,
443    methods: Methods,
444    id_provider: Arc<dyn IdProvider>,
445    local_socket_stream: LocalSocketStream,
446}
447
448/// Spawns the IPC connection onto a new task
449#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id))]
450fn process_connection<RpcMiddleware, HttpMiddleware>(
451    params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
452) where
453    RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
454    for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
455    HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
456    <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
457    + Service<
458        String,
459        Response = Option<String>,
460        Error = Box<dyn core::error::Error + Send + Sync + 'static>,
461    >,
462    <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
463    Send + Unpin,
464 {
465    let ProcessConnection {
466        http_middleware,
467        rpc_middleware,
468        conn_permit,
469        conn_id,
470        server_cfg,
471        stop_handle,
472        drop_on_completion,
473        id_provider,
474        methods,
475        local_socket_stream,
476    } = params;
477
478    let ipc = IpcConn(tokio_util::codec::Decoder::framed(
479        StreamCodec::stream_incoming(),
480        local_socket_stream,
481    ));
482
483    let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
484    let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
485    let tower_service = TowerServiceNoHttp {
486        inner: ServiceData {
487            methods,
488            id_provider,
489            stop_handle: stop_handle.clone(),
490            server_cfg: server_cfg.clone(),
491            conn_id,
492            conn_permit,
493            bounded_subscriptions: BoundedSubscriptions::new(
494                server_cfg.max_subscriptions_per_connection,
495            ),
496            method_sink,
497        },
498        rpc_middleware,
499    };
500
501    let service = http_middleware.service(tower_service);
502    tokio::spawn(async {
503        to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
504        drop(drop_on_completion)
505    });
506}
507
508async fn to_ipc_service<S, T>(
509    ipc: IpcConn<JsonRpcStream<T>>,
510    service: S,
511    stop_handle: StopHandle,
512    rx: mpsc::Receiver<Box<JsonRawValue>>,
513) where
514    S: Service<String, Response = Option<String>> + Send + 'static,
515    S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
516    S::Future: Send + Unpin,
517    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
518{
519    let rx_item = ReceiverStream::new(rx);
520    let conn = IpcConnDriver {
521        conn: ipc,
522        service,
523        pending_calls: Default::default(),
524        items: Default::default(),
525    };
526    let stopped = stop_handle.shutdown();
527
528    let mut conn = pin!(conn);
529    let mut rx_item = pin!(rx_item);
530    let mut stopped = pin!(stopped);
531
532    loop {
533        tokio::select! {
534            _ = &mut conn => {
535               break
536            }
537            item = rx_item.next() => {
538                let Some(item) = item else { break };
539                conn.push_back(item.to_string());
540            }
541            _ = &mut stopped => {
542                // shutdown
543                break
544            }
545        }
546    }
547}
548
549/// JSON-RPC IPC server settings.
550#[derive(Debug, Clone)]
551pub struct Settings {
552    /// Maximum size in bytes of a request.
553    max_request_body_size: u32,
554    /// Maximum size in bytes of a response.
555    max_response_body_size: u32,
556    /// Max length for logging for requests and responses
557    ///
558    /// Logs bigger than this limit will be truncated.
559    max_log_length: u32,
560    /// Maximum number of incoming connections allowed.
561    max_connections: u32,
562    /// Maximum number of subscriptions per connection.
563    max_subscriptions_per_connection: u32,
564    /// Number of messages that server is allowed `buffer` until backpressure kicks in.
565    message_buffer_capacity: u32,
566    /// Custom tokio runtime to run the server on.
567    tokio_runtime: Option<tokio::runtime::Handle>,
568    /// The permissions to create the IPC socket with.
569    ipc_socket_permissions: Option<String>,
570}
571
572impl Default for Settings {
573    fn default() -> Self {
574        Self {
575            max_request_body_size: TEN_MB_SIZE_BYTES,
576            max_response_body_size: TEN_MB_SIZE_BYTES,
577            max_log_length: 4096,
578            max_connections: 100,
579            max_subscriptions_per_connection: 1024,
580            message_buffer_capacity: 1024,
581            tokio_runtime: None,
582            ipc_socket_permissions: None,
583        }
584    }
585}
586
587/// Builder to configure and create a JSON-RPC server
588#[derive(Debug)]
589pub struct Builder<HttpMiddleware, RpcMiddleware> {
590    settings: Settings,
591    /// Subscription ID provider.
592    id_provider: Arc<dyn IdProvider>,
593    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
594    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
595}
596
597impl Default for Builder<Identity, Identity> {
598    fn default() -> Self {
599        Self {
600            settings: Settings::default(),
601            id_provider: Arc::new(RandomIntegerIdProvider),
602            rpc_middleware: RpcServiceBuilder::new(),
603            http_middleware: tower::ServiceBuilder::new(),
604        }
605    }
606}
607
608impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
609    /// Set the maximum size of a request body in bytes. Default is 10 MiB.
610    pub const fn max_request_body_size(mut self, size: u32) -> Self {
611        self.settings.max_request_body_size = size;
612        self
613    }
614
615    /// Set the maximum size of a response body in bytes. Default is 10 MiB.
616    pub const fn max_response_body_size(mut self, size: u32) -> Self {
617        self.settings.max_response_body_size = size;
618        self
619    }
620
621    /// Set the maximum size of a log
622    pub const fn max_log_length(mut self, size: u32) -> Self {
623        self.settings.max_log_length = size;
624        self
625    }
626
627    /// Set the maximum number of connections allowed. Default is 100.
628    pub const fn max_connections(mut self, max: u32) -> Self {
629        self.settings.max_connections = max;
630        self
631    }
632
633    /// Set the maximum number of subscriptions per connection. Default is 1024.
634    pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
635        self.settings.max_subscriptions_per_connection = max;
636        self
637    }
638
639    /// The server enforces backpressure which means that
640    /// `n` messages can be buffered and if the client
641    /// can't keep up with the server.
642    ///
643    /// This `capacity` is applied per connection and
644    /// applies globally on the connection which implies
645    /// all JSON-RPC messages.
646    ///
647    /// For example if a subscription produces plenty of new items
648    /// and the client can't keep up then no new messages are handled.
649    ///
650    /// If this limit is exceeded then the server will "back-off"
651    /// and only accept new messages once the client reads pending messages.
652    ///
653    /// # Panics
654    ///
655    /// Panics if the buffer capacity is 0.
656    pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
657        self.settings.message_buffer_capacity = c;
658        self
659    }
660
661    /// Configure a custom [`tokio::runtime::Handle`] to run the server on.
662    ///
663    /// Default: [`tokio::spawn`]
664    pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
665        self.settings.tokio_runtime = Some(rt);
666        self
667    }
668
669    /// Sets the permissions for the IPC socket file.
670    pub fn set_ipc_socket_permissions(mut self, permissions: Option<String>) -> Self {
671        self.settings.ipc_socket_permissions = permissions;
672        self
673    }
674
675    /// Configure custom `subscription ID` provider for the server to use
676    /// to when getting new subscription calls.
677    ///
678    /// You may choose static dispatch or dynamic dispatch because
679    /// `IdProvider` is implemented for `Box<T>`.
680    ///
681    /// Default: [`RandomIntegerIdProvider`].
682    ///
683    /// # Examples
684    ///
685    /// ```rust
686    /// use jsonrpsee::server::RandomStringIdProvider;
687    /// use reth_ipc::server::Builder;
688    ///
689    /// // static dispatch
690    /// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16));
691    ///
692    /// // or dynamic dispatch
693    /// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
694    /// ```
695    pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
696        self.id_provider = Arc::new(id_provider);
697        self
698    }
699
700    /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied
701    /// to the RPC service.
702    ///
703    /// Default: No tower layers are applied to the RPC service.
704    ///
705    /// # Examples
706    ///
707    /// ```rust
708    /// #[tokio::main]
709    /// async fn main() {
710    ///     let builder = tower::ServiceBuilder::new();
711    ///     let server = reth_ipc::server::Builder::default()
712    ///         .set_http_middleware(builder)
713    ///         .build("/tmp/my-uds".into());
714    /// }
715    /// ```
716    pub fn set_http_middleware<T>(
717        self,
718        service_builder: tower::ServiceBuilder<T>,
719    ) -> Builder<T, RpcMiddleware> {
720        Builder {
721            settings: self.settings,
722            id_provider: self.id_provider,
723            http_middleware: service_builder,
724            rpc_middleware: self.rpc_middleware,
725        }
726    }
727
728    /// Enable middleware that is invoked on every JSON-RPC call.
729    ///
730    /// The middleware itself is very similar to the `tower middleware` but
731    /// it has a different service trait which takes &self instead &mut self
732    /// which means that you can't use built-in middleware from tower.
733    ///
734    /// Another consequence of `&self` is that you must wrap any of the middleware state in
735    /// a type which is Send and provides interior mutability such `Arc<Mutex>`.
736    ///
737    /// The builder itself exposes a similar API as the [`tower::ServiceBuilder`]
738    /// where it is possible to compose layers to the middleware.
739    pub fn set_rpc_middleware<T>(
740        self,
741        rpc_middleware: RpcServiceBuilder<T>,
742    ) -> Builder<HttpMiddleware, T> {
743        Builder {
744            settings: self.settings,
745            id_provider: self.id_provider,
746            rpc_middleware,
747            http_middleware: self.http_middleware,
748        }
749    }
750
751    /// Finalize the configuration of the server. Consumes the [`Builder`].
752    pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
753        IpcServer {
754            endpoint,
755            cfg: self.settings,
756            id_provider: self.id_provider,
757            http_middleware: self.http_middleware,
758            rpc_middleware: self.rpc_middleware,
759        }
760    }
761}
762
763#[cfg(test)]
764#[expect(missing_docs)]
765pub fn dummy_name() -> String {
766    use rand::Rng;
767    let num: u64 = rand::rng().random();
768    format!(r"/tmp/my-uds-{num}")
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774    use crate::client::IpcClientBuilder;
775    use futures::future::select;
776    use jsonrpsee::{
777        core::{
778            client::{self, ClientT, Error, Subscription, SubscriptionClientT},
779            middleware::{Batch, BatchEntry, Notification},
780            params::BatchRequestBuilder,
781        },
782        rpc_params,
783        types::Request,
784        PendingSubscriptionSink, RpcModule, SubscriptionMessage,
785    };
786    use reth_tracing::init_test_tracing;
787    use std::pin::pin;
788    use tokio::sync::broadcast;
789    use tokio_stream::wrappers::BroadcastStream;
790
791    #[tokio::test]
792    #[cfg(unix)]
793    async fn test_ipc_socket_permissions() {
794        use std::os::unix::fs::PermissionsExt;
795        let endpoint = &dummy_name();
796        let perms = "0777";
797        let server = Builder::default()
798            .set_ipc_socket_permissions(Some(perms.to_string()))
799            .build(endpoint.clone());
800        let module = RpcModule::new(());
801        let handle = server.start(module).await.unwrap();
802        tokio::spawn(handle.stopped());
803
804        let meta = std::fs::metadata(endpoint).unwrap();
805        let perms = meta.permissions();
806        assert_eq!(perms.mode() & 0o777, 0o777);
807    }
808
809    async fn pipe_from_stream_with_bounded_buffer(
810        pending: PendingSubscriptionSink,
811        stream: BroadcastStream<usize>,
812    ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
813        let sink = pending.accept().await.unwrap();
814        let closed = sink.closed();
815
816        let mut closed = pin!(closed);
817        let mut stream = pin!(stream);
818
819        loop {
820            match select(closed, stream.next()).await {
821                // subscription closed or stream is closed.
822                Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
823
824                // received new item from the stream.
825                Either::Right((Some(Ok(item)), c)) => {
826                    let raw_value = serde_json::value::to_raw_value(&item)?;
827                    let notif = SubscriptionMessage::from(raw_value);
828
829                    // NOTE: this will block until there a spot in the queue
830                    // and you might want to do something smarter if it's
831                    // critical that "the most recent item" must be sent when it is produced.
832                    if sink.send(notif).await.is_err() {
833                        break Ok(());
834                    }
835
836                    closed = c;
837                }
838
839                // Send back the error.
840                Either::Right((Some(Err(e)), _)) => break Err(e.into()),
841            }
842        }
843    }
844
845    // Naive example that broadcasts the produced values to all active subscribers.
846    fn produce_items(tx: broadcast::Sender<usize>) {
847        for c in 1..=100 {
848            std::thread::sleep(std::time::Duration::from_millis(1));
849            let _ = tx.send(c);
850        }
851    }
852
853    #[tokio::test]
854    async fn can_set_the_max_response_body_size() {
855        // init_test_tracing();
856        let endpoint = &dummy_name();
857        let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
858        let mut module = RpcModule::new(());
859        module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
860        let handle = server.start(module).await.unwrap();
861        tokio::spawn(handle.stopped());
862
863        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
864        let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
865        assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
866    }
867
868    #[tokio::test]
869    async fn can_set_the_max_request_body_size() {
870        init_test_tracing();
871        let endpoint = &dummy_name();
872        let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
873        let mut module = RpcModule::new(());
874        module.register_method("anything", |_, _, _| "succeed").unwrap();
875        let handle = server.start(module).await.unwrap();
876        tokio::spawn(handle.stopped());
877
878        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
879        let response: Result<String, Error> =
880            client.request("anything", rpc_params!["a".repeat(101)]).await;
881        assert!(response.is_err());
882        let mut batch_request_builder = BatchRequestBuilder::new();
883        let _ = batch_request_builder.insert("anything", rpc_params![]);
884        let _ = batch_request_builder.insert("anything", rpc_params![]);
885        let _ = batch_request_builder.insert("anything", rpc_params![]);
886        // the raw request string is:
887        //  [{"jsonrpc":"2.0","id":0,"method":"anything"},{"jsonrpc":"2.0","id":1, \
888        //    "method":"anything"},{"jsonrpc":"2.0","id":2,"method":"anything"}]"
889        // which is 136 bytes, more than 100 bytes.
890        let response: Result<client::BatchResponse<'_, String>, Error> =
891            client.batch_request(batch_request_builder).await;
892        assert!(response.is_err());
893    }
894
895    #[tokio::test]
896    async fn can_set_max_connections() {
897        init_test_tracing();
898
899        let endpoint = &dummy_name();
900        let server = Builder::default().max_connections(2).build(endpoint.clone());
901        let mut module = RpcModule::new(());
902        module.register_method("anything", |_, _, _| "succeed").unwrap();
903        let handle = server.start(module).await.unwrap();
904        tokio::spawn(handle.stopped());
905
906        let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
907        let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
908        let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
909
910        let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
911        let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
912        let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
913
914        assert!(response1.is_ok());
915        assert!(response2.is_ok());
916        // Third connection is rejected
917        assert!(response3.is_err());
918
919        // Decrement connection count
920        drop(client2);
921        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
922
923        // Can connect again
924        let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
925        let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
926        assert!(response4.is_ok());
927    }
928
929    #[tokio::test]
930    async fn test_rpc_request() {
931        init_test_tracing();
932        let endpoint = &dummy_name();
933        let server = Builder::default().build(endpoint.clone());
934        let mut module = RpcModule::new(());
935        let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
936        module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
937        let handle = server.start(module).await.unwrap();
938        tokio::spawn(handle.stopped());
939
940        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
941        let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
942        assert_eq!(response, msg);
943    }
944
945    #[tokio::test]
946    async fn test_batch_request() {
947        let endpoint = &dummy_name();
948        let server = Builder::default().build(endpoint.clone());
949        let mut module = RpcModule::new(());
950        module.register_method("anything", |_, _, _| "ok").unwrap();
951        let handle = server.start(module).await.unwrap();
952        tokio::spawn(handle.stopped());
953
954        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
955        let mut batch_request_builder = BatchRequestBuilder::new();
956        let _ = batch_request_builder.insert("anything", rpc_params![]);
957        let _ = batch_request_builder.insert("anything", rpc_params![]);
958        let _ = batch_request_builder.insert("anything", rpc_params![]);
959        let result = client
960            .batch_request(batch_request_builder)
961            .await
962            .unwrap()
963            .into_ok()
964            .unwrap()
965            .collect::<Vec<String>>();
966        assert_eq!(result, vec!["ok", "ok", "ok"]);
967    }
968
969    #[tokio::test]
970    async fn test_ipc_modules() {
971        reth_tracing::init_test_tracing();
972        let endpoint = &dummy_name();
973        let server = Builder::default().build(endpoint.clone());
974        let mut module = RpcModule::new(());
975        let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
976        module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
977        let handle = server.start(module).await.unwrap();
978        tokio::spawn(handle.stopped());
979
980        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
981        let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
982        assert_eq!(response, msg);
983    }
984
985    #[tokio::test(flavor = "multi_thread")]
986    async fn test_rpc_subscription() {
987        let endpoint = &dummy_name();
988        let server = Builder::default().build(endpoint.clone());
989        let (tx, _rx) = broadcast::channel::<usize>(16);
990
991        let mut module = RpcModule::new(tx.clone());
992        std::thread::spawn(move || produce_items(tx));
993
994        module
995            .register_subscription(
996                "subscribe_hello",
997                "s_hello",
998                "unsubscribe_hello",
999                |_, pending, tx, _| async move {
1000                    let rx = tx.subscribe();
1001                    let stream = BroadcastStream::new(rx);
1002                    pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1003                    Ok(())
1004                },
1005            )
1006            .unwrap();
1007
1008        let handle = server.start(module).await.unwrap();
1009        tokio::spawn(handle.stopped());
1010
1011        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1012        let sub: Subscription<usize> =
1013            client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1014
1015        let items = sub.take(16).collect::<Vec<_>>().await;
1016        assert_eq!(items.len(), 16);
1017    }
1018
1019    #[tokio::test]
1020    async fn test_rpc_middleware() {
1021        #[derive(Clone)]
1022        struct ModifyRequestIf<S>(S);
1023
1024        impl<S> RpcServiceT for ModifyRequestIf<S>
1025        where
1026            S: Send + Sync + RpcServiceT,
1027        {
1028            type MethodResponse = S::MethodResponse;
1029            type NotificationResponse = S::NotificationResponse;
1030            type BatchResponse = S::BatchResponse;
1031
1032            fn call<'a>(
1033                &self,
1034                mut req: Request<'a>,
1035            ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
1036                // Re-direct all calls that isn't `say_hello` to `say_goodbye`
1037                if req.method == "say_hello" {
1038                    req.method = "say_goodbye".into();
1039                } else if req.method == "say_goodbye" {
1040                    req.method = "say_hello".into();
1041                }
1042
1043                self.0.call(req)
1044            }
1045
1046            fn batch<'a>(
1047                &self,
1048                mut batch: Batch<'a>,
1049            ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1050                for call in batch.iter_mut() {
1051                    match call {
1052                        Ok(BatchEntry::Call(req)) => {
1053                            if req.method == "say_hello" {
1054                                req.method = "say_goodbye".into();
1055                            } else if req.method == "say_goodbye" {
1056                                req.method = "say_hello".into();
1057                            }
1058                        }
1059                        Ok(BatchEntry::Notification(n)) => {
1060                            if n.method == "say_hello" {
1061                                n.method = "say_goodbye".into();
1062                            } else if n.method == "say_goodbye" {
1063                                n.method = "say_hello".into();
1064                            }
1065                        }
1066                        // Invalid request, we don't care about it.
1067                        Err(_err) => {}
1068                    }
1069                }
1070
1071                self.0.batch(batch)
1072            }
1073
1074            fn notification<'a>(
1075                &self,
1076                mut n: Notification<'a>,
1077            ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1078                if n.method == "say_hello" {
1079                    n.method = "say_goodbye".into();
1080                } else if n.method == "say_goodbye" {
1081                    n.method = "say_hello".into();
1082                }
1083                self.0.notification(n)
1084            }
1085        }
1086
1087        reth_tracing::init_test_tracing();
1088        let endpoint = &dummy_name();
1089
1090        let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1091        let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1092
1093        let mut module = RpcModule::new(());
1094        let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1095        let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1096        module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1097        module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1098        let handle = server.start(module).await.unwrap();
1099        tokio::spawn(handle.stopped());
1100
1101        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1102        let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1103        let say_goodbye_response: String =
1104            client.request("say_goodbye", rpc_params![]).await.unwrap();
1105
1106        assert_eq!(say_hello_response, goodbye_msg);
1107        assert_eq!(say_goodbye_response, hello_msg);
1108    }
1109}