reth_ipc/server/
rpc_service.rsuse futures_util::future::BoxFuture;
use jsonrpsee::{
server::{
middleware::rpc::{ResponseFuture, RpcServiceT},
IdProvider,
},
types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
BoundedSubscriptions, ConnectionId, MethodCallback, MethodResponse, MethodSink, Methods,
SubscriptionState,
};
use std::sync::Arc;
#[derive(Clone, Debug)]
pub struct RpcService {
conn_id: ConnectionId,
methods: Methods,
max_response_body_size: usize,
cfg: RpcServiceCfg,
}
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub(crate) enum RpcServiceCfg {
OnlyCalls,
CallsAndSubscriptions {
bounded_subscriptions: BoundedSubscriptions,
sink: MethodSink,
id_provider: Arc<dyn IdProvider>,
},
}
impl RpcService {
pub(crate) const fn new(
methods: Methods,
max_response_body_size: usize,
conn_id: ConnectionId,
cfg: RpcServiceCfg,
) -> Self {
Self { methods, max_response_body_size, conn_id, cfg }
}
}
impl<'a> RpcServiceT<'a> for RpcService {
type Future = ResponseFuture<BoxFuture<'a, MethodResponse>>;
fn call(&self, req: Request<'a>) -> Self::Future {
let conn_id = self.conn_id;
let max_response_body_size = self.max_response_body_size;
let params = req.params();
let name = req.method_name();
let id = req.id().clone();
let extensions = req.extensions.clone();
match self.methods.method_with_name(name) {
None => {
let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
ResponseFuture::ready(rp)
}
Some((_name, method)) => match method {
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
MethodCallback::Async(callback) => {
let params = params.into_owned();
let id = id.into_owned();
let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::future(fut)
}
MethodCallback::Subscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions {
bounded_subscriptions,
sink,
id_provider,
} = self.cfg.clone()
else {
tracing::warn!("Subscriptions not supported");
let rp =
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
return ResponseFuture::ready(rp);
};
if let Some(p) = bounded_subscriptions.acquire() {
let conn_state = SubscriptionState {
conn_id,
id_provider: &*id_provider.clone(),
subscription_permit: p,
};
let fut = callback(id.clone(), params, sink, conn_state, extensions);
ResponseFuture::future(fut)
} else {
let max = bounded_subscriptions.max();
let rp = MethodResponse::error(id, reject_too_many_subscriptions(max));
ResponseFuture::ready(rp)
}
}
MethodCallback::Unsubscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else {
tracing::warn!("Subscriptions not supported");
let rp =
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError));
return ResponseFuture::ready(rp);
};
let rp = callback(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
},
}
}
}