1use crate::sequencer::Error;
4use alloy_eips::BlockId;
5use alloy_json_rpc::{RpcRecv, RpcSend};
6use alloy_primitives::{BlockNumber, B256};
7use alloy_rpc_client::RpcClient;
8use jsonrpsee_core::{
9 middleware::{Batch, Notification, RpcServiceT},
10 server::MethodResponse,
11};
12use jsonrpsee_types::{Params, Request};
13use reth_storage_api::{BlockReaderIdExt, TransactionsProvider};
14use std::{future::Future, sync::Arc};
15use tracing::{debug, warn};
16
17#[derive(Debug, Clone)]
22pub struct HistoricalRpcClient {
23 inner: Arc<HistoricalRpcClientInner>,
24}
25
26impl HistoricalRpcClient {
27 pub fn new(endpoint: &str) -> Result<Self, Error> {
29 let client = RpcClient::new_http(
30 endpoint.parse::<reqwest::Url>().map_err(|err| Error::InvalidUrl(err.to_string()))?,
31 );
32
33 Ok(Self {
34 inner: Arc::new(HistoricalRpcClientInner {
35 historical_endpoint: endpoint.to_string(),
36 client,
37 }),
38 })
39 }
40
41 fn client(&self) -> &RpcClient {
43 &self.inner.client
44 }
45
46 pub async fn request<Params: RpcSend, Resp: RpcRecv>(
48 &self,
49 method: &str,
50 params: Params,
51 ) -> Result<Resp, Error> {
52 let resp =
53 self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
54 |err| {
55 warn!(
56 target: "rpc::historical",
57 %err,
58 "HTTP request to historical endpoint failed"
59 );
60 },
61 )?;
62
63 Ok(resp)
64 }
65
66 pub fn endpoint(&self) -> &str {
68 &self.inner.historical_endpoint
69 }
70}
71
72#[derive(Debug)]
73struct HistoricalRpcClientInner {
74 historical_endpoint: String,
75 client: RpcClient,
76}
77
78#[derive(Debug, Clone)]
80pub struct HistoricalRpc<P> {
81 inner: Arc<HistoricalRpcInner<P>>,
82}
83
84impl<P> HistoricalRpc<P> {
85 pub fn new(provider: P, client: HistoricalRpcClient, bedrock_block: BlockNumber) -> Self {
88 let inner = Arc::new(HistoricalRpcInner { provider, client, bedrock_block });
89
90 Self { inner }
91 }
92}
93
94impl<S, P> tower::Layer<S> for HistoricalRpc<P> {
95 type Service = HistoricalRpcService<S, P>;
96
97 fn layer(&self, inner: S) -> Self::Service {
98 HistoricalRpcService::new(inner, self.inner.clone())
99 }
100}
101
102#[derive(Debug, Clone)]
108pub struct HistoricalRpcService<S, P> {
109 inner: S,
111 historical: Arc<HistoricalRpcInner<P>>,
113}
114
115impl<S, P> HistoricalRpcService<S, P> {
116 const fn new(inner: S, historical: Arc<HistoricalRpcInner<P>>) -> Self {
119 Self { inner, historical }
120 }
121}
122
123impl<S, P> RpcServiceT for HistoricalRpcService<S, P>
124where
125 S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
126
127 P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone + 'static,
128{
129 type MethodResponse = S::MethodResponse;
130 type NotificationResponse = S::NotificationResponse;
131 type BatchResponse = S::BatchResponse;
132
133 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
134 let inner_service = self.inner.clone();
135 let historical = self.historical.clone();
136
137 Box::pin(async move {
138 if let Some(response) = historical.maybe_forward_request(&req).await {
140 return response
141 }
142
143 inner_service.call(req).await
145 })
146 }
147
148 fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
149 self.inner.batch(req)
150 }
151
152 fn notification<'a>(
153 &self,
154 n: Notification<'a>,
155 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
156 self.inner.notification(n)
157 }
158}
159
160#[derive(Debug)]
161struct HistoricalRpcInner<P> {
162 provider: P,
164 client: HistoricalRpcClient,
166 bedrock_block: BlockNumber,
168}
169
170impl<P> HistoricalRpcInner<P>
171where
172 P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone,
173{
174 async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
177 let should_forward = match req.method_name() {
178 "debug_traceTransaction" => self.should_forward_transaction(req),
179 method => self.should_forward_block_request(method, req),
180 };
181
182 if should_forward {
183 return self.forward_to_historical(req).await
184 }
185
186 None
187 }
188
189 fn should_forward_transaction(&self, req: &Request<'_>) -> bool {
191 parse_transaction_hash_from_params(&req.params())
192 .ok()
193 .map(|tx_hash| {
194 match self.provider.transaction_by_hash_with_meta(tx_hash) {
196 Ok(Some((_, meta))) => {
197 let is_pre_bedrock = meta.block_number < self.bedrock_block;
199 if is_pre_bedrock {
200 debug!(
201 target: "rpc::historical",
202 ?tx_hash,
203 block_num = meta.block_number,
204 bedrock = self.bedrock_block,
205 "transaction found in pre-bedrock block, forwarding to historical endpoint"
206 );
207 }
208 is_pre_bedrock
209 }
210 _ => {
211 debug!(
213 target: "rpc::historical",
214 ?tx_hash,
215 "transaction not found locally, forwarding to historical endpoint"
216 );
217 true
218 }
219 }
220 })
221 .unwrap_or(false)
222 }
223
224 fn should_forward_block_request(&self, method: &str, req: &Request<'_>) -> bool {
226 let maybe_block_id = extract_block_id_for_method(method, &req.params());
227
228 maybe_block_id.map(|block_id| self.is_pre_bedrock(block_id)).unwrap_or(false)
229 }
230
231 fn is_pre_bedrock(&self, block_id: BlockId) -> bool {
233 match self.provider.block_number_for_id(block_id) {
234 Ok(Some(num)) => {
235 debug!(
236 target: "rpc::historical",
237 ?block_id,
238 block_num=num,
239 bedrock=self.bedrock_block,
240 "found block number"
241 );
242 num < self.bedrock_block
243 }
244 Ok(None) if block_id.is_hash() => {
245 debug!(
246 target: "rpc::historical",
247 ?block_id,
248 "block hash not found locally, assuming pre-bedrock"
249 );
250 true
251 }
252 _ => {
253 debug!(
254 target: "rpc::historical",
255 ?block_id,
256 "could not determine block number; not forwarding"
257 );
258 false
259 }
260 }
261 }
262
263 async fn forward_to_historical(&self, req: &Request<'_>) -> Option<MethodResponse> {
265 debug!(
266 target: "rpc::historical",
267 method = %req.method_name(),
268 params=?req.params(),
269 "forwarding request to historical endpoint"
270 );
271
272 let params = req.params();
273 let params_str = params.as_str().unwrap_or("[]");
274
275 let params = serde_json::from_str::<serde_json::Value>(params_str).ok()?;
276
277 let raw =
278 self.client.request::<_, serde_json::Value>(req.method_name(), params).await.ok()?;
279
280 let payload = jsonrpsee_types::ResponsePayload::success(raw).into();
281 Some(MethodResponse::response(req.id.clone(), payload, usize::MAX))
282 }
283}
284
285#[derive(Debug)]
287enum ParseError {
288 InvalidFormat,
289 MissingParameter,
290}
291
292fn extract_block_id_for_method(method: &str, params: &Params<'_>) -> Option<BlockId> {
294 match method {
295 "eth_getBlockByNumber" |
296 "eth_getBlockByHash" |
297 "debug_traceBlockByNumber" |
298 "debug_traceBlockByHash" => parse_block_id_from_params(params, 0),
299 "eth_getBalance" |
300 "eth_getCode" |
301 "eth_getTransactionCount" |
302 "eth_call" |
303 "eth_estimateGas" |
304 "eth_createAccessList" |
305 "debug_traceCall" => parse_block_id_from_params(params, 1),
306 "eth_getStorageAt" | "eth_getProof" => parse_block_id_from_params(params, 2),
307 _ => None,
308 }
309}
310
311fn parse_block_id_from_params(params: &Params<'_>, position: usize) -> Option<BlockId> {
313 let values: Vec<serde_json::Value> = params.parse().ok()?;
314 let val = values.into_iter().nth(position)?;
315 serde_json::from_value::<BlockId>(val).ok()
316}
317
318fn parse_transaction_hash_from_params(params: &Params<'_>) -> Result<B256, ParseError> {
320 let values: Vec<serde_json::Value> = params.parse().map_err(|_| ParseError::InvalidFormat)?;
321 let val = values.into_iter().next().ok_or(ParseError::MissingParameter)?;
322 serde_json::from_value::<B256>(val).map_err(|_| ParseError::InvalidFormat)
323}
324
325#[cfg(test)]
326mod tests {
327 use super::*;
328 use alloy_eips::{BlockId, BlockNumberOrTag};
329 use jsonrpsee::types::Params;
330 use jsonrpsee_core::middleware::layer::Either;
331 use reth_node_builder::rpc::RethRpcMiddleware;
332 use reth_storage_api::noop::NoopProvider;
333 use tower::layer::util::Identity;
334
335 #[test]
336 fn check_historical_rpc() {
337 fn assert_historical_rpc<T: RethRpcMiddleware>() {}
338 assert_historical_rpc::<HistoricalRpc<NoopProvider>>();
339 assert_historical_rpc::<Either<HistoricalRpc<NoopProvider>, Identity>>();
340 }
341
342 #[test]
344 fn parses_block_id_from_first_param() {
345 let params_num = Params::new(Some(r#"["0x64"]"#)); assert_eq!(
348 parse_block_id_from_params(¶ms_num, 0).unwrap(),
349 BlockId::Number(BlockNumberOrTag::Number(100))
350 );
351
352 let params_tag = Params::new(Some(r#"["earliest"]"#));
354 assert_eq!(
355 parse_block_id_from_params(¶ms_tag, 0).unwrap(),
356 BlockId::Number(BlockNumberOrTag::Earliest)
357 );
358 }
359
360 #[test]
362 fn parses_block_id_from_second_param() {
363 let params =
364 Params::new(Some(r#"["0x0000000000000000000000000000000000000000", "latest"]"#));
365 let result = parse_block_id_from_params(¶ms, 1).unwrap();
366 assert_eq!(result, BlockId::Number(BlockNumberOrTag::Latest));
367 }
368
369 #[test]
371 fn defaults_to_latest_when_param_is_missing() {
372 let params = Params::new(Some(r#"["0x0000000000000000000000000000000000000000"]"#));
373 let result = parse_block_id_from_params(¶ms, 1);
374 assert!(result.is_none());
375 }
376
377 #[test]
379 fn returns_error_for_invalid_input() {
380 let params = Params::new(Some(r#"[true]"#));
381 let result = parse_block_id_from_params(¶ms, 0);
382 assert!(result.is_none());
383 }
384
385 #[test]
387 fn parses_transaction_hash_from_params() {
388 let hash = "0xdbdfa0f88b2cf815fdc1621bd20c2bd2b0eed4f0c56c9be2602957b5a60ec702";
389 let params_str = format!(r#"["{}"]"#, hash);
390 let params = Params::new(Some(¶ms_str));
391 let result = parse_transaction_hash_from_params(¶ms);
392 assert!(result.is_ok());
393 let parsed_hash = result.unwrap();
394 assert_eq!(format!("{:?}", parsed_hash), hash);
395 }
396
397 #[test]
399 fn returns_error_for_invalid_tx_hash() {
400 let params = Params::new(Some(r#"["not_a_hash"]"#));
401 let result = parse_transaction_hash_from_params(¶ms);
402 assert!(result.is_err());
403 assert!(matches!(result.unwrap_err(), ParseError::InvalidFormat));
404 }
405
406 #[test]
408 fn returns_error_for_missing_parameter() {
409 let params = Params::new(Some(r#"[]"#));
410 let result = parse_transaction_hash_from_params(¶ms);
411 assert!(result.is_err());
412 assert!(matches!(result.unwrap_err(), ParseError::MissingParameter));
413 }
414}