reth_ipc/server/
mod.rs

1//! JSON-RPC IPC server implementation
2
3use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7    tokio::prelude::{LocalSocketListener, LocalSocketStream},
8    traits::tokio::{Listener, Stream},
9    GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12    core::TEN_MB_SIZE_BYTES,
13    server::{
14        middleware::rpc::{RpcLoggerLayer, RpcServiceT},
15        stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider,
16        ServerHandle, StopHandle,
17    },
18    BoundedSubscriptions, MethodSink, Methods,
19};
20use std::{
21    future::Future,
22    io,
23    pin::{pin, Pin},
24    sync::Arc,
25    task::{Context, Poll},
26};
27use tokio::{
28    io::{AsyncRead, AsyncWrite, AsyncWriteExt},
29    sync::oneshot,
30};
31use tower::{layer::util::Identity, Layer, Service};
32use tracing::{debug, instrument, trace, warn, Instrument};
33// re-export so can be used during builder setup
34use crate::{
35    server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
36    stream_codec::StreamCodec,
37};
38use tokio::sync::mpsc;
39use tokio_stream::wrappers::ReceiverStream;
40use tower::layer::{util::Stack, LayerFn};
41
42mod connection;
43mod ipc;
44mod rpc_service;
45
46pub use rpc_service::RpcService;
47
48/// Ipc Server implementation
49///
50/// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
51pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
52    /// The endpoint we listen for incoming transactions
53    endpoint: String,
54    id_provider: Arc<dyn IdProvider>,
55    cfg: Settings,
56    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
57    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
58}
59
60impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
61    /// Returns the configured endpoint
62    pub fn endpoint(&self) -> String {
63        self.endpoint.clone()
64    }
65}
66
67impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
68where
69    RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT<'a>> + Clone + Send + 'static,
70    HttpMiddleware: Layer<
71            TowerServiceNoHttp<RpcMiddleware>,
72            Service: Service<
73                String,
74                Response = Option<String>,
75                Error = Box<dyn core::error::Error + Send + Sync + 'static>,
76                Future: Send + Unpin,
77            > + Send,
78        > + Send
79        + 'static,
80{
81    /// Start responding to connections requests.
82    ///
83    /// This will run on the tokio runtime until the server is stopped or the `ServerHandle` is
84    /// dropped.
85    ///
86    /// ```
87    /// use jsonrpsee::RpcModule;
88    /// use reth_ipc::server::Builder;
89    /// async fn run_server() -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
90    ///     let server = Builder::default().build("/tmp/my-uds".into());
91    ///     let mut module = RpcModule::new(());
92    ///     module.register_method("say_hello", |_, _, _| "lo")?;
93    ///     let handle = server.start(module).await?;
94    ///
95    ///     // In this example we don't care about doing shutdown so let's it run forever.
96    ///     // You may use the `ServerHandle` to shut it down or manage it yourself.
97    ///     let server = tokio::spawn(handle.stopped());
98    ///     server.await.unwrap();
99    ///     Ok(())
100    /// }
101    /// ```
102    pub async fn start(
103        mut self,
104        methods: impl Into<Methods>,
105    ) -> Result<ServerHandle, IpcServerStartError> {
106        let methods = methods.into();
107
108        let (stop_handle, server_handle) = stop_channel();
109
110        // use a signal channel to wait until we're ready to accept connections
111        let (tx, rx) = oneshot::channel();
112
113        match self.cfg.tokio_runtime.take() {
114            Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
115            None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
116        };
117        rx.await.expect("channel is open")?;
118
119        Ok(server_handle)
120    }
121
122    async fn start_inner(
123        self,
124        methods: Methods,
125        stop_handle: StopHandle,
126        on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
127    ) {
128        trace!(endpoint = ?self.endpoint, "starting ipc server");
129
130        if cfg!(unix) {
131            // ensure the file does not exist
132            if std::fs::remove_file(&self.endpoint).is_ok() {
133                debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
134            }
135        }
136
137        let listener = match self
138            .endpoint
139            .as_str()
140            .to_fs_name::<GenericFilePath>()
141            .and_then(|name| ListenerOptions::new().name(name).create_tokio())
142        {
143            Ok(listener) => listener,
144            Err(err) => {
145                on_ready
146                    .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
147                    .ok();
148                return;
149            }
150        };
151
152        // signal that we're ready to accept connections
153        on_ready.send(Ok(())).ok();
154
155        let mut id: u32 = 0;
156        let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
157
158        let stopped = stop_handle.clone().shutdown();
159        let mut stopped = pin!(stopped);
160
161        let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
162
163        trace!("accepting ipc connections");
164        loop {
165            match try_accept_conn(&listener, stopped).await {
166                AcceptConnection::Established { local_socket_stream, stop } => {
167                    let Some(conn_permit) = connection_guard.try_acquire() else {
168                        let (_reader, mut writer) = local_socket_stream.split();
169                        let _ = writer
170                            .write_all(b"Too many connections. Please try again later.")
171                            .await;
172                        stopped = stop;
173                        continue;
174                    };
175
176                    let max_conns = connection_guard.max_connections();
177                    let curr_conns = max_conns - connection_guard.available_connections();
178                    trace!("Accepting new connection {}/{}", curr_conns, max_conns);
179
180                    let conn_permit = Arc::new(conn_permit);
181
182                    process_connection(ProcessConnection {
183                        http_middleware: &self.http_middleware,
184                        rpc_middleware: self.rpc_middleware.clone(),
185                        conn_permit,
186                        conn_id: id,
187                        server_cfg: self.cfg.clone(),
188                        stop_handle: stop_handle.clone(),
189                        drop_on_completion: drop_on_completion.clone(),
190                        methods: methods.clone(),
191                        id_provider: self.id_provider.clone(),
192                        local_socket_stream,
193                    });
194
195                    id = id.wrapping_add(1);
196                    stopped = stop;
197                }
198                AcceptConnection::Shutdown => {
199                    break;
200                }
201                AcceptConnection::Err((err, stop)) => {
202                    tracing::error!(%err, "Failed accepting a new IPC connection");
203                    stopped = stop;
204                }
205            }
206        }
207
208        // Drop the last Sender
209        drop(drop_on_completion);
210
211        // Once this channel is closed it is safe to assume that all connections have been
212        // gracefully shutdown
213        while process_connection_awaiter.recv().await.is_some() {
214            // Generally, messages should not be sent across this channel,
215            // but we'll loop here to wait for `None` just to be on the safe side
216        }
217    }
218}
219
220enum AcceptConnection<S> {
221    Shutdown,
222    Established { local_socket_stream: LocalSocketStream, stop: S },
223    Err((io::Error, S)),
224}
225
226async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
227where
228    S: Future + Unpin,
229{
230    match futures_util::future::select(pin!(listener.accept()), stopped).await {
231        Either::Left((res, stop)) => match res {
232            Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
233            Err(e) => AcceptConnection::Err((e, stop)),
234        },
235        Either::Right(_) => AcceptConnection::Shutdown,
236    }
237}
238
239impl std::fmt::Debug for IpcServer {
240    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241        f.debug_struct("IpcServer")
242            .field("endpoint", &self.endpoint)
243            .field("cfg", &self.cfg)
244            .field("id_provider", &self.id_provider)
245            .finish()
246    }
247}
248
249/// Error thrown when server couldn't be started.
250#[derive(Debug, thiserror::Error)]
251#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
252pub struct IpcServerStartError {
253    endpoint: String,
254    #[source]
255    source: io::Error,
256}
257
258/// Data required by the server to handle requests received via an IPC connection
259#[derive(Debug, Clone)]
260#[expect(dead_code)]
261pub(crate) struct ServiceData {
262    /// Registered server methods.
263    pub(crate) methods: Methods,
264    /// Subscription ID provider.
265    pub(crate) id_provider: Arc<dyn IdProvider>,
266    /// Stop handle.
267    pub(crate) stop_handle: StopHandle,
268    /// Connection ID
269    pub(crate) conn_id: u32,
270    /// Connection Permit.
271    pub(crate) conn_permit: Arc<ConnectionPermit>,
272    /// Limits the number of subscriptions for this connection
273    pub(crate) bounded_subscriptions: BoundedSubscriptions,
274    /// Sink that is used to send back responses to the connection.
275    ///
276    /// This is used for subscriptions.
277    pub(crate) method_sink: MethodSink,
278    /// `ServerConfig`
279    pub(crate) server_cfg: Settings,
280}
281
282/// Similar to [`tower::ServiceBuilder`] but doesn't
283/// support any tower middleware implementations.
284#[derive(Debug, Clone)]
285pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
286
287impl Default for RpcServiceBuilder<Identity> {
288    fn default() -> Self {
289        Self(tower::ServiceBuilder::new())
290    }
291}
292
293impl RpcServiceBuilder<Identity> {
294    /// Create a new [`RpcServiceBuilder`].
295    pub fn new() -> Self {
296        Self(tower::ServiceBuilder::new())
297    }
298}
299
300impl<L> RpcServiceBuilder<L> {
301    /// Optionally add a new layer `T` to the [`RpcServiceBuilder`].
302    ///
303    /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details.
304    pub fn option_layer<T>(
305        self,
306        layer: Option<T>,
307    ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
308        let layer = if let Some(layer) = layer {
309            Either::Left(layer)
310        } else {
311            Either::Right(Identity::new())
312        };
313        self.layer(layer)
314    }
315
316    /// Add a new layer `T` to the [`RpcServiceBuilder`].
317    ///
318    /// See the documentation for [`tower::ServiceBuilder::layer`] for more details.
319    pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
320        RpcServiceBuilder(self.0.layer(layer))
321    }
322
323    /// Add a [`tower::Layer`] built from a function that accepts a service and returns another
324    /// service.
325    ///
326    /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details.
327    pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
328        RpcServiceBuilder(self.0.layer_fn(f))
329    }
330
331    /// Add a logging layer to [`RpcServiceBuilder`]
332    ///
333    /// This logs each request and response for every call.
334    pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
335        RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
336    }
337
338    /// Wrap the service `S` with the middleware.
339    pub(crate) fn service<S>(&self, service: S) -> L::Service
340    where
341        L: tower::Layer<S>,
342    {
343        self.0.service(service)
344    }
345}
346
347/// `JsonRPSee` service compatible with `tower`.
348///
349/// # Note
350/// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html).
351#[derive(Debug, Clone)]
352pub struct TowerServiceNoHttp<L> {
353    inner: ServiceData,
354    rpc_middleware: RpcServiceBuilder<L>,
355}
356
357impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
358where
359    RpcMiddleware: for<'a> Layer<RpcService>,
360    for<'a> <RpcMiddleware as Layer<RpcService>>::Service: Send + Sync + 'static + RpcServiceT<'a>,
361{
362    /// The response of a handled RPC call
363    ///
364    /// This is an `Option` because subscriptions and call responses are handled differently.
365    /// This will be `Some` for calls, and `None` for subscriptions, because the subscription
366    /// response will be emitted via the `method_sink`.
367    type Response = Option<String>;
368
369    type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
370
371    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
372
373    /// Opens door for back pressure implementation.
374    fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375        Poll::Ready(Ok(()))
376    }
377
378    fn call(&mut self, request: String) -> Self::Future {
379        trace!("{:?}", request);
380
381        let cfg = RpcServiceCfg::CallsAndSubscriptions {
382            bounded_subscriptions: BoundedSubscriptions::new(
383                self.inner.server_cfg.max_subscriptions_per_connection,
384            ),
385            id_provider: self.inner.id_provider.clone(),
386            sink: self.inner.method_sink.clone(),
387        };
388
389        let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
390        let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
391        let conn = self.inner.conn_permit.clone();
392        let rpc_service = self.rpc_middleware.service(RpcService::new(
393            self.inner.methods.clone(),
394            max_response_body_size,
395            self.inner.conn_id.into(),
396            cfg,
397        ));
398        // an ipc connection needs to handle read+write concurrently
399        // even if the underlying rpc handler spawns the actual work or is does a lot of async any
400        // additional overhead performed by `handle_request` can result in I/O latencies, for
401        // example tracing calls are relatively CPU expensive on serde::serialize alone, moving this
402        // work to a separate task takes the pressure off the connection so all concurrent responses
403        // are also serialized concurrently and the connection can focus on read+write
404        let f = tokio::task::spawn(async move {
405            ipc::call_with_service(
406                request,
407                rpc_service,
408                max_response_body_size,
409                max_request_body_size,
410                conn,
411            )
412            .await
413        });
414
415        Box::pin(async move { f.await.map_err(|err| err.into()) })
416    }
417}
418
419struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
420    http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
421    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
422    conn_permit: Arc<ConnectionPermit>,
423    conn_id: u32,
424    server_cfg: Settings,
425    stop_handle: StopHandle,
426    drop_on_completion: mpsc::Sender<()>,
427    methods: Methods,
428    id_provider: Arc<dyn IdProvider>,
429    local_socket_stream: LocalSocketStream,
430}
431
432/// Spawns the IPC connection onto a new task
433#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
434fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
435    params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
436) where
437    RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
438    for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT<'a>,
439    HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
440    <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
441    + Service<
442        String,
443        Response = Option<String>,
444        Error = Box<dyn core::error::Error + Send + Sync + 'static>,
445    >,
446    <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
447    Send + Unpin,
448 {
449    let ProcessConnection {
450        http_middleware,
451        rpc_middleware,
452        conn_permit,
453        conn_id,
454        server_cfg,
455        stop_handle,
456        drop_on_completion,
457        id_provider,
458        methods,
459        local_socket_stream,
460    } = params;
461
462    let ipc = IpcConn(tokio_util::codec::Decoder::framed(
463        StreamCodec::stream_incoming(),
464        local_socket_stream,
465    ));
466
467    let (tx, rx) = mpsc::channel::<String>(server_cfg.message_buffer_capacity as usize);
468    let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
469    let tower_service = TowerServiceNoHttp {
470        inner: ServiceData {
471            methods,
472            id_provider,
473            stop_handle: stop_handle.clone(),
474            server_cfg: server_cfg.clone(),
475            conn_id,
476            conn_permit,
477            bounded_subscriptions: BoundedSubscriptions::new(
478                server_cfg.max_subscriptions_per_connection,
479            ),
480            method_sink,
481        },
482        rpc_middleware,
483    };
484
485    let service = http_middleware.service(tower_service);
486    tokio::spawn(async {
487        to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
488        drop(drop_on_completion)
489    });
490}
491
492async fn to_ipc_service<S, T>(
493    ipc: IpcConn<JsonRpcStream<T>>,
494    service: S,
495    stop_handle: StopHandle,
496    rx: mpsc::Receiver<String>,
497) where
498    S: Service<String, Response = Option<String>> + Send + 'static,
499    S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
500    S::Future: Send + Unpin,
501    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
502{
503    let rx_item = ReceiverStream::new(rx);
504    let conn = IpcConnDriver {
505        conn: ipc,
506        service,
507        pending_calls: Default::default(),
508        items: Default::default(),
509    };
510    let stopped = stop_handle.shutdown();
511
512    let mut conn = pin!(conn);
513    let mut rx_item = pin!(rx_item);
514    let mut stopped = pin!(stopped);
515
516    loop {
517        tokio::select! {
518            _ = &mut conn => {
519               break
520            }
521            item = rx_item.next() => {
522                if let Some(item) = item {
523                    conn.push_back(item);
524                }
525            }
526            _ = &mut stopped => {
527                // shutdown
528                break
529            }
530        }
531    }
532}
533
534/// JSON-RPC IPC server settings.
535#[derive(Debug, Clone)]
536pub struct Settings {
537    /// Maximum size in bytes of a request.
538    max_request_body_size: u32,
539    /// Maximum size in bytes of a response.
540    max_response_body_size: u32,
541    /// Max length for logging for requests and responses
542    ///
543    /// Logs bigger than this limit will be truncated.
544    max_log_length: u32,
545    /// Maximum number of incoming connections allowed.
546    max_connections: u32,
547    /// Maximum number of subscriptions per connection.
548    max_subscriptions_per_connection: u32,
549    /// Number of messages that server is allowed `buffer` until backpressure kicks in.
550    message_buffer_capacity: u32,
551    /// Custom tokio runtime to run the server on.
552    tokio_runtime: Option<tokio::runtime::Handle>,
553}
554
555impl Default for Settings {
556    fn default() -> Self {
557        Self {
558            max_request_body_size: TEN_MB_SIZE_BYTES,
559            max_response_body_size: TEN_MB_SIZE_BYTES,
560            max_log_length: 4096,
561            max_connections: 100,
562            max_subscriptions_per_connection: 1024,
563            message_buffer_capacity: 1024,
564            tokio_runtime: None,
565        }
566    }
567}
568
569/// Builder to configure and create a JSON-RPC server
570#[derive(Debug)]
571pub struct Builder<HttpMiddleware, RpcMiddleware> {
572    settings: Settings,
573    /// Subscription ID provider.
574    id_provider: Arc<dyn IdProvider>,
575    rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
576    http_middleware: tower::ServiceBuilder<HttpMiddleware>,
577}
578
579impl Default for Builder<Identity, Identity> {
580    fn default() -> Self {
581        Self {
582            settings: Settings::default(),
583            id_provider: Arc::new(RandomIntegerIdProvider),
584            rpc_middleware: RpcServiceBuilder::new(),
585            http_middleware: tower::ServiceBuilder::new(),
586        }
587    }
588}
589
590impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
591    /// Set the maximum size of a request body in bytes. Default is 10 MiB.
592    pub const fn max_request_body_size(mut self, size: u32) -> Self {
593        self.settings.max_request_body_size = size;
594        self
595    }
596
597    /// Set the maximum size of a response body in bytes. Default is 10 MiB.
598    pub const fn max_response_body_size(mut self, size: u32) -> Self {
599        self.settings.max_response_body_size = size;
600        self
601    }
602
603    /// Set the maximum size of a log
604    pub const fn max_log_length(mut self, size: u32) -> Self {
605        self.settings.max_log_length = size;
606        self
607    }
608
609    /// Set the maximum number of connections allowed. Default is 100.
610    pub const fn max_connections(mut self, max: u32) -> Self {
611        self.settings.max_connections = max;
612        self
613    }
614
615    /// Set the maximum number of connections allowed. Default is 1024.
616    pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
617        self.settings.max_subscriptions_per_connection = max;
618        self
619    }
620
621    /// The server enforces backpressure which means that
622    /// `n` messages can be buffered and if the client
623    /// can't keep with up the server.
624    ///
625    /// This `capacity` is applied per connection and
626    /// applies globally on the connection which implies
627    /// all JSON-RPC messages.
628    ///
629    /// For example if a subscription produces plenty of new items
630    /// and the client can't keep up then no new messages are handled.
631    ///
632    /// If this limit is exceeded then the server will "back-off"
633    /// and only accept new messages once the client reads pending messages.
634    ///
635    /// # Panics
636    ///
637    /// Panics if the buffer capacity is 0.
638    pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
639        self.settings.message_buffer_capacity = c;
640        self
641    }
642
643    /// Configure a custom [`tokio::runtime::Handle`] to run the server on.
644    ///
645    /// Default: [`tokio::spawn`]
646    pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
647        self.settings.tokio_runtime = Some(rt);
648        self
649    }
650
651    /// Configure custom `subscription ID` provider for the server to use
652    /// to when getting new subscription calls.
653    ///
654    /// You may choose static dispatch or dynamic dispatch because
655    /// `IdProvider` is implemented for `Box<T>`.
656    ///
657    /// Default: [`RandomIntegerIdProvider`].
658    ///
659    /// # Examples
660    ///
661    /// ```rust
662    /// use jsonrpsee::server::RandomStringIdProvider;
663    /// use reth_ipc::server::Builder;
664    ///
665    /// // static dispatch
666    /// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16));
667    ///
668    /// // or dynamic dispatch
669    /// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
670    /// ```
671    pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
672        self.id_provider = Arc::new(id_provider);
673        self
674    }
675
676    /// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied
677    /// to the RPC service.
678    ///
679    /// Default: No tower layers are applied to the RPC service.
680    ///
681    /// # Examples
682    ///
683    /// ```rust
684    /// #[tokio::main]
685    /// async fn main() {
686    ///     let builder = tower::ServiceBuilder::new();
687    ///     let server = reth_ipc::server::Builder::default()
688    ///         .set_http_middleware(builder)
689    ///         .build("/tmp/my-uds".into());
690    /// }
691    /// ```
692    pub fn set_http_middleware<T>(
693        self,
694        service_builder: tower::ServiceBuilder<T>,
695    ) -> Builder<T, RpcMiddleware> {
696        Builder {
697            settings: self.settings,
698            id_provider: self.id_provider,
699            http_middleware: service_builder,
700            rpc_middleware: self.rpc_middleware,
701        }
702    }
703
704    /// Enable middleware that is invoked on every JSON-RPC call.
705    ///
706    /// The middleware itself is very similar to the `tower middleware` but
707    /// it has a different service trait which takes &self instead &mut self
708    /// which means that you can't use built-in middleware from tower.
709    ///
710    /// Another consequence of `&self` is that you must wrap any of the middleware state in
711    /// a type which is Send and provides interior mutability such `Arc<Mutex>`.
712    ///
713    /// The builder itself exposes a similar API as the [`tower::ServiceBuilder`]
714    /// where it is possible to compose layers to the middleware.
715    ///
716    /// ```
717    /// use std::{
718    ///     net::SocketAddr,
719    ///     sync::{
720    ///         atomic::{AtomicUsize, Ordering},
721    ///         Arc,
722    ///     },
723    ///     time::Instant,
724    /// };
725    ///
726    /// use futures_util::future::BoxFuture;
727    /// use jsonrpsee::{
728    ///     server::{middleware::rpc::RpcServiceT, ServerBuilder},
729    ///     types::Request,
730    ///     MethodResponse,
731    /// };
732    /// use reth_ipc::server::{Builder, RpcServiceBuilder};
733    ///
734    /// #[derive(Clone)]
735    /// struct MyMiddleware<S> {
736    ///     service: S,
737    ///     count: Arc<AtomicUsize>,
738    /// }
739    ///
740    /// impl<'a, S> RpcServiceT<'a> for MyMiddleware<S>
741    /// where
742    ///     S: RpcServiceT<'a> + Send + Sync + Clone + 'static,
743    /// {
744    ///     type Future = BoxFuture<'a, MethodResponse>;
745    ///
746    ///     fn call(&self, req: Request<'a>) -> Self::Future {
747    ///         tracing::info!("MyMiddleware processed call {}", req.method);
748    ///         let count = self.count.clone();
749    ///         let service = self.service.clone();
750    ///
751    ///         Box::pin(async move {
752    ///             let rp = service.call(req).await;
753    ///             // Modify the state.
754    ///             count.fetch_add(1, Ordering::Relaxed);
755    ///             rp
756    ///         })
757    ///     }
758    /// }
759    ///
760    /// // Create a state per connection
761    /// // NOTE: The service type can be omitted once `start` is called on the server.
762    /// let m = RpcServiceBuilder::new().layer_fn(move |service: ()| MyMiddleware {
763    ///     service,
764    ///     count: Arc::new(AtomicUsize::new(0)),
765    /// });
766    /// let builder = Builder::default().set_rpc_middleware(m);
767    /// ```
768    pub fn set_rpc_middleware<T>(
769        self,
770        rpc_middleware: RpcServiceBuilder<T>,
771    ) -> Builder<HttpMiddleware, T> {
772        Builder {
773            settings: self.settings,
774            id_provider: self.id_provider,
775            rpc_middleware,
776            http_middleware: self.http_middleware,
777        }
778    }
779
780    /// Finalize the configuration of the server. Consumes the [`Builder`].
781    pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
782        IpcServer {
783            endpoint,
784            cfg: self.settings,
785            id_provider: self.id_provider,
786            http_middleware: self.http_middleware,
787            rpc_middleware: self.rpc_middleware,
788        }
789    }
790}
791
792#[cfg(test)]
793#[expect(missing_docs)]
794pub fn dummy_name() -> String {
795    use rand::Rng;
796    let num: u64 = rand::rng().random();
797    if cfg!(windows) {
798        format!(r"\\.\pipe\my-pipe-{}", num)
799    } else {
800        format!(r"/tmp/my-uds-{}", num)
801    }
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807    use crate::client::IpcClientBuilder;
808    use futures::future::select;
809    use jsonrpsee::{
810        core::{
811            client,
812            client::{ClientT, Error, Subscription, SubscriptionClientT},
813            params::BatchRequestBuilder,
814        },
815        rpc_params,
816        types::Request,
817        PendingSubscriptionSink, RpcModule, SubscriptionMessage,
818    };
819    use reth_tracing::init_test_tracing;
820    use std::pin::pin;
821    use tokio::sync::broadcast;
822    use tokio_stream::wrappers::BroadcastStream;
823
824    async fn pipe_from_stream_with_bounded_buffer(
825        pending: PendingSubscriptionSink,
826        stream: BroadcastStream<usize>,
827    ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
828        let sink = pending.accept().await.unwrap();
829        let closed = sink.closed();
830
831        let mut closed = pin!(closed);
832        let mut stream = pin!(stream);
833
834        loop {
835            match select(closed, stream.next()).await {
836                // subscription closed or stream is closed.
837                Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
838
839                // received new item from the stream.
840                Either::Right((Some(Ok(item)), c)) => {
841                    let notif = SubscriptionMessage::from_json(&item)?;
842
843                    // NOTE: this will block until there a spot in the queue
844                    // and you might want to do something smarter if it's
845                    // critical that "the most recent item" must be sent when it is produced.
846                    if sink.send(notif).await.is_err() {
847                        break Ok(());
848                    }
849
850                    closed = c;
851                }
852
853                // Send back the error.
854                Either::Right((Some(Err(e)), _)) => break Err(e.into()),
855            }
856        }
857    }
858
859    // Naive example that broadcasts the produced values to all active subscribers.
860    fn produce_items(tx: broadcast::Sender<usize>) {
861        for c in 1..=100 {
862            std::thread::sleep(std::time::Duration::from_millis(1));
863            let _ = tx.send(c);
864        }
865    }
866
867    #[tokio::test]
868    async fn can_set_the_max_response_body_size() {
869        // init_test_tracing();
870        let endpoint = &dummy_name();
871        let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
872        let mut module = RpcModule::new(());
873        module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
874        let handle = server.start(module).await.unwrap();
875        tokio::spawn(handle.stopped());
876
877        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
878        let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
879        assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
880    }
881
882    #[tokio::test]
883    async fn can_set_the_max_request_body_size() {
884        init_test_tracing();
885        let endpoint = &dummy_name();
886        let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
887        let mut module = RpcModule::new(());
888        module.register_method("anything", |_, _, _| "succeed").unwrap();
889        let handle = server.start(module).await.unwrap();
890        tokio::spawn(handle.stopped());
891
892        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
893        let response: Result<String, Error> =
894            client.request("anything", rpc_params!["a".repeat(101)]).await;
895        assert!(response.is_err());
896        let mut batch_request_builder = BatchRequestBuilder::new();
897        let _ = batch_request_builder.insert("anything", rpc_params![]);
898        let _ = batch_request_builder.insert("anything", rpc_params![]);
899        let _ = batch_request_builder.insert("anything", rpc_params![]);
900        // the raw request string is:
901        //  [{"jsonrpc":"2.0","id":0,"method":"anything"},{"jsonrpc":"2.0","id":1, \
902        //    "method":"anything"},{"jsonrpc":"2.0","id":2,"method":"anything"}]"
903        // which is 136 bytes, more than 100 bytes.
904        let response: Result<client::BatchResponse<'_, String>, Error> =
905            client.batch_request(batch_request_builder).await;
906        assert!(response.is_err());
907    }
908
909    #[tokio::test]
910    async fn can_set_max_connections() {
911        init_test_tracing();
912
913        let endpoint = &dummy_name();
914        let server = Builder::default().max_connections(2).build(endpoint.clone());
915        let mut module = RpcModule::new(());
916        module.register_method("anything", |_, _, _| "succeed").unwrap();
917        let handle = server.start(module).await.unwrap();
918        tokio::spawn(handle.stopped());
919
920        let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
921        let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
922        let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
923
924        let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
925        let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
926        let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
927
928        assert!(response1.is_ok());
929        assert!(response2.is_ok());
930        // Third connection is rejected
931        assert!(response3.is_err());
932
933        // Decrement connection count
934        drop(client2);
935        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
936
937        // Can connect again
938        let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
939        let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
940        assert!(response4.is_ok());
941    }
942
943    #[tokio::test]
944    async fn test_rpc_request() {
945        init_test_tracing();
946        let endpoint = &dummy_name();
947        let server = Builder::default().build(endpoint.clone());
948        let mut module = RpcModule::new(());
949        let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
950        module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
951        let handle = server.start(module).await.unwrap();
952        tokio::spawn(handle.stopped());
953
954        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
955        let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
956        assert_eq!(response, msg);
957    }
958
959    #[tokio::test]
960    async fn test_batch_request() {
961        let endpoint = &dummy_name();
962        let server = Builder::default().build(endpoint.clone());
963        let mut module = RpcModule::new(());
964        module.register_method("anything", |_, _, _| "ok").unwrap();
965        let handle = server.start(module).await.unwrap();
966        tokio::spawn(handle.stopped());
967
968        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
969        let mut batch_request_builder = BatchRequestBuilder::new();
970        let _ = batch_request_builder.insert("anything", rpc_params![]);
971        let _ = batch_request_builder.insert("anything", rpc_params![]);
972        let _ = batch_request_builder.insert("anything", rpc_params![]);
973        let result = client
974            .batch_request(batch_request_builder)
975            .await
976            .unwrap()
977            .into_ok()
978            .unwrap()
979            .collect::<Vec<String>>();
980        assert_eq!(result, vec!["ok", "ok", "ok"]);
981    }
982
983    #[tokio::test]
984    async fn test_ipc_modules() {
985        reth_tracing::init_test_tracing();
986        let endpoint = &dummy_name();
987        let server = Builder::default().build(endpoint.clone());
988        let mut module = RpcModule::new(());
989        let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
990        module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
991        let handle = server.start(module).await.unwrap();
992        tokio::spawn(handle.stopped());
993
994        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
995        let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
996        assert_eq!(response, msg);
997    }
998
999    #[tokio::test(flavor = "multi_thread")]
1000    async fn test_rpc_subscription() {
1001        let endpoint = &dummy_name();
1002        let server = Builder::default().build(endpoint.clone());
1003        let (tx, _rx) = broadcast::channel::<usize>(16);
1004
1005        let mut module = RpcModule::new(tx.clone());
1006        std::thread::spawn(move || produce_items(tx));
1007
1008        module
1009            .register_subscription(
1010                "subscribe_hello",
1011                "s_hello",
1012                "unsubscribe_hello",
1013                |_, pending, tx, _| async move {
1014                    let rx = tx.subscribe();
1015                    let stream = BroadcastStream::new(rx);
1016                    pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1017                    Ok(())
1018                },
1019            )
1020            .unwrap();
1021
1022        let handle = server.start(module).await.unwrap();
1023        tokio::spawn(handle.stopped());
1024
1025        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1026        let sub: Subscription<usize> =
1027            client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1028
1029        let items = sub.take(16).collect::<Vec<_>>().await;
1030        assert_eq!(items.len(), 16);
1031    }
1032
1033    #[tokio::test]
1034    async fn test_rpc_middleware() {
1035        #[derive(Clone)]
1036        struct ModifyRequestIf<S>(S);
1037
1038        impl<'a, S> RpcServiceT<'a> for ModifyRequestIf<S>
1039        where
1040            S: Send + Sync + RpcServiceT<'a>,
1041        {
1042            type Future = S::Future;
1043
1044            fn call(&self, mut req: Request<'a>) -> Self::Future {
1045                // Re-direct all calls that isn't `say_hello` to `say_goodbye`
1046                if req.method == "say_hello" {
1047                    req.method = "say_goodbye".into();
1048                } else if req.method == "say_goodbye" {
1049                    req.method = "say_hello".into();
1050                }
1051
1052                self.0.call(req)
1053            }
1054        }
1055
1056        reth_tracing::init_test_tracing();
1057        let endpoint = &dummy_name();
1058
1059        let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1060        let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1061
1062        let mut module = RpcModule::new(());
1063        let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1064        let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1065        module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1066        module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1067        let handle = server.start(module).await.unwrap();
1068        tokio::spawn(handle.stopped());
1069
1070        let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1071        let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1072        let say_goodbye_response: String =
1073            client.request("say_goodbye", rpc_params![]).await.unwrap();
1074
1075        assert_eq!(say_hello_response, goodbye_msg);
1076        assert_eq!(say_goodbye_response, hello_msg);
1077    }
1078}