reth_optimism_flashblocks/
service.rs

1use crate::{sequence::FlashBlockSequence, ExecutionPayloadBaseV1, FlashBlock};
2use alloy_eips::BlockNumberOrTag;
3use alloy_primitives::B256;
4use futures_util::{FutureExt, Stream, StreamExt};
5use reth_chain_state::{
6    CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, ExecutedBlock,
7};
8use reth_errors::RethError;
9use reth_evm::{
10    execute::{BlockBuilder, BlockBuilderOutcome},
11    ConfigureEvm,
12};
13use reth_execution_types::ExecutionOutcome;
14use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
15use reth_revm::{cached::CachedReads, database::StateProviderDatabase, db::State};
16use reth_rpc_eth_types::{EthApiError, PendingBlock};
17use reth_storage_api::{noop::NoopProvider, BlockReaderIdExt, StateProviderFactory};
18use std::{
19    pin::Pin,
20    sync::Arc,
21    task::{Context, Poll},
22    time::{Duration, Instant},
23};
24use tokio::pin;
25use tracing::{debug, trace, warn};
26
27/// The `FlashBlockService` maintains an in-memory [`PendingBlock`] built out of a sequence of
28/// [`FlashBlock`]s.
29#[derive(Debug)]
30pub struct FlashBlockService<
31    N: NodePrimitives,
32    S,
33    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: Unpin>,
34    Provider,
35> {
36    rx: S,
37    current: Option<PendingBlock<N>>,
38    blocks: FlashBlockSequence<N::SignedTx>,
39    rebuild: bool,
40    evm_config: EvmConfig,
41    provider: Provider,
42    canon_receiver: CanonStateNotifications<N>,
43    /// Cached state reads for the current block.
44    /// Current `PendingBlock` is built out of a sequence of `FlashBlocks`, and executed again when
45    /// fb received on top of the same block. Avoid redundant I/O across multiple executions
46    /// within the same block.
47    cached_state: Option<(B256, CachedReads)>,
48}
49
50impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
51where
52    N: NodePrimitives,
53    S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
54    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
55    Provider: StateProviderFactory
56        + CanonStateSubscriptions<Primitives = N>
57        + BlockReaderIdExt<
58            Header = HeaderTy<N>,
59            Block = BlockTy<N>,
60            Transaction = N::SignedTx,
61            Receipt = ReceiptTy<N>,
62        > + Unpin,
63{
64    /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
65    pub fn new(rx: S, evm_config: EvmConfig, provider: Provider) -> Self {
66        Self {
67            rx,
68            current: None,
69            blocks: FlashBlockSequence::new(),
70            evm_config,
71            canon_receiver: provider.subscribe_to_canonical_state(),
72            provider,
73            cached_state: None,
74            rebuild: false,
75        }
76    }
77
78    /// Drives the services and sends new blocks to the receiver
79    ///
80    /// Note: this should be spawned
81    pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingBlock<N>>>) {
82        while let Some(block) = self.next().await {
83            if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
84                let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}"));
85            }
86        }
87
88        warn!("Flashblock service has stopped");
89    }
90
91    /// Returns the cached reads at the given head hash.
92    ///
93    /// Returns a new cache instance if this is new `head` hash.
94    fn cached_reads(&mut self, head: B256) -> CachedReads {
95        if let Some((tracked, cache)) = self.cached_state.take() {
96            if tracked == head {
97                return cache
98            }
99        }
100
101        // instantiate a new cache instance
102        CachedReads::default()
103    }
104
105    /// Updates the cached reads at the given head hash
106    fn update_cached_reads(&mut self, head: B256, cached_reads: CachedReads) {
107        self.cached_state = Some((head, cached_reads));
108    }
109
110    /// Returns the [`ExecutedBlock`] made purely out of [`FlashBlock`]s that were received earlier.
111    ///
112    /// Returns None if the flashblock doesn't attach to the latest header.
113    fn execute(&mut self) -> eyre::Result<Option<PendingBlock<N>>> {
114        trace!("Attempting new flashblock");
115
116        let latest = self
117            .provider
118            .latest_header()?
119            .ok_or(EthApiError::HeaderNotFound(BlockNumberOrTag::Latest.into()))?;
120        let latest_hash = latest.hash();
121
122        let Some(attrs) = self.blocks.payload_base() else {
123            trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing flashblock payload base");
124            return Ok(None)
125        };
126
127        if attrs.parent_hash != latest_hash {
128            trace!(flashblock_parent = ?attrs.parent_hash, local_latest=?latest.num_hash(),"Skipping non consecutive flashblock");
129            // doesn't attach to the latest block
130            return Ok(None)
131        }
132
133        let state_provider = self.provider.history_by_block_hash(latest.hash())?;
134
135        let mut request_cache = self.cached_reads(latest_hash);
136        let cached_db = request_cache.as_db_mut(StateProviderDatabase::new(&state_provider));
137        let mut state = State::builder().with_database(cached_db).with_bundle_update().build();
138
139        let mut builder = self
140            .evm_config
141            .builder_for_next_block(&mut state, &latest, attrs.into())
142            .map_err(RethError::other)?;
143
144        builder.apply_pre_execution_changes()?;
145
146        for tx in self.blocks.ready_transactions() {
147            let _gas_used = builder.execute_transaction(tx)?;
148        }
149
150        let BlockBuilderOutcome { execution_result, block, hashed_state, .. } =
151            builder.finish(NoopProvider::default())?;
152
153        let execution_outcome = ExecutionOutcome::new(
154            state.take_bundle(),
155            vec![execution_result.receipts],
156            block.number(),
157            vec![execution_result.requests],
158        );
159
160        // update cached reads
161        self.update_cached_reads(latest_hash, request_cache);
162
163        Ok(Some(PendingBlock::with_executed_block(
164            Instant::now() + Duration::from_secs(1),
165            ExecutedBlock {
166                recovered_block: block.into(),
167                execution_output: Arc::new(execution_outcome),
168                hashed_state: Arc::new(hashed_state),
169            },
170        )))
171    }
172
173    /// Takes out `current` [`PendingBlock`] if `state` is not preceding it.
174    fn on_new_tip(&mut self, state: CanonStateNotification<N>) -> Option<PendingBlock<N>> {
175        let latest = state.tip_checked()?.hash();
176        self.current.take_if(|current| current.parent_hash() != latest)
177    }
178}
179
180impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
181where
182    N: NodePrimitives,
183    S: Stream<Item = eyre::Result<FlashBlock>> + Unpin,
184    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>,
185    Provider: StateProviderFactory
186        + CanonStateSubscriptions<Primitives = N>
187        + BlockReaderIdExt<
188            Header = HeaderTy<N>,
189            Block = BlockTy<N>,
190            Transaction = N::SignedTx,
191            Receipt = ReceiptTy<N>,
192        > + Unpin,
193{
194    type Item = eyre::Result<Option<PendingBlock<N>>>;
195
196    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
197        let this = self.get_mut();
198
199        // consume new flashblocks while they're ready
200        while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
201            match result {
202                Ok(flashblock) => match this.blocks.insert(flashblock) {
203                    Ok(_) => this.rebuild = true,
204                    Err(err) => debug!(%err, "Failed to prepare flashblock"),
205                },
206                Err(err) => return Poll::Ready(Some(Err(err))),
207            }
208        }
209
210        if let Poll::Ready(Ok(state)) = {
211            let fut = this.canon_receiver.recv();
212            pin!(fut);
213            fut.poll_unpin(cx)
214        } {
215            if let Some(current) = this.on_new_tip(state) {
216                trace!(
217                    parent_hash = %current.block().parent_hash(),
218                    block_number = current.block().number(),
219                    "Clearing current flashblock on new canonical block"
220                );
221
222                return Poll::Ready(Some(Ok(None)))
223            }
224        }
225
226        if !this.rebuild && this.current.is_some() {
227            return Poll::Pending
228        }
229
230        let now = Instant::now();
231        // try to build a block on top of latest
232        match this.execute() {
233            Ok(Some(new_pending)) => {
234                // built a new pending block
235                this.current = Some(new_pending.clone());
236                this.rebuild = false;
237                trace!(parent_hash=%new_pending.block().parent_hash(), block_number=new_pending.block().number(), flash_blocks=this.blocks.count(), elapsed=?now.elapsed(), "Built new block with flashblocks");
238                return Poll::Ready(Some(Ok(Some(new_pending))));
239            }
240            Ok(None) => {
241                // nothing to do because tracked flashblock doesn't attach to latest
242            }
243            Err(err) => {
244                // we can ignore this error
245                debug!(%err, "failed to execute flashblock");
246            }
247        }
248
249        Poll::Pending
250    }
251}