1use crate::sequencer::Error;
4use alloy_eips::BlockId;
5use alloy_json_rpc::{RpcRecv, RpcSend};
6use alloy_primitives::{BlockNumber, B256};
7use alloy_rpc_client::RpcClient;
8use jsonrpsee_core::{
9 middleware::{Batch, Notification, RpcServiceT},
10 server::MethodResponse,
11};
12use jsonrpsee_types::{Params, Request};
13use reth_storage_api::{BlockReaderIdExt, TransactionsProvider};
14use std::{future::Future, sync::Arc};
15use tracing::{debug, warn};
16
17#[derive(Debug, Clone)]
22pub struct HistoricalRpcClient {
23 inner: Arc<HistoricalRpcClientInner>,
24}
25
26impl HistoricalRpcClient {
27 pub fn new(endpoint: &str) -> Result<Self, Error> {
29 let client = RpcClient::new_http(
30 endpoint.parse::<reqwest::Url>().map_err(|err| Error::InvalidUrl(err.to_string()))?,
31 );
32
33 Ok(Self {
34 inner: Arc::new(HistoricalRpcClientInner {
35 historical_endpoint: endpoint.to_string(),
36 client,
37 }),
38 })
39 }
40
41 fn client(&self) -> &RpcClient {
43 &self.inner.client
44 }
45
46 pub async fn request<Params: RpcSend, Resp: RpcRecv>(
48 &self,
49 method: &str,
50 params: Params,
51 ) -> Result<Resp, Error> {
52 let resp =
53 self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
54 |err| {
55 warn!(
56 target: "rpc::historical",
57 %err,
58 "HTTP request to historical endpoint failed"
59 );
60 },
61 )?;
62
63 Ok(resp)
64 }
65
66 pub fn endpoint(&self) -> &str {
68 &self.inner.historical_endpoint
69 }
70}
71
72#[derive(Debug)]
73struct HistoricalRpcClientInner {
74 historical_endpoint: String,
75 client: RpcClient,
76}
77
78#[derive(Debug, Clone)]
80pub struct HistoricalRpc<P> {
81 inner: Arc<HistoricalRpcInner<P>>,
82}
83
84impl<P> HistoricalRpc<P> {
85 pub fn new(provider: P, client: HistoricalRpcClient, bedrock_block: BlockNumber) -> Self {
88 let inner = Arc::new(HistoricalRpcInner { provider, client, bedrock_block });
89
90 Self { inner }
91 }
92}
93
94impl<S, P> tower::Layer<S> for HistoricalRpc<P> {
95 type Service = HistoricalRpcService<S, P>;
96
97 fn layer(&self, inner: S) -> Self::Service {
98 HistoricalRpcService::new(inner, self.inner.clone())
99 }
100}
101
102#[derive(Debug, Clone)]
108pub struct HistoricalRpcService<S, P> {
109 inner: S,
111 historical: Arc<HistoricalRpcInner<P>>,
113}
114
115impl<S, P> HistoricalRpcService<S, P> {
116 const fn new(inner: S, historical: Arc<HistoricalRpcInner<P>>) -> Self {
119 Self { inner, historical }
120 }
121}
122
123impl<S, P> RpcServiceT for HistoricalRpcService<S, P>
124where
125 S: RpcServiceT<MethodResponse = MethodResponse> + Send + Sync + Clone + 'static,
126
127 P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone + 'static,
128{
129 type MethodResponse = S::MethodResponse;
130 type NotificationResponse = S::NotificationResponse;
131 type BatchResponse = S::BatchResponse;
132
133 fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
134 let inner_service = self.inner.clone();
135 let historical = self.historical.clone();
136
137 Box::pin(async move {
138 if let Some(response) = historical.maybe_forward_request(&req).await {
140 return response
141 }
142
143 inner_service.call(req).await
145 })
146 }
147
148 fn batch<'a>(&self, req: Batch<'a>) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
149 self.inner.batch(req)
150 }
151
152 fn notification<'a>(
153 &self,
154 n: Notification<'a>,
155 ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
156 self.inner.notification(n)
157 }
158}
159
160#[derive(Debug)]
161struct HistoricalRpcInner<P> {
162 provider: P,
164 client: HistoricalRpcClient,
166 bedrock_block: BlockNumber,
168}
169
170impl<P> HistoricalRpcInner<P>
171where
172 P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone,
173{
174 async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
177 let should_forward = match req.method_name() {
178 "debug_traceTransaction" |
179 "eth_getTransactionByHash" |
180 "eth_getTransactionReceipt" |
181 "eth_getRawTransactionByHash" => self.should_forward_transaction(req),
182 method => self.should_forward_block_request(method, req),
183 };
184
185 if should_forward {
186 return self.forward_to_historical(req).await
187 }
188
189 None
190 }
191
192 fn should_forward_transaction(&self, req: &Request<'_>) -> bool {
194 parse_transaction_hash_from_params(&req.params())
195 .ok()
196 .map(|tx_hash| {
197 match self.provider.transaction_by_hash_with_meta(tx_hash) {
199 Ok(Some((_, meta))) => {
200 let is_pre_bedrock = meta.block_number < self.bedrock_block;
202 if is_pre_bedrock {
203 debug!(
204 target: "rpc::historical",
205 ?tx_hash,
206 block_num = meta.block_number,
207 bedrock = self.bedrock_block,
208 "transaction found in pre-bedrock block, forwarding to historical endpoint"
209 );
210 }
211 is_pre_bedrock
212 }
213 _ => {
214 debug!(
216 target: "rpc::historical",
217 ?tx_hash,
218 "transaction not found locally, forwarding to historical endpoint"
219 );
220 true
221 }
222 }
223 })
224 .unwrap_or(false)
225 }
226
227 fn should_forward_block_request(&self, method: &str, req: &Request<'_>) -> bool {
229 let maybe_block_id = extract_block_id_for_method(method, &req.params());
230
231 maybe_block_id.map(|block_id| self.is_pre_bedrock(block_id)).unwrap_or(false)
232 }
233
234 fn is_pre_bedrock(&self, block_id: BlockId) -> bool {
236 match self.provider.block_number_for_id(block_id) {
237 Ok(Some(num)) => {
238 debug!(
239 target: "rpc::historical",
240 ?block_id,
241 block_num=num,
242 bedrock=self.bedrock_block,
243 "found block number"
244 );
245 num < self.bedrock_block
246 }
247 Ok(None) if block_id.is_hash() => {
248 debug!(
249 target: "rpc::historical",
250 ?block_id,
251 "block hash not found locally, assuming pre-bedrock"
252 );
253 true
254 }
255 _ => {
256 debug!(
257 target: "rpc::historical",
258 ?block_id,
259 "could not determine block number; not forwarding"
260 );
261 false
262 }
263 }
264 }
265
266 async fn forward_to_historical(&self, req: &Request<'_>) -> Option<MethodResponse> {
268 debug!(
269 target: "rpc::historical",
270 method = %req.method_name(),
271 params=?req.params(),
272 "forwarding request to historical endpoint"
273 );
274
275 let params = req.params();
276 let params_str = params.as_str().unwrap_or("[]");
277
278 let params = serde_json::from_str::<serde_json::Value>(params_str).ok()?;
279
280 let raw =
281 self.client.request::<_, serde_json::Value>(req.method_name(), params).await.ok()?;
282
283 let payload = jsonrpsee_types::ResponsePayload::success(raw).into();
284 Some(MethodResponse::response(req.id.clone(), payload, usize::MAX))
285 }
286}
287
288#[derive(Debug)]
290enum ParseError {
291 InvalidFormat,
292 MissingParameter,
293}
294
295fn extract_block_id_for_method(method: &str, params: &Params<'_>) -> Option<BlockId> {
297 match method {
298 "eth_getBlockByNumber" |
299 "eth_getBlockByHash" |
300 "debug_traceBlockByNumber" |
301 "debug_traceBlockByHash" => parse_block_id_from_params(params, 0),
302 "eth_getBalance" |
303 "eth_getCode" |
304 "eth_getTransactionCount" |
305 "eth_call" |
306 "eth_estimateGas" |
307 "eth_createAccessList" |
308 "debug_traceCall" => parse_block_id_from_params(params, 1),
309 "eth_getStorageAt" | "eth_getProof" => parse_block_id_from_params(params, 2),
310 _ => None,
311 }
312}
313
314fn parse_block_id_from_params(params: &Params<'_>, position: usize) -> Option<BlockId> {
316 let values: Vec<serde_json::Value> = params.parse().ok()?;
317 let val = values.into_iter().nth(position)?;
318 serde_json::from_value::<BlockId>(val).ok()
319}
320
321fn parse_transaction_hash_from_params(params: &Params<'_>) -> Result<B256, ParseError> {
323 let values: Vec<serde_json::Value> = params.parse().map_err(|_| ParseError::InvalidFormat)?;
324 let val = values.into_iter().next().ok_or(ParseError::MissingParameter)?;
325 serde_json::from_value::<B256>(val).map_err(|_| ParseError::InvalidFormat)
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331 use alloy_eips::{BlockId, BlockNumberOrTag};
332 use jsonrpsee::types::Params;
333 use jsonrpsee_core::middleware::layer::Either;
334 use reth_node_builder::rpc::RethRpcMiddleware;
335 use reth_storage_api::noop::NoopProvider;
336 use tower::layer::util::Identity;
337
338 #[test]
339 fn check_historical_rpc() {
340 fn assert_historical_rpc<T: RethRpcMiddleware>() {}
341 assert_historical_rpc::<HistoricalRpc<NoopProvider>>();
342 assert_historical_rpc::<Either<HistoricalRpc<NoopProvider>, Identity>>();
343 }
344
345 #[test]
347 fn parses_block_id_from_first_param() {
348 let params_num = Params::new(Some(r#"["0x64"]"#)); assert_eq!(
351 parse_block_id_from_params(¶ms_num, 0).unwrap(),
352 BlockId::Number(BlockNumberOrTag::Number(100))
353 );
354
355 let params_tag = Params::new(Some(r#"["earliest"]"#));
357 assert_eq!(
358 parse_block_id_from_params(¶ms_tag, 0).unwrap(),
359 BlockId::Number(BlockNumberOrTag::Earliest)
360 );
361 }
362
363 #[test]
365 fn parses_block_id_from_second_param() {
366 let params =
367 Params::new(Some(r#"["0x0000000000000000000000000000000000000000", "latest"]"#));
368 let result = parse_block_id_from_params(¶ms, 1).unwrap();
369 assert_eq!(result, BlockId::Number(BlockNumberOrTag::Latest));
370 }
371
372 #[test]
374 fn defaults_to_latest_when_param_is_missing() {
375 let params = Params::new(Some(r#"["0x0000000000000000000000000000000000000000"]"#));
376 let result = parse_block_id_from_params(¶ms, 1);
377 assert!(result.is_none());
378 }
379
380 #[test]
382 fn returns_error_for_invalid_input() {
383 let params = Params::new(Some(r#"[true]"#));
384 let result = parse_block_id_from_params(¶ms, 0);
385 assert!(result.is_none());
386 }
387
388 #[test]
390 fn parses_transaction_hash_from_params() {
391 let hash = "0xdbdfa0f88b2cf815fdc1621bd20c2bd2b0eed4f0c56c9be2602957b5a60ec702";
392 let params_str = format!(r#"["{hash}"]"#);
393 let params = Params::new(Some(¶ms_str));
394 let result = parse_transaction_hash_from_params(¶ms);
395 assert!(result.is_ok());
396 let parsed_hash = result.unwrap();
397 assert_eq!(format!("{parsed_hash:?}"), hash);
398 }
399
400 #[test]
402 fn returns_error_for_invalid_tx_hash() {
403 let params = Params::new(Some(r#"["not_a_hash"]"#));
404 let result = parse_transaction_hash_from_params(¶ms);
405 assert!(result.is_err());
406 assert!(matches!(result.unwrap_err(), ParseError::InvalidFormat));
407 }
408
409 #[test]
411 fn returns_error_for_missing_parameter() {
412 let params = Params::new(Some(r#"[]"#));
413 let result = parse_transaction_hash_from_params(¶ms);
414 assert!(result.is_err());
415 assert!(matches!(result.unwrap_err(), ParseError::MissingParameter));
416 }
417}