reth_optimism_rpc/eth/
mod.rs

1//! OP-Reth `eth_` endpoint implementation.
2
3pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12    eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13    OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, U256};
17use eyre::WrapErr;
18use op_alloy_network::Optimism;
19pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
20use reqwest::Url;
21use reth_chainspec::{EthereumHardforks, Hardforks};
22use reth_evm::ConfigureEvm;
23use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
24use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
25use reth_optimism_flashblocks::{
26    ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService,
27    InProgressFlashBlockRx, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
28};
29use reth_rpc::eth::{core::EthApiInner, DevSigner};
30use reth_rpc_eth_api::{
31    helpers::{
32        pending_block::BuildPendingEnv, AddDevSigners, EthApiSpec, EthFees, EthState, LoadFee,
33        LoadPendingBlock, LoadState, SpawnBlocking, Trace,
34    },
35    EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
36    RpcNodeCoreExt, RpcTypes, SignableTxRequest,
37};
38use reth_rpc_eth_types::{
39    EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock, PendingBlockEnvOrigin,
40};
41use reth_storage_api::{ProviderHeader, ProviderTx};
42use reth_tasks::{
43    pool::{BlockingTaskGuard, BlockingTaskPool},
44    TaskSpawner,
45};
46use std::{
47    fmt::{self, Formatter},
48    marker::PhantomData,
49    sync::Arc,
50    time::Duration,
51};
52use tokio::{sync::watch, time};
53use tracing::info;
54
55/// Maximum duration to wait for a fresh flashblock when one is being built.
56const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57
58/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
59pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
60
61/// OP-Reth `Eth` API implementation.
62///
63/// This type provides the functionality for handling `eth_` related requests.
64///
65/// This wraps a default `Eth` implementation, and provides additional functionality where the
66/// optimism spec deviates from the default (ethereum) spec, e.g. transaction forwarding to the
67/// sequencer, receipts, additional RPC fields for transaction receipts.
68///
69/// This type implements the [`FullEthApi`](reth_rpc_eth_api::helpers::FullEthApi) by implemented
70/// all the `Eth` helper traits and prerequisite traits.
71pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
72    /// Gateway to node's core components.
73    inner: Arc<OpEthApiInner<N, Rpc>>,
74}
75
76impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
77    fn clone(&self) -> Self {
78        Self { inner: self.inner.clone() }
79    }
80}
81
82impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
83    /// Creates a new `OpEthApi`.
84    pub fn new(
85        eth_api: EthApiNodeBackend<N, Rpc>,
86        sequencer_client: Option<SequencerClient>,
87        min_suggested_priority_fee: U256,
88        pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
89        flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
90        in_progress_rx: Option<InProgressFlashBlockRx>,
91    ) -> Self {
92        let inner = Arc::new(OpEthApiInner {
93            eth_api,
94            sequencer_client,
95            min_suggested_priority_fee,
96            pending_block_rx,
97            flashblock_rx,
98            in_progress_rx,
99        });
100        Self { inner }
101    }
102
103    /// Returns a reference to the [`EthApiNodeBackend`].
104    pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
105        self.inner.eth_api()
106    }
107    /// Returns the configured sequencer client, if any.
108    pub fn sequencer_client(&self) -> Option<&SequencerClient> {
109        self.inner.sequencer_client()
110    }
111
112    /// Returns a cloned pending block receiver, if any.
113    pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
114        self.inner.pending_block_rx.clone()
115    }
116
117    /// Returns a flashblock receiver, if any, by resubscribing to it.
118    pub fn flashblock_rx(&self) -> Option<FlashBlockCompleteSequenceRx> {
119        self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
120    }
121
122    /// Returns information about the flashblock currently being built, if any.
123    fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
124        self.inner.in_progress_rx.as_ref().and_then(|rx| *rx.borrow())
125    }
126
127    /// Extracts pending block if it matches the expected parent hash.
128    fn extract_matching_block(
129        &self,
130        block: Option<&PendingFlashBlock<N::Primitives>>,
131        parent_hash: B256,
132    ) -> Option<PendingBlock<N::Primitives>> {
133        block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
134    }
135
136    /// Build a [`OpEthApi`] using [`OpEthApiBuilder`].
137    pub const fn builder() -> OpEthApiBuilder<Rpc> {
138        OpEthApiBuilder::new()
139    }
140
141    /// Awaits a fresh flashblock if one is being built, otherwise returns current.
142    async fn flashblock(
143        &self,
144        parent_hash: B256,
145    ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
146        let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
147
148        // Check if a flashblock is being built
149        if let Some(build_info) = self.flashblock_build_info() {
150            let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
151
152            // Check if this is the first flashblock or the next consecutive index
153            let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
154
155            // Wait only for relevant flashblocks: matching parent and next in sequence
156            if build_info.parent_hash == parent_hash && is_next_index {
157                let mut rx_clone = rx.clone();
158                // Wait up to MAX_FLASHBLOCK_WAIT_DURATION for a new flashblock to arrive
159                let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
160            }
161        }
162
163        // Fall back to current block
164        Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
165    }
166
167    /// Returns a [`PendingBlock`] that is built out of flashblocks.
168    ///
169    /// If flashblocks receiver is not set, then it always returns `None`.
170    ///
171    /// It may wait up to 50ms for a fresh flashblock if one is currently being built.
172    pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
173    where
174        OpEthApiError: FromEvmError<N::Evm>,
175        Rpc: RpcConvert<Primitives = N::Primitives>,
176    {
177        let pending = self.pending_block_env_and_cfg()?;
178        let parent = match pending.origin {
179            PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
180            PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
181        };
182
183        self.flashblock(parent.hash()).await
184    }
185}
186
187impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
188where
189    N: RpcNodeCore,
190    Rpc: RpcConvert<Primitives = N::Primitives>,
191{
192    type Error = OpEthApiError;
193    type NetworkTypes = Rpc::Network;
194    type RpcConvert = Rpc;
195
196    fn tx_resp_builder(&self) -> &Self::RpcConvert {
197        self.inner.eth_api.tx_resp_builder()
198    }
199}
200
201impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
202where
203    N: RpcNodeCore,
204    Rpc: RpcConvert<Primitives = N::Primitives>,
205{
206    type Primitives = N::Primitives;
207    type Provider = N::Provider;
208    type Pool = N::Pool;
209    type Evm = N::Evm;
210    type Network = N::Network;
211
212    #[inline]
213    fn pool(&self) -> &Self::Pool {
214        self.inner.eth_api.pool()
215    }
216
217    #[inline]
218    fn evm_config(&self) -> &Self::Evm {
219        self.inner.eth_api.evm_config()
220    }
221
222    #[inline]
223    fn network(&self) -> &Self::Network {
224        self.inner.eth_api.network()
225    }
226
227    #[inline]
228    fn provider(&self) -> &Self::Provider {
229        self.inner.eth_api.provider()
230    }
231}
232
233impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
234where
235    N: RpcNodeCore,
236    Rpc: RpcConvert<Primitives = N::Primitives>,
237{
238    #[inline]
239    fn cache(&self) -> &EthStateCache<N::Primitives> {
240        self.inner.eth_api.cache()
241    }
242}
243
244impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
245where
246    N: RpcNodeCore,
247    Rpc: RpcConvert<Primitives = N::Primitives>,
248{
249    #[inline]
250    fn starting_block(&self) -> U256 {
251        self.inner.eth_api.starting_block()
252    }
253}
254
255impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
256where
257    N: RpcNodeCore,
258    Rpc: RpcConvert<Primitives = N::Primitives>,
259{
260    #[inline]
261    fn io_task_spawner(&self) -> impl TaskSpawner {
262        self.inner.eth_api.task_spawner()
263    }
264
265    #[inline]
266    fn tracing_task_pool(&self) -> &BlockingTaskPool {
267        self.inner.eth_api.blocking_task_pool()
268    }
269
270    #[inline]
271    fn tracing_task_guard(&self) -> &BlockingTaskGuard {
272        self.inner.eth_api.blocking_task_guard()
273    }
274}
275
276impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
277where
278    N: RpcNodeCore,
279    OpEthApiError: FromEvmError<N::Evm>,
280    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
281{
282    #[inline]
283    fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
284        self.inner.eth_api.gas_oracle()
285    }
286
287    #[inline]
288    fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
289        self.inner.eth_api.fee_history_cache()
290    }
291
292    async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
293        let min_tip = U256::from(self.inner.min_suggested_priority_fee);
294        self.inner.eth_api.gas_oracle().op_suggest_tip_cap(min_tip).await.map_err(Into::into)
295    }
296}
297
298impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
299where
300    N: RpcNodeCore,
301    Rpc: RpcConvert<Primitives = N::Primitives>,
302    Self: LoadPendingBlock,
303{
304}
305
306impl<N, Rpc> EthState for OpEthApi<N, Rpc>
307where
308    N: RpcNodeCore,
309    Rpc: RpcConvert<Primitives = N::Primitives>,
310    Self: LoadPendingBlock,
311{
312    #[inline]
313    fn max_proof_window(&self) -> u64 {
314        self.inner.eth_api.eth_proof_window()
315    }
316}
317
318impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
319where
320    N: RpcNodeCore,
321    OpEthApiError: FromEvmError<N::Evm>,
322    Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
323{
324}
325
326impl<N, Rpc> Trace for OpEthApi<N, Rpc>
327where
328    N: RpcNodeCore,
329    OpEthApiError: FromEvmError<N::Evm>,
330    Rpc: RpcConvert<Primitives = N::Primitives>,
331{
332}
333
334impl<N, Rpc> AddDevSigners for OpEthApi<N, Rpc>
335where
336    N: RpcNodeCore,
337    Rpc: RpcConvert<
338        Network: RpcTypes<TransactionRequest: SignableTxRequest<ProviderTx<N::Provider>>>,
339    >,
340{
341    fn with_dev_accounts(&self) {
342        *self.inner.eth_api.signers().write() = DevSigner::random_signers(20)
343    }
344}
345
346impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
347    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348        f.debug_struct("OpEthApi").finish_non_exhaustive()
349    }
350}
351
352/// Container type `OpEthApi`
353pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
354    /// Gateway to node's core components.
355    eth_api: EthApiNodeBackend<N, Rpc>,
356    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
357    /// network.
358    sequencer_client: Option<SequencerClient>,
359    /// Minimum priority fee enforced by OP-specific logic.
360    ///
361    /// See also <https://github.com/ethereum-optimism/op-geth/blob/d4e0fe9bb0c2075a9bff269fb975464dd8498f75/eth/gasprice/optimism-gasprice.go#L38-L38>
362    min_suggested_priority_fee: U256,
363    /// Pending block receiver.
364    ///
365    /// If set, then it provides current pending block based on received Flashblocks.
366    pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
367    /// Flashblocks receiver.
368    ///
369    /// If set, then it provides sequences of flashblock built.
370    flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
371    /// Receiver that signals when a flashblock is being built
372    in_progress_rx: Option<InProgressFlashBlockRx>,
373}
374
375impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
376    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
377        f.debug_struct("OpEthApiInner").finish()
378    }
379}
380
381impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
382    /// Returns a reference to the [`EthApiNodeBackend`].
383    const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
384        &self.eth_api
385    }
386
387    /// Returns the configured sequencer client, if any.
388    const fn sequencer_client(&self) -> Option<&SequencerClient> {
389        self.sequencer_client.as_ref()
390    }
391}
392
393/// Converter for OP RPC types.
394pub type OpRpcConvert<N, NetworkT> = RpcConverter<
395    NetworkT,
396    <N as FullNodeComponents>::Evm,
397    OpReceiptConverter<<N as FullNodeTypes>::Provider>,
398    (),
399    OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
400>;
401
402/// Builds [`OpEthApi`] for Optimism.
403#[derive(Debug)]
404pub struct OpEthApiBuilder<NetworkT = Optimism> {
405    /// Sequencer client, configured to forward submitted transactions to sequencer of given OP
406    /// network.
407    sequencer_url: Option<String>,
408    /// Headers to use for the sequencer client requests.
409    sequencer_headers: Vec<String>,
410    /// Minimum suggested priority fee (tip)
411    min_suggested_priority_fee: u64,
412    /// A URL pointing to a secure websocket connection (wss) that streams out [flashblocks].
413    ///
414    /// [flashblocks]: reth_optimism_flashblocks
415    flashblocks_url: Option<Url>,
416    /// Marker for network types.
417    _nt: PhantomData<NetworkT>,
418}
419
420impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
421    fn default() -> Self {
422        Self {
423            sequencer_url: None,
424            sequencer_headers: Vec::new(),
425            min_suggested_priority_fee: 1_000_000,
426            flashblocks_url: None,
427            _nt: PhantomData,
428        }
429    }
430}
431
432impl<NetworkT> OpEthApiBuilder<NetworkT> {
433    /// Creates a [`OpEthApiBuilder`] instance from core components.
434    pub const fn new() -> Self {
435        Self {
436            sequencer_url: None,
437            sequencer_headers: Vec::new(),
438            min_suggested_priority_fee: 1_000_000,
439            flashblocks_url: None,
440            _nt: PhantomData,
441        }
442    }
443
444    /// With a [`SequencerClient`].
445    pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
446        self.sequencer_url = sequencer_url;
447        self
448    }
449
450    /// With headers to use for the sequencer client requests.
451    pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
452        self.sequencer_headers = sequencer_headers;
453        self
454    }
455
456    /// With minimum suggested priority fee (tip).
457    pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
458        self.min_suggested_priority_fee = min;
459        self
460    }
461
462    /// With a subscription to flashblocks secure websocket connection.
463    pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
464        self.flashblocks_url = flashblocks_url;
465        self
466    }
467}
468
469impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
470where
471    N: FullNodeComponents<
472        Evm: ConfigureEvm<
473            NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
474                                 + From<ExecutionPayloadBaseV1>
475                                 + Unpin,
476        >,
477        Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>,
478    >,
479    NetworkT: RpcTypes,
480    OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
481    OpEthApi<N, OpRpcConvert<N, NetworkT>>:
482        FullEthApiServer<Provider = N::Provider, Pool = N::Pool> + AddDevSigners,
483{
484    type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
485
486    async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
487        let Self {
488            sequencer_url,
489            sequencer_headers,
490            min_suggested_priority_fee,
491            flashblocks_url,
492            ..
493        } = self;
494        let rpc_converter =
495            RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
496                .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
497
498        let sequencer_client = if let Some(url) = sequencer_url {
499            Some(
500                SequencerClient::new_with_headers(&url, sequencer_headers)
501                    .await
502                    .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
503            )
504        } else {
505            None
506        };
507
508        let (pending_block_rx, flashblock_rx, in_progress_rx) =
509            if let Some(ws_url) = flashblocks_url {
510                info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
511
512                let (tx, pending_rx) = watch::channel(None);
513                let stream = WsFlashBlockStream::new(ws_url);
514                let service = FlashBlockService::new(
515                    stream,
516                    ctx.components.evm_config().clone(),
517                    ctx.components.provider().clone(),
518                    ctx.components.task_executor().clone(),
519                );
520
521                let flashblock_rx = service.subscribe_block_sequence();
522                let in_progress_rx = service.subscribe_in_progress();
523
524                ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
525
526                (Some(pending_rx), Some(flashblock_rx), Some(in_progress_rx))
527            } else {
528                (None, None, None)
529            };
530
531        let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
532
533        Ok(OpEthApi::new(
534            eth_api,
535            sequencer_client,
536            U256::from(min_suggested_priority_fee),
537            pending_block_rx,
538            flashblock_rx,
539            in_progress_rx,
540        ))
541    }
542}