reth_ipc/server/
rpc_service.rs1use futures::{
3 future::Either,
4 stream::{FuturesOrdered, StreamExt},
5};
6use jsonrpsee::{
7 core::middleware::{Batch, BatchEntry},
8 server::{
9 middleware::rpc::{ResponseFuture, RpcServiceT},
10 IdProvider,
11 },
12 types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Id, Request},
13 BatchResponse, BatchResponseBuilder, BoundedSubscriptions, ConnectionId, MethodCallback,
14 MethodResponse, MethodSink, Methods, SubscriptionState,
15};
16use std::{future::Future, sync::Arc};
17
18#[derive(Clone, Debug)]
20pub struct RpcService {
21 conn_id: ConnectionId,
22 methods: Methods,
23 max_response_body_size: usize,
24 cfg: RpcServiceCfg,
25}
26
27#[derive(Clone, Debug)]
29pub(crate) struct RpcServiceCfg {
30 pub(crate) bounded_subscriptions: BoundedSubscriptions,
31 pub(crate) sink: MethodSink,
32 pub(crate) id_provider: Arc<dyn IdProvider>,
33}
34
35impl RpcService {
36 pub(crate) const fn new(
38 methods: Methods,
39 max_response_body_size: usize,
40 conn_id: ConnectionId,
41 cfg: RpcServiceCfg,
42 ) -> Self {
43 Self { methods, max_response_body_size, conn_id, cfg }
44 }
45}
46
47impl RpcServiceT for RpcService {
48 type MethodResponse = MethodResponse;
49 type NotificationResponse = Option<MethodResponse>;
50 type BatchResponse = BatchResponse;
51
52 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
53 let conn_id = self.conn_id;
54 let max_response_body_size = self.max_response_body_size;
55
56 let params = req.params();
57 let name = req.method_name();
58 let id = req.id().clone();
59 let extensions = req.extensions.clone();
60
61 match self.methods.method_with_name(name) {
62 None => {
63 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
64 ResponseFuture::ready(rp)
65 }
66 Some((_name, method)) => match method {
67 MethodCallback::Sync(callback) => {
68 let rp = (callback)(id, params, max_response_body_size, extensions);
69 ResponseFuture::ready(rp)
70 }
71 MethodCallback::Async(callback) => {
72 let params = params.into_owned();
73 let id = id.into_owned();
74
75 let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
76 ResponseFuture::future(fut)
77 }
78 MethodCallback::Subscription(callback) => {
79 let cfg = &self.cfg;
80
81 if let Some(p) = cfg.bounded_subscriptions.acquire() {
82 let conn_state = SubscriptionState {
83 conn_id,
84 id_provider: &*cfg.id_provider,
85 subscription_permit: p,
86 };
87
88 let fut =
89 callback(id.clone(), params, cfg.sink.clone(), conn_state, extensions);
90 ResponseFuture::future(fut)
91 } else {
92 let max = cfg.bounded_subscriptions.max();
93 let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
94 ResponseFuture::ready(rp)
95 }
96 }
97 MethodCallback::Unsubscription(callback) => {
98 let rp = callback(id, params, conn_id, max_response_body_size, extensions);
102 ResponseFuture::ready(rp)
103 }
104 },
105 }
106 }
107
108 fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
109 let entries: Vec<_> = req.into_iter().collect();
110
111 let mut got_notif = false;
112 let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
113
114 let mut pending_calls: FuturesOrdered<_> = entries
115 .into_iter()
116 .filter_map(|v| match v {
117 Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
118 Ok(BatchEntry::Notification(_n)) => {
119 got_notif = true;
120 None
121 }
122 Err(_err) => Some(Either::Left(async {
123 MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
124 })),
125 })
126 .collect();
127 async move {
128 while let Some(response) = pending_calls.next().await {
129 if let Err(too_large) = batch_response.append(response) {
130 let mut error_batch = BatchResponseBuilder::new_with_limit(1);
131 let _ = error_batch.append(too_large);
132 return error_batch.finish();
133 }
134 }
135
136 batch_response.finish()
137 }
138 }
139
140 #[allow(clippy::manual_async_fn)]
141 fn notification<'a>(
142 &self,
143 _n: jsonrpsee::core::middleware::Notification<'a>,
144 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
145 async move { None }
146 }
147}