reth_ipc/server/
rpc_service.rs
1use futures_util::future::BoxFuture;
3use jsonrpsee::{
4 server::{
5 middleware::rpc::{ResponseFuture, RpcServiceT},
6 IdProvider,
7 },
8 types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
9 BoundedSubscriptions, ConnectionId, MethodCallback, MethodResponse, MethodSink, Methods,
10 SubscriptionState,
11};
12use std::sync::Arc;
13
14#[derive(Clone, Debug)]
16pub struct RpcService {
17 conn_id: ConnectionId,
18 methods: Methods,
19 max_response_body_size: usize,
20 cfg: RpcServiceCfg,
21}
22
23#[expect(dead_code)]
25#[derive(Clone, Debug)]
26pub(crate) enum RpcServiceCfg {
27 OnlyCalls,
29 CallsAndSubscriptions {
31 bounded_subscriptions: BoundedSubscriptions,
32 sink: MethodSink,
33 id_provider: Arc<dyn IdProvider>,
34 },
35}
36
37impl RpcService {
38 pub(crate) const fn new(
40 methods: Methods,
41 max_response_body_size: usize,
42 conn_id: ConnectionId,
43 cfg: RpcServiceCfg,
44 ) -> Self {
45 Self { methods, max_response_body_size, conn_id, cfg }
46 }
47}
48
49impl<'a> RpcServiceT<'a> for RpcService {
50 type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
53
54 fn call(&self, req: Request<'a>) -> Self::Future {
55 let conn_id = self.conn_id;
56 let max_response_body_size = self.max_response_body_size;
57
58 let params = req.params();
59 let name = req.method_name();
60 let id = req.id().clone();
61 let extensions = req.extensions.clone();
62
63 match self.methods.method_with_name(name) {
64 None => {
65 let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
66 ResponseFuture::ready(rp)
67 }
68 Some((_name, method)) => match method {
69 MethodCallback::Sync(callback) => {
70 let rp = (callback)(id, params, max_response_body_size, extensions);
71 ResponseFuture::ready(rp)
72 }
73 MethodCallback::Async(callback) => {
74 let params = params.into_owned();
75 let id = id.into_owned();
76
77 let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
78 ResponseFuture::future(fut)
79 }
80 MethodCallback::Subscription(callback) => {
81 let RpcServiceCfg::CallsAndSubscriptions {
82 bounded_subscriptions,
83 sink,
84 id_provider,
85 } = &self.cfg
86 else {
87 tracing::warn!("Subscriptions not supported");
88 let rp =
89 MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
90 return ResponseFuture::ready(rp);
91 };
92
93 if let Some(p) = bounded_subscriptions.acquire() {
94 let conn_state = SubscriptionState {
95 conn_id,
96 id_provider: &**id_provider,
97 subscription_permit: p,
98 };
99
100 let fut =
101 callback(id.clone(), params, sink.clone(), conn_state, extensions);
102 ResponseFuture::future(fut)
103 } else {
104 let max = bounded_subscriptions.max();
105 let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
106 ResponseFuture::ready(rp)
107 }
108 }
109 MethodCallback::Unsubscription(callback) => {
110 let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
114 tracing::warn!("Subscriptions not supported");
115 let rp =
116 MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
117 return ResponseFuture::ready(rp);
118 };
119
120 let rp = callback(id, params, conn_id, max_response_body_size, extensions);
121 ResponseFuture::ready(rp)
122 }
123 },
124 }
125 }
126}