1pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12 eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13 OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, U256};
17use eyre::WrapErr;
18use op_alloy_network::Optimism;
19use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
20pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
21use reqwest::Url;
22use reth_chainspec::{EthereumHardforks, Hardforks};
23use reth_evm::ConfigureEvm;
24use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
25use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
26use reth_optimism_flashblocks::{
27 FlashBlockBuildInfo, FlashBlockCompleteSequence, FlashBlockCompleteSequenceRx,
28 FlashBlockConsensusClient, FlashBlockRx, FlashBlockService, FlashblocksListeners,
29 PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
30};
31use reth_rpc::eth::core::EthApiInner;
32use reth_rpc_eth_api::{
33 helpers::{
34 pending_block::BuildPendingEnv, EthApiSpec, EthFees, EthState, LoadFee, LoadPendingBlock,
35 LoadState, SpawnBlocking, Trace,
36 },
37 EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
38 RpcNodeCoreExt, RpcTypes,
39};
40use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock};
41use reth_storage_api::{BlockReaderIdExt, ProviderHeader};
42use reth_tasks::{
43 pool::{BlockingTaskGuard, BlockingTaskPool},
44 TaskSpawner,
45};
46use std::{
47 fmt::{self, Formatter},
48 marker::PhantomData,
49 sync::Arc,
50 time::Duration,
51};
52use tokio::{sync::watch, time};
53use tracing::info;
54
55const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57
58pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
60
61pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
72 inner: Arc<OpEthApiInner<N, Rpc>>,
74}
75
76impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
77 fn clone(&self) -> Self {
78 Self { inner: self.inner.clone() }
79 }
80}
81
82impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
83 pub fn new(
85 eth_api: EthApiNodeBackend<N, Rpc>,
86 sequencer_client: Option<SequencerClient>,
87 min_suggested_priority_fee: U256,
88 flashblocks: Option<FlashblocksListeners<N::Primitives>>,
89 ) -> Self {
90 let inner = Arc::new(OpEthApiInner {
91 eth_api,
92 sequencer_client,
93 min_suggested_priority_fee,
94 flashblocks,
95 });
96 Self { inner }
97 }
98
99 pub const fn builder() -> OpEthApiBuilder<Rpc> {
101 OpEthApiBuilder::new()
102 }
103
104 pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
106 self.inner.eth_api()
107 }
108 pub fn sequencer_client(&self) -> Option<&SequencerClient> {
110 self.inner.sequencer_client()
111 }
112
113 pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
115 self.inner.flashblocks.as_ref().map(|f| f.pending_block_rx.clone())
116 }
117
118 pub fn subscribe_received_flashblocks(&self) -> Option<FlashBlockRx> {
120 self.inner.flashblocks.as_ref().map(|f| f.received_flashblocks.subscribe())
121 }
122
123 pub fn subscribe_flashblock_sequence(&self) -> Option<FlashBlockCompleteSequenceRx> {
125 self.inner.flashblocks.as_ref().map(|f| f.flashblocks_sequence.subscribe())
126 }
127
128 fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
130 self.inner.flashblocks.as_ref().and_then(|f| *f.in_progress_rx.borrow())
131 }
132
133 fn extract_matching_block(
135 &self,
136 block: Option<&PendingFlashBlock<N::Primitives>>,
137 parent_hash: B256,
138 ) -> Option<PendingBlock<N::Primitives>> {
139 block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
140 }
141
142 async fn flashblock(
144 &self,
145 parent_hash: B256,
146 ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
147 let Some(rx) = self.inner.flashblocks.as_ref().map(|f| &f.pending_block_rx) else {
148 return Ok(None)
149 };
150
151 if let Some(build_info) = self.flashblock_build_info() {
153 let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
154
155 let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
157
158 if build_info.parent_hash == parent_hash && is_next_index {
160 let mut rx_clone = rx.clone();
161 let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
163 }
164 }
165
166 Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
168 }
169
170 pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
176 where
177 OpEthApiError: FromEvmError<N::Evm>,
178 Rpc: RpcConvert<Primitives = N::Primitives>,
179 {
180 let Some(latest) = self.provider().latest_header()? else {
181 return Ok(None);
182 };
183
184 self.flashblock(latest.hash()).await
185 }
186}
187
188impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
189where
190 N: RpcNodeCore,
191 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
192{
193 type Error = OpEthApiError;
194 type NetworkTypes = Rpc::Network;
195 type RpcConvert = Rpc;
196
197 fn converter(&self) -> &Self::RpcConvert {
198 self.inner.eth_api.converter()
199 }
200}
201
202impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
203where
204 N: RpcNodeCore,
205 Rpc: RpcConvert<Primitives = N::Primitives>,
206{
207 type Primitives = N::Primitives;
208 type Provider = N::Provider;
209 type Pool = N::Pool;
210 type Evm = N::Evm;
211 type Network = N::Network;
212
213 #[inline]
214 fn pool(&self) -> &Self::Pool {
215 self.inner.eth_api.pool()
216 }
217
218 #[inline]
219 fn evm_config(&self) -> &Self::Evm {
220 self.inner.eth_api.evm_config()
221 }
222
223 #[inline]
224 fn network(&self) -> &Self::Network {
225 self.inner.eth_api.network()
226 }
227
228 #[inline]
229 fn provider(&self) -> &Self::Provider {
230 self.inner.eth_api.provider()
231 }
232}
233
234impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
235where
236 N: RpcNodeCore,
237 Rpc: RpcConvert<Primitives = N::Primitives>,
238{
239 #[inline]
240 fn cache(&self) -> &EthStateCache<N::Primitives> {
241 self.inner.eth_api.cache()
242 }
243}
244
245impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
246where
247 N: RpcNodeCore,
248 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
249{
250 #[inline]
251 fn starting_block(&self) -> U256 {
252 self.inner.eth_api.starting_block()
253 }
254}
255
256impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
257where
258 N: RpcNodeCore,
259 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
260{
261 #[inline]
262 fn io_task_spawner(&self) -> impl TaskSpawner {
263 self.inner.eth_api.task_spawner()
264 }
265
266 #[inline]
267 fn tracing_task_pool(&self) -> &BlockingTaskPool {
268 self.inner.eth_api.blocking_task_pool()
269 }
270
271 #[inline]
272 fn tracing_task_guard(&self) -> &BlockingTaskGuard {
273 self.inner.eth_api.blocking_task_guard()
274 }
275
276 #[inline]
277 fn blocking_io_task_guard(&self) -> &Arc<tokio::sync::Semaphore> {
278 self.inner.eth_api.blocking_io_request_semaphore()
279 }
280}
281
282impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
283where
284 N: RpcNodeCore,
285 OpEthApiError: FromEvmError<N::Evm>,
286 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
287{
288 #[inline]
289 fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
290 self.inner.eth_api.gas_oracle()
291 }
292
293 #[inline]
294 fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
295 self.inner.eth_api.fee_history_cache()
296 }
297
298 async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
299 self.inner
300 .eth_api
301 .gas_oracle()
302 .op_suggest_tip_cap(self.inner.min_suggested_priority_fee)
303 .await
304 .map_err(Into::into)
305 }
306}
307
308impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
309where
310 N: RpcNodeCore,
311 Rpc: RpcConvert<Primitives = N::Primitives>,
312 Self: LoadPendingBlock,
313{
314}
315
316impl<N, Rpc> EthState for OpEthApi<N, Rpc>
317where
318 N: RpcNodeCore,
319 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
320 Self: LoadPendingBlock,
321{
322 #[inline]
323 fn max_proof_window(&self) -> u64 {
324 self.inner.eth_api.eth_proof_window()
325 }
326}
327
328impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
329where
330 N: RpcNodeCore,
331 OpEthApiError: FromEvmError<N::Evm>,
332 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
333{
334}
335
336impl<N, Rpc> Trace for OpEthApi<N, Rpc>
337where
338 N: RpcNodeCore,
339 OpEthApiError: FromEvmError<N::Evm>,
340 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError, Evm = N::Evm>,
341{
342}
343
344impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346 f.debug_struct("OpEthApi").finish_non_exhaustive()
347 }
348}
349
350pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
352 eth_api: EthApiNodeBackend<N, Rpc>,
354 sequencer_client: Option<SequencerClient>,
357 min_suggested_priority_fee: U256,
361 flashblocks: Option<FlashblocksListeners<N::Primitives>>,
365}
366
367impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
368 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
369 f.debug_struct("OpEthApiInner").finish()
370 }
371}
372
373impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
374 const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
376 &self.eth_api
377 }
378
379 const fn sequencer_client(&self) -> Option<&SequencerClient> {
381 self.sequencer_client.as_ref()
382 }
383}
384
385pub type OpRpcConvert<N, NetworkT> = RpcConverter<
387 NetworkT,
388 <N as FullNodeComponents>::Evm,
389 OpReceiptConverter<<N as FullNodeTypes>::Provider>,
390 (),
391 OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
392>;
393
394#[derive(Debug)]
396pub struct OpEthApiBuilder<NetworkT = Optimism> {
397 sequencer_url: Option<String>,
400 sequencer_headers: Vec<String>,
402 min_suggested_priority_fee: u64,
404 flashblocks_url: Option<Url>,
408 flashblock_consensus: bool,
414 _nt: PhantomData<NetworkT>,
416}
417
418impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
419 fn default() -> Self {
420 Self {
421 sequencer_url: None,
422 sequencer_headers: Vec::new(),
423 min_suggested_priority_fee: 1_000_000,
424 flashblocks_url: None,
425 flashblock_consensus: false,
426 _nt: PhantomData,
427 }
428 }
429}
430
431impl<NetworkT> OpEthApiBuilder<NetworkT> {
432 pub const fn new() -> Self {
434 Self {
435 sequencer_url: None,
436 sequencer_headers: Vec::new(),
437 min_suggested_priority_fee: 1_000_000,
438 flashblocks_url: None,
439 flashblock_consensus: false,
440 _nt: PhantomData,
441 }
442 }
443
444 pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
446 self.sequencer_url = sequencer_url;
447 self
448 }
449
450 pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
452 self.sequencer_headers = sequencer_headers;
453 self
454 }
455
456 pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
458 self.min_suggested_priority_fee = min;
459 self
460 }
461
462 pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
464 self.flashblocks_url = flashblocks_url;
465 self
466 }
467
468 pub const fn with_flashblock_consensus(mut self, flashblock_consensus: bool) -> Self {
470 self.flashblock_consensus = flashblock_consensus;
471 self
472 }
473}
474
475impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
476where
477 N: FullNodeComponents<
478 Evm: ConfigureEvm<
479 NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
480 + From<OpFlashblockPayloadBase>
481 + Unpin,
482 >,
483 Types: NodeTypes<
484 ChainSpec: Hardforks + EthereumHardforks,
485 Payload: reth_node_api::PayloadTypes<
486 ExecutionData: for<'a> TryFrom<
487 &'a FlashBlockCompleteSequence,
488 Error: std::fmt::Display,
489 >,
490 >,
491 >,
492 >,
493 NetworkT: RpcTypes,
494 OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
495 OpEthApi<N, OpRpcConvert<N, NetworkT>>:
496 FullEthApiServer<Provider = N::Provider, Pool = N::Pool>,
497{
498 type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
499
500 async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
501 let Self {
502 sequencer_url,
503 sequencer_headers,
504 min_suggested_priority_fee,
505 flashblocks_url,
506 flashblock_consensus,
507 ..
508 } = self;
509 let rpc_converter =
510 RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
511 .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
512
513 let sequencer_client = if let Some(url) = sequencer_url {
514 Some(
515 SequencerClient::new_with_headers(&url, sequencer_headers)
516 .await
517 .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
518 )
519 } else {
520 None
521 };
522
523 let flashblocks = if let Some(ws_url) = flashblocks_url {
524 info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
525
526 let (tx, pending_rx) = watch::channel(None);
527 let stream = WsFlashBlockStream::new(ws_url);
528 let service = FlashBlockService::new(
529 stream,
530 ctx.components.evm_config().clone(),
531 ctx.components.provider().clone(),
532 ctx.components.task_executor().clone(),
533 flashblock_consensus,
535 );
536
537 let flashblocks_sequence = service.block_sequence_broadcaster().clone();
538 let received_flashblocks = service.flashblocks_broadcaster().clone();
539 let in_progress_rx = service.subscribe_in_progress();
540 ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
541
542 if flashblock_consensus {
543 info!(target: "reth::cli", "Launching FlashBlockConsensusClient");
544 let flashblock_client = FlashBlockConsensusClient::new(
545 ctx.engine_handle.clone(),
546 flashblocks_sequence.subscribe(),
547 )?;
548 ctx.components.task_executor().spawn(Box::pin(flashblock_client.run()));
549 }
550
551 Some(FlashblocksListeners::new(
552 pending_rx,
553 flashblocks_sequence,
554 in_progress_rx,
555 received_flashblocks,
556 ))
557 } else {
558 None
559 };
560
561 let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
562
563 Ok(OpEthApi::new(
564 eth_api,
565 sequencer_client,
566 U256::from(min_suggested_priority_fee),
567 flashblocks,
568 ))
569 }
570}