1use crate::server::connection::{IpcConn, JsonRpcStream};
4use futures::StreamExt;
5use futures_util::future::Either;
6use interprocess::local_socket::{
7 tokio::prelude::{LocalSocketListener, LocalSocketStream},
8 traits::tokio::{Listener, Stream},
9 GenericFilePath, ListenerOptions, ToFsName,
10};
11use jsonrpsee::{
12 core::TEN_MB_SIZE_BYTES,
13 server::{
14 middleware::rpc::{RpcLoggerLayer, RpcServiceT},
15 stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider,
16 ServerHandle, StopHandle,
17 },
18 BoundedSubscriptions, MethodSink, Methods,
19};
20use std::{
21 future::Future,
22 io,
23 pin::{pin, Pin},
24 sync::Arc,
25 task::{Context, Poll},
26};
27use tokio::{
28 io::{AsyncRead, AsyncWrite, AsyncWriteExt},
29 sync::oneshot,
30};
31use tower::{layer::util::Identity, Layer, Service};
32use tracing::{debug, instrument, trace, warn, Instrument};
33use crate::{
35 server::{connection::IpcConnDriver, rpc_service::RpcServiceCfg},
36 stream_codec::StreamCodec,
37};
38use tokio::sync::mpsc;
39use tokio_stream::wrappers::ReceiverStream;
40use tower::layer::{util::Stack, LayerFn};
41
42mod connection;
43mod ipc;
44mod rpc_service;
45
46pub use rpc_service::RpcService;
47
48pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
52 endpoint: String,
54 id_provider: Arc<dyn IdProvider>,
55 cfg: Settings,
56 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
57 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
58}
59
60impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
61 pub fn endpoint(&self) -> String {
63 self.endpoint.clone()
64 }
65}
66
67impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware>
68where
69 RpcMiddleware: for<'a> Layer<RpcService, Service: RpcServiceT<'a>> + Clone + Send + 'static,
70 HttpMiddleware: Layer<
71 TowerServiceNoHttp<RpcMiddleware>,
72 Service: Service<
73 String,
74 Response = Option<String>,
75 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
76 Future: Send + Unpin,
77 > + Send,
78 > + Send
79 + 'static,
80{
81 pub async fn start(
103 mut self,
104 methods: impl Into<Methods>,
105 ) -> Result<ServerHandle, IpcServerStartError> {
106 let methods = methods.into();
107
108 let (stop_handle, server_handle) = stop_channel();
109
110 let (tx, rx) = oneshot::channel();
112
113 match self.cfg.tokio_runtime.take() {
114 Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
115 None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
116 };
117 rx.await.expect("channel is open")?;
118
119 Ok(server_handle)
120 }
121
122 async fn start_inner(
123 self,
124 methods: Methods,
125 stop_handle: StopHandle,
126 on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
127 ) {
128 trace!(endpoint = ?self.endpoint, "starting ipc server");
129
130 if cfg!(unix) {
131 if std::fs::remove_file(&self.endpoint).is_ok() {
133 debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
134 }
135 }
136
137 let listener = match self
138 .endpoint
139 .as_str()
140 .to_fs_name::<GenericFilePath>()
141 .and_then(|name| ListenerOptions::new().name(name).create_tokio())
142 {
143 Ok(listener) => listener,
144 Err(err) => {
145 on_ready
146 .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
147 .ok();
148 return;
149 }
150 };
151
152 on_ready.send(Ok(())).ok();
154
155 let mut id: u32 = 0;
156 let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
157
158 let stopped = stop_handle.clone().shutdown();
159 let mut stopped = pin!(stopped);
160
161 let (drop_on_completion, mut process_connection_awaiter) = mpsc::channel::<()>(1);
162
163 trace!("accepting ipc connections");
164 loop {
165 match try_accept_conn(&listener, stopped).await {
166 AcceptConnection::Established { local_socket_stream, stop } => {
167 let Some(conn_permit) = connection_guard.try_acquire() else {
168 let (_reader, mut writer) = local_socket_stream.split();
169 let _ = writer
170 .write_all(b"Too many connections. Please try again later.")
171 .await;
172 stopped = stop;
173 continue;
174 };
175
176 let max_conns = connection_guard.max_connections();
177 let curr_conns = max_conns - connection_guard.available_connections();
178 trace!("Accepting new connection {}/{}", curr_conns, max_conns);
179
180 let conn_permit = Arc::new(conn_permit);
181
182 process_connection(ProcessConnection {
183 http_middleware: &self.http_middleware,
184 rpc_middleware: self.rpc_middleware.clone(),
185 conn_permit,
186 conn_id: id,
187 server_cfg: self.cfg.clone(),
188 stop_handle: stop_handle.clone(),
189 drop_on_completion: drop_on_completion.clone(),
190 methods: methods.clone(),
191 id_provider: self.id_provider.clone(),
192 local_socket_stream,
193 });
194
195 id = id.wrapping_add(1);
196 stopped = stop;
197 }
198 AcceptConnection::Shutdown => {
199 break;
200 }
201 AcceptConnection::Err((err, stop)) => {
202 tracing::error!(%err, "Failed accepting a new IPC connection");
203 stopped = stop;
204 }
205 }
206 }
207
208 drop(drop_on_completion);
210
211 while process_connection_awaiter.recv().await.is_some() {
214 }
217 }
218}
219
220enum AcceptConnection<S> {
221 Shutdown,
222 Established { local_socket_stream: LocalSocketStream, stop: S },
223 Err((io::Error, S)),
224}
225
226async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
227where
228 S: Future + Unpin,
229{
230 match futures_util::future::select(pin!(listener.accept()), stopped).await {
231 Either::Left((res, stop)) => match res {
232 Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
233 Err(e) => AcceptConnection::Err((e, stop)),
234 },
235 Either::Right(_) => AcceptConnection::Shutdown,
236 }
237}
238
239impl std::fmt::Debug for IpcServer {
240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
241 f.debug_struct("IpcServer")
242 .field("endpoint", &self.endpoint)
243 .field("cfg", &self.cfg)
244 .field("id_provider", &self.id_provider)
245 .finish()
246 }
247}
248
249#[derive(Debug, thiserror::Error)]
251#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")]
252pub struct IpcServerStartError {
253 endpoint: String,
254 #[source]
255 source: io::Error,
256}
257
258#[derive(Debug, Clone)]
260#[expect(dead_code)]
261pub(crate) struct ServiceData {
262 pub(crate) methods: Methods,
264 pub(crate) id_provider: Arc<dyn IdProvider>,
266 pub(crate) stop_handle: StopHandle,
268 pub(crate) conn_id: u32,
270 pub(crate) conn_permit: Arc<ConnectionPermit>,
272 pub(crate) bounded_subscriptions: BoundedSubscriptions,
274 pub(crate) method_sink: MethodSink,
278 pub(crate) server_cfg: Settings,
280}
281
282#[derive(Debug, Clone)]
285pub struct RpcServiceBuilder<L>(tower::ServiceBuilder<L>);
286
287impl Default for RpcServiceBuilder<Identity> {
288 fn default() -> Self {
289 Self(tower::ServiceBuilder::new())
290 }
291}
292
293impl RpcServiceBuilder<Identity> {
294 pub fn new() -> Self {
296 Self(tower::ServiceBuilder::new())
297 }
298}
299
300impl<L> RpcServiceBuilder<L> {
301 pub fn option_layer<T>(
305 self,
306 layer: Option<T>,
307 ) -> RpcServiceBuilder<Stack<Either<T, Identity>, L>> {
308 let layer = if let Some(layer) = layer {
309 Either::Left(layer)
310 } else {
311 Either::Right(Identity::new())
312 };
313 self.layer(layer)
314 }
315
316 pub fn layer<T>(self, layer: T) -> RpcServiceBuilder<Stack<T, L>> {
320 RpcServiceBuilder(self.0.layer(layer))
321 }
322
323 pub fn layer_fn<F>(self, f: F) -> RpcServiceBuilder<Stack<LayerFn<F>, L>> {
328 RpcServiceBuilder(self.0.layer_fn(f))
329 }
330
331 pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder<Stack<RpcLoggerLayer, L>> {
335 RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len)))
336 }
337
338 pub(crate) fn service<S>(&self, service: S) -> L::Service
340 where
341 L: tower::Layer<S>,
342 {
343 self.0.service(service)
344 }
345}
346
347#[derive(Debug, Clone)]
352pub struct TowerServiceNoHttp<L> {
353 inner: ServiceData,
354 rpc_middleware: RpcServiceBuilder<L>,
355}
356
357impl<RpcMiddleware> Service<String> for TowerServiceNoHttp<RpcMiddleware>
358where
359 RpcMiddleware: for<'a> Layer<RpcService>,
360 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: Send + Sync + 'static + RpcServiceT<'a>,
361{
362 type Response = Option<String>;
368
369 type Error = Box<dyn core::error::Error + Send + Sync + 'static>;
370
371 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
372
373 fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375 Poll::Ready(Ok(()))
376 }
377
378 fn call(&mut self, request: String) -> Self::Future {
379 trace!("{:?}", request);
380
381 let cfg = RpcServiceCfg::CallsAndSubscriptions {
382 bounded_subscriptions: BoundedSubscriptions::new(
383 self.inner.server_cfg.max_subscriptions_per_connection,
384 ),
385 id_provider: self.inner.id_provider.clone(),
386 sink: self.inner.method_sink.clone(),
387 };
388
389 let max_response_body_size = self.inner.server_cfg.max_response_body_size as usize;
390 let max_request_body_size = self.inner.server_cfg.max_request_body_size as usize;
391 let conn = self.inner.conn_permit.clone();
392 let rpc_service = self.rpc_middleware.service(RpcService::new(
393 self.inner.methods.clone(),
394 max_response_body_size,
395 self.inner.conn_id.into(),
396 cfg,
397 ));
398 let f = tokio::task::spawn(async move {
405 ipc::call_with_service(
406 request,
407 rpc_service,
408 max_response_body_size,
409 max_request_body_size,
410 conn,
411 )
412 .await
413 });
414
415 Box::pin(async move { f.await.map_err(|err| err.into()) })
416 }
417}
418
419struct ProcessConnection<'a, HttpMiddleware, RpcMiddleware> {
420 http_middleware: &'a tower::ServiceBuilder<HttpMiddleware>,
421 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
422 conn_permit: Arc<ConnectionPermit>,
423 conn_id: u32,
424 server_cfg: Settings,
425 stop_handle: StopHandle,
426 drop_on_completion: mpsc::Sender<()>,
427 methods: Methods,
428 id_provider: Arc<dyn IdProvider>,
429 local_socket_stream: LocalSocketStream,
430}
431
432#[instrument(name = "connection", skip_all, fields(conn_id = %params.conn_id), level = "INFO")]
434fn process_connection<'b, RpcMiddleware, HttpMiddleware>(
435 params: ProcessConnection<'_, HttpMiddleware, RpcMiddleware>,
436) where
437 RpcMiddleware: Layer<RpcService> + Clone + Send + 'static,
438 for<'a> <RpcMiddleware as Layer<RpcService>>::Service: RpcServiceT<'a>,
439 HttpMiddleware: Layer<TowerServiceNoHttp<RpcMiddleware>> + Send + 'static,
440 <HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service: Send
441 + Service<
442 String,
443 Response = Option<String>,
444 Error = Box<dyn core::error::Error + Send + Sync + 'static>,
445 >,
446 <<HttpMiddleware as Layer<TowerServiceNoHttp<RpcMiddleware>>>::Service as Service<String>>::Future:
447 Send + Unpin,
448 {
449 let ProcessConnection {
450 http_middleware,
451 rpc_middleware,
452 conn_permit,
453 conn_id,
454 server_cfg,
455 stop_handle,
456 drop_on_completion,
457 id_provider,
458 methods,
459 local_socket_stream,
460 } = params;
461
462 let ipc = IpcConn(tokio_util::codec::Decoder::framed(
463 StreamCodec::stream_incoming(),
464 local_socket_stream,
465 ));
466
467 let (tx, rx) = mpsc::channel::<String>(server_cfg.message_buffer_capacity as usize);
468 let method_sink = MethodSink::new_with_limit(tx, server_cfg.max_response_body_size);
469 let tower_service = TowerServiceNoHttp {
470 inner: ServiceData {
471 methods,
472 id_provider,
473 stop_handle: stop_handle.clone(),
474 server_cfg: server_cfg.clone(),
475 conn_id,
476 conn_permit,
477 bounded_subscriptions: BoundedSubscriptions::new(
478 server_cfg.max_subscriptions_per_connection,
479 ),
480 method_sink,
481 },
482 rpc_middleware,
483 };
484
485 let service = http_middleware.service(tower_service);
486 tokio::spawn(async {
487 to_ipc_service(ipc, service, stop_handle, rx).in_current_span().await;
488 drop(drop_on_completion)
489 });
490}
491
492async fn to_ipc_service<S, T>(
493 ipc: IpcConn<JsonRpcStream<T>>,
494 service: S,
495 stop_handle: StopHandle,
496 rx: mpsc::Receiver<String>,
497) where
498 S: Service<String, Response = Option<String>> + Send + 'static,
499 S::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
500 S::Future: Send + Unpin,
501 T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
502{
503 let rx_item = ReceiverStream::new(rx);
504 let conn = IpcConnDriver {
505 conn: ipc,
506 service,
507 pending_calls: Default::default(),
508 items: Default::default(),
509 };
510 let stopped = stop_handle.shutdown();
511
512 let mut conn = pin!(conn);
513 let mut rx_item = pin!(rx_item);
514 let mut stopped = pin!(stopped);
515
516 loop {
517 tokio::select! {
518 _ = &mut conn => {
519 break
520 }
521 item = rx_item.next() => {
522 if let Some(item) = item {
523 conn.push_back(item);
524 }
525 }
526 _ = &mut stopped => {
527 break
529 }
530 }
531 }
532}
533
534#[derive(Debug, Clone)]
536pub struct Settings {
537 max_request_body_size: u32,
539 max_response_body_size: u32,
541 max_log_length: u32,
545 max_connections: u32,
547 max_subscriptions_per_connection: u32,
549 message_buffer_capacity: u32,
551 tokio_runtime: Option<tokio::runtime::Handle>,
553}
554
555impl Default for Settings {
556 fn default() -> Self {
557 Self {
558 max_request_body_size: TEN_MB_SIZE_BYTES,
559 max_response_body_size: TEN_MB_SIZE_BYTES,
560 max_log_length: 4096,
561 max_connections: 100,
562 max_subscriptions_per_connection: 1024,
563 message_buffer_capacity: 1024,
564 tokio_runtime: None,
565 }
566 }
567}
568
569#[derive(Debug)]
571pub struct Builder<HttpMiddleware, RpcMiddleware> {
572 settings: Settings,
573 id_provider: Arc<dyn IdProvider>,
575 rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
576 http_middleware: tower::ServiceBuilder<HttpMiddleware>,
577}
578
579impl Default for Builder<Identity, Identity> {
580 fn default() -> Self {
581 Self {
582 settings: Settings::default(),
583 id_provider: Arc::new(RandomIntegerIdProvider),
584 rpc_middleware: RpcServiceBuilder::new(),
585 http_middleware: tower::ServiceBuilder::new(),
586 }
587 }
588}
589
590impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
591 pub const fn max_request_body_size(mut self, size: u32) -> Self {
593 self.settings.max_request_body_size = size;
594 self
595 }
596
597 pub const fn max_response_body_size(mut self, size: u32) -> Self {
599 self.settings.max_response_body_size = size;
600 self
601 }
602
603 pub const fn max_log_length(mut self, size: u32) -> Self {
605 self.settings.max_log_length = size;
606 self
607 }
608
609 pub const fn max_connections(mut self, max: u32) -> Self {
611 self.settings.max_connections = max;
612 self
613 }
614
615 pub const fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
617 self.settings.max_subscriptions_per_connection = max;
618 self
619 }
620
621 pub const fn set_message_buffer_capacity(mut self, c: u32) -> Self {
639 self.settings.message_buffer_capacity = c;
640 self
641 }
642
643 pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
647 self.settings.tokio_runtime = Some(rt);
648 self
649 }
650
651 pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
672 self.id_provider = Arc::new(id_provider);
673 self
674 }
675
676 pub fn set_http_middleware<T>(
693 self,
694 service_builder: tower::ServiceBuilder<T>,
695 ) -> Builder<T, RpcMiddleware> {
696 Builder {
697 settings: self.settings,
698 id_provider: self.id_provider,
699 http_middleware: service_builder,
700 rpc_middleware: self.rpc_middleware,
701 }
702 }
703
704 pub fn set_rpc_middleware<T>(
769 self,
770 rpc_middleware: RpcServiceBuilder<T>,
771 ) -> Builder<HttpMiddleware, T> {
772 Builder {
773 settings: self.settings,
774 id_provider: self.id_provider,
775 rpc_middleware,
776 http_middleware: self.http_middleware,
777 }
778 }
779
780 pub fn build(self, endpoint: String) -> IpcServer<HttpMiddleware, RpcMiddleware> {
782 IpcServer {
783 endpoint,
784 cfg: self.settings,
785 id_provider: self.id_provider,
786 http_middleware: self.http_middleware,
787 rpc_middleware: self.rpc_middleware,
788 }
789 }
790}
791
792#[cfg(test)]
793#[expect(missing_docs)]
794pub fn dummy_name() -> String {
795 use rand::Rng;
796 let num: u64 = rand::rng().random();
797 if cfg!(windows) {
798 format!(r"\\.\pipe\my-pipe-{}", num)
799 } else {
800 format!(r"/tmp/my-uds-{}", num)
801 }
802}
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807 use crate::client::IpcClientBuilder;
808 use futures::future::select;
809 use jsonrpsee::{
810 core::{
811 client,
812 client::{ClientT, Error, Subscription, SubscriptionClientT},
813 params::BatchRequestBuilder,
814 },
815 rpc_params,
816 types::Request,
817 PendingSubscriptionSink, RpcModule, SubscriptionMessage,
818 };
819 use reth_tracing::init_test_tracing;
820 use std::pin::pin;
821 use tokio::sync::broadcast;
822 use tokio_stream::wrappers::BroadcastStream;
823
824 async fn pipe_from_stream_with_bounded_buffer(
825 pending: PendingSubscriptionSink,
826 stream: BroadcastStream<usize>,
827 ) -> Result<(), Box<dyn core::error::Error + Send + Sync>> {
828 let sink = pending.accept().await.unwrap();
829 let closed = sink.closed();
830
831 let mut closed = pin!(closed);
832 let mut stream = pin!(stream);
833
834 loop {
835 match select(closed, stream.next()).await {
836 Either::Left((_, _)) | Either::Right((None, _)) => break Ok(()),
838
839 Either::Right((Some(Ok(item)), c)) => {
841 let notif = SubscriptionMessage::from_json(&item)?;
842
843 if sink.send(notif).await.is_err() {
847 break Ok(());
848 }
849
850 closed = c;
851 }
852
853 Either::Right((Some(Err(e)), _)) => break Err(e.into()),
855 }
856 }
857 }
858
859 fn produce_items(tx: broadcast::Sender<usize>) {
861 for c in 1..=100 {
862 std::thread::sleep(std::time::Duration::from_millis(1));
863 let _ = tx.send(c);
864 }
865 }
866
867 #[tokio::test]
868 async fn can_set_the_max_response_body_size() {
869 let endpoint = &dummy_name();
871 let server = Builder::default().max_response_body_size(100).build(endpoint.clone());
872 let mut module = RpcModule::new(());
873 module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
874 let handle = server.start(module).await.unwrap();
875 tokio::spawn(handle.stopped());
876
877 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
878 let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
879 assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
880 }
881
882 #[tokio::test]
883 async fn can_set_the_max_request_body_size() {
884 init_test_tracing();
885 let endpoint = &dummy_name();
886 let server = Builder::default().max_request_body_size(100).build(endpoint.clone());
887 let mut module = RpcModule::new(());
888 module.register_method("anything", |_, _, _| "succeed").unwrap();
889 let handle = server.start(module).await.unwrap();
890 tokio::spawn(handle.stopped());
891
892 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
893 let response: Result<String, Error> =
894 client.request("anything", rpc_params!["a".repeat(101)]).await;
895 assert!(response.is_err());
896 let mut batch_request_builder = BatchRequestBuilder::new();
897 let _ = batch_request_builder.insert("anything", rpc_params![]);
898 let _ = batch_request_builder.insert("anything", rpc_params![]);
899 let _ = batch_request_builder.insert("anything", rpc_params![]);
900 let response: Result<client::BatchResponse<'_, String>, Error> =
905 client.batch_request(batch_request_builder).await;
906 assert!(response.is_err());
907 }
908
909 #[tokio::test]
910 async fn can_set_max_connections() {
911 init_test_tracing();
912
913 let endpoint = &dummy_name();
914 let server = Builder::default().max_connections(2).build(endpoint.clone());
915 let mut module = RpcModule::new(());
916 module.register_method("anything", |_, _, _| "succeed").unwrap();
917 let handle = server.start(module).await.unwrap();
918 tokio::spawn(handle.stopped());
919
920 let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap();
921 let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap();
922 let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap();
923
924 let response1: Result<String, Error> = client1.request("anything", rpc_params![]).await;
925 let response2: Result<String, Error> = client2.request("anything", rpc_params![]).await;
926 let response3: Result<String, Error> = client3.request("anything", rpc_params![]).await;
927
928 assert!(response1.is_ok());
929 assert!(response2.is_ok());
930 assert!(response3.is_err());
932
933 drop(client2);
935 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
936
937 let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap();
939 let response4: Result<String, Error> = client4.request("anything", rpc_params![]).await;
940 assert!(response4.is_ok());
941 }
942
943 #[tokio::test]
944 async fn test_rpc_request() {
945 init_test_tracing();
946 let endpoint = &dummy_name();
947 let server = Builder::default().build(endpoint.clone());
948 let mut module = RpcModule::new(());
949 let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
950 module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
951 let handle = server.start(module).await.unwrap();
952 tokio::spawn(handle.stopped());
953
954 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
955 let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
956 assert_eq!(response, msg);
957 }
958
959 #[tokio::test]
960 async fn test_batch_request() {
961 let endpoint = &dummy_name();
962 let server = Builder::default().build(endpoint.clone());
963 let mut module = RpcModule::new(());
964 module.register_method("anything", |_, _, _| "ok").unwrap();
965 let handle = server.start(module).await.unwrap();
966 tokio::spawn(handle.stopped());
967
968 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
969 let mut batch_request_builder = BatchRequestBuilder::new();
970 let _ = batch_request_builder.insert("anything", rpc_params![]);
971 let _ = batch_request_builder.insert("anything", rpc_params![]);
972 let _ = batch_request_builder.insert("anything", rpc_params![]);
973 let result = client
974 .batch_request(batch_request_builder)
975 .await
976 .unwrap()
977 .into_ok()
978 .unwrap()
979 .collect::<Vec<String>>();
980 assert_eq!(result, vec!["ok", "ok", "ok"]);
981 }
982
983 #[tokio::test]
984 async fn test_ipc_modules() {
985 reth_tracing::init_test_tracing();
986 let endpoint = &dummy_name();
987 let server = Builder::default().build(endpoint.clone());
988 let mut module = RpcModule::new(());
989 let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
990 module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
991 let handle = server.start(module).await.unwrap();
992 tokio::spawn(handle.stopped());
993
994 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
995 let response: String = client.request("rpc_modules", rpc_params![]).await.unwrap();
996 assert_eq!(response, msg);
997 }
998
999 #[tokio::test(flavor = "multi_thread")]
1000 async fn test_rpc_subscription() {
1001 let endpoint = &dummy_name();
1002 let server = Builder::default().build(endpoint.clone());
1003 let (tx, _rx) = broadcast::channel::<usize>(16);
1004
1005 let mut module = RpcModule::new(tx.clone());
1006 std::thread::spawn(move || produce_items(tx));
1007
1008 module
1009 .register_subscription(
1010 "subscribe_hello",
1011 "s_hello",
1012 "unsubscribe_hello",
1013 |_, pending, tx, _| async move {
1014 let rx = tx.subscribe();
1015 let stream = BroadcastStream::new(rx);
1016 pipe_from_stream_with_bounded_buffer(pending, stream).await?;
1017 Ok(())
1018 },
1019 )
1020 .unwrap();
1021
1022 let handle = server.start(module).await.unwrap();
1023 tokio::spawn(handle.stopped());
1024
1025 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1026 let sub: Subscription<usize> =
1027 client.subscribe("subscribe_hello", rpc_params![], "unsubscribe_hello").await.unwrap();
1028
1029 let items = sub.take(16).collect::<Vec<_>>().await;
1030 assert_eq!(items.len(), 16);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_rpc_middleware() {
1035 #[derive(Clone)]
1036 struct ModifyRequestIf<S>(S);
1037
1038 impl<'a, S> RpcServiceT<'a> for ModifyRequestIf<S>
1039 where
1040 S: Send + Sync + RpcServiceT<'a>,
1041 {
1042 type Future = S::Future;
1043
1044 fn call(&self, mut req: Request<'a>) -> Self::Future {
1045 if req.method == "say_hello" {
1047 req.method = "say_goodbye".into();
1048 } else if req.method == "say_goodbye" {
1049 req.method = "say_hello".into();
1050 }
1051
1052 self.0.call(req)
1053 }
1054 }
1055
1056 reth_tracing::init_test_tracing();
1057 let endpoint = &dummy_name();
1058
1059 let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf);
1060 let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone());
1061
1062 let mut module = RpcModule::new(());
1063 let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
1064 let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
1065 module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
1066 module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
1067 let handle = server.start(module).await.unwrap();
1068 tokio::spawn(handle.stopped());
1069
1070 let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
1071 let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap();
1072 let say_goodbye_response: String =
1073 client.request("say_goodbye", rpc_params![]).await.unwrap();
1074
1075 assert_eq!(say_hello_response, goodbye_msg);
1076 assert_eq!(say_goodbye_response, hello_msg);
1077 }
1078}