reth_optimism_rpc/eth/
mod.rs

1//! OP-Reth `eth_` endpoint implementation.
2
3pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12    eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13    OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_eips::BlockNumHash;
17use alloy_primitives::{B256, U256};
18use alloy_rpc_types_eth::{Filter, Log};
19use eyre::WrapErr;
20use futures::StreamExt;
21use op_alloy_network::Optimism;
22use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
23pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
24use reqwest::Url;
25use reth_chainspec::{EthereumHardforks, Hardforks};
26use reth_evm::ConfigureEvm;
27use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
28use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
29use reth_optimism_flashblocks::{
30    FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx,
31    FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners,
32    PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
33};
34use reth_rpc::eth::core::EthApiInner;
35use reth_rpc_eth_api::{
36    helpers::{
37        pending_block::BuildPendingEnv, EthApiSpec, EthFees, EthState, LoadFee, LoadPendingBlock,
38        LoadState, SpawnBlocking, Trace,
39    },
40    EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
41    RpcNodeCoreExt, RpcTypes,
42};
43use reth_rpc_eth_types::{
44    logs_utils::matching_block_logs_with_tx_hashes, EthStateCache, FeeHistoryCache, GasPriceOracle,
45    PendingBlock,
46};
47use reth_storage_api::{BlockReaderIdExt, ProviderHeader};
48use reth_tasks::{
49    pool::{BlockingTaskGuard, BlockingTaskPool},
50    TaskSpawner,
51};
52use std::{
53    fmt::{self, Formatter},
54    marker::PhantomData,
55    sync::Arc,
56    time::Duration,
57};
58use tokio::{sync::watch, time};
59use tokio_stream::{wrappers::BroadcastStream, Stream};
60use tracing::info;
61
62/// Maximum duration to wait for a fresh flashblock when one is being built.
63const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
64
65/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
66pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
67
68/// OP-Reth `Eth` API implementation.
69///
70/// This type provides the functionality for handling `eth_` related requests.
71///
72/// This wraps a default `Eth` implementation, and provides additional functionality where the
73/// optimism spec deviates from the default (ethereum) spec, e.g. transaction forwarding to the
74/// sequencer, receipts, additional RPC fields for transaction receipts.
75///
76/// This type implements the [`FullEthApi`](reth_rpc_eth_api::helpers::FullEthApi) by implemented
77/// all the `Eth` helper traits and prerequisite traits.
78pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
79    /// Gateway to node's core components.
80    inner: Arc<OpEthApiInner<N, Rpc>>,
81}
82
83impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
84    fn clone(&self) -> Self {
85        Self { inner: self.inner.clone() }
86    }
87}
88
89impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
90    /// Creates a new `OpEthApi`.
91    pub fn new(
92        eth_api: EthApiNodeBackend<N, Rpc>,
93        sequencer_client: Option<SequencerClient>,
94        min_suggested_priority_fee: U256,
95        flashblocks: Option<FlashblocksListeners<N::Primitives>>,
96    ) -> Self {
97        let inner = Arc::new(OpEthApiInner {
98            eth_api,
99            sequencer_client,
100            min_suggested_priority_fee,
101            flashblocks,
102        });
103        Self { inner }
104    }
105
106    /// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
107    pub const fn builder() -> OpEthApiBuilder<Rpc> {
108        OpEthApiBuilder::new()
109    }
110
111    /// Returns a reference to the [`EthApiNodeBackend`].
112    pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
113        self.inner.eth_api()
114    }
115    /// Returns the configured sequencer client, if any.
116    pub fn sequencer_client(&self) -> Option<&SequencerClient> {
117        self.inner.sequencer_client()
118    }
119
120    /// Returns a cloned pending block receiver, if any.
121    pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
122        self.inner.flashblocks.as_ref().map(|f| f.pending_block_rx.clone())
123    }
124
125    /// Returns a new subscription to received flashblocks.
126    pub fn subscribe_received_flashblocks(&self) -> Option<FlashBlockRx> {
127        self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe())
128    }
129
130    /// Returns a new subscription to flashblock sequences.
131    pub fn subscribe_flashblock_sequence(&self) -> Option<FlashBlockCompleteSequenceRx> {
132        self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe())
133    }
134
135    /// Returns a stream of matching flashblock receipts, if any.
136    ///
137    /// This will yield all new matching receipts received from _new_ flashblocks.
138    pub fn flashblock_receipts_stream(
139        &self,
140        filter: Filter,
141    ) -> Option<impl Stream<Item = Log> + Send + Unpin> {
142        self.subscribe_received_flashblocks().map(|rx| {
143            BroadcastStream::new(rx)
144                .scan(
145                    None::<(u64, u64)>, // state buffers base block number and timestamp
146                    move |state, result| {
147                        let fb = match result.ok() {
148                            Some(fb) => fb,
149                            None => return futures::future::ready(None),
150                        };
151
152                        // Update state from base flashblock for block level meta data.
153                        if let Some(base) = &fb.base {
154                            *state = Some((base.block_number, base.timestamp));
155                        }
156
157                        let Some((block_number, timestamp)) = *state else {
158                            // we haven't received a new flashblock sequence yet, so we can skip
159                            // until we receive the first index 0 (base)
160                            return futures::future::ready(Some(Vec::new()))
161                        };
162
163                        let receipts =
164                            fb.metadata.receipts.iter().map(|(tx, receipt)| (*tx, receipt));
165
166                        let all_logs = matching_block_logs_with_tx_hashes(
167                            &filter,
168                            BlockNumHash::new(block_number, fb.diff.block_hash),
169                            timestamp,
170                            receipts,
171                            false,
172                        );
173
174                        futures::future::ready(Some(all_logs))
175                    },
176                )
177                .flat_map(futures::stream::iter)
178        })
179    }
180
181    /// Returns information about the flashblock currently being built, if any.
182    fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
183        self.inner.flashblocks.as_ref().and_then(|f| *f.in_progress_rx.borrow())
184    }
185
186    /// Extracts pending block if it matches the expected parent hash.
187    fn extract_matching_block(
188        &self,
189        block: Option<&PendingFlashBlock<N::Primitives>>,
190        parent_hash: B256,
191    ) -> Option<PendingBlock<N::Primitives>> {
192        block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
193    }
194
195    /// Awaits a fresh flashblock if one is being built, otherwise returns current.
196    async fn flashblock(
197        &self,
198        parent_hash: B256,
199    ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
200        let Some(rx) = self.inner.flashblocks.as_ref().map(|f| &f.pending_block_rx) else {
201            return Ok(None)
202        };
203
204        // Check if a flashblock is being built
205        if let Some(build_info) = self.flashblock_build_info() {
206            let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
207
208            // Check if this is the first flashblock or the next consecutive index
209            let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
210
211            // Wait only for relevant flashblocks: matching parent and next in sequence
212            if build_info.parent_hash == parent_hash && is_next_index {
213                let mut rx_clone = rx.clone();
214                // Wait up to MAX_FLASHBLOCK_WAIT_DURATION for a new flashblock to arrive
215                let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
216            }
217        }
218
219        // Fall back to current block
220        Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
221    }
222
223    /// Returns a [`PendingBlock`] that is built out of flashblocks.
224    ///
225    /// If flashblocks receiver is not set, then it always returns `None`.
226    ///
227    /// It may wait up to 50ms for a fresh flashblock if one is currently being built.
228    pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
229    where
230        OpEthApiError: FromEvmError<N::Evm>,
231        Rpc: RpcConvert<Primitives = N::Primitives>,
232    {
233        let Some(latest) = self.provider().latest_header()? else {
234            return Ok(None);
235        };
236
237        self.flashblock(latest.hash()).await
238    }
239}
240
241impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
242where
243    N: RpcNodeCore,
244    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
245{
246    type Error = OpEthApiError;
247    type NetworkTypes = Rpc::Network;
248    type RpcConvert = Rpc;
249
250    fn converter(&self) -> &Self::RpcConvert {
251        self.inner.eth_api.converter()
252    }
253}
254
255impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
256where
257    N: RpcNodeCore,
258    Rpc: RpcConvert<Primitives = N::Primitives>,
259{
260    type Primitives = N::Primitives;
261    type Provider = N::Provider;
262    type Pool = N::Pool;
263    type Evm = N::Evm;
264    type Network = N::Network;
265
266    #[inline]
267    fn pool(&self) -> &Self::Pool {
268        self.inner.eth_api.pool()
269    }
270
271    #[inline]
272    fn evm_config(&self) -> &Self::Evm {
273        self.inner.eth_api.evm_config()
274    }
275
276    #[inline]
277    fn network(&self) -> &Self::Network {
278        self.inner.eth_api.network()
279    }
280
281    #[inline]
282    fn provider(&self) -> &Self::Provider {
283        self.inner.eth_api.provider()
284    }
285}
286
287impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
288where
289    N: RpcNodeCore,
290    Rpc: RpcConvert<Primitives = N::Primitives>,
291{
292    #[inline]
293    fn cache(&self) -> &EthStateCache<N::Primitives> {
294        self.inner.eth_api.cache()
295    }
296}
297
298impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
299where
300    N: RpcNodeCore,
301    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
302{
303    #[inline]
304    fn starting_block(&self) -> U256 {
305        self.inner.eth_api.starting_block()
306    }
307}
308
309impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
310where
311    N: RpcNodeCore,
312    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
313{
314    #[inline]
315    fn io_task_spawner(&self) -> impl TaskSpawner {
316        self.inner.eth_api.task_spawner()
317    }
318
319    #[inline]
320    fn tracing_task_pool(&self) -> &BlockingTaskPool {
321        self.inner.eth_api.blocking_task_pool()
322    }
323
324    #[inline]
325    fn tracing_task_guard(&self) -> &BlockingTaskGuard {
326        self.inner.eth_api.blocking_task_guard()
327    }
328
329    #[inline]
330    fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
331        self.inner.eth_api.blocking_io_request_semaphore()
332    }
333}
334
335impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
336where
337    N: RpcNodeCore,
338    OpEthApiError: FromEvmError<N::Evm>,
339    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
340{
341    #[inline]
342    fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
343        self.inner.eth_api.gas_oracle()
344    }
345
346    #[inline]
347    fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
348        self.inner.eth_api.fee_history_cache()
349    }
350
351    async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
352        self.inner
353            .eth_api
354            .gas_oracle()
355            .op_suggest_tip_cap(self.inner.min_suggested_priority_fee)
356            .await
357            .map_err(Into::into)
358    }
359}
360
361impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
362where
363    N: RpcNodeCore,
364    Rpc: RpcConvert<Primitives = N::Primitives>,
365    Self: LoadPendingBlock,
366{
367}
368
369impl<N, Rpc> EthState for OpEthApi<N, Rpc>
370where
371    N: RpcNodeCore,
372    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
373    Self: LoadPendingBlock,
374{
375    #[inline]
376    fn max_proof_window(&self) -> u64 {
377        self.inner.eth_api.eth_proof_window()
378    }
379}
380
381impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
382where
383    N: RpcNodeCore,
384    OpEthApiError: FromEvmError<N::Evm>,
385    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
386{
387}
388
389impl<N, Rpc> Trace for OpEthApi<N, Rpc>
390where
391    N: RpcNodeCore,
392    OpEthApiError: FromEvmError<N::Evm>,
393    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError, Evm = N::Evm>,
394{
395}
396
397impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
398    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399        f.debug_struct("OpEthApi").finish_non_exhaustive()
400    }
401}
402
403/// Container type `OpEthApi`
404pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
405    /// Gateway to node's core components.
406    eth_api: EthApiNodeBackend<N, Rpc>,
407    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
408    /// network.
409    sequencer_client: Option<SequencerClient>,
410    /// Minimum priority fee enforced by OP-specific logic.
411    ///
412    /// See also <https://github.com/ethereum-optimism/op-geth/blob/d4e0fe9bb0c2075a9bff269fb975464dd8498f75/eth/gasprice/optimism-gasprice.go#L38-L38>
413    min_suggested_priority_fee: U256,
414    /// Flashblocks listeners.
415    ///
416    /// If set, provides receivers for pending blocks, flashblock sequences, and build status.
417    flashblocks: Option<FlashblocksListeners<N::Primitives>>,
418}
419
420impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
421    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422        f.debug_struct("OpEthApiInner").finish()
423    }
424}
425
426impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
427    /// Returns a reference to the [`EthApiNodeBackend`].
428    const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
429        &self.eth_api
430    }
431
432    /// Returns the configured sequencer client, if any.
433    const fn sequencer_client(&self) -> Option<&SequencerClient> {
434        self.sequencer_client.as_ref()
435    }
436}
437
438/// Converter for OP RPC types.
439pub type OpRpcConvert<N, NetworkT> = RpcConverter<
440    NetworkT,
441    <N as FullNodeComponents>::Evm,
442    OpReceiptConverter<<N as FullNodeTypes>::Provider>,
443    (),
444    OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
445>;
446
447/// Builds [`OpEthApi`] for Optimism.
448#[derive(Debug)]
449pub struct OpEthApiBuilder<NetworkT = Optimism> {
450    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
451    /// network.
452    sequencer_url: Option<String>,
453    /// Headers to use for the sequencer client requests.
454    sequencer_headers: Vec<String>,
455    /// Minimum suggested priority fee (tip)
456    min_suggested_priority_fee: u64,
457    /// A URL pointing to a secure websocket connection (wss) that streams out [flashblocks].
458    ///
459    /// [flashblocks]: reth_optimism_flashblocks
460    flashblocks_url: Option<Url>,
461    /// Enable flashblock consensus client to drive the chain forward.
462    ///
463    /// When enabled, flashblock sequences are submitted to the engine API via
464    /// `newPayload` and `forkchoiceUpdated` calls, advancing the canonical chain state.
465    /// Requires `flashblocks_url` to be set.
466    flashblock_consensus: bool,
467    /// Marker for network types.
468    _nt: PhantomData<NetworkT>,
469}
470
471impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
472    fn default() -> Self {
473        Self {
474            sequencer_url: None,
475            sequencer_headers: Vec::new(),
476            min_suggested_priority_fee: 1_000_000,
477            flashblocks_url: None,
478            flashblock_consensus: false,
479            _nt: PhantomData,
480        }
481    }
482}
483
484impl<NetworkT> OpEthApiBuilder<NetworkT> {
485    /// Creates a [`OpEthApiBuilder`] instance from core components.
486    pub const fn new() -> Self {
487        Self {
488            sequencer_url: None,
489            sequencer_headers: Vec::new(),
490            min_suggested_priority_fee: 1_000_000,
491            flashblocks_url: None,
492            flashblock_consensus: false,
493            _nt: PhantomData,
494        }
495    }
496
497    /// With a [`SequencerClient`].
498    pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
499        self.sequencer_url = sequencer_url;
500        self
501    }
502
503    /// With headers to use for the sequencer client requests.
504    pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
505        self.sequencer_headers = sequencer_headers;
506        self
507    }
508
509    /// With minimum suggested priority fee (tip).
510    pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
511        self.min_suggested_priority_fee = min;
512        self
513    }
514
515    /// With a subscription to flashblocks secure websocket connection.
516    pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
517        self.flashblocks_url = flashblocks_url;
518        self
519    }
520
521    /// With flashblock consensus client enabled to drive chain forward
522    pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self {
523        self.flashblock_consensus = flashblock_consensus;
524        self
525    }
526}
527
528impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
529where
530    N: FullNodeComponents<
531        Evm: ConfigureEvm<
532            NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
533                                 + From<OpFlashblockPayloadBase>
534                                 + Unpin,
535        >,
536        Types: NodeTypes<
537            ChainSpec: Hardforks + EthereumHardforks,
538            Payload: reth_node_api::PayloadTypes<
539                ExecutionData: for<'a> TryFrom<
540                    &'a FlashBlockCompleteSequence,
541                    Error: std::fmt::Display,
542                >,
543            >,
544        >,
545    >,
546    NetworkT: RpcTypes,
547    OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
548    OpEthApi<N, OpRpcConvert<N, NetworkT>>:
549        FullEthApiServer<Provider = N::Provider, Pool = N::Pool>,
550{
551    type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
552
553    async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
554        let Self {
555            sequencer_url,
556            sequencer_headers,
557            min_suggested_priority_fee,
558            flashblocks_url,
559            flashblock_consensus,
560            ..
561        } = self;
562        let rpc_converter =
563            RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
564                .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
565
566        let sequencer_client = if let Some(url) = sequencer_url {
567            Some(
568                SequencerClient::new_with_headers(&url, sequencer_headers)
569                    .await
570                    .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
571            )
572        } else {
573            None
574        };
575
576        let flashblocks = if let Some(ws_url) = flashblocks_url {
577            info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
578
579            let (tx, pending_rx) = watch::channel(None);
580            let stream = WsFlashBlockStream::new(ws_url);
581            let service = FlashBlockService::new(
582                stream,
583                ctx.components.evm_config().clone(),
584                ctx.components.provider().clone(),
585                ctx.components.task_executor().clone(),
586                // enable state root calculation if flashblock_consensus is enabled.
587                flashblock_consensus,
588            );
589
590            let flashblocks_sequence = service.block_sequence_broadcaster().clone();
591            let received_flashblocks = service.flashblocks_broadcaster().clone();
592            let in_progress_rx = service.subscribe_in_progress();
593            ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
594
595            if flashblock_consensus {
596                info!(target: "reth::cli", "Launching FlashBlockConsensusClient");
597                let flashblock_client = FlashBlockConsensusClient::new(
598                    ctx.engine_handle.clone(),
599                    flashblocks_sequence.subscribe(),
600                )?;
601                ctx.components.task_executor().spawn(Box::pin(flashblock_client.run()));
602            }
603
604            Some(FlashblocksListeners::new(
605                pending_rx,
606                flashblocks_sequence,
607                in_progress_rx,
608                received_flashblocks,
609            ))
610        } else {
611            None
612        };
613
614        let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
615
616        Ok(OpEthApi::new(
617            eth_api,
618            sequencer_client,
619            U256::from(min_suggested_priority_fee),
620            flashblocks,
621        ))
622    }
623}