reth_ipc/server/
ipc.rs

1//! IPC request handling adapted from [`jsonrpsee`] http request handling
2
3use futures::{stream::FuturesOrdered, StreamExt};
4use jsonrpsee::{
5    batch_response_error,
6    core::{
7        server::helpers::prepare_error,
8        tracing::server::{rx_log_from_json, tx_log_from_str},
9        JsonRawValue,
10    },
11    server::middleware::rpc::RpcServiceT,
12    types::{
13        error::{reject_too_big_request, ErrorCode},
14        ErrorObject, Id, InvalidRequest, Notification, Request,
15    },
16    BatchResponseBuilder, MethodResponse, ResponsePayload,
17};
18use std::sync::Arc;
19use tokio::sync::OwnedSemaphorePermit;
20use tokio_util::either::Either;
21use tracing::instrument;
22
23type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>;
24
25#[derive(Debug, Clone)]
26pub(crate) struct Batch<S> {
27    data: Vec<u8>,
28    rpc_service: S,
29}
30
31// Batch responses must be sent back as a single message so we read the results from each
32// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
33// complete batch response back to the client over `tx`.
34#[instrument(name = "batch", skip(b), level = "TRACE")]
35pub(crate) async fn process_batch_request<S>(
36    b: Batch<S>,
37    max_response_body_size: usize,
38) -> Option<String>
39where
40    for<'a> S: RpcServiceT<'a> + Send,
41{
42    let Batch { data, rpc_service } = b;
43
44    if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
45        let mut got_notif = false;
46        let mut batch_response = BatchResponseBuilder::new_with_limit(max_response_body_size);
47
48        let mut pending_calls: FuturesOrdered<_> = batch
49            .into_iter()
50            .filter_map(|v| {
51                if let Ok(req) = serde_json::from_str::<Request<'_>>(v.get()) {
52                    Some(Either::Right(rpc_service.call(req)))
53                } else if let Ok(_notif) = serde_json::from_str::<Notif<'_>>(v.get()) {
54                    // notifications should not be answered.
55                    got_notif = true;
56                    None
57                } else {
58                    // valid JSON but could be not parsable as `InvalidRequest`
59                    let id = match serde_json::from_str::<InvalidRequest<'_>>(v.get()) {
60                        Ok(err) => err.id,
61                        Err(_) => Id::Null,
62                    };
63
64                    Some(Either::Left(async {
65                        MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))
66                    }))
67                }
68            })
69            .collect();
70
71        while let Some(response) = pending_calls.next().await {
72            if let Err(too_large) = batch_response.append(&response) {
73                return Some(too_large.to_result())
74            }
75        }
76
77        if got_notif && batch_response.is_empty() {
78            None
79        } else {
80            let batch_resp = batch_response.finish();
81            Some(MethodResponse::from_batch(batch_resp).to_result())
82        }
83    } else {
84        Some(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::ParseError)))
85    }
86}
87
88pub(crate) async fn process_single_request<S>(
89    data: Vec<u8>,
90    rpc_service: &S,
91) -> Option<MethodResponse>
92where
93    for<'a> S: RpcServiceT<'a> + Send,
94{
95    if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
96        Some(execute_call_with_tracing(req, rpc_service).await)
97    } else if serde_json::from_slice::<Notif<'_>>(&data).is_ok() {
98        None
99    } else {
100        let (id, code) = prepare_error(&data);
101        Some(MethodResponse::error(id, ErrorObject::from(code)))
102    }
103}
104
105#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(req, rpc_service), level = "TRACE")]
106pub(crate) async fn execute_call_with_tracing<'a, S>(
107    req: Request<'a>,
108    rpc_service: &S,
109) -> MethodResponse
110where
111    for<'b> S: RpcServiceT<'b> + Send,
112{
113    rpc_service.call(req).await
114}
115
116#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
117fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodResponse {
118    rx_log_from_json(notif, max_log_length);
119    let response =
120        MethodResponse::response(Id::Null, ResponsePayload::success(String::new()), usize::MAX);
121    tx_log_from_str(response.as_result(), max_log_length);
122    response
123}
124
125pub(crate) async fn call_with_service<S>(
126    request: String,
127    rpc_service: S,
128    max_response_body_size: usize,
129    max_request_body_size: usize,
130    conn: Arc<OwnedSemaphorePermit>,
131) -> Option<String>
132where
133    for<'a> S: RpcServiceT<'a> + Send,
134{
135    enum Kind {
136        Single,
137        Batch,
138    }
139
140    let request_kind = request
141        .chars()
142        .find_map(|c| match c {
143            '{' => Some(Kind::Single),
144            '[' => Some(Kind::Batch),
145            _ => None,
146        })
147        .unwrap_or(Kind::Single);
148
149    let data = request.into_bytes();
150    if data.len() > max_request_body_size {
151        return Some(batch_response_error(
152            Id::Null,
153            reject_too_big_request(max_request_body_size as u32),
154        ))
155    }
156
157    // Single request or notification
158    let res = if matches!(request_kind, Kind::Single) {
159        let response = process_single_request(data, &rpc_service).await;
160        match response {
161            Some(response) if response.is_method_call() => Some(response.to_result()),
162            _ => {
163                // subscription responses are sent directly over the sink, return a response here
164                // would lead to duplicate responses for the subscription response
165                None
166            }
167        }
168    } else {
169        process_batch_request(Batch { data, rpc_service }, max_response_body_size).await
170    };
171
172    drop(conn);
173
174    res
175}