reth_ipc/server/
ipc.rs
1use futures::{stream::FuturesOrdered, StreamExt};
4use jsonrpsee::{
5 batch_response_error,
6 core::{
7 server::helpers::prepare_error,
8 tracing::server::{rx_log_from_json, tx_log_from_str},
9 JsonRawValue,
10 },
11 server::middleware::rpc::RpcServiceT,
12 types::{
13 error::{reject_too_big_request, ErrorCode},
14 ErrorObject, Id, InvalidRequest, Notification, Request,
15 },
16 BatchResponseBuilder, MethodResponse, ResponsePayload,
17};
18use std::sync::Arc;
19use tokio::sync::OwnedSemaphorePermit;
20use tokio_util::either::Either;
21use tracing::instrument;
22
23type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>;
24
25#[derive(Debug, Clone)]
26pub(crate) struct Batch<S> {
27 data: Vec<u8>,
28 rpc_service: S,
29}
30
31#[instrument(name = "batch", skip(b), level = "TRACE")]
35pub(crate) async fn process_batch_request<S>(
36 b: Batch<S>,
37 max_response_body_size: usize,
38) -> Option<String>
39where
40 for<'a> S: RpcServiceT<'a> + Send,
41{
42 let Batch { data, rpc_service } = b;
43
44 if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
45 let mut got_notif = false;
46 let mut batch_response = BatchResponseBuilder::new_with_limit(max_response_body_size);
47
48 let mut pending_calls: FuturesOrdered<_> = batch
49 .into_iter()
50 .filter_map(|v| {
51 if let Ok(req) = serde_json::from_str::<Request<'_>>(v.get()) {
52 Some(Either::Right(rpc_service.call(req)))
53 } else if let Ok(_notif) = serde_json::from_str::<Notif<'_>>(v.get()) {
54 got_notif = true;
56 None
57 } else {
58 let id = match serde_json::from_str::<InvalidRequest<'_>>(v.get()) {
60 Ok(err) => err.id,
61 Err(_) => Id::Null,
62 };
63
64 Some(Either::Left(async {
65 MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))
66 }))
67 }
68 })
69 .collect();
70
71 while let Some(response) = pending_calls.next().await {
72 if let Err(too_large) = batch_response.append(&response) {
73 return Some(too_large.to_result())
74 }
75 }
76
77 if got_notif && batch_response.is_empty() {
78 None
79 } else {
80 let batch_resp = batch_response.finish();
81 Some(MethodResponse::from_batch(batch_resp).to_result())
82 }
83 } else {
84 Some(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::ParseError)))
85 }
86}
87
88pub(crate) async fn process_single_request<S>(
89 data: Vec<u8>,
90 rpc_service: &S,
91) -> Option<MethodResponse>
92where
93 for<'a> S: RpcServiceT<'a> + Send,
94{
95 if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
96 Some(execute_call_with_tracing(req, rpc_service).await)
97 } else if serde_json::from_slice::<Notif<'_>>(&data).is_ok() {
98 None
99 } else {
100 let (id, code) = prepare_error(&data);
101 Some(MethodResponse::error(id, ErrorObject::from(code)))
102 }
103}
104
105#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(req, rpc_service), level = "TRACE")]
106pub(crate) async fn execute_call_with_tracing<'a, S>(
107 req: Request<'a>,
108 rpc_service: &S,
109) -> MethodResponse
110where
111 for<'b> S: RpcServiceT<'b> + Send,
112{
113 rpc_service.call(req).await
114}
115
116#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
117fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodResponse {
118 rx_log_from_json(notif, max_log_length);
119 let response =
120 MethodResponse::response(Id::Null, ResponsePayload::success(String::new()), usize::MAX);
121 tx_log_from_str(response.as_result(), max_log_length);
122 response
123}
124
125pub(crate) async fn call_with_service<S>(
126 request: String,
127 rpc_service: S,
128 max_response_body_size: usize,
129 max_request_body_size: usize,
130 conn: Arc<OwnedSemaphorePermit>,
131) -> Option<String>
132where
133 for<'a> S: RpcServiceT<'a> + Send,
134{
135 enum Kind {
136 Single,
137 Batch,
138 }
139
140 let request_kind = request
141 .chars()
142 .find_map(|c| match c {
143 '{' => Some(Kind::Single),
144 '[' => Some(Kind::Batch),
145 _ => None,
146 })
147 .unwrap_or(Kind::Single);
148
149 let data = request.into_bytes();
150 if data.len() > max_request_body_size {
151 return Some(batch_response_error(
152 Id::Null,
153 reject_too_big_request(max_request_body_size as u32),
154 ))
155 }
156
157 let res = if matches!(request_kind, Kind::Single) {
159 let response = process_single_request(data, &rpc_service).await;
160 match response {
161 Some(response) if response.is_method_call() => Some(response.to_result()),
162 _ => {
163 None
166 }
167 }
168 } else {
169 process_batch_request(Batch { data, rpc_service }, max_response_body_size).await
170 };
171
172 drop(conn);
173
174 res
175}