reth_optimism_rpc/
historical.rs

1//! Client support for optimism historical RPC requests.
2
3use crate::sequencer::Error;
4use alloy_eips::BlockId;
5use alloy_json_rpc::{RpcRecv, RpcSend};
6use alloy_primitives::{BlockNumber, B256};
7use alloy_rpc_client::RpcClient;
8use jsonrpsee::BatchResponseBuilder;
9use jsonrpsee_core::{
10    middleware::{Batch, BatchEntry, Notification, RpcServiceT},
11    server::MethodResponse,
12};
13use jsonrpsee_types::{Params, Request};
14use reth_storage_api::{BlockReaderIdExt, TransactionsProvider};
15use std::{future::Future, sync::Arc};
16use tracing::{debug, warn};
17
18/// A client that can be used to forward RPC requests for historical data to an endpoint.
19///
20/// This is intended to be used for OP-Mainnet pre-bedrock data, allowing users to query historical
21/// state.
22#[derive(Debug, Clone)]
23pub struct HistoricalRpcClient {
24    inner: Arc<HistoricalRpcClientInner>,
25}
26
27impl HistoricalRpcClient {
28    /// Constructs a new historical RPC client with the given endpoint URL.
29    pub fn new(endpoint: &str) -> Result<Self, Error> {
30        let client = RpcClient::new_http(
31            endpoint.parse::<reqwest::Url>().map_err(|err| Error::InvalidUrl(err.to_string()))?,
32        );
33
34        Ok(Self {
35            inner: Arc::new(HistoricalRpcClientInner {
36                historical_endpoint: endpoint.to_string(),
37                client,
38            }),
39        })
40    }
41
42    /// Returns a reference to the underlying RPC client
43    fn client(&self) -> &RpcClient {
44        &self.inner.client
45    }
46
47    /// Forwards a JSON-RPC request to the historical endpoint
48    pub async fn request<Params: RpcSend, Resp: RpcRecv>(
49        &self,
50        method: &str,
51        params: Params,
52    ) -> Result<Resp, Error> {
53        let resp =
54            self.client().request::<Params, Resp>(method.to_string(), params).await.inspect_err(
55                |err| {
56                    warn!(
57                        target: "rpc::historical",
58                        %err,
59                        "HTTP request to historical endpoint failed"
60                    );
61                },
62            )?;
63
64        Ok(resp)
65    }
66
67    /// Returns the configured historical endpoint URL
68    pub fn endpoint(&self) -> &str {
69        &self.inner.historical_endpoint
70    }
71}
72
73#[derive(Debug)]
74struct HistoricalRpcClientInner {
75    historical_endpoint: String,
76    client: RpcClient,
77}
78
79/// A layer that provides historical RPC forwarding functionality for a given service.
80#[derive(Debug, Clone)]
81pub struct HistoricalRpc<P> {
82    inner: Arc<HistoricalRpcInner<P>>,
83}
84
85impl<P> HistoricalRpc<P> {
86    /// Constructs a new historical RPC layer with the given provider, client and bedrock block
87    /// number.
88    pub fn new(provider: P, client: HistoricalRpcClient, bedrock_block: BlockNumber) -> Self {
89        let inner = Arc::new(HistoricalRpcInner { provider, client, bedrock_block });
90
91        Self { inner }
92    }
93}
94
95impl<S, P> tower::Layer<S> for HistoricalRpc<P> {
96    type Service = HistoricalRpcService<S, P>;
97
98    fn layer(&self, inner: S) -> Self::Service {
99        HistoricalRpcService::new(inner, self.inner.clone())
100    }
101}
102
103/// A service that intercepts RPC calls and forwards pre-bedrock historical requests
104/// to a dedicated endpoint.
105///
106/// This checks if the request is for a pre-bedrock block and forwards it via the configured
107/// historical RPC client.
108#[derive(Debug, Clone)]
109pub struct HistoricalRpcService<S, P> {
110    /// The inner service that handles regular RPC requests
111    inner: S,
112    /// The context required to forward historical requests.
113    historical: Arc<HistoricalRpcInner<P>>,
114}
115
116impl<S, P> HistoricalRpcService<S, P> {
117    /// Constructs a new historical RPC service with the given inner service, historical client,
118    /// provider, and bedrock block number.
119    const fn new(inner: S, historical: Arc<HistoricalRpcInner<P>>) -> Self {
120        Self { inner, historical }
121    }
122}
123
124impl<S, P> RpcServiceT for HistoricalRpcService<S, P>
125where
126    S: RpcServiceT<
127            MethodResponse = MethodResponse,
128            BatchResponse = MethodResponse,
129            NotificationResponse = MethodResponse,
130        > + Send
131        + Sync
132        + Clone
133        + 'static,
134    P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone + 'static,
135{
136    type MethodResponse = S::MethodResponse;
137    type NotificationResponse = S::NotificationResponse;
138    type BatchResponse = S::BatchResponse;
139
140    fn call<'a>(&self, req: Request<'a>) -> impl Future<Output = Self::MethodResponse> + Send + 'a {
141        let inner_service = self.inner.clone();
142        let historical = self.historical.clone();
143
144        Box::pin(async move {
145            // Check if request should be forwarded to historical endpoint
146            if let Some(response) = historical.maybe_forward_request(&req).await {
147                return response
148            }
149
150            // Handle the request with the inner service
151            inner_service.call(req).await
152        })
153    }
154
155    fn batch<'a>(
156        &self,
157        mut req: Batch<'a>,
158    ) -> impl Future<Output = Self::BatchResponse> + Send + 'a {
159        let this = self.clone();
160        let historical = self.historical.clone();
161
162        async move {
163            let mut needs_forwarding = false;
164            for entry in req.iter_mut() {
165                if let Ok(BatchEntry::Call(call)) = entry &&
166                    historical.should_forward_request(call)
167                {
168                    needs_forwarding = true;
169                    break;
170                }
171            }
172
173            if !needs_forwarding {
174                // no call needs to be forwarded and we can simply perform this batch request
175                return this.inner.batch(req).await;
176            }
177
178            // the entire response is checked above so we can assume that these don't exceed
179            let mut batch_rp = BatchResponseBuilder::new_with_limit(usize::MAX);
180            let mut got_notification = false;
181
182            for batch_entry in req {
183                match batch_entry {
184                    Ok(BatchEntry::Call(req)) => {
185                        let rp = this.call(req).await;
186                        if let Err(err) = batch_rp.append(rp) {
187                            return err;
188                        }
189                    }
190                    Ok(BatchEntry::Notification(n)) => {
191                        got_notification = true;
192                        this.notification(n).await;
193                    }
194                    Err(err) => {
195                        let (err, id) = err.into_parts();
196                        let rp = MethodResponse::error(id, err);
197                        if let Err(err) = batch_rp.append(rp) {
198                            return err;
199                        }
200                    }
201                }
202            }
203
204            // If the batch is empty and we got a notification, we return an empty response.
205            if batch_rp.is_empty() && got_notification {
206                MethodResponse::notification()
207            }
208            // An empty batch is regarded as an invalid request here.
209            else {
210                MethodResponse::from_batch(batch_rp.finish())
211            }
212        }
213    }
214
215    fn notification<'a>(
216        &self,
217        n: Notification<'a>,
218    ) -> impl Future<Output = Self::NotificationResponse> + Send + 'a {
219        self.inner.notification(n)
220    }
221}
222
223#[derive(Debug)]
224struct HistoricalRpcInner<P> {
225    /// Provider used to determine if a block is pre-bedrock
226    provider: P,
227    /// Client used to forward historical requests
228    client: HistoricalRpcClient,
229    /// Bedrock transition block number
230    bedrock_block: BlockNumber,
231}
232
233impl<P> HistoricalRpcInner<P>
234where
235    P: BlockReaderIdExt + TransactionsProvider + Send + Sync + Clone,
236{
237    /// Checks if a request should be forwarded to the historical endpoint (synchronous check).
238    fn should_forward_request(&self, req: &Request<'_>) -> bool {
239        match req.method_name() {
240            "debug_traceTransaction" |
241            "eth_getTransactionByHash" |
242            "eth_getTransactionReceipt" |
243            "eth_getRawTransactionByHash" => self.should_forward_transaction(req),
244            method => self.should_forward_block_request(method, req),
245        }
246    }
247
248    /// Checks if a request should be forwarded to the historical endpoint and returns
249    /// the response if it was forwarded.
250    async fn maybe_forward_request(&self, req: &Request<'_>) -> Option<MethodResponse> {
251        if self.should_forward_request(req) {
252            return self.forward_to_historical(req).await
253        }
254        None
255    }
256
257    /// Determines if a transaction request should be forwarded
258    fn should_forward_transaction(&self, req: &Request<'_>) -> bool {
259        parse_transaction_hash_from_params(&req.params())
260            .ok()
261            .map(|tx_hash| {
262                // Check if we can find the transaction locally and get its metadata
263                match self.provider.transaction_by_hash_with_meta(tx_hash) {
264                    Ok(Some((_, meta))) => {
265                        // Transaction found - check if it's pre-bedrock based on block number
266                        let is_pre_bedrock = meta.block_number < self.bedrock_block;
267                        if is_pre_bedrock {
268                            debug!(
269                                target: "rpc::historical",
270                                ?tx_hash,
271                                block_num = meta.block_number,
272                                bedrock = self.bedrock_block,
273                                "transaction found in pre-bedrock block, forwarding to historical endpoint"
274                            );
275                        }
276                        is_pre_bedrock
277                    }
278                    _ => {
279                        // Transaction not found locally, optimistically forward to historical endpoint
280                        debug!(
281                            target: "rpc::historical",
282                            ?tx_hash,
283                            "transaction not found locally, forwarding to historical endpoint"
284                        );
285                        true
286                    }
287                }
288            })
289            .unwrap_or(false)
290    }
291
292    /// Determines if a block-based request should be forwarded
293    fn should_forward_block_request(&self, method: &str, req: &Request<'_>) -> bool {
294        let maybe_block_id = extract_block_id_for_method(method, &req.params());
295
296        maybe_block_id.map(|block_id| self.is_pre_bedrock(block_id)).unwrap_or(false)
297    }
298
299    /// Checks if a block ID refers to a pre-bedrock block
300    fn is_pre_bedrock(&self, block_id: BlockId) -> bool {
301        match self.provider.block_number_for_id(block_id) {
302            Ok(Some(num)) => {
303                debug!(
304                    target: "rpc::historical",
305                    ?block_id,
306                    block_num=num,
307                    bedrock=self.bedrock_block,
308                    "found block number"
309                );
310                num < self.bedrock_block
311            }
312            Ok(None) if block_id.is_hash() => {
313                debug!(
314                    target: "rpc::historical",
315                    ?block_id,
316                    "block hash not found locally, assuming pre-bedrock"
317                );
318                true
319            }
320            _ => {
321                debug!(
322                    target: "rpc::historical",
323                    ?block_id,
324                    "could not determine block number; not forwarding"
325                );
326                false
327            }
328        }
329    }
330
331    /// Forwards a request to the historical endpoint
332    async fn forward_to_historical(&self, req: &Request<'_>) -> Option<MethodResponse> {
333        debug!(
334            target: "rpc::historical",
335            method = %req.method_name(),
336            params=?req.params(),
337            "forwarding request to historical endpoint"
338        );
339
340        let params = req.params();
341        let params_str = params.as_str().unwrap_or("[]");
342
343        let params = serde_json::from_str::<serde_json::Value>(params_str).ok()?;
344
345        let raw =
346            self.client.request::<_, serde_json::Value>(req.method_name(), params).await.ok()?;
347
348        let payload = jsonrpsee_types::ResponsePayload::success(raw).into();
349        Some(MethodResponse::response(req.id.clone(), payload, usize::MAX))
350    }
351}
352
353/// Error type for parameter parsing
354#[derive(Debug)]
355enum ParseError {
356    InvalidFormat,
357    MissingParameter,
358}
359
360/// Extracts the block ID from request parameters based on the method name
361fn extract_block_id_for_method(method: &str, params: &Params<'_>) -> Option<BlockId> {
362    match method {
363        "eth_getBlockByNumber" |
364        "eth_getBlockByHash" |
365        "debug_traceBlockByNumber" |
366        "debug_traceBlockByHash" => parse_block_id_from_params(params, 0),
367        "eth_getBalance" |
368        "eth_getCode" |
369        "eth_getTransactionCount" |
370        "eth_call" |
371        "eth_estimateGas" |
372        "eth_createAccessList" |
373        "debug_traceCall" => parse_block_id_from_params(params, 1),
374        "eth_getStorageAt" | "eth_getProof" => parse_block_id_from_params(params, 2),
375        _ => None,
376    }
377}
378
379/// Parses a `BlockId` from the given parameters at the specified position.
380fn parse_block_id_from_params(params: &Params<'_>, position: usize) -> Option<BlockId> {
381    let values: Vec<serde_json::Value> = params.parse().ok()?;
382    let val = values.into_iter().nth(position)?;
383    serde_json::from_value::<BlockId>(val).ok()
384}
385
386/// Parses a transaction hash from the first parameter.
387fn parse_transaction_hash_from_params(params: &Params<'_>) -> Result<B256, ParseError> {
388    let values: Vec<serde_json::Value> = params.parse().map_err(|_| ParseError::InvalidFormat)?;
389    let val = values.into_iter().next().ok_or(ParseError::MissingParameter)?;
390    serde_json::from_value::<B256>(val).map_err(|_| ParseError::InvalidFormat)
391}
392
393#[cfg(test)]
394mod tests {
395    use super::*;
396    use alloy_eips::{BlockId, BlockNumberOrTag};
397    use jsonrpsee::types::Params;
398    use jsonrpsee_core::middleware::layer::Either;
399    use reth_node_builder::rpc::RethRpcMiddleware;
400    use reth_storage_api::noop::NoopProvider;
401    use tower::layer::util::Identity;
402
403    #[test]
404    fn check_historical_rpc() {
405        fn assert_historical_rpc<T: RethRpcMiddleware>() {}
406        assert_historical_rpc::<HistoricalRpc<NoopProvider>>();
407        assert_historical_rpc::<Either<HistoricalRpc<NoopProvider>, Identity>>();
408    }
409
410    /// Tests that various valid id types can be parsed from the first parameter.
411    #[test]
412    fn parses_block_id_from_first_param() {
413        // Test with a block number
414        let params_num = Params::new(Some(r#"["0x64"]"#)); // 100
415        assert_eq!(
416            parse_block_id_from_params(&params_num, 0).unwrap(),
417            BlockId::Number(BlockNumberOrTag::Number(100))
418        );
419
420        // Test with the "earliest" tag
421        let params_tag = Params::new(Some(r#"["earliest"]"#));
422        assert_eq!(
423            parse_block_id_from_params(&params_tag, 0).unwrap(),
424            BlockId::Number(BlockNumberOrTag::Earliest)
425        );
426    }
427
428    /// Tests that the function correctly parses from a position other than 0.
429    #[test]
430    fn parses_block_id_from_second_param() {
431        let params =
432            Params::new(Some(r#"["0x0000000000000000000000000000000000000000", "latest"]"#));
433        let result = parse_block_id_from_params(&params, 1).unwrap();
434        assert_eq!(result, BlockId::Number(BlockNumberOrTag::Latest));
435    }
436
437    /// Tests that the function returns nothing if the parameter is missing or empty.
438    #[test]
439    fn defaults_to_latest_when_param_is_missing() {
440        let params = Params::new(Some(r#"["0x0000000000000000000000000000000000000000"]"#));
441        let result = parse_block_id_from_params(&params, 1);
442        assert!(result.is_none());
443    }
444
445    /// Tests that the function doesn't parse anything if the parameter is not a valid block id.
446    #[test]
447    fn returns_error_for_invalid_input() {
448        let params = Params::new(Some(r#"[true]"#));
449        let result = parse_block_id_from_params(&params, 0);
450        assert!(result.is_none());
451    }
452
453    /// Tests that transaction hashes can be parsed from params.
454    #[test]
455    fn parses_transaction_hash_from_params() {
456        let hash = "0xdbdfa0f88b2cf815fdc1621bd20c2bd2b0eed4f0c56c9be2602957b5a60ec702";
457        let params_str = format!(r#"["{hash}"]"#);
458        let params = Params::new(Some(&params_str));
459        let result = parse_transaction_hash_from_params(&params);
460        assert!(result.is_ok());
461        let parsed_hash = result.unwrap();
462        assert_eq!(format!("{parsed_hash:?}"), hash);
463    }
464
465    /// Tests that invalid transaction hash returns error.
466    #[test]
467    fn returns_error_for_invalid_tx_hash() {
468        let params = Params::new(Some(r#"["not_a_hash"]"#));
469        let result = parse_transaction_hash_from_params(&params);
470        assert!(result.is_err());
471        assert!(matches!(result.unwrap_err(), ParseError::InvalidFormat));
472    }
473
474    /// Tests that missing parameter returns appropriate error.
475    #[test]
476    fn returns_error_for_missing_parameter() {
477        let params = Params::new(Some(r#"[]"#));
478        let result = parse_transaction_hash_from_params(&params);
479        assert!(result.is_err());
480        assert!(matches!(result.unwrap_err(), ParseError::MissingParameter));
481    }
482}