reth_optimism_rpc/eth/
mod.rs

1//! OP-Reth `eth_` endpoint implementation.
2
3pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12    eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13    OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, U256};
17use eyre::WrapErr;
18use op_alloy_network::Optimism;
19use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
20pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
21use reqwest::Url;
22use reth_chainspec::{EthereumHardforks, Hardforks};
23use reth_evm::ConfigureEvm;
24use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
25use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
26use reth_optimism_flashblocks::{
27    FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx,
28    FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners,
29    PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
30};
31use reth_rpc::eth::core::EthApiInner;
32use reth_rpc_eth_api::{
33    helpers::{
34        pending_block::BuildPendingEnv, EthApiSpec, EthFees, EthState, LoadFee, LoadPendingBlock,
35        LoadState, SpawnBlocking, Trace,
36    },
37    EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
38    RpcNodeCoreExt, RpcTypes,
39};
40use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock};
41use reth_storage_api::{BlockReaderIdExt, ProviderHeader};
42use reth_tasks::{
43    pool::{BlockingTaskGuard, BlockingTaskPool},
44    TaskSpawner,
45};
46use std::{
47    fmt::{self, Formatter},
48    marker::PhantomData,
49    sync::Arc,
50    time::Duration,
51};
52use tokio::{sync::watch, time};
53use tracing::info;
54
55/// Maximum duration to wait for a fresh flashblock when one is being built.
56const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57
58/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
59pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
60
61/// OP-Reth `Eth` API implementation.
62///
63/// This type provides the functionality for handling `eth_` related requests.
64///
65/// This wraps a default `Eth` implementation, and provides additional functionality where the
66/// optimism spec deviates from the default (ethereum) spec, e.g. transaction forwarding to the
67/// sequencer, receipts, additional RPC fields for transaction receipts.
68///
69/// This type implements the [`FullEthApi`](reth_rpc_eth_api::helpers::FullEthApi) by implemented
70/// all the `Eth` helper traits and prerequisite traits.
71pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
72    /// Gateway to node's core components.
73    inner: Arc<OpEthApiInner<N, Rpc>>,
74}
75
76impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
77    fn clone(&self) -> Self {
78        Self { inner: self.inner.clone() }
79    }
80}
81
82impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
83    /// Creates a new `OpEthApi`.
84    pub fn new(
85        eth_api: EthApiNodeBackend<N, Rpc>,
86        sequencer_client: Option<SequencerClient>,
87        min_suggested_priority_fee: U256,
88        flashblocks: Option<FlashblocksListeners<N::Primitives>>,
89    ) -> Self {
90        let inner = Arc::new(OpEthApiInner {
91            eth_api,
92            sequencer_client,
93            min_suggested_priority_fee,
94            flashblocks,
95        });
96        Self { inner }
97    }
98
99    /// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
100    pub const fn builder() -> OpEthApiBuilder<Rpc> {
101        OpEthApiBuilder::new()
102    }
103
104    /// Returns a reference to the [`EthApiNodeBackend`].
105    pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
106        self.inner.eth_api()
107    }
108    /// Returns the configured sequencer client, if any.
109    pub fn sequencer_client(&self) -> Option<&SequencerClient> {
110        self.inner.sequencer_client()
111    }
112
113    /// Returns a cloned pending block receiver, if any.
114    pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
115        self.inner.flashblocks.as_ref().map(|f| f.pending_block_rx.clone())
116    }
117
118    /// Returns a new subscription to received flashblocks.
119    pub fn subscribe_received_flashblocks(&self) -> Option<FlashBlockRx> {
120        self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe())
121    }
122
123    /// Returns a new subscription to flashblock sequences.
124    pub fn subscribe_flashblock_sequence(&self) -> Option<FlashBlockCompleteSequenceRx> {
125        self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe())
126    }
127
128    /// Returns information about the flashblock currently being built, if any.
129    fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
130        self.inner.flashblocks.as_ref().and_then(|f| *f.in_progress_rx.borrow())
131    }
132
133    /// Extracts pending block if it matches the expected parent hash.
134    fn extract_matching_block(
135        &self,
136        block: Option<&PendingFlashBlock<N::Primitives>>,
137        parent_hash: B256,
138    ) -> Option<PendingBlock<N::Primitives>> {
139        block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
140    }
141
142    /// Awaits a fresh flashblock if one is being built, otherwise returns current.
143    async fn flashblock(
144        &self,
145        parent_hash: B256,
146    ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
147        let Some(rx) = self.inner.flashblocks.as_ref().map(|f| &f.pending_block_rx) else {
148            return Ok(None)
149        };
150
151        // Check if a flashblock is being built
152        if let Some(build_info) = self.flashblock_build_info() {
153            let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
154
155            // Check if this is the first flashblock or the next consecutive index
156            let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
157
158            // Wait only for relevant flashblocks: matching parent and next in sequence
159            if build_info.parent_hash == parent_hash && is_next_index {
160                let mut rx_clone = rx.clone();
161                // Wait up to MAX_FLASHBLOCK_WAIT_DURATION for a new flashblock to arrive
162                let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
163            }
164        }
165
166        // Fall back to current block
167        Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
168    }
169
170    /// Returns a [`PendingBlock`] that is built out of flashblocks.
171    ///
172    /// If flashblocks receiver is not set, then it always returns `None`.
173    ///
174    /// It may wait up to 50ms for a fresh flashblock if one is currently being built.
175    pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
176    where
177        OpEthApiError: FromEvmError<N::Evm>,
178        Rpc: RpcConvert<Primitives = N::Primitives>,
179    {
180        let Some(latest) = self.provider().latest_header()? else {
181            return Ok(None);
182        };
183
184        self.flashblock(latest.hash()).await
185    }
186}
187
188impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
189where
190    N: RpcNodeCore,
191    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
192{
193    type Error = OpEthApiError;
194    type NetworkTypes = Rpc::Network;
195    type RpcConvert = Rpc;
196
197    fn converter(&self) -> &Self::RpcConvert {
198        self.inner.eth_api.converter()
199    }
200}
201
202impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
203where
204    N: RpcNodeCore,
205    Rpc: RpcConvert<Primitives = N::Primitives>,
206{
207    type Primitives = N::Primitives;
208    type Provider = N::Provider;
209    type Pool = N::Pool;
210    type Evm = N::Evm;
211    type Network = N::Network;
212
213    #[inline]
214    fn pool(&self) -> &Self::Pool {
215        self.inner.eth_api.pool()
216    }
217
218    #[inline]
219    fn evm_config(&self) -> &Self::Evm {
220        self.inner.eth_api.evm_config()
221    }
222
223    #[inline]
224    fn network(&self) -> &Self::Network {
225        self.inner.eth_api.network()
226    }
227
228    #[inline]
229    fn provider(&self) -> &Self::Provider {
230        self.inner.eth_api.provider()
231    }
232}
233
234impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
235where
236    N: RpcNodeCore,
237    Rpc: RpcConvert<Primitives = N::Primitives>,
238{
239    #[inline]
240    fn cache(&self) -> &EthStateCache<N::Primitives> {
241        self.inner.eth_api.cache()
242    }
243}
244
245impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
246where
247    N: RpcNodeCore,
248    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
249{
250    #[inline]
251    fn starting_block(&self) -> U256 {
252        self.inner.eth_api.starting_block()
253    }
254}
255
256impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
257where
258    N: RpcNodeCore,
259    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
260{
261    #[inline]
262    fn io_task_spawner(&self) -> impl TaskSpawner {
263        self.inner.eth_api.task_spawner()
264    }
265
266    #[inline]
267    fn tracing_task_pool(&self) -> &BlockingTaskPool {
268        self.inner.eth_api.blocking_task_pool()
269    }
270
271    #[inline]
272    fn tracing_task_guard(&self) -> &BlockingTaskGuard {
273        self.inner.eth_api.blocking_task_guard()
274    }
275
276    #[inline]
277    fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
278        self.inner.eth_api.blocking_io_request_semaphore()
279    }
280}
281
282impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
283where
284    N: RpcNodeCore,
285    OpEthApiError: FromEvmError<N::Evm>,
286    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
287{
288    #[inline]
289    fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
290        self.inner.eth_api.gas_oracle()
291    }
292
293    #[inline]
294    fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
295        self.inner.eth_api.fee_history_cache()
296    }
297
298    async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
299        self.inner
300            .eth_api
301            .gas_oracle()
302            .op_suggest_tip_cap(self.inner.min_suggested_priority_fee)
303            .await
304            .map_err(Into::into)
305    }
306}
307
308impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
309where
310    N: RpcNodeCore,
311    Rpc: RpcConvert<Primitives = N::Primitives>,
312    Self: LoadPendingBlock,
313{
314}
315
316impl<N, Rpc> EthState for OpEthApi<N, Rpc>
317where
318    N: RpcNodeCore,
319    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
320    Self: LoadPendingBlock,
321{
322    #[inline]
323    fn max_proof_window(&self) -> u64 {
324        self.inner.eth_api.eth_proof_window()
325    }
326}
327
328impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
329where
330    N: RpcNodeCore,
331    OpEthApiError: FromEvmError<N::Evm>,
332    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
333{
334}
335
336impl<N, Rpc> Trace for OpEthApi<N, Rpc>
337where
338    N: RpcNodeCore,
339    OpEthApiError: FromEvmError<N::Evm>,
340    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError, Evm = N::Evm>,
341{
342}
343
344impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346        f.debug_struct("OpEthApi").finish_non_exhaustive()
347    }
348}
349
350/// Container type `OpEthApi`
351pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
352    /// Gateway to node's core components.
353    eth_api: EthApiNodeBackend<N, Rpc>,
354    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
355    /// network.
356    sequencer_client: Option<SequencerClient>,
357    /// Minimum priority fee enforced by OP-specific logic.
358    ///
359    /// See also <https://github.com/ethereum-optimism/op-geth/blob/d4e0fe9bb0c2075a9bff269fb975464dd8498f75/eth/gasprice/optimism-gasprice.go#L38-L38>
360    min_suggested_priority_fee: U256,
361    /// Flashblocks listeners.
362    ///
363    /// If set, provides receivers for pending blocks, flashblock sequences, and build status.
364    flashblocks: Option<FlashblocksListeners<N::Primitives>>,
365}
366
367impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
368    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
369        f.debug_struct("OpEthApiInner").finish()
370    }
371}
372
373impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
374    /// Returns a reference to the [`EthApiNodeBackend`].
375    const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
376        &self.eth_api
377    }
378
379    /// Returns the configured sequencer client, if any.
380    const fn sequencer_client(&self) -> Option<&SequencerClient> {
381        self.sequencer_client.as_ref()
382    }
383}
384
385/// Converter for OP RPC types.
386pub type OpRpcConvert<N, NetworkT> = RpcConverter<
387    NetworkT,
388    <N as FullNodeComponents>::Evm,
389    OpReceiptConverter<<N as FullNodeTypes>::Provider>,
390    (),
391    OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
392>;
393
394/// Builds [`OpEthApi`] for Optimism.
395#[derive(Debug)]
396pub struct OpEthApiBuilder<NetworkT = Optimism> {
397    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
398    /// network.
399    sequencer_url: Option<String>,
400    /// Headers to use for the sequencer client requests.
401    sequencer_headers: Vec<String>,
402    /// Minimum suggested priority fee (tip)
403    min_suggested_priority_fee: u64,
404    /// A URL pointing to a secure websocket connection (wss) that streams out [flashblocks].
405    ///
406    /// [flashblocks]: reth_optimism_flashblocks
407    flashblocks_url: Option<Url>,
408    /// Enable flashblock consensus client to drive the chain forward.
409    ///
410    /// When enabled, flashblock sequences are submitted to the engine API via
411    /// `newPayload` and `forkchoiceUpdated` calls, advancing the canonical chain state.
412    /// Requires `flashblocks_url` to be set.
413    flashblock_consensus: bool,
414    /// Marker for network types.
415    _nt: PhantomData<NetworkT>,
416}
417
418impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
419    fn default() -> Self {
420        Self {
421            sequencer_url: None,
422            sequencer_headers: Vec::new(),
423            min_suggested_priority_fee: 1_000_000,
424            flashblocks_url: None,
425            flashblock_consensus: false,
426            _nt: PhantomData,
427        }
428    }
429}
430
431impl<NetworkT> OpEthApiBuilder<NetworkT> {
432    /// Creates a [`OpEthApiBuilder`] instance from core components.
433    pub const fn new() -> Self {
434        Self {
435            sequencer_url: None,
436            sequencer_headers: Vec::new(),
437            min_suggested_priority_fee: 1_000_000,
438            flashblocks_url: None,
439            flashblock_consensus: false,
440            _nt: PhantomData,
441        }
442    }
443
444    /// With a [`SequencerClient`].
445    pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
446        self.sequencer_url = sequencer_url;
447        self
448    }
449
450    /// With headers to use for the sequencer client requests.
451    pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
452        self.sequencer_headers = sequencer_headers;
453        self
454    }
455
456    /// With minimum suggested priority fee (tip).
457    pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
458        self.min_suggested_priority_fee = min;
459        self
460    }
461
462    /// With a subscription to flashblocks secure websocket connection.
463    pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
464        self.flashblocks_url = flashblocks_url;
465        self
466    }
467
468    /// With flashblock consensus client enabled to drive chain forward
469    pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self {
470        self.flashblock_consensus = flashblock_consensus;
471        self
472    }
473}
474
475impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
476where
477    N: FullNodeComponents<
478        Evm: ConfigureEvm<
479            NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
480                                 + From<OpFlashblockPayloadBase>
481                                 + Unpin,
482        >,
483        Types: NodeTypes<
484            ChainSpec: Hardforks + EthereumHardforks,
485            Payload: reth_node_api::PayloadTypes<
486                ExecutionData: for<'a> TryFrom<
487                    &'a FlashBlockCompleteSequence,
488                    Error: std::fmt::Display,
489                >,
490            >,
491        >,
492    >,
493    NetworkT: RpcTypes,
494    OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
495    OpEthApi<N, OpRpcConvert<N, NetworkT>>:
496        FullEthApiServer<Provider = N::Provider, Pool = N::Pool>,
497{
498    type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
499
500    async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
501        let Self {
502            sequencer_url,
503            sequencer_headers,
504            min_suggested_priority_fee,
505            flashblocks_url,
506            flashblock_consensus,
507            ..
508        } = self;
509        let rpc_converter =
510            RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
511                .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
512
513        let sequencer_client = if let Some(url) = sequencer_url {
514            Some(
515                SequencerClient::new_with_headers(&url, sequencer_headers)
516                    .await
517                    .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
518            )
519        } else {
520            None
521        };
522
523        let flashblocks = if let Some(ws_url) = flashblocks_url {
524            info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
525
526            let (tx, pending_rx) = watch::channel(None);
527            let stream = WsFlashBlockStream::new(ws_url);
528            let service = FlashBlockService::new(
529                stream,
530                ctx.components.evm_config().clone(),
531                ctx.components.provider().clone(),
532                ctx.components.task_executor().clone(),
533                // enable state root calculation if flashblock_consensus is enabled.
534                flashblock_consensus,
535            );
536
537            let flashblocks_sequence = service.block_sequence_broadcaster().clone();
538            let received_flashblocks = service.flashblocks_broadcaster().clone();
539            let in_progress_rx = service.subscribe_in_progress();
540            ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
541
542            if flashblock_consensus {
543                info!(target: "reth::cli", "Launching FlashBlockConsensusClient");
544                let flashblock_client = FlashBlockConsensusClient::new(
545                    ctx.engine_handle.clone(),
546                    flashblocks_sequence.subscribe(),
547                )?;
548                ctx.components.task_executor().spawn(Box::pin(flashblock_client.run()));
549            }
550
551            Some(FlashblocksListeners::new(
552                pending_rx,
553                flashblocks_sequence,
554                in_progress_rx,
555                received_flashblocks,
556            ))
557        } else {
558            None
559        };
560
561        let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
562
563        Ok(OpEthApi::new(
564            eth_api,
565            sequencer_client,
566            U256::from(min_suggested_priority_fee),
567            flashblocks,
568        ))
569    }
570}