reth_optimism_rpc/eth/
mod.rs

1//! OP-Reth `eth_` endpoint implementation.
2
3pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12    eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13    OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, U256};
17use eyre::WrapErr;
18use op_alloy_network::Optimism;
19pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
20use reqwest::Url;
21use reth_chainspec::{EthereumHardforks, Hardforks};
22use reth_evm::ConfigureEvm;
23use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
24use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
25use reth_optimism_flashblocks::{
26    ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService,
27    InProgressFlashBlockRx, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
28};
29use reth_rpc::eth::core::EthApiInner;
30use reth_rpc_eth_api::{
31    helpers::{
32        pending_block::BuildPendingEnv, EthApiSpec, EthFees, EthState, LoadFee, LoadPendingBlock,
33        LoadState, SpawnBlocking, Trace,
34    },
35    EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
36    RpcNodeCoreExt, RpcTypes,
37};
38use reth_rpc_eth_types::{
39    EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock, PendingBlockEnvOrigin,
40};
41use reth_storage_api::ProviderHeader;
42use reth_tasks::{
43    pool::{BlockingTaskGuard, BlockingTaskPool},
44    TaskSpawner,
45};
46use std::{
47    fmt::{self, Formatter},
48    marker::PhantomData,
49    sync::Arc,
50    time::Duration,
51};
52use tokio::{sync::watch, time};
53use tracing::info;
54
55/// Maximum duration to wait for a fresh flashblock when one is being built.
56const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57
58/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
59pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
60
61/// OP-Reth `Eth` API implementation.
62///
63/// This type provides the functionality for handling `eth_` related requests.
64///
65/// This wraps a default `Eth` implementation, and provides additional functionality where the
66/// optimism spec deviates from the default (ethereum) spec, e.g. transaction forwarding to the
67/// sequencer, receipts, additional RPC fields for transaction receipts.
68///
69/// This type implements the [`FullEthApi`](reth_rpc_eth_api::helpers::FullEthApi) by implemented
70/// all the `Eth` helper traits and prerequisite traits.
71pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
72    /// Gateway to node's core components.
73    inner: Arc<OpEthApiInner<N, Rpc>>,
74}
75
76impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
77    fn clone(&self) -> Self {
78        Self { inner: self.inner.clone() }
79    }
80}
81
82impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
83    /// Creates a new `OpEthApi`.
84    pub fn new(
85        eth_api: EthApiNodeBackend<N, Rpc>,
86        sequencer_client: Option<SequencerClient>,
87        min_suggested_priority_fee: U256,
88        pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
89        flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
90        in_progress_rx: Option<InProgressFlashBlockRx>,
91    ) -> Self {
92        let inner = Arc::new(OpEthApiInner {
93            eth_api,
94            sequencer_client,
95            min_suggested_priority_fee,
96            pending_block_rx,
97            flashblock_rx,
98            in_progress_rx,
99        });
100        Self { inner }
101    }
102
103    /// Returns a reference to the [`EthApiNodeBackend`].
104    pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
105        self.inner.eth_api()
106    }
107    /// Returns the configured sequencer client, if any.
108    pub fn sequencer_client(&self) -> Option<&SequencerClient> {
109        self.inner.sequencer_client()
110    }
111
112    /// Returns a cloned pending block receiver, if any.
113    pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
114        self.inner.pending_block_rx.clone()
115    }
116
117    /// Returns a flashblock receiver, if any, by resubscribing to it.
118    pub fn flashblock_rx(&self) -> Option<FlashBlockCompleteSequenceRx> {
119        self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
120    }
121
122    /// Returns information about the flashblock currently being built, if any.
123    fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
124        self.inner.in_progress_rx.as_ref().and_then(|rx| *rx.borrow())
125    }
126
127    /// Extracts pending block if it matches the expected parent hash.
128    fn extract_matching_block(
129        &self,
130        block: Option<&PendingFlashBlock<N::Primitives>>,
131        parent_hash: B256,
132    ) -> Option<PendingBlock<N::Primitives>> {
133        block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
134    }
135
136    /// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
137    pub const fn builder() -> OpEthApiBuilder<Rpc> {
138        OpEthApiBuilder::new()
139    }
140
141    /// Awaits a fresh flashblock if one is being built, otherwise returns current.
142    async fn flashblock(
143        &self,
144        parent_hash: B256,
145    ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
146        let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
147
148        // Check if a flashblock is being built
149        if let Some(build_info) = self.flashblock_build_info() {
150            let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
151
152            // Check if this is the first flashblock or the next consecutive index
153            let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
154
155            // Wait only for relevant flashblocks: matching parent and next in sequence
156            if build_info.parent_hash == parent_hash && is_next_index {
157                let mut rx_clone = rx.clone();
158                // Wait up to MAX_FLASHBLOCK_WAIT_DURATION for a new flashblock to arrive
159                let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
160            }
161        }
162
163        // Fall back to current block
164        Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
165    }
166
167    /// Returns a [`PendingBlock`] that is built out of flashblocks.
168    ///
169    /// If flashblocks receiver is not set, then it always returns `None`.
170    ///
171    /// It may wait up to 50ms for a fresh flashblock if one is currently being built.
172    pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
173    where
174        OpEthApiError: FromEvmError<N::Evm>,
175        Rpc: RpcConvert<Primitives = N::Primitives>,
176    {
177        let pending = self.pending_block_env_and_cfg()?;
178        let parent = match pending.origin {
179            PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
180            PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
181        };
182
183        self.flashblock(parent.hash()).await
184    }
185}
186
187impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
188where
189    N: RpcNodeCore,
190    Rpc: RpcConvert<Primitives = N::Primitives>,
191{
192    type Error = OpEthApiError;
193    type NetworkTypes = Rpc::Network;
194    type RpcConvert = Rpc;
195
196    fn tx_resp_builder(&self) -> &Self::RpcConvert {
197        self.inner.eth_api.tx_resp_builder()
198    }
199}
200
201impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
202where
203    N: RpcNodeCore,
204    Rpc: RpcConvert<Primitives = N::Primitives>,
205{
206    type Primitives = N::Primitives;
207    type Provider = N::Provider;
208    type Pool = N::Pool;
209    type Evm = N::Evm;
210    type Network = N::Network;
211
212    #[inline]
213    fn pool(&self) -> &Self::Pool {
214        self.inner.eth_api.pool()
215    }
216
217    #[inline]
218    fn evm_config(&self) -> &Self::Evm {
219        self.inner.eth_api.evm_config()
220    }
221
222    #[inline]
223    fn network(&self) -> &Self::Network {
224        self.inner.eth_api.network()
225    }
226
227    #[inline]
228    fn provider(&self) -> &Self::Provider {
229        self.inner.eth_api.provider()
230    }
231}
232
233impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
234where
235    N: RpcNodeCore,
236    Rpc: RpcConvert<Primitives = N::Primitives>,
237{
238    #[inline]
239    fn cache(&self) -> &EthStateCache<N::Primitives> {
240        self.inner.eth_api.cache()
241    }
242}
243
244impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
245where
246    N: RpcNodeCore,
247    Rpc: RpcConvert<Primitives = N::Primitives>,
248{
249    #[inline]
250    fn starting_block(&self) -> U256 {
251        self.inner.eth_api.starting_block()
252    }
253}
254
255impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
256where
257    N: RpcNodeCore,
258    Rpc: RpcConvert<Primitives = N::Primitives>,
259{
260    #[inline]
261    fn io_task_spawner(&self) -> impl TaskSpawner {
262        self.inner.eth_api.task_spawner()
263    }
264
265    #[inline]
266    fn tracing_task_pool(&self) -> &BlockingTaskPool {
267        self.inner.eth_api.blocking_task_pool()
268    }
269
270    #[inline]
271    fn tracing_task_guard(&self) -> &BlockingTaskGuard {
272        self.inner.eth_api.blocking_task_guard()
273    }
274}
275
276impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
277where
278    N: RpcNodeCore,
279    OpEthApiError: FromEvmError<N::Evm>,
280    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
281{
282    #[inline]
283    fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
284        self.inner.eth_api.gas_oracle()
285    }
286
287    #[inline]
288    fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
289        self.inner.eth_api.fee_history_cache()
290    }
291
292    async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
293        self.inner
294            .eth_api
295            .gas_oracle()
296            .op_suggest_tip_cap(self.inner.min_suggested_priority_fee)
297            .await
298            .map_err(Into::into)
299    }
300}
301
302impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
303where
304    N: RpcNodeCore,
305    Rpc: RpcConvert<Primitives = N::Primitives>,
306    Self: LoadPendingBlock,
307{
308}
309
310impl<N, Rpc> EthState for OpEthApi<N, Rpc>
311where
312    N: RpcNodeCore,
313    Rpc: RpcConvert<Primitives = N::Primitives>,
314    Self: LoadPendingBlock,
315{
316    #[inline]
317    fn max_proof_window(&self) -> u64 {
318        self.inner.eth_api.eth_proof_window()
319    }
320}
321
322impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
323where
324    N: RpcNodeCore,
325    OpEthApiError: FromEvmError<N::Evm>,
326    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
327{
328}
329
330impl<N, Rpc> Trace for OpEthApi<N, Rpc>
331where
332    N: RpcNodeCore,
333    OpEthApiError: FromEvmError<N::Evm>,
334    Rpc: RpcConvert<Primitives = N::Primitives>,
335{
336}
337
338impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
339    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
340        f.debug_struct("OpEthApi").finish_non_exhaustive()
341    }
342}
343
344/// Container type `OpEthApi`
345pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
346    /// Gateway to node's core components.
347    eth_api: EthApiNodeBackend<N, Rpc>,
348    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
349    /// network.
350    sequencer_client: Option<SequencerClient>,
351    /// Minimum priority fee enforced by OP-specific logic.
352    ///
353    /// See also <https://github.com/ethereum-optimism/op-geth/blob/d4e0fe9bb0c2075a9bff269fb975464dd8498f75/eth/gasprice/optimism-gasprice.go#L38-L38>
354    min_suggested_priority_fee: U256,
355    /// Pending block receiver.
356    ///
357    /// If set, then it provides current pending block based on received Flashblocks.
358    pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
359    /// Flashblocks receiver.
360    ///
361    /// If set, then it provides sequences of flashblock built.
362    flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
363    /// Receiver that signals when a flashblock is being built
364    in_progress_rx: Option<InProgressFlashBlockRx>,
365}
366
367impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
368    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
369        f.debug_struct("OpEthApiInner").finish()
370    }
371}
372
373impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
374    /// Returns a reference to the [`EthApiNodeBackend`].
375    const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
376        &self.eth_api
377    }
378
379    /// Returns the configured sequencer client, if any.
380    const fn sequencer_client(&self) -> Option<&SequencerClient> {
381        self.sequencer_client.as_ref()
382    }
383}
384
385/// Converter for OP RPC types.
386pub type OpRpcConvert<N, NetworkT> = RpcConverter<
387    NetworkT,
388    <N as FullNodeComponents>::Evm,
389    OpReceiptConverter<<N as FullNodeTypes>::Provider>,
390    (),
391    OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
392>;
393
394/// Builds [`OpEthApi`] for Optimism.
395#[derive(Debug)]
396pub struct OpEthApiBuilder<NetworkT = Optimism> {
397    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
398    /// network.
399    sequencer_url: Option<String>,
400    /// Headers to use for the sequencer client requests.
401    sequencer_headers: Vec<String>,
402    /// Minimum suggested priority fee (tip)
403    min_suggested_priority_fee: u64,
404    /// A URL pointing to a secure websocket connection (wss) that streams out [flashblocks].
405    ///
406    /// [flashblocks]: reth_optimism_flashblocks
407    flashblocks_url: Option<Url>,
408    /// Marker for network types.
409    _nt: PhantomData<NetworkT>,
410}
411
412impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
413    fn default() -> Self {
414        Self {
415            sequencer_url: None,
416            sequencer_headers: Vec::new(),
417            min_suggested_priority_fee: 1_000_000,
418            flashblocks_url: None,
419            _nt: PhantomData,
420        }
421    }
422}
423
424impl<NetworkT> OpEthApiBuilder<NetworkT> {
425    /// Creates a [`OpEthApiBuilder`] instance from core components.
426    pub const fn new() -> Self {
427        Self {
428            sequencer_url: None,
429            sequencer_headers: Vec::new(),
430            min_suggested_priority_fee: 1_000_000,
431            flashblocks_url: None,
432            _nt: PhantomData,
433        }
434    }
435
436    /// With a [`SequencerClient`].
437    pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
438        self.sequencer_url = sequencer_url;
439        self
440    }
441
442    /// With headers to use for the sequencer client requests.
443    pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
444        self.sequencer_headers = sequencer_headers;
445        self
446    }
447
448    /// With minimum suggested priority fee (tip).
449    pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
450        self.min_suggested_priority_fee = min;
451        self
452    }
453
454    /// With a subscription to flashblocks secure websocket connection.
455    pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
456        self.flashblocks_url = flashblocks_url;
457        self
458    }
459}
460
461impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
462where
463    N: FullNodeComponents<
464        Evm: ConfigureEvm<
465            NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
466                                 + From<ExecutionPayloadBaseV1>
467                                 + Unpin,
468        >,
469        Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>,
470    >,
471    NetworkT: RpcTypes,
472    OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
473    OpEthApi<N, OpRpcConvert<N, NetworkT>>:
474        FullEthApiServer<Provider = N::Provider, Pool = N::Pool>,
475{
476    type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
477
478    async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
479        let Self {
480            sequencer_url,
481            sequencer_headers,
482            min_suggested_priority_fee,
483            flashblocks_url,
484            ..
485        } = self;
486        let rpc_converter =
487            RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
488                .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
489
490        let sequencer_client = if let Some(url) = sequencer_url {
491            Some(
492                SequencerClient::new_with_headers(&url, sequencer_headers)
493                    .await
494                    .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
495            )
496        } else {
497            None
498        };
499
500        let (pending_block_rx, flashblock_rx, in_progress_rx) =
501            if let Some(ws_url) = flashblocks_url {
502                info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
503
504                let (tx, pending_rx) = watch::channel(None);
505                let stream = WsFlashBlockStream::new(ws_url);
506                let service = FlashBlockService::new(
507                    stream,
508                    ctx.components.evm_config().clone(),
509                    ctx.components.provider().clone(),
510                    ctx.components.task_executor().clone(),
511                );
512
513                let flashblock_rx = service.subscribe_block_sequence();
514                let in_progress_rx = service.subscribe_in_progress();
515
516                ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
517
518                (Some(pending_rx), Some(flashblock_rx), Some(in_progress_rx))
519            } else {
520                (None, None, None)
521            };
522
523        let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
524
525        Ok(OpEthApi::new(
526            eth_api,
527            sequencer_client,
528            U256::from(min_suggested_priority_fee),
529            pending_block_rx,
530            flashblock_rx,
531            in_progress_rx,
532        ))
533    }
534}