1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
13 server::{
14 middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
15 RandomIntegerIdProvider, ServerHandle, StopHandle,
16 },
17 BoundedSubscriptions, MethodResponse, MethodSink, Methods,
18};
19use std::{
20 future::Future,
21 io,
22 pin::{pin, Pin},
23 sync::Arc,
24 task::{Context, Poll},
25};
26use tokio::{
27 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
28 sync::oneshot,
29};
30use tower::{layer::util::Identity, Layer, Service};
31use tracing::{debug, instrument, trace, warn, Instrument};
32use crate::{
34 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
35 stream_codec::StreamCodec,
36};
37use tokio::sync::mpsc;
38use tokio_stream::wrappers::ReceiverStream;
39use tower::layer::{util::Stack, LayerFn};
40
41mod connection;
42mod ipc;
43mod rpc_service;
44
45pub use rpc_service::RpcService;
46
47pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
51 endpoint: String,
53 id_provider: Arc<dyn IdProvider>,
54 cfg: Settings,
55 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
56 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
57}
58
59impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
60 pub fn endpoint(&self) -> String {
62 self.endpoint.clone()
63 }
64}
65
66impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
67where
68 RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
69 HttpMiddleware: Layer<
70 TowerServiceNoHttp<RpcMiddleware>,
71 Service: Service<
72 String,
73 Response = Option<String>,
74 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
75 Future: Send + Unpin,
76 > + Send,
77 > + Send
78 + 'static,
79{
80 pub async fn start(
102 mut self,
103 methods: impl Into<Methods>,
104 ) -> Result<ServerHandle, IpcServerStartError> {
105 let methods = methods.into();
106
107 let (stop_handle, server_handle) = stop_channel();
108
109 let (tx, rx) = oneshot::channel();
111
112 match self.cfg.tokio_runtime.take() {
113 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
114 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
115 };
116 rx.await.expect("channel is open")?;
117
118 Ok(server_handle)
119 }
120
121 async fn start_inner(
122 self,
123 methods: Methods,
124 stop_handle: StopHandle,
125 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
126 ) {
127 trace!(endpoint = ?self.endpoint, "starting ipc server");
128
129 if cfg!(unix) {
130 if std::fs::remove_file(&self.endpoint).is_ok() {
132 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
133 }
134 }
135
136 let listener = match self
137 .endpoint
138 .as_str()
139 .to_fs_name::<GenericFilePath>()
140 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
141 {
142 Ok(listener) => {
143 #[cfg(unix)]
144 {
145 use std::os::unix::fs::PermissionsExt;
147 if let Some(perms_str) = &self.cfg.ipc_socket_permissions {
148 if let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8) {
149 let perms = std::fs::Permissions::from_mode(mode);
150 let _ = std::fs::set_permissions(&self.endpoint, perms);
151 }
152 }
153 }
154 listener
155 }
156 Err(err) => {
157 on_ready
158 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
159 .ok();
160 return;
161 }
162 };
163
164 on_ready.send(Ok(())).ok();
166
167 let mut id: u32 = 0;
168 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
169
170 let stopped = stop_handle.clone().shutdown();
171 let mut stopped = pin!(stopped);
172
173 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
174
175 trace!("accepting ipc connections");
176 loop {
177 match try_accept_conn(&listener, stopped).await {
178 AcceptConnection::Established { local_socket_stream, stop } => {
179 let Some(conn_permit) = connection_guard.try_acquire() else {
180 let (_reader, mut writer) = local_socket_stream.split();
181 let _ = writer
182 .write_all(b"Too many connections. Please try again later.")
183 .await;
184 stopped = stop;
185 continue;
186 };
187
188 let max_conns = connection_guard.max_connections();
189 let curr_conns = max_conns - connection_guard.available_connections();
190 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
191
192 let conn_permit = Arc::new(conn_permit);
193
194 process_connection(ProcessConnection {
195 http_middleware: &self.http_middleware,
196 rpc_middleware: self.rpc_middleware.clone(),
197 conn_permit,
198 conn_id: id,
199 server_cfg: self.cfg.clone(),
200 stop_handle: stop_handle.clone(),
201 drop_on_completion: drop_on_completion.clone(),
202 methods: methods.clone(),
203 id_provider: self.id_provider.clone(),
204 local_socket_stream,
205 });
206
207 id = id.wrapping_add(1);
208 stopped = stop;
209 }
210 AcceptConnection::Shutdown => {
211 break;
212 }
213 AcceptConnection::Err((err, stop)) => {
214 tracing::error!(%err, "Failed accepting a new IPC connection");
215 stopped = stop;
216 }
217 }
218 }
219
220 drop(drop_on_completion);
222
223 while process_connection_awaiter.recv().await.is_some() {
226 }
229 }
230}
231
232enum AcceptConnection<S> {
233 Shutdown,
234 Established { local_socket_stream: LocalSocketStream, stop: S },
235 Err((io::Error, S)),
236}
237
238async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
239where
240 S: Future + Unpin,
241{
242 match futures_util::future::select(pin!(listener.accept()), stopped).await {
243 Either::Left((res, stop)) => match res {
244 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
245 Err(e) => AcceptConnection::Err((e, stop)),
246 },
247 Either::Right(_) => AcceptConnection::Shutdown,
248 }
249}
250
251impl std::fmt::Debug for IpcServer {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 f.debug_struct("IpcServer")
254 .field("endpoint", &self.endpoint)
255 .field("cfg", &self.cfg)
256 .field("id_provider", &self.id_provider)
257 .finish()
258 }
259}
260
261#[derive(Debug, thiserror::Error)]
263#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
264pub struct IpcServerStartError {
265 endpoint: String,
266 #[source]
267 source: io::Error,
268}
269
270#[derive(Debug, Clone)]
272#[allow(dead_code)]
273pub(crate) struct ServiceData {
274 pub(crate) methods: Methods,
276 pub(crate) id_provider: Arc<dyn IdProvider>,
278 pub(crate) stop_handle: StopHandle,
280 pub(crate) conn_id: u32,
282 pub(crate) conn_permit: Arc<ConnectionPermit>,
284 pub(crate) bounded_subscriptions: BoundedSubscriptions,
286 pub(crate) method_sink: MethodSink,
290 pub(crate) server_cfg: Settings,
292}
293
294#[derive(Debug, Clone)]
297pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
298
299impl Default for RpcServiceBuilder<Identity> {
300 fn default() -> Self {
301 Self(tower::ServiceBuilder::new())
302 }
303}
304
305impl RpcServiceBuilder<Identity> {
306 pub const fn new() -> Self {
308 Self(tower::ServiceBuilder::new())
309 }
310}
311
312impl<L> RpcServiceBuilder<L> {
313 pub fn option_layer<T>(
317 self,
318 layer: Option<T>,
319 ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
320 let layer = if let Some(layer) = layer {
321 Either::Left(layer)
322 } else {
323 Either::Right(Identity::new())
324 };
325 self.layer(layer)
326 }
327
328 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
332 RpcServiceBuilder(self.0.layer(layer))
333 }
334
335 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
340 RpcServiceBuilder(self.0.layer_fn(f))
341 }
342
343 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
347 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
348 }
349
350 pub(crate) fn service<S>(&self, service: S) -> L::Service
352 where
353 L: tower::Layer<S>,
354 {
355 self.0.service(service)
356 }
357}
358
359#[derive(Debug, Clone)]
364pub struct TowerServiceNoHttp<L> {
365 inner: ServiceData,
366 rpc_middleware: RpcServiceBuilder<L>,
367}
368
369impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
370where
371 RpcMiddleware: for<'a> Layer<RpcService>,
372 for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
373 Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
374{
375 type Response = Option<String>;
381
382 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
383
384 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
385
386 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
388 Poll::Ready(Ok(()))
389 }
390
391 fn call(&mut self, request: String) -> Self::Future {
392 trace!("{:?}", request);
393
394 let cfg = RpcServiceCfg::CallsAndSubscriptions {
395 bounded_subscriptions: BoundedSubscriptions::new(
396 self.inner.server_cfg.max_subscriptions_per_connection,
397 ),
398 id_provider: self.inner.id_provider.clone(),
399 sink: self.inner.method_sink.clone(),
400 };
401
402 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
403 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
404 let conn = self.inner.conn_permit.clone();
405 let rpc_service = self.rpc_middleware.service(RpcService::new(
406 self.inner.methods.clone(),
407 max_response_body_size,
408 self.inner.conn_id.into(),
409 cfg,
410 ));
411 let f = tokio::task::spawn(async move {
418 ipc::call_with_service(
419 request,
420 rpc_service,
421 max_response_body_size,
422 max_request_body_size,
423 conn,
424 )
425 .await
426 });
427
428 Box::pin(async move { f.await.map_err(|err| err.into()) })
429 }
430}
431
432struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
433 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
434 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
435 conn_permit: Arc<ConnectionPermit>,
436 conn_id: u32,
437 server_cfg: Settings,
438 stop_handle: StopHandle,
439 drop_on_completion: mpsc::Sender<()>,
440 methods: Methods,
441 id_provider: Arc<dyn IdProvider>,
442 local_socket_stream: LocalSocketStream,
443}
444
445#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
447fn process_connection<RpcMiddleware, HttpMiddleware>(
448 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
449) where
450 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
451 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
452 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
453 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
454 + Service<
455 String,
456 Response = Option<String>,
457 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
458 >,
459 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
460 Send + Unpin,
461 {
462 let ProcessConnection {
463 http_middleware,
464 rpc_middleware,
465 conn_permit,
466 conn_id,
467 server_cfg,
468 stop_handle,
469 drop_on_completion,
470 id_provider,
471 methods,
472 local_socket_stream,
473 } = params;
474
475 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
476 StreamCodec::stream_incoming(),
477 local_socket_stream,
478 ));
479
480 let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
481 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
482 let tower_service = TowerServiceNoHttp {
483 inner: ServiceData {
484 methods,
485 id_provider,
486 stop_handle: stop_handle.clone(),
487 server_cfg: server_cfg.clone(),
488 conn_id,
489 conn_permit,
490 bounded_subscriptions: BoundedSubscriptions::new(
491 server_cfg.max_subscriptions_per_connection,
492 ),
493 method_sink,
494 },
495 rpc_middleware,
496 };
497
498 let service = http_middleware.service(tower_service);
499 tokio::spawn(async {
500 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
501 drop(drop_on_completion)
502 });
503}
504
505async fn to_ipc_service<S, T>(
506 ipc: IpcConn<JsonRpcStream<T>>,
507 service: S,
508 stop_handle: StopHandle,
509 rx: mpsc::Receiver<Box<JsonRawValue>>,
510) where
511 S: Service<String, Response = Option<String>> + Send + 'static,
512 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
513 S::Future: Send + Unpin,
514 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
515{
516 let rx_item = ReceiverStream::new(rx);
517 let conn = IpcConnDriver {
518 conn: ipc,
519 service,
520 pending_calls: Default::default(),
521 items: Default::default(),
522 };
523 let stopped = stop_handle.shutdown();
524
525 let mut conn = pin!(conn);
526 let mut rx_item = pin!(rx_item);
527 let mut stopped = pin!(stopped);
528
529 loop {
530 tokio::select! {
531 _ = &mut conn => {
532 break
533 }
534 item = rx_item.next() => {
535 if let Some(item) = item {
536 conn.push_back(item.to_string());
537 }
538 }
539 _ = &mut stopped => {
540 break
542 }
543 }
544 }
545}
546
547#[derive(Debug, Clone)]
549pub struct Settings {
550 max_request_body_size: u32,
552 max_response_body_size: u32,
554 max_log_length: u32,
558 max_connections: u32,
560 max_subscriptions_per_connection: u32,
562 message_buffer_capacity: u32,
564 tokio_runtime: Option<tokio::runtime::Handle>,
566 ipc_socket_permissions: Option<String>,
568}
569
570impl Default for Settings {
571 fn default() -> Self {
572 Self {
573 max_request_body_size: TEN_MB_SIZE_BYTES,
574 max_response_body_size: TEN_MB_SIZE_BYTES,
575 max_log_length: 4096,
576 max_connections: 100,
577 max_subscriptions_per_connection: 1024,
578 message_buffer_capacity: 1024,
579 tokio_runtime: None,
580 ipc_socket_permissions: None,
581 }
582 }
583}
584
585#[derive(Debug)]
587pub struct Builder<HttpMiddleware, RpcMiddleware> {
588 settings: Settings,
589 id_provider: Arc<dyn IdProvider>,
591 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
592 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
593}
594
595impl Default for Builder<Identity, Identity> {
596 fn default() -> Self {
597 Self {
598 settings: Settings::default(),
599 id_provider: Arc::new(RandomIntegerIdProvider),
600 rpc_middleware: RpcServiceBuilder::new(),
601 http_middleware: tower::ServiceBuilder::new(),
602 }
603 }
604}
605
606impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
607 pub const fn max_request_body_size(mut self, size: u32) -> Self {
609 self.settings.max_request_body_size = size;
610 self
611 }
612
613 pub const fn max_response_body_size(mut self, size: u32) -> Self {
615 self.settings.max_response_body_size = size;
616 self
617 }
618
619 pub const fn max_log_length(mut self, size: u32) -> Self {
621 self.settings.max_log_length = size;
622 self
623 }
624
625 pub const fn max_connections(mut self, max: u32) -> Self {
627 self.settings.max_connections = max;
628 self
629 }
630
631 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
633 self.settings.max_subscriptions_per_connection = max;
634 self
635 }
636
637 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
655 self.settings.message_buffer_capacity = c;
656 self
657 }
658
659 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
663 self.settings.tokio_runtime = Some(rt);
664 self
665 }
666
667 pub fn set_ipc_socket_permissions(mut self, permissions: Option<String>) -> Self {
669 self.settings.ipc_socket_permissions = permissions;
670 self
671 }
672
673 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
694 self.id_provider = Arc::new(id_provider);
695 self
696 }
697
698 pub fn set_http_middleware<T>(
715 self,
716 service_builder: tower::ServiceBuilder<T>,
717 ) -> Builder<T, RpcMiddleware> {
718 Builder {
719 settings: self.settings,
720 id_provider: self.id_provider,
721 http_middleware: service_builder,
722 rpc_middleware: self.rpc_middleware,
723 }
724 }
725
726 pub fn set_rpc_middleware<T>(
738 self,
739 rpc_middleware: RpcServiceBuilder<T>,
740 ) -> Builder<HttpMiddleware, T> {
741 Builder {
742 settings: self.settings,
743 id_provider: self.id_provider,
744 rpc_middleware,
745 http_middleware: self.http_middleware,
746 }
747 }
748
749 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
751 IpcServer {
752 endpoint,
753 cfg: self.settings,
754 id_provider: self.id_provider,
755 http_middleware: self.http_middleware,
756 rpc_middleware: self.rpc_middleware,
757 }
758 }
759}
760
761#[cfg(test)]
762#[expect(missing_docs)]
763pub fn dummy_name() -> String {
764 use rand::Rng;
765 let num: u64 = rand::rng().random();
766 if cfg!(windows) {
767 format!(r"\\.\pipe\my-pipe-{num}")
768 } else {
769 format!(r"/tmp/my-uds-{num}")
770 }
771}
772
773#[cfg(test)]
774mod tests {
775 use super::*;
776 use crate::client::IpcClientBuilder;
777 use futures::future::select;
778 use jsonrpsee::{
779 core::{
780 client::{self, ClientT, Error, Subscription, SubscriptionClientT},
781 middleware::{Batch, BatchEntry, Notification},
782 params::BatchRequestBuilder,
783 },
784 rpc_params,
785 types::Request,
786 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
787 };
788 use reth_tracing::init_test_tracing;
789 use std::pin::pin;
790 use tokio::sync::broadcast;
791 use tokio_stream::wrappers::BroadcastStream;
792
793 #[tokio::test]
794 #[cfg(unix)]
795 async fn test_ipc_socket_permissions() {
796 use std::os::unix::fs::PermissionsExt;
797 let endpoint = &dummy_name();
798 let perms = "0777";
799 let server = Builder::default()
800 .set_ipc_socket_permissions(Some(perms.to_string()))
801 .build(endpoint.clone());
802 let module = RpcModule::new(());
803 let handle = server.start(module).await.unwrap();
804 tokio::spawn(handle.stopped());
805
806 let meta = std::fs::metadata(endpoint).unwrap();
807 let perms = meta.permissions();
808 assert_eq!(perms.mode() & 0o777, 0o777);
809 }
810
811 async fn pipe_from_stream_with_bounded_buffer(
812 pending: PendingSubscriptionSink,
813 stream: BroadcastStream<usize>,
814 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
815 let sink = pending.accept().await.unwrap();
816 let closed = sink.closed();
817
818 let mut closed = pin!(closed);
819 let mut stream = pin!(stream);
820
821 loop {
822 match select(closed, stream.next()).await {
823 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
825
826 Either::Right((Some(Ok(item)), c)) => {
828 let raw_value = serde_json::value::to_raw_value(&item)?;
829 let notif = SubscriptionMessage::from(raw_value);
830
831 if sink.send(notif).await.is_err() {
835 break Ok(());
836 }
837
838 closed = c;
839 }
840
841 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
843 }
844 }
845 }
846
847 fn produce_items(tx: broadcast::Sender<usize>) {
849 for c in 1..=100 {
850 std::thread::sleep(std::time::Duration::from_millis(1));
851 let _ = tx.send(c);
852 }
853 }
854
855 #[tokio::test]
856 async fn can_set_the_max_response_body_size() {
857 let endpoint = &dummy_name();
859 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
860 let mut module = RpcModule::new(());
861 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
862 let handle = server.start(module).await.unwrap();
863 tokio::spawn(handle.stopped());
864
865 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
866 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
867 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
868 }
869
870 #[tokio::test]
871 async fn can_set_the_max_request_body_size() {
872 init_test_tracing();
873 let endpoint = &dummy_name();
874 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
875 let mut module = RpcModule::new(());
876 module.register_method("anything", |_, _, _| "succeed").unwrap();
877 let handle = server.start(module).await.unwrap();
878 tokio::spawn(handle.stopped());
879
880 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
881 let response: Result<String, Error> =
882 client.request("anything", rpc_params!["a".repeat(101)]).await;
883 assert!(response.is_err());
884 let mut batch_request_builder = BatchRequestBuilder::new();
885 let _ = batch_request_builder.insert("anything", rpc_params![]);
886 let _ = batch_request_builder.insert("anything", rpc_params![]);
887 let _ = batch_request_builder.insert("anything", rpc_params![]);
888 let response: Result<client::BatchResponse<'_, String>, Error> =
893 client.batch_request(batch_request_builder).await;
894 assert!(response.is_err());
895 }
896
897 #[tokio::test]
898 async fn can_set_max_connections() {
899 init_test_tracing();
900
901 let endpoint = &dummy_name();
902 let server = Builder::default().max_connections(2).build(endpoint.clone());
903 let mut module = RpcModule::new(());
904 module.register_method("anything", |_, _, _| "succeed").unwrap();
905 let handle = server.start(module).await.unwrap();
906 tokio::spawn(handle.stopped());
907
908 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
909 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
910 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
911
912 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
913 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
914 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
915
916 assert!(response1.is_ok());
917 assert!(response2.is_ok());
918 assert!(response3.is_err());
920
921 drop(client2);
923 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
924
925 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
927 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
928 assert!(response4.is_ok());
929 }
930
931 #[tokio::test]
932 async fn test_rpc_request() {
933 init_test_tracing();
934 let endpoint = &dummy_name();
935 let server = Builder::default().build(endpoint.clone());
936 let mut module = RpcModule::new(());
937 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
938 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
939 let handle = server.start(module).await.unwrap();
940 tokio::spawn(handle.stopped());
941
942 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
943 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
944 assert_eq!(response, msg);
945 }
946
947 #[tokio::test]
948 async fn test_batch_request() {
949 let endpoint = &dummy_name();
950 let server = Builder::default().build(endpoint.clone());
951 let mut module = RpcModule::new(());
952 module.register_method("anything", |_, _, _| "ok").unwrap();
953 let handle = server.start(module).await.unwrap();
954 tokio::spawn(handle.stopped());
955
956 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
957 let mut batch_request_builder = BatchRequestBuilder::new();
958 let _ = batch_request_builder.insert("anything", rpc_params![]);
959 let _ = batch_request_builder.insert("anything", rpc_params![]);
960 let _ = batch_request_builder.insert("anything", rpc_params![]);
961 let result = client
962 .batch_request(batch_request_builder)
963 .await
964 .unwrap()
965 .into_ok()
966 .unwrap()
967 .collect::<Vec<String>>();
968 assert_eq!(result, vec!["ok", "ok", "ok"]);
969 }
970
971 #[tokio::test]
972 async fn test_ipc_modules() {
973 reth_tracing::init_test_tracing();
974 let endpoint = &dummy_name();
975 let server = Builder::default().build(endpoint.clone());
976 let mut module = RpcModule::new(());
977 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
978 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
979 let handle = server.start(module).await.unwrap();
980 tokio::spawn(handle.stopped());
981
982 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
983 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
984 assert_eq!(response, msg);
985 }
986
987 #[tokio::test(flavor = "multi_thread")]
988 async fn test_rpc_subscription() {
989 let endpoint = &dummy_name();
990 let server = Builder::default().build(endpoint.clone());
991 let (tx, _rx) = broadcast::channel::<usize>(16);
992
993 let mut module = RpcModule::new(tx.clone());
994 std::thread::spawn(move || produce_items(tx));
995
996 module
997 .register_subscription(
998 "subscribe_hello",
999 "s_hello",
1000 "unsubscribe_hello",
1001 |_, pending, tx, _| async move {
1002 let rx = tx.subscribe();
1003 let stream = BroadcastStream::new(rx);
1004 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1005 Ok(())
1006 },
1007 )
1008 .unwrap();
1009
1010 let handle = server.start(module).await.unwrap();
1011 tokio::spawn(handle.stopped());
1012
1013 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1014 let sub: Subscription<usize> =
1015 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1016
1017 let items = sub.take(16).collect::<Vec<_>>().await;
1018 assert_eq!(items.len(), 16);
1019 }
1020
1021 #[tokio::test]
1022 async fn test_rpc_middleware() {
1023 #[derive(Clone)]
1024 struct ModifyRequestIf<S>(S);
1025
1026 impl<S> RpcServiceT for ModifyRequestIf<S>
1027 where
1028 S: Send + Sync + RpcServiceT,
1029 {
1030 type MethodResponse = S::MethodResponse;
1031 type NotificationResponse = S::NotificationResponse;
1032 type BatchResponse = S::BatchResponse;
1033
1034 fn call<'a>(
1035 &self,
1036 mut req: Request<'a>,
1037 ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
1038 if req.method == "say_hello" {
1040 req.method = "say_goodbye".into();
1041 } else if req.method == "say_goodbye" {
1042 req.method = "say_hello".into();
1043 }
1044
1045 self.0.call(req)
1046 }
1047
1048 fn batch<'a>(
1049 &self,
1050 mut batch: Batch<'a>,
1051 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1052 for call in batch.iter_mut() {
1053 match call {
1054 Ok(BatchEntry::Call(req)) => {
1055 if req.method == "say_hello" {
1056 req.method = "say_goodbye".into();
1057 } else if req.method == "say_goodbye" {
1058 req.method = "say_hello".into();
1059 }
1060 }
1061 Ok(BatchEntry::Notification(n)) => {
1062 if n.method == "say_hello" {
1063 n.method = "say_goodbye".into();
1064 } else if n.method == "say_goodbye" {
1065 n.method = "say_hello".into();
1066 }
1067 }
1068 Err(_err) => {}
1070 }
1071 }
1072
1073 self.0.batch(batch)
1074 }
1075
1076 fn notification<'a>(
1077 &self,
1078 mut n: Notification<'a>,
1079 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1080 if n.method == "say_hello" {
1081 n.method = "say_goodbye".into();
1082 } else if n.method == "say_goodbye" {
1083 n.method = "say_hello".into();
1084 }
1085 self.0.notification(n)
1086 }
1087 }
1088
1089 reth_tracing::init_test_tracing();
1090 let endpoint = &dummy_name();
1091
1092 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1093 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1094
1095 let mut module = RpcModule::new(());
1096 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1097 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1098 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1099 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1100 let handle = server.start(module).await.unwrap();
1101 tokio::spawn(handle.stopped());
1102
1103 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1104 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1105 let say_goodbye_response: String =
1106 client.request("say_goodbye", rpc_params![]).await.unwrap();
1107
1108 assert_eq!(say_hello_response, goodbye_msg);
1109 assert_eq!(say_goodbye_response, hello_msg);
1110 }
1111}