1pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12 eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13 OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_eips::BlockNumHash;
17use alloy_primitives::{B256, U256};
18use alloy_rpc_types_eth::{Filter, Log};
19use eyre::WrapErr;
20use futures::StreamExt;
21use op_alloy_network::Optimism;
22use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
23pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
24use reqwest::Url;
25use reth_chainspec::{EthereumHardforks, Hardforks};
26use reth_evm::ConfigureEvm;
27use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
28use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
29use reth_optimism_flashblocks::{
30 FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx,
31 FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners,
32 PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
33};
34use reth_rpc::eth::core::EthApiInner;
35use reth_rpc_eth_api::{
36 helpers::{
37 pending_block::BuildPendingEnv, EthApiSpec, EthFees, EthState, LoadFee, LoadPendingBlock,
38 LoadState, SpawnBlocking, Trace,
39 },
40 EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
41 RpcNodeCoreExt, RpcTypes,
42};
43use reth_rpc_eth_types::{
44 logs_utils::matching_block_logs_with_tx_hashes, EthStateCache, FeeHistoryCache, GasPriceOracle,
45 PendingBlock,
46};
47use reth_storage_api::{BlockReaderIdExt, ProviderHeader};
48use reth_tasks::{
49 pool::{BlockingTaskGuard, BlockingTaskPool},
50 TaskSpawner,
51};
52use std::{
53 fmt::{self, Formatter},
54 marker::PhantomData,
55 sync::Arc,
56 time::Duration,
57};
58use tokio::{sync::watch, time};
59use tokio_stream::{wrappers::BroadcastStream, Stream};
60use tracing::info;
61
62const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
64
65pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
67
68pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
79 inner: Arc<OpEthApiInner<N, Rpc>>,
81}
82
83impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
84 fn clone(&self) -> Self {
85 Self { inner: self.inner.clone() }
86 }
87}
88
89impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
90 pub fn new(
92 eth_api: EthApiNodeBackend<N, Rpc>,
93 sequencer_client: Option<SequencerClient>,
94 min_suggested_priority_fee: U256,
95 flashblocks: Option<FlashblocksListeners<N::Primitives>>,
96 ) -> Self {
97 let inner = Arc::new(OpEthApiInner {
98 eth_api,
99 sequencer_client,
100 min_suggested_priority_fee,
101 flashblocks,
102 });
103 Self { inner }
104 }
105
106 pub const fn builder() -> OpEthApiBuilder<Rpc> {
108 OpEthApiBuilder::new()
109 }
110
111 pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
113 self.inner.eth_api()
114 }
115 pub fn sequencer_client(&self) -> Option<&SequencerClient> {
117 self.inner.sequencer_client()
118 }
119
120 pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
122 self.inner.flashblocks.as_ref().map(|f| f.pending_block_rx.clone())
123 }
124
125 pub fn subscribe_received_flashblocks(&self) -> Option<FlashBlockRx> {
127 self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe())
128 }
129
130 pub fn subscribe_flashblock_sequence(&self) -> Option<FlashBlockCompleteSequenceRx> {
132 self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe())
133 }
134
135 pub fn flashblock_receipts_stream(
139 &self,
140 filter: Filter,
141 ) -> Option<impl Stream<Item = Log> + Send + Unpin> {
142 self.subscribe_received_flashblocks().map(|rx| {
143 BroadcastStream::new(rx)
144 .scan(
145 None::<(u64, u64)>, move |state, result| {
147 let fb = match result.ok() {
148 Some(fb) => fb,
149 None => return futures::future::ready(None),
150 };
151
152 if let Some(base) = &fb.base {
154 *state = Some((base.block_number, base.timestamp));
155 }
156
157 let Some((block_number, timestamp)) = *state else {
158 return futures::future::ready(Some(Vec::new()))
161 };
162
163 let receipts =
164 fb.metadata.receipts.iter().map(|(tx, receipt)| (*tx, receipt));
165
166 let all_logs = matching_block_logs_with_tx_hashes(
167 &filter,
168 BlockNumHash::new(block_number, fb.diff.block_hash),
169 timestamp,
170 receipts,
171 false,
172 );
173
174 futures::future::ready(Some(all_logs))
175 },
176 )
177 .flat_map(futures::stream::iter)
178 })
179 }
180
181 fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
183 self.inner.flashblocks.as_ref().and_then(|f| *f.in_progress_rx.borrow())
184 }
185
186 fn extract_matching_block(
188 &self,
189 block: Option<&PendingFlashBlock<N::Primitives>>,
190 parent_hash: B256,
191 ) -> Option<PendingBlock<N::Primitives>> {
192 block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
193 }
194
195 async fn flashblock(
197 &self,
198 parent_hash: B256,
199 ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
200 let Some(rx) = self.inner.flashblocks.as_ref().map(|f| &f.pending_block_rx) else {
201 return Ok(None)
202 };
203
204 if let Some(build_info) = self.flashblock_build_info() {
206 let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
207
208 let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
210
211 if build_info.parent_hash == parent_hash && is_next_index {
213 let mut rx_clone = rx.clone();
214 let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
216 }
217 }
218
219 Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
221 }
222
223 pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
229 where
230 OpEthApiError: FromEvmError<N::Evm>,
231 Rpc: RpcConvert<Primitives = N::Primitives>,
232 {
233 let Some(latest) = self.provider().latest_header()? else {
234 return Ok(None);
235 };
236
237 self.flashblock(latest.hash()).await
238 }
239}
240
241impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
242where
243 N: RpcNodeCore,
244 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
245{
246 type Error = OpEthApiError;
247 type NetworkTypes = Rpc::Network;
248 type RpcConvert = Rpc;
249
250 fn converter(&self) -> &Self::RpcConvert {
251 self.inner.eth_api.converter()
252 }
253}
254
255impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
256where
257 N: RpcNodeCore,
258 Rpc: RpcConvert<Primitives = N::Primitives>,
259{
260 type Primitives = N::Primitives;
261 type Provider = N::Provider;
262 type Pool = N::Pool;
263 type Evm = N::Evm;
264 type Network = N::Network;
265
266 #[inline]
267 fn pool(&self) -> &Self::Pool {
268 self.inner.eth_api.pool()
269 }
270
271 #[inline]
272 fn evm_config(&self) -> &Self::Evm {
273 self.inner.eth_api.evm_config()
274 }
275
276 #[inline]
277 fn network(&self) -> &Self::Network {
278 self.inner.eth_api.network()
279 }
280
281 #[inline]
282 fn provider(&self) -> &Self::Provider {
283 self.inner.eth_api.provider()
284 }
285}
286
287impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
288where
289 N: RpcNodeCore,
290 Rpc: RpcConvert<Primitives = N::Primitives>,
291{
292 #[inline]
293 fn cache(&self) -> &EthStateCache<N::Primitives> {
294 self.inner.eth_api.cache()
295 }
296}
297
298impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
299where
300 N: RpcNodeCore,
301 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
302{
303 #[inline]
304 fn starting_block(&self) -> U256 {
305 self.inner.eth_api.starting_block()
306 }
307}
308
309impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
310where
311 N: RpcNodeCore,
312 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
313{
314 #[inline]
315 fn io_task_spawner(&self) -> impl TaskSpawner {
316 self.inner.eth_api.task_spawner()
317 }
318
319 #[inline]
320 fn tracing_task_pool(&self) -> &BlockingTaskPool {
321 self.inner.eth_api.blocking_task_pool()
322 }
323
324 #[inline]
325 fn tracing_task_guard(&self) -> &BlockingTaskGuard {
326 self.inner.eth_api.blocking_task_guard()
327 }
328
329 #[inline]
330 fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
331 self.inner.eth_api.blocking_io_request_semaphore()
332 }
333}
334
335impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
336where
337 N: RpcNodeCore,
338 OpEthApiError: FromEvmError<N::Evm>,
339 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
340{
341 #[inline]
342 fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
343 self.inner.eth_api.gas_oracle()
344 }
345
346 #[inline]
347 fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
348 self.inner.eth_api.fee_history_cache()
349 }
350
351 async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
352 self.inner
353 .eth_api
354 .gas_oracle()
355 .op_suggest_tip_cap(self.inner.min_suggested_priority_fee)
356 .await
357 .map_err(Into::into)
358 }
359}
360
361impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
362where
363 N: RpcNodeCore,
364 Rpc: RpcConvert<Primitives = N::Primitives>,
365 Self: LoadPendingBlock,
366{
367}
368
369impl<N, Rpc> EthState for OpEthApi<N, Rpc>
370where
371 N: RpcNodeCore,
372 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
373 Self: LoadPendingBlock,
374{
375 #[inline]
376 fn max_proof_window(&self) -> u64 {
377 self.inner.eth_api.eth_proof_window()
378 }
379}
380
381impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
382where
383 N: RpcNodeCore,
384 OpEthApiError: FromEvmError<N::Evm>,
385 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
386{
387}
388
389impl<N, Rpc> Trace for OpEthApi<N, Rpc>
390where
391 N: RpcNodeCore,
392 OpEthApiError: FromEvmError<N::Evm>,
393 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError, Evm = N::Evm>,
394{
395}
396
397impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
398 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
399 f.debug_struct("OpEthApi").finish_non_exhaustive()
400 }
401}
402
403pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
405 eth_api: EthApiNodeBackend<N, Rpc>,
407 sequencer_client: Option<SequencerClient>,
410 min_suggested_priority_fee: U256,
414 flashblocks: Option<FlashblocksListeners<N::Primitives>>,
418}
419
420impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
421 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
422 f.debug_struct("OpEthApiInner").finish()
423 }
424}
425
426impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
427 const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
429 &self.eth_api
430 }
431
432 const fn sequencer_client(&self) -> Option<&SequencerClient> {
434 self.sequencer_client.as_ref()
435 }
436}
437
438pub type OpRpcConvert<N, NetworkT> = RpcConverter<
440 NetworkT,
441 <N as FullNodeComponents>::Evm,
442 OpReceiptConverter<<N as FullNodeTypes>::Provider>,
443 (),
444 OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
445>;
446
447#[derive(Debug)]
449pub struct OpEthApiBuilder<NetworkT = Optimism> {
450 sequencer_url: Option<String>,
453 sequencer_headers: Vec<String>,
455 min_suggested_priority_fee: u64,
457 flashblocks_url: Option<Url>,
461 flashblock_consensus: bool,
467 _nt: PhantomData<NetworkT>,
469}
470
471impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
472 fn default() -> Self {
473 Self {
474 sequencer_url: None,
475 sequencer_headers: Vec::new(),
476 min_suggested_priority_fee: 1_000_000,
477 flashblocks_url: None,
478 flashblock_consensus: false,
479 _nt: PhantomData,
480 }
481 }
482}
483
484impl<NetworkT> OpEthApiBuilder<NetworkT> {
485 pub const fn new() -> Self {
487 Self {
488 sequencer_url: None,
489 sequencer_headers: Vec::new(),
490 min_suggested_priority_fee: 1_000_000,
491 flashblocks_url: None,
492 flashblock_consensus: false,
493 _nt: PhantomData,
494 }
495 }
496
497 pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
499 self.sequencer_url = sequencer_url;
500 self
501 }
502
503 pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
505 self.sequencer_headers = sequencer_headers;
506 self
507 }
508
509 pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
511 self.min_suggested_priority_fee = min;
512 self
513 }
514
515 pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
517 self.flashblocks_url = flashblocks_url;
518 self
519 }
520
521 pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self {
523 self.flashblock_consensus = flashblock_consensus;
524 self
525 }
526}
527
528impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
529where
530 N: FullNodeComponents<
531 Evm: ConfigureEvm<
532 NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
533 + From<OpFlashblockPayloadBase>
534 + Unpin,
535 >,
536 Types: NodeTypes<
537 ChainSpec: Hardforks + EthereumHardforks,
538 Payload: reth_node_api::PayloadTypes<
539 ExecutionData: for<'a> TryFrom<
540 &'a FlashBlockCompleteSequence,
541 Error: std::fmt::Display,
542 >,
543 >,
544 >,
545 >,
546 NetworkT: RpcTypes,
547 OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
548 OpEthApi<N, OpRpcConvert<N, NetworkT>>:
549 FullEthApiServer<Provider = N::Provider, Pool = N::Pool>,
550{
551 type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
552
553 async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
554 let Self {
555 sequencer_url,
556 sequencer_headers,
557 min_suggested_priority_fee,
558 flashblocks_url,
559 flashblock_consensus,
560 ..
561 } = self;
562 let rpc_converter =
563 RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
564 .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
565
566 let sequencer_client = if let Some(url) = sequencer_url {
567 Some(
568 SequencerClient::new_with_headers(&url, sequencer_headers)
569 .await
570 .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
571 )
572 } else {
573 None
574 };
575
576 let flashblocks = if let Some(ws_url) = flashblocks_url {
577 info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
578
579 let (tx, pending_rx) = watch::channel(None);
580 let stream = WsFlashBlockStream::new(ws_url);
581 let service = FlashBlockService::new(
582 stream,
583 ctx.components.evm_config().clone(),
584 ctx.components.provider().clone(),
585 ctx.components.task_executor().clone(),
586 flashblock_consensus,
588 );
589
590 let flashblocks_sequence = service.block_sequence_broadcaster().clone();
591 let received_flashblocks = service.flashblocks_broadcaster().clone();
592 let in_progress_rx = service.subscribe_in_progress();
593 ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
594
595 if flashblock_consensus {
596 info!(target: "reth::cli", "Launching FlashBlockConsensusClient");
597 let flashblock_client = FlashBlockConsensusClient::new(
598 ctx.engine_handle.clone(),
599 flashblocks_sequence.subscribe(),
600 )?;
601 ctx.components.task_executor().spawn(Box::pin(flashblock_client.run()));
602 }
603
604 Some(FlashblocksListeners::new(
605 pending_rx,
606 flashblocks_sequence,
607 in_progress_rx,
608 received_flashblocks,
609 ))
610 } else {
611 None
612 };
613
614 let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
615
616 Ok(OpEthApi::new(
617 eth_api,
618 sequencer_client,
619 U256::from(min_suggested_priority_fee),
620 flashblocks,
621 ))
622 }
623}