reth_ipc/server/
rpc_service.rs

1//! JSON-RPC service middleware.
2use futures::{
3    future::Either,
4    stream::{FuturesOrdered, StreamExt},
5};
6use jsonrpsee::{
7    core::middleware::{Batch, BatchEntry},
8    server::{
9        middleware::rpc::{ResponseFuture, RpcServiceT},
10        IdProvider,
11    },
12    types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Id, Request},
13    BatchResponse, BatchResponseBuilder, BoundedSubscriptions, ConnectionId, MethodCallback,
14    MethodResponse, MethodSink, Methods, SubscriptionState,
15};
16use std::{future::Future, sync::Arc};
17
18/// JSON-RPC service middleware.
19#[derive(Clone, Debug)]
20pub struct RpcService {
21    conn_id: ConnectionId,
22    methods: Methods,
23    max_response_body_size: usize,
24    cfg: RpcServiceCfg,
25}
26
27/// Configuration of the `RpcService`.
28#[derive(Clone, Debug)]
29pub(crate) struct RpcServiceCfg {
30    pub(crate) bounded_subscriptions: BoundedSubscriptions,
31    pub(crate) sink: MethodSink,
32    pub(crate) id_provider: Arc<dyn IdProvider>,
33}
34
35impl RpcService {
36    /// Create a new service.
37    pub(crate) const fn new(
38        methods: Methods,
39        max_response_body_size: usize,
40        conn_id: ConnectionId,
41        cfg: RpcServiceCfg,
42    ) -> Self {
43        Self { methods, max_response_body_size, conn_id, cfg }
44    }
45}
46
47impl RpcServiceT for RpcService {
48    type MethodResponse = MethodResponse;
49    type NotificationResponse = Option<MethodResponse>;
50    type BatchResponse = BatchResponse;
51
52    fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
53        let conn_id = self.conn_id;
54        let max_response_body_size = self.max_response_body_size;
55
56        let params = req.params();
57        let name = req.method_name();
58        let id = req.id().clone();
59        let extensions = req.extensions.clone();
60
61        match self.methods.method_with_name(name) {
62            None => {
63                let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
64                ResponseFuture::ready(rp)
65            }
66            Some((_name, method)) => match method {
67                MethodCallback::Sync(callback) => {
68                    let rp = (callback)(id, params, max_response_body_size, extensions);
69                    ResponseFuture::ready(rp)
70                }
71                MethodCallback::Async(callback) => {
72                    let params = params.into_owned();
73                    let id = id.into_owned();
74
75                    let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
76                    ResponseFuture::future(fut)
77                }
78                MethodCallback::Subscription(callback) => {
79                    let cfg = &self.cfg;
80
81                    if let Some(p) = cfg.bounded_subscriptions.acquire() {
82                        let conn_state = SubscriptionState {
83                            conn_id,
84                            id_provider: &*cfg.id_provider,
85                            subscription_permit: p,
86                        };
87
88                        let fut =
89                            callback(id.clone(), params, cfg.sink.clone(), conn_state, extensions);
90                        ResponseFuture::future(fut)
91                    } else {
92                        let max = cfg.bounded_subscriptions.max();
93                        let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
94                        ResponseFuture::ready(rp)
95                    }
96                }
97                MethodCallback::Unsubscription(callback) => {
98                    // Don't adhere to any resource or subscription limits; always let unsubscribing
99                    // happen!
100
101                    let rp = callback(id, params, conn_id, max_response_body_size, extensions);
102                    ResponseFuture::ready(rp)
103                }
104            },
105        }
106    }
107
108    fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
109        let entries: Vec<_> = req.into_iter().collect();
110
111        let mut got_notif = false;
112        let mut batch_response = BatchResponseBuilder::new_with_limit(self.max_response_body_size);
113
114        let mut pending_calls: FuturesOrdered<_> = entries
115            .into_iter()
116            .filter_map(|v| match v {
117                Ok(BatchEntry::Call(call)) => Some(Either::Right(self.call(call))),
118                Ok(BatchEntry::Notification(_n)) => {
119                    got_notif = true;
120                    None
121                }
122                Err(_err) => Some(Either::Left(async {
123                    MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest))
124                })),
125            })
126            .collect();
127        async move {
128            while let Some(response) = pending_calls.next().await {
129                if let Err(too_large) = batch_response.append(response) {
130                    let mut error_batch = BatchResponseBuilder::new_with_limit(1);
131                    let _ = error_batch.append(too_large);
132                    return error_batch.finish();
133                }
134            }
135
136            batch_response.finish()
137        }
138    }
139
140    #[allow(clippy::manual_async_fn)]
141    fn notification<'a>(
142        &self,
143        _n: jsonrpsee::core::middleware::Notification<'a>,
144    ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
145        async move { None }
146    }
147}