1pub mod ext;
4pub mod receipt;
5pub mod transaction;
6
7mod block;
8mod call;
9mod pending_block;
10
11use crate::{
12 eth::{receipt::OpReceiptConverter, transaction::OpTxInfoMapper},
13 OpEthApiError, SequencerClient,
14};
15use alloy_consensus::BlockHeader;
16use alloy_primitives::{B256, U256};
17use eyre::WrapErr;
18use op_alloy_network::Optimism;
19pub use receipt::{OpReceiptBuilder, OpReceiptFieldsBuilder};
20use reqwest::Url;
21use reth_chainspec::{EthereumHardforks, Hardforks};
22use reth_evm::ConfigureEvm;
23use reth_node_api::{FullNodeComponents, FullNodeTypes, HeaderTy, NodeTypes};
24use reth_node_builder::rpc::{EthApiBuilder, EthApiCtx};
25use reth_optimism_flashblocks::{
26 ExecutionPayloadBaseV1, FlashBlockBuildInfo, FlashBlockCompleteSequenceRx, FlashBlockService,
27 InProgressFlashBlockRx, PendingBlockRx, PendingFlashBlock, WsFlashBlockStream,
28};
29use reth_rpc::eth::{core::EthApiInner, DevSigner};
30use reth_rpc_eth_api::{
31 helpers::{
32 pending_block::BuildPendingEnv, AddDevSigners, EthApiSpec, EthFees, EthState, LoadFee,
33 LoadPendingBlock, LoadState, SpawnBlocking, Trace,
34 },
35 EthApiTypes, FromEvmError, FullEthApiServer, RpcConvert, RpcConverter, RpcNodeCore,
36 RpcNodeCoreExt, RpcTypes, SignableTxRequest,
37};
38use reth_rpc_eth_types::{
39 EthStateCache, FeeHistoryCache, GasPriceOracle, PendingBlock, PendingBlockEnvOrigin,
40};
41use reth_storage_api::{ProviderHeader, ProviderTx};
42use reth_tasks::{
43 pool::{BlockingTaskGuard, BlockingTaskPool},
44 TaskSpawner,
45};
46use std::{
47 fmt::{self, Formatter},
48 marker::PhantomData,
49 sync::Arc,
50 time::Duration,
51};
52use tokio::{sync::watch, time};
53use tracing::info;
54
55const MAX_FLASHBLOCK_WAIT_DURATION: Duration = Duration::from_millis(50);
57
58pub type EthApiNodeBackend<N, Rpc> = EthApiInner<N, Rpc>;
60
61pub struct OpEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
72 inner: Arc<OpEthApiInner<N, Rpc>>,
74}
75
76impl<N: RpcNodeCore, Rpc: RpcConvert> Clone for OpEthApi<N, Rpc> {
77 fn clone(&self) -> Self {
78 Self { inner: self.inner.clone() }
79 }
80}
81
82impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApi<N, Rpc> {
83 pub fn new(
85 eth_api: EthApiNodeBackend<N, Rpc>,
86 sequencer_client: Option<SequencerClient>,
87 min_suggested_priority_fee: U256,
88 pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
89 flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
90 in_progress_rx: Option<InProgressFlashBlockRx>,
91 ) -> Self {
92 let inner = Arc::new(OpEthApiInner {
93 eth_api,
94 sequencer_client,
95 min_suggested_priority_fee,
96 pending_block_rx,
97 flashblock_rx,
98 in_progress_rx,
99 });
100 Self { inner }
101 }
102
103 pub fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
105 self.inner.eth_api()
106 }
107 pub fn sequencer_client(&self) -> Option<&SequencerClient> {
109 self.inner.sequencer_client()
110 }
111
112 pub fn pending_block_rx(&self) -> Option<PendingBlockRx<N::Primitives>> {
114 self.inner.pending_block_rx.clone()
115 }
116
117 pub fn flashblock_rx(&self) -> Option<FlashBlockCompleteSequenceRx> {
119 self.inner.flashblock_rx.as_ref().map(|rx| rx.resubscribe())
120 }
121
122 fn flashblock_build_info(&self) -> Option<FlashBlockBuildInfo> {
124 self.inner.in_progress_rx.as_ref().and_then(|rx| *rx.borrow())
125 }
126
127 fn extract_matching_block(
129 &self,
130 block: Option<&PendingFlashBlock<N::Primitives>>,
131 parent_hash: B256,
132 ) -> Option<PendingBlock<N::Primitives>> {
133 block.filter(|b| b.block().parent_hash() == parent_hash).map(|b| b.pending.clone())
134 }
135
136 pub const fn builder() -> OpEthApiBuilder<Rpc> {
138 OpEthApiBuilder::new()
139 }
140
141 async fn flashblock(
143 &self,
144 parent_hash: B256,
145 ) -> eyre::Result<Option<PendingBlock<N::Primitives>>> {
146 let Some(rx) = self.inner.pending_block_rx.as_ref() else { return Ok(None) };
147
148 if let Some(build_info) = self.flashblock_build_info() {
150 let current_index = rx.borrow().as_ref().map(|b| b.last_flashblock_index);
151
152 let is_next_index = current_index.is_none_or(|idx| build_info.index == idx + 1);
154
155 if build_info.parent_hash == parent_hash && is_next_index {
157 let mut rx_clone = rx.clone();
158 let _ = time::timeout(MAX_FLASHBLOCK_WAIT_DURATION, rx_clone.changed()).await;
160 }
161 }
162
163 Ok(self.extract_matching_block(rx.borrow().as_ref(), parent_hash))
165 }
166
167 pub async fn pending_flashblock(&self) -> eyre::Result<Option<PendingBlock<N::Primitives>>>
173 where
174 OpEthApiError: FromEvmError<N::Evm>,
175 Rpc: RpcConvert<Primitives = N::Primitives>,
176 {
177 let pending = self.pending_block_env_and_cfg()?;
178 let parent = match pending.origin {
179 PendingBlockEnvOrigin::ActualPending(..) => return Ok(None),
180 PendingBlockEnvOrigin::DerivedFromLatest(parent) => parent,
181 };
182
183 self.flashblock(parent.hash()).await
184 }
185}
186
187impl<N, Rpc> EthApiTypes for OpEthApi<N, Rpc>
188where
189 N: RpcNodeCore,
190 Rpc: RpcConvert<Primitives = N::Primitives>,
191{
192 type Error = OpEthApiError;
193 type NetworkTypes = Rpc::Network;
194 type RpcConvert = Rpc;
195
196 fn tx_resp_builder(&self) -> &Self::RpcConvert {
197 self.inner.eth_api.tx_resp_builder()
198 }
199}
200
201impl<N, Rpc> RpcNodeCore for OpEthApi<N, Rpc>
202where
203 N: RpcNodeCore,
204 Rpc: RpcConvert<Primitives = N::Primitives>,
205{
206 type Primitives = N::Primitives;
207 type Provider = N::Provider;
208 type Pool = N::Pool;
209 type Evm = N::Evm;
210 type Network = N::Network;
211
212 #[inline]
213 fn pool(&self) -> &Self::Pool {
214 self.inner.eth_api.pool()
215 }
216
217 #[inline]
218 fn evm_config(&self) -> &Self::Evm {
219 self.inner.eth_api.evm_config()
220 }
221
222 #[inline]
223 fn network(&self) -> &Self::Network {
224 self.inner.eth_api.network()
225 }
226
227 #[inline]
228 fn provider(&self) -> &Self::Provider {
229 self.inner.eth_api.provider()
230 }
231}
232
233impl<N, Rpc> RpcNodeCoreExt for OpEthApi<N, Rpc>
234where
235 N: RpcNodeCore,
236 Rpc: RpcConvert<Primitives = N::Primitives>,
237{
238 #[inline]
239 fn cache(&self) -> &EthStateCache<N::Primitives> {
240 self.inner.eth_api.cache()
241 }
242}
243
244impl<N, Rpc> EthApiSpec for OpEthApi<N, Rpc>
245where
246 N: RpcNodeCore,
247 Rpc: RpcConvert<Primitives = N::Primitives>,
248{
249 #[inline]
250 fn starting_block(&self) -> U256 {
251 self.inner.eth_api.starting_block()
252 }
253}
254
255impl<N, Rpc> SpawnBlocking for OpEthApi<N, Rpc>
256where
257 N: RpcNodeCore,
258 Rpc: RpcConvert<Primitives = N::Primitives>,
259{
260 #[inline]
261 fn io_task_spawner(&self) -> impl TaskSpawner {
262 self.inner.eth_api.task_spawner()
263 }
264
265 #[inline]
266 fn tracing_task_pool(&self) -> &BlockingTaskPool {
267 self.inner.eth_api.blocking_task_pool()
268 }
269
270 #[inline]
271 fn tracing_task_guard(&self) -> &BlockingTaskGuard {
272 self.inner.eth_api.blocking_task_guard()
273 }
274}
275
276impl<N, Rpc> LoadFee for OpEthApi<N, Rpc>
277where
278 N: RpcNodeCore,
279 OpEthApiError: FromEvmError<N::Evm>,
280 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
281{
282 #[inline]
283 fn gas_oracle(&self) -> &GasPriceOracle<Self::Provider> {
284 self.inner.eth_api.gas_oracle()
285 }
286
287 #[inline]
288 fn fee_history_cache(&self) -> &FeeHistoryCache<ProviderHeader<N::Provider>> {
289 self.inner.eth_api.fee_history_cache()
290 }
291
292 async fn suggested_priority_fee(&self) -> Result<U256, Self::Error> {
293 let min_tip = U256::from(self.inner.min_suggested_priority_fee);
294 self.inner.eth_api.gas_oracle().op_suggest_tip_cap(min_tip).await.map_err(Into::into)
295 }
296}
297
298impl<N, Rpc> LoadState for OpEthApi<N, Rpc>
299where
300 N: RpcNodeCore,
301 Rpc: RpcConvert<Primitives = N::Primitives>,
302 Self: LoadPendingBlock,
303{
304}
305
306impl<N, Rpc> EthState for OpEthApi<N, Rpc>
307where
308 N: RpcNodeCore,
309 Rpc: RpcConvert<Primitives = N::Primitives>,
310 Self: LoadPendingBlock,
311{
312 #[inline]
313 fn max_proof_window(&self) -> u64 {
314 self.inner.eth_api.eth_proof_window()
315 }
316}
317
318impl<N, Rpc> EthFees for OpEthApi<N, Rpc>
319where
320 N: RpcNodeCore,
321 OpEthApiError: FromEvmError<N::Evm>,
322 Rpc: RpcConvert<Primitives = N::Primitives, Error = OpEthApiError>,
323{
324}
325
326impl<N, Rpc> Trace for OpEthApi<N, Rpc>
327where
328 N: RpcNodeCore,
329 OpEthApiError: FromEvmError<N::Evm>,
330 Rpc: RpcConvert<Primitives = N::Primitives>,
331{
332}
333
334impl<N, Rpc> AddDevSigners for OpEthApi<N, Rpc>
335where
336 N: RpcNodeCore,
337 Rpc: RpcConvert<
338 Network: RpcTypes<TransactionRequest: SignableTxRequest<ProviderTx<N::Provider>>>,
339 >,
340{
341 fn with_dev_accounts(&self) {
342 *self.inner.eth_api.signers().write() = DevSigner::random_signers(20)
343 }
344}
345
346impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApi<N, Rpc> {
347 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
348 f.debug_struct("OpEthApi").finish_non_exhaustive()
349 }
350}
351
352pub struct OpEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
354 eth_api: EthApiNodeBackend<N, Rpc>,
356 sequencer_client: Option<SequencerClient>,
359 min_suggested_priority_fee: U256,
363 pending_block_rx: Option<PendingBlockRx<N::Primitives>>,
367 flashblock_rx: Option<FlashBlockCompleteSequenceRx>,
371 in_progress_rx: Option<InProgressFlashBlockRx>,
373}
374
375impl<N: RpcNodeCore, Rpc: RpcConvert> fmt::Debug for OpEthApiInner<N, Rpc> {
376 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
377 f.debug_struct("OpEthApiInner").finish()
378 }
379}
380
381impl<N: RpcNodeCore, Rpc: RpcConvert> OpEthApiInner<N, Rpc> {
382 const fn eth_api(&self) -> &EthApiNodeBackend<N, Rpc> {
384 &self.eth_api
385 }
386
387 const fn sequencer_client(&self) -> Option<&SequencerClient> {
389 self.sequencer_client.as_ref()
390 }
391}
392
393pub type OpRpcConvert<N, NetworkT> = RpcConverter<
395 NetworkT,
396 <N as FullNodeComponents>::Evm,
397 OpReceiptConverter<<N as FullNodeTypes>::Provider>,
398 (),
399 OpTxInfoMapper<<N as FullNodeTypes>::Provider>,
400>;
401
402#[derive(Debug)]
404pub struct OpEthApiBuilder<NetworkT = Optimism> {
405 sequencer_url: Option<String>,
408 sequencer_headers: Vec<String>,
410 min_suggested_priority_fee: u64,
412 flashblocks_url: Option<Url>,
416 _nt: PhantomData<NetworkT>,
418}
419
420impl<NetworkT> Default for OpEthApiBuilder<NetworkT> {
421 fn default() -> Self {
422 Self {
423 sequencer_url: None,
424 sequencer_headers: Vec::new(),
425 min_suggested_priority_fee: 1_000_000,
426 flashblocks_url: None,
427 _nt: PhantomData,
428 }
429 }
430}
431
432impl<NetworkT> OpEthApiBuilder<NetworkT> {
433 pub const fn new() -> Self {
435 Self {
436 sequencer_url: None,
437 sequencer_headers: Vec::new(),
438 min_suggested_priority_fee: 1_000_000,
439 flashblocks_url: None,
440 _nt: PhantomData,
441 }
442 }
443
444 pub fn with_sequencer(mut self, sequencer_url: Option<String>) -> Self {
446 self.sequencer_url = sequencer_url;
447 self
448 }
449
450 pub fn with_sequencer_headers(mut self, sequencer_headers: Vec<String>) -> Self {
452 self.sequencer_headers = sequencer_headers;
453 self
454 }
455
456 pub const fn with_min_suggested_priority_fee(mut self, min: u64) -> Self {
458 self.min_suggested_priority_fee = min;
459 self
460 }
461
462 pub fn with_flashblocks(mut self, flashblocks_url: Option<Url>) -> Self {
464 self.flashblocks_url = flashblocks_url;
465 self
466 }
467}
468
469impl<N, NetworkT> EthApiBuilder<N> for OpEthApiBuilder<NetworkT>
470where
471 N: FullNodeComponents<
472 Evm: ConfigureEvm<
473 NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>
474 + From<ExecutionPayloadBaseV1>
475 + Unpin,
476 >,
477 Types: NodeTypes<ChainSpec: Hardforks + EthereumHardforks>,
478 >,
479 NetworkT: RpcTypes,
480 OpRpcConvert<N, NetworkT>: RpcConvert<Network = NetworkT>,
481 OpEthApi<N, OpRpcConvert<N, NetworkT>>:
482 FullEthApiServer<Provider = N::Provider, Pool = N::Pool> + AddDevSigners,
483{
484 type EthApi = OpEthApi<N, OpRpcConvert<N, NetworkT>>;
485
486 async fn build_eth_api(self, ctx: EthApiCtx<'_, N>) -> eyre::Result<Self::EthApi> {
487 let Self {
488 sequencer_url,
489 sequencer_headers,
490 min_suggested_priority_fee,
491 flashblocks_url,
492 ..
493 } = self;
494 let rpc_converter =
495 RpcConverter::new(OpReceiptConverter::new(ctx.components.provider().clone()))
496 .with_mapper(OpTxInfoMapper::new(ctx.components.provider().clone()));
497
498 let sequencer_client = if let Some(url) = sequencer_url {
499 Some(
500 SequencerClient::new_with_headers(&url, sequencer_headers)
501 .await
502 .wrap_err_with(|| format!("Failed to init sequencer client with: {url}"))?,
503 )
504 } else {
505 None
506 };
507
508 let (pending_block_rx, flashblock_rx, in_progress_rx) =
509 if let Some(ws_url) = flashblocks_url {
510 info!(target: "reth:cli", %ws_url, "Launching flashblocks service");
511
512 let (tx, pending_rx) = watch::channel(None);
513 let stream = WsFlashBlockStream::new(ws_url);
514 let service = FlashBlockService::new(
515 stream,
516 ctx.components.evm_config().clone(),
517 ctx.components.provider().clone(),
518 ctx.components.task_executor().clone(),
519 );
520
521 let flashblock_rx = service.subscribe_block_sequence();
522 let in_progress_rx = service.subscribe_in_progress();
523
524 ctx.components.task_executor().spawn(Box::pin(service.run(tx)));
525
526 (Some(pending_rx), Some(flashblock_rx), Some(in_progress_rx))
527 } else {
528 (None, None, None)
529 };
530
531 let eth_api = ctx.eth_api_builder().with_rpc_converter(rpc_converter).build_inner();
532
533 Ok(OpEthApi::new(
534 eth_api,
535 sequencer_client,
536 U256::from(min_suggested_priority_fee),
537 pending_block_rx,
538 flashblock_rx,
539 in_progress_rx,
540 ))
541 }
542}