1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
13 server::{
14 middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
15 RandomIntegerIdProvider, ServerHandle, StopHandle,
16 },
17 BoundedSubscriptions, MethodResponse, MethodSink, Methods,
18};
19use std::{
20 future::Future,
21 io,
22 pin::{pin, Pin},
23 sync::Arc,
24 task::{Context, Poll},
25};
26use tokio::{
27 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
28 sync::oneshot,
29};
30use tower::{layer::util::Identity, Layer, Service};
31use tracing::{debug, instrument, trace, warn, Instrument};
32use crate::{
34 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
35 stream_codec::StreamCodec,
36};
37use tokio::sync::mpsc;
38use tokio_stream::wrappers::ReceiverStream;
39use tower::layer::{util::Stack, LayerFn};
40
41mod connection;
42mod ipc;
43mod rpc_service;
44
45pub use rpc_service::RpcService;
46
47pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
51 endpoint: String,
53 id_provider: Arc<dyn IdProvider>,
54 cfg: Settings,
55 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
56 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
57}
58
59impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
60 pub fn endpoint(&self) -> String {
62 self.endpoint.clone()
63 }
64}
65
66impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
67where
68 RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
69 HttpMiddleware: Layer<
70 TowerServiceNoHttp<RpcMiddleware>,
71 Service: Service<
72 String,
73 Response = Option<String>,
74 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
75 Future: Send + Unpin,
76 > + Send,
77 > + Send
78 + 'static,
79{
80 pub async fn start(
102 mut self,
103 methods: impl Into<Methods>,
104 ) -> Result<ServerHandle, IpcServerStartError> {
105 let methods = methods.into();
106
107 let (stop_handle, server_handle) = stop_channel();
108
109 let (tx, rx) = oneshot::channel();
111
112 match self.cfg.tokio_runtime.take() {
113 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
114 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
115 };
116 rx.await.expect("channel is open")?;
117
118 Ok(server_handle)
119 }
120
121 async fn start_inner(
122 self,
123 methods: Methods,
124 stop_handle: StopHandle,
125 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
126 ) {
127 trace!(endpoint = ?self.endpoint, "starting ipc server");
128
129 if cfg!(unix) {
130 if std::fs::remove_file(&self.endpoint).is_ok() {
132 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
133 }
134 }
135
136 let listener = match self
137 .endpoint
138 .as_str()
139 .to_fs_name::<GenericFilePath>()
140 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
141 {
142 Ok(listener) => {
143 #[cfg(unix)]
144 {
145 use std::os::unix::fs::PermissionsExt;
147 if let Some(perms_str) = &self.cfg.ipc_socket_permissions &&
148 let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8)
149 {
150 let perms = std::fs::Permissions::from_mode(mode);
151 let _ = std::fs::set_permissions(&self.endpoint, perms);
152 }
153 }
154 listener
155 }
156 Err(err) => {
157 on_ready
158 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
159 .ok();
160 return;
161 }
162 };
163
164 on_ready.send(Ok(())).ok();
166
167 let mut id: u32 = 0;
168 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
169
170 let stopped = stop_handle.clone().shutdown();
171 let mut stopped = pin!(stopped);
172
173 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
174
175 trace!("accepting ipc connections");
176 loop {
177 match try_accept_conn(&listener, stopped).await {
178 AcceptConnection::Established { local_socket_stream, stop } => {
179 let Some(conn_permit) = connection_guard.try_acquire() else {
180 let (_reader, mut writer) = local_socket_stream.split();
181 let _ = writer
182 .write_all(b"Too many connections. Please try again later.")
183 .await;
184 stopped = stop;
185 continue;
186 };
187
188 let max_conns = connection_guard.max_connections();
189 let curr_conns = max_conns - connection_guard.available_connections();
190 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
191
192 let conn_permit = Arc::new(conn_permit);
193
194 process_connection(ProcessConnection {
195 http_middleware: &self.http_middleware,
196 rpc_middleware: self.rpc_middleware.clone(),
197 conn_permit,
198 conn_id: id,
199 server_cfg: self.cfg.clone(),
200 stop_handle: stop_handle.clone(),
201 drop_on_completion: drop_on_completion.clone(),
202 methods: methods.clone(),
203 id_provider: self.id_provider.clone(),
204 local_socket_stream,
205 });
206
207 id = id.wrapping_add(1);
208 stopped = stop;
209 }
210 AcceptConnection::Shutdown => {
211 break;
212 }
213 AcceptConnection::Err((err, stop)) => {
214 tracing::error!(%err, "Failed accepting a new IPC connection");
215 stopped = stop;
216 }
217 }
218 }
219
220 drop(drop_on_completion);
222
223 while process_connection_awaiter.recv().await.is_some() {
226 }
229 }
230}
231
232enum AcceptConnection<S> {
233 Shutdown,
234 Established { local_socket_stream: LocalSocketStream, stop: S },
235 Err((io::Error, S)),
236}
237
238async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
239where
240 S: Future + Unpin,
241{
242 match futures_util::future::select(pin!(listener.accept()), stopped).await {
243 Either::Left((res, stop)) => match res {
244 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
245 Err(e) => AcceptConnection::Err((e, stop)),
246 },
247 Either::Right(_) => AcceptConnection::Shutdown,
248 }
249}
250
251impl<HttpMiddleware, RpcMiddleware> std::fmt::Debug for IpcServer<HttpMiddleware, RpcMiddleware> {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 f.debug_struct("IpcServer")
254 .field("endpoint", &self.endpoint)
255 .field("cfg", &self.cfg)
256 .field("id_provider", &self.id_provider)
257 .finish()
258 }
259}
260
261#[derive(Debug, thiserror::Error)]
263#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
264pub struct IpcServerStartError {
265 endpoint: String,
266 #[source]
267 source: io::Error,
268}
269
270#[derive(Debug, Clone)]
272#[allow(dead_code)]
273pub(crate) struct ServiceData {
274 pub(crate) methods: Methods,
276 pub(crate) id_provider: Arc<dyn IdProvider>,
278 pub(crate) stop_handle: StopHandle,
280 pub(crate) conn_id: u32,
282 pub(crate) conn_permit: Arc<ConnectionPermit>,
284 pub(crate) bounded_subscriptions: BoundedSubscriptions,
286 pub(crate) method_sink: MethodSink,
290 pub(crate) server_cfg: Settings,
292}
293
294#[derive(Debug, Clone)]
297pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
298
299impl Default for RpcServiceBuilder<Identity> {
300 fn default() -> Self {
301 Self(tower::ServiceBuilder::new())
302 }
303}
304
305impl RpcServiceBuilder<Identity> {
306 pub const fn new() -> Self {
308 Self(tower::ServiceBuilder::new())
309 }
310}
311
312impl<L> RpcServiceBuilder<L> {
313 pub fn option_layer<T>(
317 self,
318 layer: Option<T>,
319 ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
320 let layer = if let Some(layer) = layer {
321 Either::Left(layer)
322 } else {
323 Either::Right(Identity::new())
324 };
325 self.layer(layer)
326 }
327
328 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
332 RpcServiceBuilder(self.0.layer(layer))
333 }
334
335 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
340 RpcServiceBuilder(self.0.layer_fn(f))
341 }
342
343 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
347 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
348 }
349
350 pub(crate) fn service<S>(&self, service: S) -> L::Service
352 where
353 L: tower::Layer<S>,
354 {
355 self.0.service(service)
356 }
357}
358
359#[derive(Debug, Clone)]
364pub struct TowerServiceNoHttp<L> {
365 inner: ServiceData,
366 rpc_middleware: RpcServiceBuilder<L>,
367}
368
369impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
370where
371 RpcMiddleware: for<'a> Layer<RpcService>,
372 for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
373 Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
374{
375 type Response = Option<String>;
381
382 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
383
384 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
385
386 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
388 Poll::Ready(Ok(()))
389 }
390
391 fn call(&mut self, request: String) -> Self::Future {
392 trace!("{:?}", request);
393
394 let cfg = RpcServiceCfg {
395 bounded_subscriptions: BoundedSubscriptions::new(
396 self.inner.server_cfg.max_subscriptions_per_connection,
397 ),
398 id_provider: self.inner.id_provider.clone(),
399 sink: self.inner.method_sink.clone(),
400 };
401
402 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
403 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
404 let conn = self.inner.conn_permit.clone();
405 let rpc_service = self.rpc_middleware.service(RpcService::new(
406 self.inner.methods.clone(),
407 max_response_body_size,
408 self.inner.conn_id.into(),
409 cfg,
410 ));
411 let f = tokio::task::spawn(async move {
418 ipc::call_with_service(
419 request,
420 rpc_service,
421 max_response_body_size,
422 max_request_body_size,
423 conn,
424 )
425 .await
426 });
427
428 Box::pin(async move { f.await.map_err(|err| err.into()) })
429 }
430}
431
432struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
433 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
434 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
435 conn_permit: Arc<ConnectionPermit>,
436 conn_id: u32,
437 server_cfg: Settings,
438 stop_handle: StopHandle,
439 drop_on_completion: mpsc::Sender<()>,
440 methods: Methods,
441 id_provider: Arc<dyn IdProvider>,
442 local_socket_stream: LocalSocketStream,
443}
444
445#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id))]
447fn process_connection<RpcMiddleware, HttpMiddleware>(
448 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
449) where
450 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
451 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
452 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
453 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
454 + Service<
455 String,
456 Response = Option<String>,
457 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
458 >,
459 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
460 Send + Unpin,
461 {
462 let ProcessConnection {
463 http_middleware,
464 rpc_middleware,
465 conn_permit,
466 conn_id,
467 server_cfg,
468 stop_handle,
469 drop_on_completion,
470 id_provider,
471 methods,
472 local_socket_stream,
473 } = params;
474
475 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
476 StreamCodec::stream_incoming(),
477 local_socket_stream,
478 ));
479
480 let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
481 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
482 let tower_service = TowerServiceNoHttp {
483 inner: ServiceData {
484 methods,
485 id_provider,
486 stop_handle: stop_handle.clone(),
487 server_cfg: server_cfg.clone(),
488 conn_id,
489 conn_permit,
490 bounded_subscriptions: BoundedSubscriptions::new(
491 server_cfg.max_subscriptions_per_connection,
492 ),
493 method_sink,
494 },
495 rpc_middleware,
496 };
497
498 let service = http_middleware.service(tower_service);
499 tokio::spawn(async {
500 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
501 drop(drop_on_completion)
502 });
503}
504
505async fn to_ipc_service<S, T>(
506 ipc: IpcConn<JsonRpcStream<T>>,
507 service: S,
508 stop_handle: StopHandle,
509 rx: mpsc::Receiver<Box<JsonRawValue>>,
510) where
511 S: Service<String, Response = Option<String>> + Send + 'static,
512 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
513 S::Future: Send + Unpin,
514 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
515{
516 let rx_item = ReceiverStream::new(rx);
517 let conn = IpcConnDriver {
518 conn: ipc,
519 service,
520 pending_calls: Default::default(),
521 items: Default::default(),
522 };
523 let stopped = stop_handle.shutdown();
524
525 let mut conn = pin!(conn);
526 let mut rx_item = pin!(rx_item);
527 let mut stopped = pin!(stopped);
528
529 loop {
530 tokio::select! {
531 _ = &mut conn => {
532 break
533 }
534 item = rx_item.next() => {
535 let Some(item) = item else { break };
536 conn.push_back(item.to_string());
537 }
538 _ = &mut stopped => {
539 break
541 }
542 }
543 }
544}
545
546#[derive(Debug, Clone)]
548pub struct Settings {
549 max_request_body_size: u32,
551 max_response_body_size: u32,
553 max_log_length: u32,
557 max_connections: u32,
559 max_subscriptions_per_connection: u32,
561 message_buffer_capacity: u32,
563 tokio_runtime: Option<tokio::runtime::Handle>,
565 ipc_socket_permissions: Option<String>,
567}
568
569impl Default for Settings {
570 fn default() -> Self {
571 Self {
572 max_request_body_size: TEN_MB_SIZE_BYTES,
573 max_response_body_size: TEN_MB_SIZE_BYTES,
574 max_log_length: 4096,
575 max_connections: 100,
576 max_subscriptions_per_connection: 1024,
577 message_buffer_capacity: 1024,
578 tokio_runtime: None,
579 ipc_socket_permissions: None,
580 }
581 }
582}
583
584#[derive(Debug)]
586pub struct Builder<HttpMiddleware, RpcMiddleware> {
587 settings: Settings,
588 id_provider: Arc<dyn IdProvider>,
590 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
591 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
592}
593
594impl Default for Builder<Identity, Identity> {
595 fn default() -> Self {
596 Self {
597 settings: Settings::default(),
598 id_provider: Arc::new(RandomIntegerIdProvider),
599 rpc_middleware: RpcServiceBuilder::new(),
600 http_middleware: tower::ServiceBuilder::new(),
601 }
602 }
603}
604
605impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
606 pub const fn max_request_body_size(mut self, size: u32) -> Self {
608 self.settings.max_request_body_size = size;
609 self
610 }
611
612 pub const fn max_response_body_size(mut self, size: u32) -> Self {
614 self.settings.max_response_body_size = size;
615 self
616 }
617
618 pub const fn max_log_length(mut self, size: u32) -> Self {
620 self.settings.max_log_length = size;
621 self
622 }
623
624 pub const fn max_connections(mut self, max: u32) -> Self {
626 self.settings.max_connections = max;
627 self
628 }
629
630 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
632 self.settings.max_subscriptions_per_connection = max;
633 self
634 }
635
636 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
654 self.settings.message_buffer_capacity = c;
655 self
656 }
657
658 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
662 self.settings.tokio_runtime = Some(rt);
663 self
664 }
665
666 pub fn set_ipc_socket_permissions(mut self, permissions: Option<String>) -> Self {
668 self.settings.ipc_socket_permissions = permissions;
669 self
670 }
671
672 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
693 self.id_provider = Arc::new(id_provider);
694 self
695 }
696
697 pub fn set_http_middleware<T>(
714 self,
715 service_builder: tower::ServiceBuilder<T>,
716 ) -> Builder<T, RpcMiddleware> {
717 Builder {
718 settings: self.settings,
719 id_provider: self.id_provider,
720 http_middleware: service_builder,
721 rpc_middleware: self.rpc_middleware,
722 }
723 }
724
725 pub fn set_rpc_middleware<T>(
737 self,
738 rpc_middleware: RpcServiceBuilder<T>,
739 ) -> Builder<HttpMiddleware, T> {
740 Builder {
741 settings: self.settings,
742 id_provider: self.id_provider,
743 rpc_middleware,
744 http_middleware: self.http_middleware,
745 }
746 }
747
748 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
750 IpcServer {
751 endpoint,
752 cfg: self.settings,
753 id_provider: self.id_provider,
754 http_middleware: self.http_middleware,
755 rpc_middleware: self.rpc_middleware,
756 }
757 }
758}
759
760#[cfg(test)]
761#[expect(missing_docs)]
762pub fn dummy_name() -> String {
763 use rand::Rng;
764 let num: u64 = rand::rng().random();
765 format!(r"/tmp/my-uds-{num}")
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771 use crate::client::IpcClientBuilder;
772 use futures::future::select;
773 use jsonrpsee::{
774 core::{
775 client::{self, ClientT, Error, Subscription, SubscriptionClientT},
776 middleware::{Batch, BatchEntry, Notification},
777 params::BatchRequestBuilder,
778 },
779 rpc_params,
780 types::Request,
781 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
782 };
783 use reth_tracing::init_test_tracing;
784 use std::pin::pin;
785 use tokio::sync::broadcast;
786 use tokio_stream::wrappers::BroadcastStream;
787
788 #[tokio::test]
789 #[cfg(unix)]
790 async fn test_ipc_socket_permissions() {
791 use std::os::unix::fs::PermissionsExt;
792 let endpoint = &dummy_name();
793 let perms = "0777";
794 let server = Builder::default()
795 .set_ipc_socket_permissions(Some(perms.to_string()))
796 .build(endpoint.clone());
797 let module = RpcModule::new(());
798 let handle = server.start(module).await.unwrap();
799 tokio::spawn(handle.stopped());
800
801 let meta = std::fs::metadata(endpoint).unwrap();
802 let perms = meta.permissions();
803 assert_eq!(perms.mode() & 0o777, 0o777);
804 }
805
806 async fn pipe_from_stream_with_bounded_buffer(
807 pending: PendingSubscriptionSink,
808 stream: BroadcastStream<usize>,
809 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
810 let sink = pending.accept().await.unwrap();
811 let closed = sink.closed();
812
813 let mut closed = pin!(closed);
814 let mut stream = pin!(stream);
815
816 loop {
817 match select(closed, stream.next()).await {
818 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
820
821 Either::Right((Some(Ok(item)), c)) => {
823 let raw_value = serde_json::value::to_raw_value(&item)?;
824 let notif = SubscriptionMessage::from(raw_value);
825
826 if sink.send(notif).await.is_err() {
830 break Ok(());
831 }
832
833 closed = c;
834 }
835
836 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
838 }
839 }
840 }
841
842 fn produce_items(tx: broadcast::Sender<usize>) {
844 for c in 1..=100 {
845 std::thread::sleep(std::time::Duration::from_millis(1));
846 let _ = tx.send(c);
847 }
848 }
849
850 #[tokio::test]
851 async fn can_set_the_max_response_body_size() {
852 let endpoint = &dummy_name();
854 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
855 let mut module = RpcModule::new(());
856 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
857 let handle = server.start(module).await.unwrap();
858 tokio::spawn(handle.stopped());
859
860 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
861 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
862 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
863 }
864
865 #[tokio::test]
866 async fn can_set_the_max_request_body_size() {
867 init_test_tracing();
868 let endpoint = &dummy_name();
869 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
870 let mut module = RpcModule::new(());
871 module.register_method("anything", |_, _, _| "succeed").unwrap();
872 let handle = server.start(module).await.unwrap();
873 tokio::spawn(handle.stopped());
874
875 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
876 let response: Result<String, Error> =
877 client.request("anything", rpc_params!["a".repeat(101)]).await;
878 assert!(response.is_err());
879 let mut batch_request_builder = BatchRequestBuilder::new();
880 let _ = batch_request_builder.insert("anything", rpc_params![]);
881 let _ = batch_request_builder.insert("anything", rpc_params![]);
882 let _ = batch_request_builder.insert("anything", rpc_params![]);
883 let response: Result<client::BatchResponse<'_, String>, Error> =
888 client.batch_request(batch_request_builder).await;
889 assert!(response.is_err());
890 }
891
892 #[tokio::test]
893 async fn can_set_max_connections() {
894 init_test_tracing();
895
896 let endpoint = &dummy_name();
897 let server = Builder::default().max_connections(2).build(endpoint.clone());
898 let mut module = RpcModule::new(());
899 module.register_method("anything", |_, _, _| "succeed").unwrap();
900 let handle = server.start(module).await.unwrap();
901 tokio::spawn(handle.stopped());
902
903 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
904 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
905 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
906
907 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
908 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
909 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
910
911 assert!(response1.is_ok());
912 assert!(response2.is_ok());
913 assert!(response3.is_err());
915
916 drop(client2);
918 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
919
920 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
922 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
923 assert!(response4.is_ok());
924 }
925
926 #[tokio::test]
927 async fn test_rpc_request() {
928 init_test_tracing();
929 let endpoint = &dummy_name();
930 let server = Builder::default().build(endpoint.clone());
931 let mut module = RpcModule::new(());
932 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
933 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
934 let handle = server.start(module).await.unwrap();
935 tokio::spawn(handle.stopped());
936
937 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
938 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
939 assert_eq!(response, msg);
940 }
941
942 #[tokio::test]
943 async fn test_batch_request() {
944 let endpoint = &dummy_name();
945 let server = Builder::default().build(endpoint.clone());
946 let mut module = RpcModule::new(());
947 module.register_method("anything", |_, _, _| "ok").unwrap();
948 let handle = server.start(module).await.unwrap();
949 tokio::spawn(handle.stopped());
950
951 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
952 let mut batch_request_builder = BatchRequestBuilder::new();
953 let _ = batch_request_builder.insert("anything", rpc_params![]);
954 let _ = batch_request_builder.insert("anything", rpc_params![]);
955 let _ = batch_request_builder.insert("anything", rpc_params![]);
956 let result = client
957 .batch_request(batch_request_builder)
958 .await
959 .unwrap()
960 .into_ok()
961 .unwrap()
962 .collect::<Vec<String>>();
963 assert_eq!(result, vec!["ok", "ok", "ok"]);
964 }
965
966 #[tokio::test]
967 async fn test_ipc_modules() {
968 reth_tracing::init_test_tracing();
969 let endpoint = &dummy_name();
970 let server = Builder::default().build(endpoint.clone());
971 let mut module = RpcModule::new(());
972 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
973 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
974 let handle = server.start(module).await.unwrap();
975 tokio::spawn(handle.stopped());
976
977 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
978 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
979 assert_eq!(response, msg);
980 }
981
982 #[tokio::test(flavor = "multi_thread")]
983 async fn test_rpc_subscription() {
984 let endpoint = &dummy_name();
985 let server = Builder::default().build(endpoint.clone());
986 let (tx, _rx) = broadcast::channel::<usize>(16);
987
988 let mut module = RpcModule::new(tx.clone());
989 std::thread::spawn(move || produce_items(tx));
990
991 module
992 .register_subscription(
993 "subscribe_hello",
994 "s_hello",
995 "unsubscribe_hello",
996 |_, pending, tx, _| async move {
997 let rx = tx.subscribe();
998 let stream = BroadcastStream::new(rx);
999 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1000 Ok(())
1001 },
1002 )
1003 .unwrap();
1004
1005 let handle = server.start(module).await.unwrap();
1006 tokio::spawn(handle.stopped());
1007
1008 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1009 let sub: Subscription<usize> =
1010 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1011
1012 let items = sub.take(16).collect::<Vec<_>>().await;
1013 assert_eq!(items.len(), 16);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_rpc_middleware() {
1018 #[derive(Clone)]
1019 struct ModifyRequestIf<S>(S);
1020
1021 impl<S> RpcServiceT for ModifyRequestIf<S>
1022 where
1023 S: Send + Sync + RpcServiceT,
1024 {
1025 type MethodResponse = S::MethodResponse;
1026 type NotificationResponse = S::NotificationResponse;
1027 type BatchResponse = S::BatchResponse;
1028
1029 fn call<'a>(
1030 &self,
1031 mut req: Request<'a>,
1032 ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
1033 if req.method == "say_hello" {
1035 req.method = "say_goodbye".into();
1036 } else if req.method == "say_goodbye" {
1037 req.method = "say_hello".into();
1038 }
1039
1040 self.0.call(req)
1041 }
1042
1043 fn batch<'a>(
1044 &self,
1045 mut batch: Batch<'a>,
1046 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1047 for call in batch.iter_mut() {
1048 match call {
1049 Ok(BatchEntry::Call(req)) => {
1050 if req.method == "say_hello" {
1051 req.method = "say_goodbye".into();
1052 } else if req.method == "say_goodbye" {
1053 req.method = "say_hello".into();
1054 }
1055 }
1056 Ok(BatchEntry::Notification(n)) => {
1057 if n.method == "say_hello" {
1058 n.method = "say_goodbye".into();
1059 } else if n.method == "say_goodbye" {
1060 n.method = "say_hello".into();
1061 }
1062 }
1063 Err(_err) => {}
1065 }
1066 }
1067
1068 self.0.batch(batch)
1069 }
1070
1071 fn notification<'a>(
1072 &self,
1073 mut n: Notification<'a>,
1074 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1075 if n.method == "say_hello" {
1076 n.method = "say_goodbye".into();
1077 } else if n.method == "say_goodbye" {
1078 n.method = "say_hello".into();
1079 }
1080 self.0.notification(n)
1081 }
1082 }
1083
1084 reth_tracing::init_test_tracing();
1085 let endpoint = &dummy_name();
1086
1087 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1088 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1089
1090 let mut module = RpcModule::new(());
1091 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1092 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1093 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1094 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1095 let handle = server.start(module).await.unwrap();
1096 tokio::spawn(handle.stopped());
1097
1098 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1099 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1100 let say_goodbye_response: String =
1101 client.request("say_goodbye", rpc_params![]).await.unwrap();
1102
1103 assert_eq!(say_hello_response, goodbye_msg);
1104 assert_eq!(say_goodbye_response, hello_msg);
1105 }
1106}