1pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12 eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13 OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, U256};
17use eyre::WrapErr;
18use op_alloy_network::Optimism;
19pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
20use reqwest::Url;
21use reth_chainspec::{EthereumHardforks, Hardforks};
22use reth_evm::ConfigureEvm;
23use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
24use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
25use reth_optimism_flashblocks::{
26 ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService,
27 InProgressFlashBlockRx, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
28};
29use reth_rpc::eth::core::EthApiInner;
30use reth_rpc_eth_api::{
31 helpers::{
32 pending_block::BuildPendingEnv, EthApiSpec, EthFees, EthState, LoadFee, LoadPendingBlock,
33 LoadState, SpawnBlocking, Trace,
34 },
35 EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
36 RpcNodeCoreExt, RpcTypes,
37};
38use reth_rpc_eth_types::{
39 EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock, PendingBlockEnvOrigin,
40};
41use reth_storage_api::ProviderHeader;
42use reth_tasks::{
43 pool::{BlockingTaskGuard, BlockingTaskPool},
44 TaskSpawner,
45};
46use std::{
47 fmt::{self, Formatter},
48 marker::PhantomData,
49 sync::Arc,
50 time::Duration,
51};
52use tokio::{sync::watch, time};
53use tracing::info;
54
55const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57
58pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
60
61pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
72 inner: Arc<OpEthApiInner<N, Rpc>>,
74}
75
76impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
77 fn clone(&self) -> Self {
78 Self { inner: self.inner.clone() }
79 }
80}
81
82impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
83 pub fn new(
85 eth_api: EthApiNodeBackend<N, Rpc>,
86 sequencer_client: Option<SequencerClient>,
87 min_suggested_priority_fee: U256,
88 pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
89 flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
90 in_progress_rx: Option<InProgressFlashBlockRx>,
91 ) -> Self {
92 let inner = Arc::new(OpEthApiInner {
93 eth_api,
94 sequencer_client,
95 min_suggested_priority_fee,
96 pending_block_rx,
97 flashblock_rx,
98 in_progress_rx,
99 });
100 Self { inner }
101 }
102
103 pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
105 self.inner.eth_api()
106 }
107 pub fn sequencer_client(&self) -> Option<&SequencerClient> {
109 self.inner.sequencer_client()
110 }
111
112 pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
114 self.inner.pending_block_rx.clone()
115 }
116
117 pub fn flashblock_rx(&self) -> Option<FlashBlockCompleteSequenceRx> {
119 self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
120 }
121
122 fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
124 self.inner.in_progress_rx.as_ref().and_then(|rx| *rx.borrow())
125 }
126
127 fn extract_matching_block(
129 &self,
130 block: Option<&PendingFlashBlock<N::Primitives>>,
131 parent_hash: B256,
132 ) -> Option<PendingBlock<N::Primitives>> {
133 block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
134 }
135
136 pub const fn builder() -> OpEthApiBuilder<Rpc> {
138 OpEthApiBuilder::new()
139 }
140
141 async fn flashblock(
143 &self,
144 parent_hash: B256,
145 ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
146 let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
147
148 if let Some(build_info) = self.flashblock_build_info() {
150 let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
151
152 let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
154
155 if build_info.parent_hash == parent_hash && is_next_index {
157 let mut rx_clone = rx.clone();
158 let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
160 }
161 }
162
163 Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
165 }
166
167 pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
173 where
174 OpEthApiError: FromEvmError<N::Evm>,
175 Rpc: RpcConvert<Primitives = N::Primitives>,
176 {
177 let pending = self.pending_block_env_and_cfg()?;
178 let parent = match pending.origin {
179 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
180 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
181 };
182
183 self.flashblock(parent.hash()).await
184 }
185}
186
187impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
188where
189 N: RpcNodeCore,
190 Rpc: RpcConvert<Primitives = N::Primitives>,
191{
192 type Error = OpEthApiError;
193 type NetworkTypes = Rpc::Network;
194 type RpcConvert = Rpc;
195
196 fn tx_resp_builder(&self) -> &Self::RpcConvert {
197 self.inner.eth_api.tx_resp_builder()
198 }
199}
200
201impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
202where
203 N: RpcNodeCore,
204 Rpc: RpcConvert<Primitives = N::Primitives>,
205{
206 type Primitives = N::Primitives;
207 type Provider = N::Provider;
208 type Pool = N::Pool;
209 type Evm = N::Evm;
210 type Network = N::Network;
211
212 #[inline]
213 fn pool(&self) -> &Self::Pool {
214 self.inner.eth_api.pool()
215 }
216
217 #[inline]
218 fn evm_config(&self) -> &Self::Evm {
219 self.inner.eth_api.evm_config()
220 }
221
222 #[inline]
223 fn network(&self) -> &Self::Network {
224 self.inner.eth_api.network()
225 }
226
227 #[inline]
228 fn provider(&self) -> &Self::Provider {
229 self.inner.eth_api.provider()
230 }
231}
232
233impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
234where
235 N: RpcNodeCore,
236 Rpc: RpcConvert<Primitives = N::Primitives>,
237{
238 #[inline]
239 fn cache(&self) -> &EthStateCache<N::Primitives> {
240 self.inner.eth_api.cache()
241 }
242}
243
244impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
245where
246 N: RpcNodeCore,
247 Rpc: RpcConvert<Primitives = N::Primitives>,
248{
249 #[inline]
250 fn starting_block(&self) -> U256 {
251 self.inner.eth_api.starting_block()
252 }
253}
254
255impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
256where
257 N: RpcNodeCore,
258 Rpc: RpcConvert<Primitives = N::Primitives>,
259{
260 #[inline]
261 fn io_task_spawner(&self) -> impl TaskSpawner {
262 self.inner.eth_api.task_spawner()
263 }
264
265 #[inline]
266 fn tracing_task_pool(&self) -> &BlockingTaskPool {
267 self.inner.eth_api.blocking_task_pool()
268 }
269
270 #[inline]
271 fn tracing_task_guard(&self) -> &BlockingTaskGuard {
272 self.inner.eth_api.blocking_task_guard()
273 }
274}
275
276impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
277where
278 N: RpcNodeCore,
279 OpEthApiError: FromEvmError<N::Evm>,
280 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
281{
282 #[inline]
283 fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
284 self.inner.eth_api.gas_oracle()
285 }
286
287 #[inline]
288 fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
289 self.inner.eth_api.fee_history_cache()
290 }
291
292 async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
293 self.inner
294 .eth_api
295 .gas_oracle()
296 .op_suggest_tip_cap(self.inner.min_suggested_priority_fee)
297 .await
298 .map_err(Into::into)
299 }
300}
301
302impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
303where
304 N: RpcNodeCore,
305 Rpc: RpcConvert<Primitives = N::Primitives>,
306 Self: LoadPendingBlock,
307{
308}
309
310impl<N, Rpc> EthState for OpEthApi<N, Rpc>
311where
312 N: RpcNodeCore,
313 Rpc: RpcConvert<Primitives = N::Primitives>,
314 Self: LoadPendingBlock,
315{
316 #[inline]
317 fn max_proof_window(&self) -> u64 {
318 self.inner.eth_api.eth_proof_window()
319 }
320}
321
322impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
323where
324 N: RpcNodeCore,
325 OpEthApiError: FromEvmError<N::Evm>,
326 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
327{
328}
329
330impl<N, Rpc> Trace for OpEthApi<N, Rpc>
331where
332 N: RpcNodeCore,
333 OpEthApiError: FromEvmError<N::Evm>,
334 Rpc: RpcConvert<Primitives = N::Primitives>,
335{
336}
337
338impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
339 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
340 f.debug_struct("OpEthApi").finish_non_exhaustive()
341 }
342}
343
344pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
346 eth_api: EthApiNodeBackend<N, Rpc>,
348 sequencer_client: Option<SequencerClient>,
351 min_suggested_priority_fee: U256,
355 pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
359 flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
363 in_progress_rx: Option<InProgressFlashBlockRx>,
365}
366
367impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
368 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
369 f.debug_struct("OpEthApiInner").finish()
370 }
371}
372
373impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
374 const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
376 &self.eth_api
377 }
378
379 const fn sequencer_client(&self) -> Option<&SequencerClient> {
381 self.sequencer_client.as_ref()
382 }
383}
384
385pub type OpRpcConvert<N, NetworkT> = RpcConverter<
387 NetworkT,
388 <N as FullNodeComponents>::Evm,
389 OpReceiptConverter<<N as FullNodeTypes>::Provider>,
390 (),
391 OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
392>;
393
394#[derive(Debug)]
396pub struct OpEthApiBuilder<NetworkT = Optimism> {
397 sequencer_url: Option<String>,
400 sequencer_headers: Vec<String>,
402 min_suggested_priority_fee: u64,
404 flashblocks_url: Option<Url>,
408 _nt: PhantomData<NetworkT>,
410}
411
412impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
413 fn default() -> Self {
414 Self {
415 sequencer_url: None,
416 sequencer_headers: Vec::new(),
417 min_suggested_priority_fee: 1_000_000,
418 flashblocks_url: None,
419 _nt: PhantomData,
420 }
421 }
422}
423
424impl<NetworkT> OpEthApiBuilder<NetworkT> {
425 pub const fn new() -> Self {
427 Self {
428 sequencer_url: None,
429 sequencer_headers: Vec::new(),
430 min_suggested_priority_fee: 1_000_000,
431 flashblocks_url: None,
432 _nt: PhantomData,
433 }
434 }
435
436 pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
438 self.sequencer_url = sequencer_url;
439 self
440 }
441
442 pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
444 self.sequencer_headers = sequencer_headers;
445 self
446 }
447
448 pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
450 self.min_suggested_priority_fee = min;
451 self
452 }
453
454 pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
456 self.flashblocks_url = flashblocks_url;
457 self
458 }
459}
460
461impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
462where
463 N: FullNodeComponents<
464 Evm: ConfigureEvm<
465 NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
466 + From<ExecutionPayloadBaseV1>
467 + Unpin,
468 >,
469 Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>,
470 >,
471 NetworkT: RpcTypes,
472 OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
473 OpEthApi<N, OpRpcConvert<N, NetworkT>>:
474 FullEthApiServer<Provider = N::Provider, Pool = N::Pool>,
475{
476 type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
477
478 async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
479 let Self {
480 sequencer_url,
481 sequencer_headers,
482 min_suggested_priority_fee,
483 flashblocks_url,
484 ..
485 } = self;
486 let rpc_converter =
487 RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
488 .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
489
490 let sequencer_client = if let Some(url) = sequencer_url {
491 Some(
492 SequencerClient::new_with_headers(&url, sequencer_headers)
493 .await
494 .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
495 )
496 } else {
497 None
498 };
499
500 let (pending_block_rx, flashblock_rx, in_progress_rx) =
501 if let Some(ws_url) = flashblocks_url {
502 info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
503
504 let (tx, pending_rx) = watch::channel(None);
505 let stream = WsFlashBlockStream::new(ws_url);
506 let service = FlashBlockService::new(
507 stream,
508 ctx.components.evm_config().clone(),
509 ctx.components.provider().clone(),
510 ctx.components.task_executor().clone(),
511 );
512
513 let flashblock_rx = service.subscribe_block_sequence();
514 let in_progress_rx = service.subscribe_in_progress();
515
516 ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
517
518 (Some(pending_rx), Some(flashblock_rx), Some(in_progress_rx))
519 } else {
520 (None, None, None)
521 };
522
523 let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
524
525 Ok(OpEthApi::new(
526 eth_api,
527 sequencer_client,
528 U256::from(min_suggested_priority_fee),
529 pending_block_rx,
530 flashblock_rx,
531 in_progress_rx,
532 ))
533 }
534}