1use crate::sequencer::Error;
4use alloy_eips::BlockId;
5use alloy_json_rpc::{RpcRecv, RpcSend};
6use alloy_primitives::{BlockNumber, B256};
7use alloy_rpc_client::RpcClient;
8use jsonrpsee::BatchResponseBuilder;
9use jsonrpsee_core::{
10 middleware::{Batch, BatchEntry, Notification, RpcServiceT},
11 server::MethodResponse,
12};
13use jsonrpsee_types::{Params, Request};
14use reth_storage_api::{BlockReaderIdExt, TransactionsProvider};
15use std::{future::Future, sync::Arc};
16use tracing::{debug, warn};
17
18#[derive(Debug, Clone)]
23pub struct HistoricalRpcClient {
24 inner: Arc<HistoricalRpcClientInner>,
25}
26
27impl HistoricalRpcClient {
28 pub fn new(endpoint: &str) -> Result<Self, Error> {
30 let client = RpcClient::new_http(
31 endpoint.parse::<reqwest::Url>().map_err(|err| Error::InvalidUrl(err.to_string()))?,
32 );
33
34 Ok(Self {
35 inner: Arc::new(HistoricalRpcClientInner {
36 historical_endpoint: endpoint.to_string(),
37 client,
38 }),
39 })
40 }
41
42 fn client(&self) -> &RpcClient {
44 &self.inner.client
45 }
46
47 pub async fn request<Params: RpcSend, Resp: RpcRecv>(
49 &self,
50 method: &str,
51 params: Params,
52 ) -> Result<Resp, Error> {
53 let resp =
54 self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
55 |err| {
56 warn!(
57 target: "rpc::historical",
58 %err,
59 "HTTP request to historical endpoint failed"
60 );
61 },
62 )?;
63
64 Ok(resp)
65 }
66
67 pub fn endpoint(&self) -> &str {
69 &self.inner.historical_endpoint
70 }
71}
72
73#[derive(Debug)]
74struct HistoricalRpcClientInner {
75 historical_endpoint: String,
76 client: RpcClient,
77}
78
79#[derive(Debug, Clone)]
81pub struct HistoricalRpc<P> {
82 inner: Arc<HistoricalRpcInner<P>>,
83}
84
85impl<P> HistoricalRpc<P> {
86 pub fn new(provider: P, client: HistoricalRpcClient, bedrock_block: BlockNumber) -> Self {
89 let inner = Arc::new(HistoricalRpcInner { provider, client, bedrock_block });
90
91 Self { inner }
92 }
93}
94
95impl<S, P> tower::Layer<S> for HistoricalRpc<P> {
96 type Service = HistoricalRpcService<S, P>;
97
98 fn layer(&self, inner: S) -> Self::Service {
99 HistoricalRpcService::new(inner, self.inner.clone())
100 }
101}
102
103#[derive(Debug, Clone)]
109pub struct HistoricalRpcService<S, P> {
110 inner: S,
112 historical: Arc<HistoricalRpcInner<P>>,
114}
115
116impl<S, P> HistoricalRpcService<S, P> {
117 const fn new(inner: S, historical: Arc<HistoricalRpcInner<P>>) -> Self {
120 Self { inner, historical }
121 }
122}
123
124impl<S, P> RpcServiceT for HistoricalRpcService<S, P>
125where
126 S: RpcServiceT<
127 MethodResponse = MethodResponse,
128 BatchResponse = MethodResponse,
129 NotificationResponse = MethodResponse,
130 > + Send
131 + Sync
132 + Clone
133 + 'static,
134 P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone + 'static,
135{
136 type MethodResponse = S::MethodResponse;
137 type NotificationResponse = S::NotificationResponse;
138 type BatchResponse = S::BatchResponse;
139
140 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
141 let inner_service = self.inner.clone();
142 let historical = self.historical.clone();
143
144 Box::pin(async move {
145 if let Some(response) = historical.maybe_forward_request(&req).await {
147 return response
148 }
149
150 inner_service.call(req).await
152 })
153 }
154
155 fn batch<'a>(
156 &self,
157 mut req: Batch<'a>,
158 ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
159 let this = self.clone();
160 let historical = self.historical.clone();
161
162 async move {
163 let mut needs_forwarding = false;
164 for entry in req.iter_mut() {
165 if let Ok(BatchEntry::Call(call)) = entry &&
166 historical.should_forward_request(call)
167 {
168 needs_forwarding = true;
169 break;
170 }
171 }
172
173 if !needs_forwarding {
174 return this.inner.batch(req).await;
176 }
177
178 let mut batch_rp = BatchResponseBuilder::new_with_limit(usize::MAX);
180 let mut got_notification = false;
181
182 for batch_entry in req {
183 match batch_entry {
184 Ok(BatchEntry::Call(req)) => {
185 let rp = this.call(req).await;
186 if let Err(err) = batch_rp.append(rp) {
187 return err;
188 }
189 }
190 Ok(BatchEntry::Notification(n)) => {
191 got_notification = true;
192 this.notification(n).await;
193 }
194 Err(err) => {
195 let (err, id) = err.into_parts();
196 let rp = MethodResponse::error(id, err);
197 if let Err(err) = batch_rp.append(rp) {
198 return err;
199 }
200 }
201 }
202 }
203
204 if batch_rp.is_empty() && got_notification {
206 MethodResponse::notification()
207 }
208 else {
210 MethodResponse::from_batch(batch_rp.finish())
211 }
212 }
213 }
214
215 fn notification<'a>(
216 &self,
217 n: Notification<'a>,
218 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
219 self.inner.notification(n)
220 }
221}
222
223#[derive(Debug)]
224struct HistoricalRpcInner<P> {
225 provider: P,
227 client: HistoricalRpcClient,
229 bedrock_block: BlockNumber,
231}
232
233impl<P> HistoricalRpcInner<P>
234where
235 P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone,
236{
237 fn should_forward_request(&self, req: &Request<'_>) -> bool {
239 match req.method_name() {
240 "debug_traceTransaction" |
241 "eth_getTransactionByHash" |
242 "eth_getTransactionReceipt" |
243 "eth_getRawTransactionByHash" => self.should_forward_transaction(req),
244 method => self.should_forward_block_request(method, req),
245 }
246 }
247
248 async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
251 if self.should_forward_request(req) {
252 return self.forward_to_historical(req).await
253 }
254 None
255 }
256
257 fn should_forward_transaction(&self, req: &Request<'_>) -> bool {
259 parse_transaction_hash_from_params(&req.params())
260 .ok()
261 .map(|tx_hash| {
262 match self.provider.transaction_by_hash_with_meta(tx_hash) {
264 Ok(Some((_, meta))) => {
265 let is_pre_bedrock = meta.block_number < self.bedrock_block;
267 if is_pre_bedrock {
268 debug!(
269 target: "rpc::historical",
270 ?tx_hash,
271 block_num = meta.block_number,
272 bedrock = self.bedrock_block,
273 "transaction found in pre-bedrock block, forwarding to historical endpoint"
274 );
275 }
276 is_pre_bedrock
277 }
278 _ => {
279 debug!(
281 target: "rpc::historical",
282 ?tx_hash,
283 "transaction not found locally, forwarding to historical endpoint"
284 );
285 true
286 }
287 }
288 })
289 .unwrap_or(false)
290 }
291
292 fn should_forward_block_request(&self, method: &str, req: &Request<'_>) -> bool {
294 let maybe_block_id = extract_block_id_for_method(method, &req.params());
295
296 maybe_block_id.map(|block_id| self.is_pre_bedrock(block_id)).unwrap_or(false)
297 }
298
299 fn is_pre_bedrock(&self, block_id: BlockId) -> bool {
301 match self.provider.block_number_for_id(block_id) {
302 Ok(Some(num)) => {
303 debug!(
304 target: "rpc::historical",
305 ?block_id,
306 block_num=num,
307 bedrock=self.bedrock_block,
308 "found block number"
309 );
310 num < self.bedrock_block
311 }
312 Ok(None) if block_id.is_hash() => {
313 debug!(
314 target: "rpc::historical",
315 ?block_id,
316 "block hash not found locally, assuming pre-bedrock"
317 );
318 true
319 }
320 _ => {
321 debug!(
322 target: "rpc::historical",
323 ?block_id,
324 "could not determine block number; not forwarding"
325 );
326 false
327 }
328 }
329 }
330
331 async fn forward_to_historical(&self, req: &Request<'_>) -> Option<MethodResponse> {
333 debug!(
334 target: "rpc::historical",
335 method = %req.method_name(),
336 params=?req.params(),
337 "forwarding request to historical endpoint"
338 );
339
340 let params = req.params();
341 let params_str = params.as_str().unwrap_or("[]");
342
343 let params = serde_json::from_str::<serde_json::Value>(params_str).ok()?;
344
345 let raw =
346 self.client.request::<_, serde_json::Value>(req.method_name(), params).await.ok()?;
347
348 let payload = jsonrpsee_types::ResponsePayload::success(raw).into();
349 Some(MethodResponse::response(req.id.clone(), payload, usize::MAX))
350 }
351}
352
353#[derive(Debug)]
355enum ParseError {
356 InvalidFormat,
357 MissingParameter,
358}
359
360fn extract_block_id_for_method(method: &str, params: &Params<'_>) -> Option<BlockId> {
362 match method {
363 "eth_getBlockByNumber" |
364 "eth_getBlockByHash" |
365 "debug_traceBlockByNumber" |
366 "debug_traceBlockByHash" => parse_block_id_from_params(params, 0),
367 "eth_getBalance" |
368 "eth_getCode" |
369 "eth_getTransactionCount" |
370 "eth_call" |
371 "eth_estimateGas" |
372 "eth_createAccessList" |
373 "debug_traceCall" => parse_block_id_from_params(params, 1),
374 "eth_getStorageAt" | "eth_getProof" => parse_block_id_from_params(params, 2),
375 _ => None,
376 }
377}
378
379fn parse_block_id_from_params(params: &Params<'_>, position: usize) -> Option<BlockId> {
381 let values: Vec<serde_json::Value> = params.parse().ok()?;
382 let val = values.into_iter().nth(position)?;
383 serde_json::from_value::<BlockId>(val).ok()
384}
385
386fn parse_transaction_hash_from_params(params: &Params<'_>) -> Result<B256, ParseError> {
388 let values: Vec<serde_json::Value> = params.parse().map_err(|_| ParseError::InvalidFormat)?;
389 let val = values.into_iter().next().ok_or(ParseError::MissingParameter)?;
390 serde_json::from_value::<B256>(val).map_err(|_| ParseError::InvalidFormat)
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396 use alloy_eips::{BlockId, BlockNumberOrTag};
397 use jsonrpsee::types::Params;
398 use jsonrpsee_core::middleware::layer::Either;
399 use reth_node_builder::rpc::RethRpcMiddleware;
400 use reth_storage_api::noop::NoopProvider;
401 use tower::layer::util::Identity;
402
403 #[test]
404 fn check_historical_rpc() {
405 fn assert_historical_rpc<T: RethRpcMiddleware>() {}
406 assert_historical_rpc::<HistoricalRpc<NoopProvider>>();
407 assert_historical_rpc::<Either<HistoricalRpc<NoopProvider>, Identity>>();
408 }
409
410 #[test]
412 fn parses_block_id_from_first_param() {
413 let params_num = Params::new(Some(r#"["0x64"]"#)); assert_eq!(
416 parse_block_id_from_params(¶ms_num, 0).unwrap(),
417 BlockId::Number(BlockNumberOrTag::Number(100))
418 );
419
420 let params_tag = Params::new(Some(r#"["earliest"]"#));
422 assert_eq!(
423 parse_block_id_from_params(¶ms_tag, 0).unwrap(),
424 BlockId::Number(BlockNumberOrTag::Earliest)
425 );
426 }
427
428 #[test]
430 fn parses_block_id_from_second_param() {
431 let params =
432 Params::new(Some(r#"["0x0000000000000000000000000000000000000000", "latest"]"#));
433 let result = parse_block_id_from_params(¶ms, 1).unwrap();
434 assert_eq!(result, BlockId::Number(BlockNumberOrTag::Latest));
435 }
436
437 #[test]
439 fn defaults_to_latest_when_param_is_missing() {
440 let params = Params::new(Some(r#"["0x0000000000000000000000000000000000000000"]"#));
441 let result = parse_block_id_from_params(¶ms, 1);
442 assert!(result.is_none());
443 }
444
445 #[test]
447 fn returns_error_for_invalid_input() {
448 let params = Params::new(Some(r#"[true]"#));
449 let result = parse_block_id_from_params(¶ms, 0);
450 assert!(result.is_none());
451 }
452
453 #[test]
455 fn parses_transaction_hash_from_params() {
456 let hash = "0xdbdfa0f88b2cf815fdc1621bd20c2bd2b0eed4f0c56c9be2602957b5a60ec702";
457 let params_str = format!(r#"["{hash}"]"#);
458 let params = Params::new(Some(¶ms_str));
459 let result = parse_transaction_hash_from_params(¶ms);
460 assert!(result.is_ok());
461 let parsed_hash = result.unwrap();
462 assert_eq!(format!("{parsed_hash:?}"), hash);
463 }
464
465 #[test]
467 fn returns_error_for_invalid_tx_hash() {
468 let params = Params::new(Some(r#"["not_a_hash"]"#));
469 let result = parse_transaction_hash_from_params(¶ms);
470 assert!(result.is_err());
471 assert!(matches!(result.unwrap_err(), ParseError::InvalidFormat));
472 }
473
474 #[test]
476 fn returns_error_for_missing_parameter() {
477 let params = Params::new(Some(r#"[]"#));
478 let result = parse_transaction_hash_from_params(¶ms);
479 assert!(result.is_err());
480 assert!(matches!(result.unwrap_err(), ParseError::MissingParameter));
481 }
482}