reth_ipc/server/
rpc_service.rs

1//! JSON-RPC service middleware.
2use futures_util::future::BoxFuture;
3use jsonrpsee::{
4    server::{
5        middleware::rpc::{ResponseFuture, RpcServiceT},
6        IdProvider,
7    },
8    types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
9    BoundedSubscriptions, ConnectionId, MethodCallback, MethodResponse, MethodSink, Methods,
10    SubscriptionState,
11};
12use std::sync::Arc;
13
14/// JSON-RPC service middleware.
15#[derive(Clone, Debug)]
16pub struct RpcService {
17    conn_id: ConnectionId,
18    methods: Methods,
19    max_response_body_size: usize,
20    cfg: RpcServiceCfg,
21}
22
23/// Configuration of the `RpcService`.
24#[expect(dead_code)]
25#[derive(Clone, Debug)]
26pub(crate) enum RpcServiceCfg {
27    /// The server supports only calls.
28    OnlyCalls,
29    /// The server supports both method calls and subscriptions.
30    CallsAndSubscriptions {
31        bounded_subscriptions: BoundedSubscriptions,
32        sink: MethodSink,
33        id_provider: Arc<dyn IdProvider>,
34    },
35}
36
37impl RpcService {
38    /// Create a new service.
39    pub(crate) const fn new(
40        methods: Methods,
41        max_response_body_size: usize,
42        conn_id: ConnectionId,
43        cfg: RpcServiceCfg,
44    ) -> Self {
45        Self { methods, max_response_body_size, conn_id, cfg }
46    }
47}
48
49impl<'a> RpcServiceT<'a> for RpcService {
50    // The rpc module is already boxing the futures and
51    // it's used to under the hood by the RpcService.
52    type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
53
54    fn call(&self, req: Request<'a>) -> Self::Future {
55        let conn_id = self.conn_id;
56        let max_response_body_size = self.max_response_body_size;
57
58        let params = req.params();
59        let name = req.method_name();
60        let id = req.id().clone();
61        let extensions = req.extensions.clone();
62
63        match self.methods.method_with_name(name) {
64            None => {
65                let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
66                ResponseFuture::ready(rp)
67            }
68            Some((_name, method)) => match method {
69                MethodCallback::Sync(callback) => {
70                    let rp = (callback)(id, params, max_response_body_size, extensions);
71                    ResponseFuture::ready(rp)
72                }
73                MethodCallback::Async(callback) => {
74                    let params = params.into_owned();
75                    let id = id.into_owned();
76
77                    let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
78                    ResponseFuture::future(fut)
79                }
80                MethodCallback::Subscription(callback) => {
81                    let RpcServiceCfg::CallsAndSubscriptions {
82                        bounded_subscriptions,
83                        sink,
84                        id_provider,
85                    } = &self.cfg
86                    else {
87                        tracing::warn!("Subscriptions not supported");
88                        let rp =
89                            MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
90                        return ResponseFuture::ready(rp);
91                    };
92
93                    if let Some(p) = bounded_subscriptions.acquire() {
94                        let conn_state = SubscriptionState {
95                            conn_id,
96                            id_provider: &**id_provider,
97                            subscription_permit: p,
98                        };
99
100                        let fut =
101                            callback(id.clone(), params, sink.clone(), conn_state, extensions);
102                        ResponseFuture::future(fut)
103                    } else {
104                        let max = bounded_subscriptions.max();
105                        let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
106                        ResponseFuture::ready(rp)
107                    }
108                }
109                MethodCallback::Unsubscription(callback) => {
110                    // Don't adhere to any resource or subscription limits; always let unsubscribing
111                    // happen!
112
113                    let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
114                        tracing::warn!("Subscriptions not supported");
115                        let rp =
116                            MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
117                        return ResponseFuture::ready(rp);
118                    };
119
120                    let rp = callback(id, params, conn_id, max_response_body_size, extensions);
121                    ResponseFuture::ready(rp)
122                }
123            },
124        }
125    }
126}