1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::{
13 middleware::layer::{Either as RpcEither, RpcLoggerLayer},
14 JsonRawValue, TEN_MB_SIZE_BYTES,
15 },
16 server::{
17 middleware::rpc::RpcServiceT, stop_channel, ConnectionGuard, ConnectionPermit, IdProvider,
18 RandomIntegerIdProvider, ServerHandle, StopHandle,
19 },
20 BoundedSubscriptions, MethodResponse, MethodSink, Methods,
21};
22use std::{
23 future::Future,
24 io,
25 pin::{pin, Pin},
26 sync::Arc,
27 task::{Context, Poll},
28};
29use tokio::{
30 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
31 sync::oneshot,
32};
33use tower::{layer::util::Identity, Layer, Service};
34use tracing::{debug, instrument, trace, warn, Instrument};
35use crate::{
37 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
38 stream_codec::StreamCodec,
39};
40use tokio::sync::mpsc;
41use tokio_stream::wrappers::ReceiverStream;
42use tower::layer::{util::Stack, LayerFn};
43
44mod connection;
45mod ipc;
46mod rpc_service;
47
48pub use rpc_service::RpcService;
49
50pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
54 endpoint: String,
56 id_provider: Arc<dyn IdProvider>,
57 cfg: Settings,
58 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
59 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
60}
61
62impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
63 pub fn endpoint(&self) -> String {
65 self.endpoint.clone()
66 }
67}
68
69impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
70where
71 RpcMiddleware: Layer<RpcService, Service: RpcServiceT> + Clone + Send + 'static,
72 HttpMiddleware: Layer<
73 TowerServiceNoHttp<RpcMiddleware>,
74 Service: Service<
75 String,
76 Response = Option<String>,
77 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
78 Future: Send + Unpin,
79 > + Send,
80 > + Send
81 + 'static,
82{
83 pub async fn start(
105 mut self,
106 methods: impl Into<Methods>,
107 ) -> Result<ServerHandle, IpcServerStartError> {
108 let methods = methods.into();
109
110 let (stop_handle, server_handle) = stop_channel();
111
112 let (tx, rx) = oneshot::channel();
114
115 match self.cfg.tokio_runtime.take() {
116 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
117 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
118 };
119 rx.await.expect("channel is open")?;
120
121 Ok(server_handle)
122 }
123
124 async fn start_inner(
125 self,
126 methods: Methods,
127 stop_handle: StopHandle,
128 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
129 ) {
130 trace!(endpoint = ?self.endpoint, "starting ipc server");
131
132 if cfg!(unix) {
133 if std::fs::remove_file(&self.endpoint).is_ok() {
135 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
136 }
137 }
138
139 let listener = match self
140 .endpoint
141 .as_str()
142 .to_fs_name::<GenericFilePath>()
143 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
144 {
145 Ok(listener) => {
146 #[cfg(unix)]
147 {
148 use std::os::unix::fs::PermissionsExt;
150 if let Some(perms_str) = &self.cfg.ipc_socket_permissions &&
151 let Ok(mode) = u32::from_str_radix(&perms_str.replace("0o", ""), 8)
152 {
153 let perms = std::fs::Permissions::from_mode(mode);
154 let _ = std::fs::set_permissions(&self.endpoint, perms);
155 }
156 }
157 listener
158 }
159 Err(err) => {
160 on_ready
161 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
162 .ok();
163 return;
164 }
165 };
166
167 on_ready.send(Ok(())).ok();
169
170 let mut id: u32 = 0;
171 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
172
173 let stopped = stop_handle.clone().shutdown();
174 let mut stopped = pin!(stopped);
175
176 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
177
178 trace!("accepting ipc connections");
179 loop {
180 match try_accept_conn(&listener, stopped).await {
181 AcceptConnection::Established { local_socket_stream, stop } => {
182 let Some(conn_permit) = connection_guard.try_acquire() else {
183 let (_reader, mut writer) = local_socket_stream.split();
184 let _ = writer
185 .write_all(b"Too many connections. Please try again later.")
186 .await;
187 stopped = stop;
188 continue;
189 };
190
191 let max_conns = connection_guard.max_connections();
192 let curr_conns = max_conns - connection_guard.available_connections();
193 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
194
195 let conn_permit = Arc::new(conn_permit);
196
197 process_connection(ProcessConnection {
198 http_middleware: &self.http_middleware,
199 rpc_middleware: self.rpc_middleware.clone(),
200 conn_permit,
201 conn_id: id,
202 server_cfg: self.cfg.clone(),
203 stop_handle: stop_handle.clone(),
204 drop_on_completion: drop_on_completion.clone(),
205 methods: methods.clone(),
206 id_provider: self.id_provider.clone(),
207 local_socket_stream,
208 });
209
210 id = id.wrapping_add(1);
211 stopped = stop;
212 }
213 AcceptConnection::Shutdown => {
214 break;
215 }
216 AcceptConnection::Err((err, stop)) => {
217 tracing::error!(%err, "Failed accepting a new IPC connection");
218 stopped = stop;
219 }
220 }
221 }
222
223 drop(drop_on_completion);
225
226 while process_connection_awaiter.recv().await.is_some() {
229 }
232 }
233}
234
235enum AcceptConnection<S> {
236 Shutdown,
237 Established { local_socket_stream: LocalSocketStream, stop: S },
238 Err((io::Error, S)),
239}
240
241async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
242where
243 S: Future + Unpin,
244{
245 match futures_util::future::select(pin!(listener.accept()), stopped).await {
246 Either::Left((res, stop)) => match res {
247 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
248 Err(e) => AcceptConnection::Err((e, stop)),
249 },
250 Either::Right(_) => AcceptConnection::Shutdown,
251 }
252}
253
254impl<HttpMiddleware, RpcMiddleware> std::fmt::Debug for IpcServer<HttpMiddleware, RpcMiddleware> {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 f.debug_struct("IpcServer")
257 .field("endpoint", &self.endpoint)
258 .field("cfg", &self.cfg)
259 .field("id_provider", &self.id_provider)
260 .finish()
261 }
262}
263
264#[derive(Debug, thiserror::Error)]
266#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
267pub struct IpcServerStartError {
268 endpoint: String,
269 #[source]
270 source: io::Error,
271}
272
273#[derive(Debug, Clone)]
275#[allow(dead_code)]
276pub(crate) struct ServiceData {
277 pub(crate) methods: Methods,
279 pub(crate) id_provider: Arc<dyn IdProvider>,
281 pub(crate) stop_handle: StopHandle,
283 pub(crate) conn_id: u32,
285 pub(crate) conn_permit: Arc<ConnectionPermit>,
287 pub(crate) bounded_subscriptions: BoundedSubscriptions,
289 pub(crate) method_sink: MethodSink,
293 pub(crate) server_cfg: Settings,
295}
296
297#[derive(Debug, Clone)]
300pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
301
302impl Default for RpcServiceBuilder<Identity> {
303 fn default() -> Self {
304 Self(tower::ServiceBuilder::new())
305 }
306}
307
308impl RpcServiceBuilder<Identity> {
309 pub const fn new() -> Self {
311 Self(tower::ServiceBuilder::new())
312 }
313}
314
315impl<L> RpcServiceBuilder<L> {
316 pub fn option_layer<T>(
320 self,
321 layer: Option<T>,
322 ) -> RpcServiceBuilder<Stack<RpcEither<T, Identity>, L>> {
323 let layer = if let Some(layer) = layer {
324 RpcEither::Left(layer)
325 } else {
326 RpcEither::Right(Identity::new())
327 };
328 self.layer(layer)
329 }
330
331 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
335 RpcServiceBuilder(self.0.layer(layer))
336 }
337
338 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
343 RpcServiceBuilder(self.0.layer_fn(f))
344 }
345
346 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
350 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
351 }
352
353 pub(crate) fn service<S>(&self, service: S) -> L::Service
355 where
356 L: tower::Layer<S>,
357 {
358 self.0.service(service)
359 }
360}
361
362#[derive(Debug, Clone)]
367pub struct TowerServiceNoHttp<L> {
368 inner: ServiceData,
369 rpc_middleware: RpcServiceBuilder<L>,
370}
371
372impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
373where
374 RpcMiddleware: Layer<RpcService>,
375 <RpcMiddleware as Layer<RpcService>>::Service:
376 Send + Sync + 'static + RpcServiceT<MethodResponse = MethodResponse>,
377{
378 type Response = Option<String>;
384
385 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
386
387 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
388
389 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
391 Poll::Ready(Ok(()))
392 }
393
394 fn call(&mut self, request: String) -> Self::Future {
395 trace!("{:?}", request);
396
397 let cfg = RpcServiceCfg {
398 bounded_subscriptions: BoundedSubscriptions::new(
399 self.inner.server_cfg.max_subscriptions_per_connection,
400 ),
401 id_provider: self.inner.id_provider.clone(),
402 sink: self.inner.method_sink.clone(),
403 };
404
405 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
406 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
407 let conn = self.inner.conn_permit.clone();
408 let rpc_service = self.rpc_middleware.service(RpcService::new(
409 self.inner.methods.clone(),
410 max_response_body_size,
411 self.inner.conn_id.into(),
412 cfg,
413 ));
414 let f = tokio::task::spawn(async move {
421 ipc::call_with_service(
422 request,
423 rpc_service,
424 max_response_body_size,
425 max_request_body_size,
426 conn,
427 )
428 .await
429 });
430
431 Box::pin(async move { f.await.map_err(|err| err.into()) })
432 }
433}
434
435struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
436 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
437 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
438 conn_permit: Arc<ConnectionPermit>,
439 conn_id: u32,
440 server_cfg: Settings,
441 stop_handle: StopHandle,
442 drop_on_completion: mpsc::Sender<()>,
443 methods: Methods,
444 id_provider: Arc<dyn IdProvider>,
445 local_socket_stream: LocalSocketStream,
446}
447
448#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id))]
450fn process_connection<RpcMiddleware, HttpMiddleware>(
451 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
452) where
453 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
454 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT,
455 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
456 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
457 + Service<
458 String,
459 Response = Option<String>,
460 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
461 >,
462 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
463 Send + Unpin,
464 {
465 let ProcessConnection {
466 http_middleware,
467 rpc_middleware,
468 conn_permit,
469 conn_id,
470 server_cfg,
471 stop_handle,
472 drop_on_completion,
473 id_provider,
474 methods,
475 local_socket_stream,
476 } = params;
477
478 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
479 StreamCodec::stream_incoming(),
480 local_socket_stream,
481 ));
482
483 let (tx, rx) = mpsc::channel::<Box<JsonRawValue>>(server_cfg.message_buffer_capacity as usize);
484 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
485 let tower_service = TowerServiceNoHttp {
486 inner: ServiceData {
487 methods,
488 id_provider,
489 stop_handle: stop_handle.clone(),
490 server_cfg: server_cfg.clone(),
491 conn_id,
492 conn_permit,
493 bounded_subscriptions: BoundedSubscriptions::new(
494 server_cfg.max_subscriptions_per_connection,
495 ),
496 method_sink,
497 },
498 rpc_middleware,
499 };
500
501 let service = http_middleware.service(tower_service);
502 tokio::spawn(async {
503 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
504 drop(drop_on_completion)
505 });
506}
507
508async fn to_ipc_service<S, T>(
509 ipc: IpcConn<JsonRpcStream<T>>,
510 service: S,
511 stop_handle: StopHandle,
512 rx: mpsc::Receiver<Box<JsonRawValue>>,
513) where
514 S: Service<String, Response = Option<String>> + Send + 'static,
515 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
516 S::Future: Send + Unpin,
517 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
518{
519 let rx_item = ReceiverStream::new(rx);
520 let conn = IpcConnDriver {
521 conn: ipc,
522 service,
523 pending_calls: Default::default(),
524 items: Default::default(),
525 };
526 let stopped = stop_handle.shutdown();
527
528 let mut conn = pin!(conn);
529 let mut rx_item = pin!(rx_item);
530 let mut stopped = pin!(stopped);
531
532 loop {
533 tokio::select! {
534 _ = &mut conn => {
535 break
536 }
537 item = rx_item.next() => {
538 let Some(item) = item else { break };
539 conn.push_back(item.to_string());
540 }
541 _ = &mut stopped => {
542 break
544 }
545 }
546 }
547}
548
549#[derive(Debug, Clone)]
551pub struct Settings {
552 max_request_body_size: u32,
554 max_response_body_size: u32,
556 max_log_length: u32,
560 max_connections: u32,
562 max_subscriptions_per_connection: u32,
564 message_buffer_capacity: u32,
566 tokio_runtime: Option<tokio::runtime::Handle>,
568 ipc_socket_permissions: Option<String>,
570}
571
572impl Default for Settings {
573 fn default() -> Self {
574 Self {
575 max_request_body_size: TEN_MB_SIZE_BYTES,
576 max_response_body_size: TEN_MB_SIZE_BYTES,
577 max_log_length: 4096,
578 max_connections: 100,
579 max_subscriptions_per_connection: 1024,
580 message_buffer_capacity: 1024,
581 tokio_runtime: None,
582 ipc_socket_permissions: None,
583 }
584 }
585}
586
587#[derive(Debug)]
589pub struct Builder<HttpMiddleware, RpcMiddleware> {
590 settings: Settings,
591 id_provider: Arc<dyn IdProvider>,
593 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
594 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
595}
596
597impl Default for Builder<Identity, Identity> {
598 fn default() -> Self {
599 Self {
600 settings: Settings::default(),
601 id_provider: Arc::new(RandomIntegerIdProvider),
602 rpc_middleware: RpcServiceBuilder::new(),
603 http_middleware: tower::ServiceBuilder::new(),
604 }
605 }
606}
607
608impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
609 pub const fn max_request_body_size(mut self, size: u32) -> Self {
611 self.settings.max_request_body_size = size;
612 self
613 }
614
615 pub const fn max_response_body_size(mut self, size: u32) -> Self {
617 self.settings.max_response_body_size = size;
618 self
619 }
620
621 pub const fn max_log_length(mut self, size: u32) -> Self {
623 self.settings.max_log_length = size;
624 self
625 }
626
627 pub const fn max_connections(mut self, max: u32) -> Self {
629 self.settings.max_connections = max;
630 self
631 }
632
633 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
635 self.settings.max_subscriptions_per_connection = max;
636 self
637 }
638
639 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
657 self.settings.message_buffer_capacity = c;
658 self
659 }
660
661 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
665 self.settings.tokio_runtime = Some(rt);
666 self
667 }
668
669 pub fn set_ipc_socket_permissions(mut self, permissions: Option<String>) -> Self {
671 self.settings.ipc_socket_permissions = permissions;
672 self
673 }
674
675 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
696 self.id_provider = Arc::new(id_provider);
697 self
698 }
699
700 pub fn set_http_middleware<T>(
717 self,
718 service_builder: tower::ServiceBuilder<T>,
719 ) -> Builder<T, RpcMiddleware> {
720 Builder {
721 settings: self.settings,
722 id_provider: self.id_provider,
723 http_middleware: service_builder,
724 rpc_middleware: self.rpc_middleware,
725 }
726 }
727
728 pub fn set_rpc_middleware<T>(
740 self,
741 rpc_middleware: RpcServiceBuilder<T>,
742 ) -> Builder<HttpMiddleware, T> {
743 Builder {
744 settings: self.settings,
745 id_provider: self.id_provider,
746 rpc_middleware,
747 http_middleware: self.http_middleware,
748 }
749 }
750
751 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
753 IpcServer {
754 endpoint,
755 cfg: self.settings,
756 id_provider: self.id_provider,
757 http_middleware: self.http_middleware,
758 rpc_middleware: self.rpc_middleware,
759 }
760 }
761}
762
763#[cfg(test)]
764#[expect(missing_docs)]
765pub fn dummy_name() -> String {
766 use rand::Rng;
767 let num: u64 = rand::rng().random();
768 format!(r"/tmp/my-uds-{num}")
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774 use crate::client::IpcClientBuilder;
775 use futures::future::select;
776 use jsonrpsee::{
777 core::{
778 client::{self, ClientT, Error, Subscription, SubscriptionClientT},
779 middleware::{Batch, BatchEntry, Notification},
780 params::BatchRequestBuilder,
781 },
782 rpc_params,
783 types::Request,
784 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
785 };
786 use reth_tracing::init_test_tracing;
787 use std::pin::pin;
788 use tokio::sync::broadcast;
789 use tokio_stream::wrappers::BroadcastStream;
790
791 #[tokio::test]
792 #[cfg(unix)]
793 async fn test_ipc_socket_permissions() {
794 use std::os::unix::fs::PermissionsExt;
795 let endpoint = &dummy_name();
796 let perms = "0777";
797 let server = Builder::default()
798 .set_ipc_socket_permissions(Some(perms.to_string()))
799 .build(endpoint.clone());
800 let module = RpcModule::new(());
801 let handle = server.start(module).await.unwrap();
802 tokio::spawn(handle.stopped());
803
804 let meta = std::fs::metadata(endpoint).unwrap();
805 let perms = meta.permissions();
806 assert_eq!(perms.mode() & 0o777, 0o777);
807 }
808
809 async fn pipe_from_stream_with_bounded_buffer(
810 pending: PendingSubscriptionSink,
811 stream: BroadcastStream<usize>,
812 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
813 let sink = pending.accept().await.unwrap();
814 let closed = sink.closed();
815
816 let mut closed = pin!(closed);
817 let mut stream = pin!(stream);
818
819 loop {
820 match select(closed, stream.next()).await {
821 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
823
824 Either::Right((Some(Ok(item)), c)) => {
826 let raw_value = serde_json::value::to_raw_value(&item)?;
827 let notif = SubscriptionMessage::from(raw_value);
828
829 if sink.send(notif).await.is_err() {
833 break Ok(());
834 }
835
836 closed = c;
837 }
838
839 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
841 }
842 }
843 }
844
845 fn produce_items(tx: broadcast::Sender<usize>) {
847 for c in 1..=100 {
848 std::thread::sleep(std::time::Duration::from_millis(1));
849 let _ = tx.send(c);
850 }
851 }
852
853 #[tokio::test]
854 async fn can_set_the_max_response_body_size() {
855 let endpoint = &dummy_name();
857 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
858 let mut module = RpcModule::new(());
859 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
860 let handle = server.start(module).await.unwrap();
861 tokio::spawn(handle.stopped());
862
863 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
864 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
865 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
866 }
867
868 #[tokio::test]
869 async fn can_set_the_max_request_body_size() {
870 init_test_tracing();
871 let endpoint = &dummy_name();
872 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
873 let mut module = RpcModule::new(());
874 module.register_method("anything", |_, _, _| "succeed").unwrap();
875 let handle = server.start(module).await.unwrap();
876 tokio::spawn(handle.stopped());
877
878 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
879 let response: Result<String, Error> =
880 client.request("anything", rpc_params!["a".repeat(101)]).await;
881 assert!(response.is_err());
882 let mut batch_request_builder = BatchRequestBuilder::new();
883 let _ = batch_request_builder.insert("anything", rpc_params![]);
884 let _ = batch_request_builder.insert("anything", rpc_params![]);
885 let _ = batch_request_builder.insert("anything", rpc_params![]);
886 let response: Result<client::BatchResponse<'_, String>, Error> =
891 client.batch_request(batch_request_builder).await;
892 assert!(response.is_err());
893 }
894
895 #[tokio::test]
896 async fn can_set_max_connections() {
897 init_test_tracing();
898
899 let endpoint = &dummy_name();
900 let server = Builder::default().max_connections(2).build(endpoint.clone());
901 let mut module = RpcModule::new(());
902 module.register_method("anything", |_, _, _| "succeed").unwrap();
903 let handle = server.start(module).await.unwrap();
904 tokio::spawn(handle.stopped());
905
906 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
907 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
908 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
909
910 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
911 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
912 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
913
914 assert!(response1.is_ok());
915 assert!(response2.is_ok());
916 assert!(response3.is_err());
918
919 drop(client2);
921 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
922
923 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
925 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
926 assert!(response4.is_ok());
927 }
928
929 #[tokio::test]
930 async fn test_rpc_request() {
931 init_test_tracing();
932 let endpoint = &dummy_name();
933 let server = Builder::default().build(endpoint.clone());
934 let mut module = RpcModule::new(());
935 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
936 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
937 let handle = server.start(module).await.unwrap();
938 tokio::spawn(handle.stopped());
939
940 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
941 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
942 assert_eq!(response, msg);
943 }
944
945 #[tokio::test]
946 async fn test_batch_request() {
947 let endpoint = &dummy_name();
948 let server = Builder::default().build(endpoint.clone());
949 let mut module = RpcModule::new(());
950 module.register_method("anything", |_, _, _| "ok").unwrap();
951 let handle = server.start(module).await.unwrap();
952 tokio::spawn(handle.stopped());
953
954 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
955 let mut batch_request_builder = BatchRequestBuilder::new();
956 let _ = batch_request_builder.insert("anything", rpc_params![]);
957 let _ = batch_request_builder.insert("anything", rpc_params![]);
958 let _ = batch_request_builder.insert("anything", rpc_params![]);
959 let result = client
960 .batch_request(batch_request_builder)
961 .await
962 .unwrap()
963 .into_ok()
964 .unwrap()
965 .collect::<Vec<String>>();
966 assert_eq!(result, vec!["ok", "ok", "ok"]);
967 }
968
969 #[tokio::test]
970 async fn test_ipc_modules() {
971 reth_tracing::init_test_tracing();
972 let endpoint = &dummy_name();
973 let server = Builder::default().build(endpoint.clone());
974 let mut module = RpcModule::new(());
975 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
976 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
977 let handle = server.start(module).await.unwrap();
978 tokio::spawn(handle.stopped());
979
980 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
981 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
982 assert_eq!(response, msg);
983 }
984
985 #[tokio::test(flavor = "multi_thread")]
986 async fn test_rpc_subscription() {
987 let endpoint = &dummy_name();
988 let server = Builder::default().build(endpoint.clone());
989 let (tx, _rx) = broadcast::channel::<usize>(16);
990
991 let mut module = RpcModule::new(tx.clone());
992 std::thread::spawn(move || produce_items(tx));
993
994 module
995 .register_subscription(
996 "subscribe_hello",
997 "s_hello",
998 "unsubscribe_hello",
999 |_, pending, tx, _| async move {
1000 let rx = tx.subscribe();
1001 let stream = BroadcastStream::new(rx);
1002 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1003 Ok(())
1004 },
1005 )
1006 .unwrap();
1007
1008 let handle = server.start(module).await.unwrap();
1009 tokio::spawn(handle.stopped());
1010
1011 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1012 let sub: Subscription<usize> =
1013 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1014
1015 let items = sub.take(16).collect::<Vec<_>>().await;
1016 assert_eq!(items.len(), 16);
1017 }
1018
1019 #[tokio::test]
1020 async fn test_rpc_middleware() {
1021 #[derive(Clone)]
1022 struct ModifyRequestIf<S>(S);
1023
1024 impl<S> RpcServiceT for ModifyRequestIf<S>
1025 where
1026 S: Send + Sync + RpcServiceT,
1027 {
1028 type MethodResponse = S::MethodResponse;
1029 type NotificationResponse = S::NotificationResponse;
1030 type BatchResponse = S::BatchResponse;
1031
1032 fn call<'a>(
1033 &self,
1034 mut req: Request<'a>,
1035 ) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
1036 if req.method == "say_hello" {
1038 req.method = "say_goodbye".into();
1039 } else if req.method == "say_goodbye" {
1040 req.method = "say_hello".into();
1041 }
1042
1043 self.0.call(req)
1044 }
1045
1046 fn batch<'a>(
1047 &self,
1048 mut batch: Batch<'a>,
1049 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
1050 for call in batch.iter_mut() {
1051 match call {
1052 Ok(BatchEntry::Call(req)) => {
1053 if req.method == "say_hello" {
1054 req.method = "say_goodbye".into();
1055 } else if req.method == "say_goodbye" {
1056 req.method = "say_hello".into();
1057 }
1058 }
1059 Ok(BatchEntry::Notification(n)) => {
1060 if n.method == "say_hello" {
1061 n.method = "say_goodbye".into();
1062 } else if n.method == "say_goodbye" {
1063 n.method = "say_hello".into();
1064 }
1065 }
1066 Err(_err) => {}
1068 }
1069 }
1070
1071 self.0.batch(batch)
1072 }
1073
1074 fn notification<'a>(
1075 &self,
1076 mut n: Notification<'a>,
1077 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
1078 if n.method == "say_hello" {
1079 n.method = "say_goodbye".into();
1080 } else if n.method == "say_goodbye" {
1081 n.method = "say_hello".into();
1082 }
1083 self.0.notification(n)
1084 }
1085 }
1086
1087 reth_tracing::init_test_tracing();
1088 let endpoint = &dummy_name();
1089
1090 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1091 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1092
1093 let mut module = RpcModule::new(());
1094 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1095 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1096 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1097 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1098 let handle = server.start(module).await.unwrap();
1099 tokio::spawn(handle.stopped());
1100
1101 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1102 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1103 let say_goodbye_response: String =
1104 client.request("say_goodbye", rpc_params![]).await.unwrap();
1105
1106 assert_eq!(say_hello_response, goodbye_msg);
1107 assert_eq!(say_goodbye_response, hello_msg);
1108 }
1109}