1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::{middleware::layer::RpcLoggerLayer, JsonRawValue, TEN_MB_SIZE_BYTES},
13 server::{
14 middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
15 RandomIntegerIdProvider, ServerHandle, StopHandle,
16 },
17 BoundedSubscriptions, MethodResponse, MethodSink, Methods,
18};
19use std::{
20 future::Future,
21 io,
22 pin::{pin, Pin},
23 sync::Arc,
24 task::{Context, Poll},
25};
26use tokio::{
27 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
28 sync::oneshot,
29};
30use tower::{layer::util::Identity, Layer, Service};
31use tracing::{debug, instrument, trace, warn, Instrument};
32use crate::{
34 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
35 stream_codec::StreamCodec,
36};
37use tokio::sync::mpsc;
38use tokio_stream::wrappers::ReceiverStream;
39use tower::layer::{util::Stack, LayerFn};
40
41mod connection;
42mod ipc;
43mod rpc_service;
44
45pub use rpc_service::RpcService;
46
47pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
51 endpoint: String,
53 id_provider: Arc<dyn IdProvider>,
54 cfg: Settings,
55 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
56 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
57}
58
59impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
60 pub fn endpoint(&self) -> String {
62 self.endpoint.clone()
63 }
64}
65
66impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
67where
68 RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
69 HttpMiddleware: Layer<
70 TowerServiceNoHttp<RpcMiddleware>,
71 Service: Service<
72 String,
73 Response = Option<String>,
74 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
75 Future: Send + Unpin,
76 > + Send,
77 > + Send
78 + 'static,
79{
80 pub async fn start(
102 mut self,
103 methods: impl Into<Methods>,
104 ) -> Result<ServerHandle, IpcServerStartError> {
105 let methods = methods.into();
106
107 let (stop_handle, server_handle) = stop_channel();
108
109 let (tx, rx) = oneshot::channel();
111
112 match self.cfg.tokio_runtime.take() {
113 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
114 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
115 };
116 rx.await.expect("channel is open")?;
117
118 Ok(server_handle)
119 }
120
121 async fn start_inner(
122 self,
123 methods: Methods,
124 stop_handle: StopHandle,
125 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
126 ) {
127 trace!(endpoint = ?self.endpoint, "starting ipc server");
128
129 if cfg!(unix) {
130 if std::fs::remove_file(&self.endpoint).is_ok() {
132 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
133 }
134 }
135
136 let listener = match self
137 .endpoint
138 .as_str()
139 .to_fs_name::<GenericFilePath>()
140 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
141 {
142 Ok(listener) => {
143 #[cfg(unix)]
144 {
145 use std::os::unix::fs::PermissionsExt;
147 if let Some(perms_str) = &self.cfg.ipc_socket_permissions &&
148 let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8)
149 {
150 let perms = std::fs::Permissions::from_mode(mode);
151 let _ = std::fs::set_permissions(&self.endpoint, perms);
152 }
153 }
154 listener
155 }
156 Err(err) => {
157 on_ready
158 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
159 .ok();
160 return;
161 }
162 };
163
164 on_ready.send(Ok(())).ok();
166
167 let mut id: u32 = 0;
168 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
169
170 let stopped = stop_handle.clone().shutdown();
171 let mut stopped = pin!(stopped);
172
173 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
174
175 trace!("accepting ipc connections");
176 loop {
177 match try_accept_conn(&listener, stopped).await {
178 AcceptConnection::Established { local_socket_stream, stop } => {
179 let Some(conn_permit) = connection_guard.try_acquire() else {
180 let (_reader, mut writer) = local_socket_stream.split();
181 let _ = writer
182 .write_all(b"Too many connections. Please try again later.")
183 .await;
184 stopped = stop;
185 continue;
186 };
187
188 let max_conns = connection_guard.max_connections();
189 let curr_conns = max_conns - connection_guard.available_connections();
190 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
191
192 let conn_permit = Arc::new(conn_permit);
193
194 process_connection(ProcessConnection {
195 http_middleware: &self.http_middleware,
196 rpc_middleware: self.rpc_middleware.clone(),
197 conn_permit,
198 conn_id: id,
199 server_cfg: self.cfg.clone(),
200 stop_handle: stop_handle.clone(),
201 drop_on_completion: drop_on_completion.clone(),
202 methods: methods.clone(),
203 id_provider: self.id_provider.clone(),
204 local_socket_stream,
205 });
206
207 id = id.wrapping_add(1);
208 stopped = stop;
209 }
210 AcceptConnection::Shutdown => {
211 break;
212 }
213 AcceptConnection::Err((err, stop)) => {
214 tracing::error!(%err, "Failed accepting a new IPC connection");
215 stopped = stop;
216 }
217 }
218 }
219
220 drop(drop_on_completion);
222
223 while process_connection_awaiter.recv().await.is_some() {
226 }
229 }
230}
231
232enum AcceptConnection<S> {
233 Shutdown,
234 Established { local_socket_stream: LocalSocketStream, stop: S },
235 Err((io::Error, S)),
236}
237
238async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
239where
240 S: Future + Unpin,
241{
242 match futures_util::future::select(pin!(listener.accept()), stopped).await {
243 Either::Left((res, stop)) => match res {
244 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
245 Err(e) => AcceptConnection::Err((e, stop)),
246 },
247 Either::Right(_) => AcceptConnection::Shutdown,
248 }
249}
250
251impl<HttpMiddleware, RpcMiddleware> std::fmt::Debug for IpcServer<HttpMiddleware, RpcMiddleware> {
252 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
253 f.debug_struct("IpcServer")
254 .field("endpoint", &self.endpoint)
255 .field("cfg", &self.cfg)
256 .field("id_provider", &self.id_provider)
257 .finish()
258 }
259}
260
261#[derive(Debug, thiserror::Error)]
263#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
264pub struct IpcServerStartError {
265 endpoint: String,
266 #[source]
267 source: io::Error,
268}
269
270#[derive(Debug, Clone)]
272#[allow(dead_code)]
273pub(crate) struct ServiceData {
274 pub(crate) methods: Methods,
276 pub(crate) id_provider: Arc<dyn IdProvider>,
278 pub(crate) stop_handle: StopHandle,
280 pub(crate) conn_id: u32,
282 pub(crate) conn_permit: Arc<ConnectionPermit>,
284 pub(crate) bounded_subscriptions: BoundedSubscriptions,
286 pub(crate) method_sink: MethodSink,
290 pub(crate) server_cfg: Settings,
292}
293
294#[derive(Debug, Clone)]
297pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
298
299impl Default for RpcServiceBuilder<Identity> {
300 fn default() -> Self {
301 Self(tower::ServiceBuilder::new())
302 }
303}
304
305impl RpcServiceBuilder<Identity> {
306 pub const fn new() -> Self {
308 Self(tower::ServiceBuilder::new())
309 }
310}
311
312impl<L> RpcServiceBuilder<L> {
313 pub fn option_layer<T>(
317 self,
318 layer: Option<T>,
319 ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
320 let layer = if let Some(layer) = layer {
321 Either::Left(layer)
322 } else {
323 Either::Right(Identity::new())
324 };
325 self.layer(layer)
326 }
327
328 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
332 RpcServiceBuilder(self.0.layer(layer))
333 }
334
335 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
340 RpcServiceBuilder(self.0.layer_fn(f))
341 }
342
343 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
347 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
348 }
349
350 pub(crate) fn service<S>(&self, service: S) -> L::Service
352 where
353 L: tower::Layer<S>,
354 {
355 self.0.service(service)
356 }
357}
358
359#[derive(Debug, Clone)]
364pub struct TowerServiceNoHttp<L> {
365 inner: ServiceData,
366 rpc_middleware: RpcServiceBuilder<L>,
367}
368
369impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
370where
371 RpcMiddleware: for<'a> Layer<RpcService>,
372 for<'a> <RpcMiddleware as Layer<RpcService>>::Service:
373 Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
374{
375 type Response = Option<String>;
381
382 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
383
384 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
385
386 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
388 Poll::Ready(Ok(()))
389 }
390
391 fn call(&mut self, request: String) -> Self::Future {
392 trace!("{:?}", request);
393
394 let cfg = RpcServiceCfg {
395 bounded_subscriptions: BoundedSubscriptions::new(
396 self.inner.server_cfg.max_subscriptions_per_connection,
397 ),
398 id_provider: self.inner.id_provider.clone(),
399 sink: self.inner.method_sink.clone(),
400 };
401
402 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
403 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
404 let conn = self.inner.conn_permit.clone();
405 let rpc_service = self.rpc_middleware.service(RpcService::new(
406 self.inner.methods.clone(),
407 max_response_body_size,
408 self.inner.conn_id.into(),
409 cfg,
410 ));
411 let f = tokio::task::spawn(async move {
418 ipc::call_with_service(
419 request,
420 rpc_service,
421 max_response_body_size,
422 max_request_body_size,
423 conn,
424 )
425 .await
426 });
427
428 Box::pin(async move { f.await.map_err(|err| err.into()) })
429 }
430}
431
432struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
433 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
434 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
435 conn_permit: Arc<ConnectionPermit>,
436 conn_id: u32,
437 server_cfg: Settings,
438 stop_handle: StopHandle,
439 drop_on_completion: mpsc::Sender<()>,
440 methods: Methods,
441 id_provider: Arc<dyn IdProvider>,
442 local_socket_stream: LocalSocketStream,
443}
444
445#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id))]
447fn process_connection<RpcMiddleware, HttpMiddleware>(
448 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
449) where
450 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
451 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
452 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
453 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
454 + Service<
455 String,
456 Response = Option<String>,
457 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
458 >,
459 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
460 Send + Unpin,
461 {
462 let ProcessConnection {
463 http_middleware,
464 rpc_middleware,
465 conn_permit,
466 conn_id,
467 server_cfg,
468 stop_handle,
469 drop_on_completion,
470 id_provider,
471 methods,
472 local_socket_stream,
473 } = params;
474
475 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
476 StreamCodec::stream_incoming(),
477 local_socket_stream,
478 ));
479
480 let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
481 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
482 let tower_service = TowerServiceNoHttp {
483 inner: ServiceData {
484 methods,
485 id_provider,
486 stop_handle: stop_handle.clone(),
487 server_cfg: server_cfg.clone(),
488 conn_id,
489 conn_permit,
490 bounded_subscriptions: BoundedSubscriptions::new(
491 server_cfg.max_subscriptions_per_connection,
492 ),
493 method_sink,
494 },
495 rpc_middleware,
496 };
497
498 let service = http_middleware.service(tower_service);
499 tokio::spawn(async {
500 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
501 drop(drop_on_completion)
502 });
503}
504
505async fn to_ipc_service<S, T>(
506 ipc: IpcConn<JsonRpcStream<T>>,
507 service: S,
508 stop_handle: StopHandle,
509 rx: mpsc::Receiver<Box<JsonRawValue>>,
510) where
511 S: Service<String, Response = Option<String>> + Send + 'static,
512 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
513 S::Future: Send + Unpin,
514 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
515{
516 let rx_item = ReceiverStream::new(rx);
517 let conn = IpcConnDriver {
518 conn: ipc,
519 service,
520 pending_calls: Default::default(),
521 items: Default::default(),
522 };
523 let stopped = stop_handle.shutdown();
524
525 let mut conn = pin!(conn);
526 let mut rx_item = pin!(rx_item);
527 let mut stopped = pin!(stopped);
528
529 loop {
530 tokio::select! {
531 _ = &mut conn => {
532 break
533 }
534 item = rx_item.next() => {
535 if let Some(item) = item {
536 conn.push_back(item.to_string());
537 }
538 }
539 _ = &mut stopped => {
540 break
542 }
543 }
544 }
545}
546
547#[derive(Debug, Clone)]
549pub struct Settings {
550 max_request_body_size: u32,
552 max_response_body_size: u32,
554 max_log_length: u32,
558 max_connections: u32,
560 max_subscriptions_per_connection: u32,
562 message_buffer_capacity: u32,
564 tokio_runtime: Option<tokio::runtime::Handle>,
566 ipc_socket_permissions: Option<String>,
568}
569
570impl Default for Settings {
571 fn default() -> Self {
572 Self {
573 max_request_body_size: TEN_MB_SIZE_BYTES,
574 max_response_body_size: TEN_MB_SIZE_BYTES,
575 max_log_length: 4096,
576 max_connections: 100,
577 max_subscriptions_per_connection: 1024,
578 message_buffer_capacity: 1024,
579 tokio_runtime: None,
580 ipc_socket_permissions: None,
581 }
582 }
583}
584
585#[derive(Debug)]
587pub struct Builder<HttpMiddleware, RpcMiddleware> {
588 settings: Settings,
589 id_provider: Arc<dyn IdProvider>,
591 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
592 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
593}
594
595impl Default for Builder<Identity, Identity> {
596 fn default() -> Self {
597 Self {
598 settings: Settings::default(),
599 id_provider: Arc::new(RandomIntegerIdProvider),
600 rpc_middleware: RpcServiceBuilder::new(),
601 http_middleware: tower::ServiceBuilder::new(),
602 }
603 }
604}
605
606impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
607 pub const fn max_request_body_size(mut self, size: u32) -> Self {
609 self.settings.max_request_body_size = size;
610 self
611 }
612
613 pub const fn max_response_body_size(mut self, size: u32) -> Self {
615 self.settings.max_response_body_size = size;
616 self
617 }
618
619 pub const fn max_log_length(mut self, size: u32) -> Self {
621 self.settings.max_log_length = size;
622 self
623 }
624
625 pub const fn max_connections(mut self, max: u32) -> Self {
627 self.settings.max_connections = max;
628 self
629 }
630
631 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
633 self.settings.max_subscriptions_per_connection = max;
634 self
635 }
636
637 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
655 self.settings.message_buffer_capacity = c;
656 self
657 }
658
659 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
663 self.settings.tokio_runtime = Some(rt);
664 self
665 }
666
667 pub fn set_ipc_socket_permissions(mut self, permissions: Option<String>) -> Self {
669 self.settings.ipc_socket_permissions = permissions;
670 self
671 }
672
673 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
694 self.id_provider = Arc::new(id_provider);
695 self
696 }
697
698 pub fn set_http_middleware<T>(
715 self,
716 service_builder: tower::ServiceBuilder<T>,
717 ) -> Builder<T, RpcMiddleware> {
718 Builder {
719 settings: self.settings,
720 id_provider: self.id_provider,
721 http_middleware: service_builder,
722 rpc_middleware: self.rpc_middleware,
723 }
724 }
725
726 pub fn set_rpc_middleware<T>(
738 self,
739 rpc_middleware: RpcServiceBuilder<T>,
740 ) -> Builder<HttpMiddleware, T> {
741 Builder {
742 settings: self.settings,
743 id_provider: self.id_provider,
744 rpc_middleware,
745 http_middleware: self.http_middleware,
746 }
747 }
748
749 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
751 IpcServer {
752 endpoint,
753 cfg: self.settings,
754 id_provider: self.id_provider,
755 http_middleware: self.http_middleware,
756 rpc_middleware: self.rpc_middleware,
757 }
758 }
759}
760
761#[cfg(test)]
762#[expect(missing_docs)]
763pub fn dummy_name() -> String {
764 use rand::Rng;
765 let num: u64 = rand::rng().random();
766 format!(r"/tmp/my-uds-{num}")
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772 use crate::client::IpcClientBuilder;
773 use futures::future::select;
774 use jsonrpsee::{
775 core::{
776 client::{self, ClientT, Error, Subscription, SubscriptionClientT},
777 middleware::{Batch, BatchEntry, Notification},
778 params::BatchRequestBuilder,
779 },
780 rpc_params,
781 types::Request,
782 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
783 };
784 use reth_tracing::init_test_tracing;
785 use std::pin::pin;
786 use tokio::sync::broadcast;
787 use tokio_stream::wrappers::BroadcastStream;
788
789 #[tokio::test]
790 #[cfg(unix)]
791 async fn test_ipc_socket_permissions() {
792 use std::os::unix::fs::PermissionsExt;
793 let endpoint = &dummy_name();
794 let perms = "0777";
795 let server = Builder::default()
796 .set_ipc_socket_permissions(Some(perms.to_string()))
797 .build(endpoint.clone());
798 let module = RpcModule::new(());
799 let handle = server.start(module).await.unwrap();
800 tokio::spawn(handle.stopped());
801
802 let meta = std::fs::metadata(endpoint).unwrap();
803 let perms = meta.permissions();
804 assert_eq!(perms.mode() & 0o777, 0o777);
805 }
806
807 async fn pipe_from_stream_with_bounded_buffer(
808 pending: PendingSubscriptionSink,
809 stream: BroadcastStream<usize>,
810 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
811 let sink = pending.accept().await.unwrap();
812 let closed = sink.closed();
813
814 let mut closed = pin!(closed);
815 let mut stream = pin!(stream);
816
817 loop {
818 match select(closed, stream.next()).await {
819 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
821
822 Either::Right((Some(Ok(item)), c)) => {
824 let raw_value = serde_json::value::to_raw_value(&item)?;
825 let notif = SubscriptionMessage::from(raw_value);
826
827 if sink.send(notif).await.is_err() {
831 break Ok(());
832 }
833
834 closed = c;
835 }
836
837 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
839 }
840 }
841 }
842
843 fn produce_items(tx: broadcast::Sender<usize>) {
845 for c in 1..=100 {
846 std::thread::sleep(std::time::Duration::from_millis(1));
847 let _ = tx.send(c);
848 }
849 }
850
851 #[tokio::test]
852 async fn can_set_the_max_response_body_size() {
853 let endpoint = &dummy_name();
855 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
856 let mut module = RpcModule::new(());
857 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
858 let handle = server.start(module).await.unwrap();
859 tokio::spawn(handle.stopped());
860
861 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
862 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
863 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
864 }
865
866 #[tokio::test]
867 async fn can_set_the_max_request_body_size() {
868 init_test_tracing();
869 let endpoint = &dummy_name();
870 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
871 let mut module = RpcModule::new(());
872 module.register_method("anything", |_, _, _| "succeed").unwrap();
873 let handle = server.start(module).await.unwrap();
874 tokio::spawn(handle.stopped());
875
876 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
877 let response: Result<String, Error> =
878 client.request("anything", rpc_params!["a".repeat(101)]).await;
879 assert!(response.is_err());
880 let mut batch_request_builder = BatchRequestBuilder::new();
881 let _ = batch_request_builder.insert("anything", rpc_params![]);
882 let _ = batch_request_builder.insert("anything", rpc_params![]);
883 let _ = batch_request_builder.insert("anything", rpc_params![]);
884 let response: Result<client::BatchResponse<'_, String>, Error> =
889 client.batch_request(batch_request_builder).await;
890 assert!(response.is_err());
891 }
892
893 #[tokio::test]
894 async fn can_set_max_connections() {
895 init_test_tracing();
896
897 let endpoint = &dummy_name();
898 let server = Builder::default().max_connections(2).build(endpoint.clone());
899 let mut module = RpcModule::new(());
900 module.register_method("anything", |_, _, _| "succeed").unwrap();
901 let handle = server.start(module).await.unwrap();
902 tokio::spawn(handle.stopped());
903
904 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
905 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
906 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
907
908 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
909 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
910 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
911
912 assert!(response1.is_ok());
913 assert!(response2.is_ok());
914 assert!(response3.is_err());
916
917 drop(client2);
919 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
920
921 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
923 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
924 assert!(response4.is_ok());
925 }
926
927 #[tokio::test]
928 async fn test_rpc_request() {
929 init_test_tracing();
930 let endpoint = &dummy_name();
931 let server = Builder::default().build(endpoint.clone());
932 let mut module = RpcModule::new(());
933 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
934 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
935 let handle = server.start(module).await.unwrap();
936 tokio::spawn(handle.stopped());
937
938 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
939 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
940 assert_eq!(response, msg);
941 }
942
943 #[tokio::test]
944 async fn test_batch_request() {
945 let endpoint = &dummy_name();
946 let server = Builder::default().build(endpoint.clone());
947 let mut module = RpcModule::new(());
948 module.register_method("anything", |_, _, _| "ok").unwrap();
949 let handle = server.start(module).await.unwrap();
950 tokio::spawn(handle.stopped());
951
952 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
953 let mut batch_request_builder = BatchRequestBuilder::new();
954 let _ = batch_request_builder.insert("anything", rpc_params![]);
955 let _ = batch_request_builder.insert("anything", rpc_params![]);
956 let _ = batch_request_builder.insert("anything", rpc_params![]);
957 let result = client
958 .batch_request(batch_request_builder)
959 .await
960 .unwrap()
961 .into_ok()
962 .unwrap()
963 .collect::<Vec<String>>();
964 assert_eq!(result, vec!["ok", "ok", "ok"]);
965 }
966
967 #[tokio::test]
968 async fn test_ipc_modules() {
969 reth_tracing::init_test_tracing();
970 let endpoint = &dummy_name();
971 let server = Builder::default().build(endpoint.clone());
972 let mut module = RpcModule::new(());
973 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
974 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
975 let handle = server.start(module).await.unwrap();
976 tokio::spawn(handle.stopped());
977
978 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
979 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
980 assert_eq!(response, msg);
981 }
982
983 #[tokio::test(flavor = "multi_thread")]
984 async fn test_rpc_subscription() {
985 let endpoint = &dummy_name();
986 let server = Builder::default().build(endpoint.clone());
987 let (tx, _rx) = broadcast::channel::<usize>(16);
988
989 let mut module = RpcModule::new(tx.clone());
990 std::thread::spawn(move || produce_items(tx));
991
992 module
993 .register_subscription(
994 "subscribe_hello",
995 "s_hello",
996 "unsubscribe_hello",
997 |_, pending, tx, _| async move {
998 let rx = tx.subscribe();
999 let stream = BroadcastStream::new(rx);
1000 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1001 Ok(())
1002 },
1003 )
1004 .unwrap();
1005
1006 let handle = server.start(module).await.unwrap();
1007 tokio::spawn(handle.stopped());
1008
1009 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1010 let sub: Subscription<usize> =
1011 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1012
1013 let items = sub.take(16).collect::<Vec<_>>().await;
1014 assert_eq!(items.len(), 16);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_rpc_middleware() {
1019 #[derive(Clone)]
1020 struct ModifyRequestIf<S>(S);
1021
1022 impl<S> RpcServiceT for ModifyRequestIf<S>
1023 where
1024 S: Send + Sync + RpcServiceT,
1025 {
1026 type MethodResponse = S::MethodResponse;
1027 type NotificationResponse = S::NotificationResponse;
1028 type BatchResponse = S::BatchResponse;
1029
1030 fn call<'a>(
1031 &self,
1032 mut req: Request<'a>,
1033 ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
1034 if req.method == "say_hello" {
1036 req.method = "say_goodbye".into();
1037 } else if req.method == "say_goodbye" {
1038 req.method = "say_hello".into();
1039 }
1040
1041 self.0.call(req)
1042 }
1043
1044 fn batch<'a>(
1045 &self,
1046 mut batch: Batch<'a>,
1047 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1048 for call in batch.iter_mut() {
1049 match call {
1050 Ok(BatchEntry::Call(req)) => {
1051 if req.method == "say_hello" {
1052 req.method = "say_goodbye".into();
1053 } else if req.method == "say_goodbye" {
1054 req.method = "say_hello".into();
1055 }
1056 }
1057 Ok(BatchEntry::Notification(n)) => {
1058 if n.method == "say_hello" {
1059 n.method = "say_goodbye".into();
1060 } else if n.method == "say_goodbye" {
1061 n.method = "say_hello".into();
1062 }
1063 }
1064 Err(_err) => {}
1066 }
1067 }
1068
1069 self.0.batch(batch)
1070 }
1071
1072 fn notification<'a>(
1073 &self,
1074 mut n: Notification<'a>,
1075 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1076 if n.method == "say_hello" {
1077 n.method = "say_goodbye".into();
1078 } else if n.method == "say_goodbye" {
1079 n.method = "say_hello".into();
1080 }
1081 self.0.notification(n)
1082 }
1083 }
1084
1085 reth_tracing::init_test_tracing();
1086 let endpoint = &dummy_name();
1087
1088 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1089 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1090
1091 let mut module = RpcModule::new(());
1092 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1093 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1094 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1095 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1096 let handle = server.start(module).await.unwrap();
1097 tokio::spawn(handle.stopped());
1098
1099 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1100 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1101 let say_goodbye_response: String =
1102 client.request("say_goodbye", rpc_params![]).await.unwrap();
1103
1104 assert_eq!(say_hello_response, goodbye_msg);
1105 assert_eq!(say_goodbye_response, hello_msg);
1106 }
1107}