reth_optimism_flashblocks/
service.rs

1use crate::{
2    sequence::FlashBlockPendingSequence,
3    worker::{BuildArgs, FlashBlockBuilder},
4    ExecutionPayloadBaseV1, FlashBlock, FlashBlockCompleteSequenceRx, InProgressFlashBlockRx,
5    PendingFlashBlock,
6};
7use alloy_eips::eip2718::WithEncoded;
8use alloy_primitives::B256;
9use futures_util::{FutureExt, Stream, StreamExt};
10use metrics::Histogram;
11use reth_chain_state::{CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions};
12use reth_evm::ConfigureEvm;
13use reth_metrics::Metrics;
14use reth_primitives_traits::{
15    AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy, Recovered,
16};
17use reth_revm::cached::CachedReads;
18use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
19use reth_tasks::TaskExecutor;
20use std::{
21    pin::Pin,
22    task::{ready, Context, Poll},
23    time::Instant,
24};
25use tokio::{
26    pin,
27    sync::{oneshot, watch},
28};
29use tracing::{debug, trace, warn};
30
31pub(crate) const FB_STATE_ROOT_FROM_INDEX: usize = 9;
32
33/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of
34/// [`FlashBlock`]s.
35#[derive(Debug)]
36pub struct FlashBlockService<
37    N: NodePrimitives,
38    S,
39    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: Unpin>,
40    Provider,
41> {
42    rx: S,
43    current: Option<PendingFlashBlock<N>>,
44    blocks: FlashBlockPendingSequence<N::SignedTx>,
45    rebuild: bool,
46    builder: FlashBlockBuilder<EvmConfig, Provider>,
47    canon_receiver: CanonStateNotifications<N>,
48    spawner: TaskExecutor,
49    job: Option<BuildJob<N>>,
50    /// Cached state reads for the current block.
51    /// Current `PendingFlashBlock` is built out of a sequence of `FlashBlocks`, and executed again
52    /// when fb received on top of the same block. Avoid redundant I/O across multiple
53    /// executions within the same block.
54    cached_state: Option<(B256, CachedReads)>,
55    /// Signals when a block build is in progress
56    in_progress_tx: watch::Sender<Option<FlashBlockBuildInfo>>,
57    /// `FlashBlock` service's metrics
58    metrics: FlashBlockServiceMetrics,
59    /// Enable state root calculation from flashblock with index [`FB_STATE_ROOT_FROM_INDEX`]
60    compute_state_root: bool,
61}
62
63/// Information for a flashblock currently built
64#[derive(Debug, Clone, Copy)]
65pub struct FlashBlockBuildInfo {
66    /// Parent block hash
67    pub parent_hash: B256,
68    /// Flashblock index within the current block's sequence
69    pub index: u64,
70    /// Block number of the flashblock being built.
71    pub block_number: u64,
72}
73
74impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
75where
76    N: NodePrimitives,
77    S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
78    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
79        + Clone
80        + 'static,
81    Provider: StateProviderFactory
82        + CanonStateSubscriptions<Primitives = N>
83        + BlockReaderIdExt<
84            Header = HeaderTy<N>,
85            Block = BlockTy<N>,
86            Transaction = N::SignedTx,
87            Receipt = ReceiptTy<N>,
88        > + Unpin
89        + Clone
90        + 'static,
91{
92    /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
93    pub fn new(rx: S, evm_config: EvmConfig, provider: Provider, spawner: TaskExecutor) -> Self {
94        let (in_progress_tx, _) = watch::channel(None);
95        Self {
96            rx,
97            current: None,
98            blocks: FlashBlockPendingSequence::new(),
99            canon_receiver: provider.subscribe_to_canonical_state(),
100            builder: FlashBlockBuilder::new(evm_config, provider),
101            rebuild: false,
102            spawner,
103            job: None,
104            cached_state: None,
105            in_progress_tx,
106            metrics: FlashBlockServiceMetrics::default(),
107            compute_state_root: false,
108        }
109    }
110
111    /// Enable state root calculation from flashblock
112    pub const fn compute_state_root(mut self, enable_state_root: bool) -> Self {
113        self.compute_state_root = enable_state_root;
114        self
115    }
116
117    /// Returns a subscriber to the flashblock sequence.
118    pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
119        self.blocks.subscribe_block_sequence()
120    }
121
122    /// Returns a receiver that signals when a flashblock is being built.
123    pub fn subscribe_in_progress(&self) -> InProgressFlashBlockRx {
124        self.in_progress_tx.subscribe()
125    }
126
127    /// Drives the services and sends new blocks to the receiver
128    ///
129    /// Note: this should be spawned
130    pub async fn run(mut self, tx: tokio::sync::watch::Sender<Option<PendingFlashBlock<N>>>) {
131        while let Some(block) = self.next().await {
132            if let Ok(block) = block.inspect_err(|e| tracing::error!("{e}")) {
133                let _ = tx.send(block).inspect_err(|e| tracing::error!("{e}"));
134            }
135        }
136
137        warn!("Flashblock service has stopped");
138    }
139
140    /// Returns the [`BuildArgs`] made purely out of [`FlashBlock`]s that were received earlier.
141    ///
142    /// Returns `None` if the flashblock have no `base` or the base is not a child block of latest.
143    fn build_args(
144        &mut self,
145    ) -> Option<
146        BuildArgs<
147            impl IntoIterator<Item = WithEncoded<Recovered<N::SignedTx>>>
148                + use<N, S, EvmConfig, Provider>,
149        >,
150    > {
151        let Some(base) = self.blocks.payload_base() else {
152            trace!(
153                flashblock_number = ?self.blocks.block_number(),
154                count = %self.blocks.count(),
155                "Missing flashblock payload base"
156            );
157
158            return None
159        };
160
161        // attempt an initial consecutive check
162        if let Some(latest) = self.builder.provider().latest_header().ok().flatten() &&
163            latest.hash() != base.parent_hash
164        {
165            trace!(flashblock_parent=?base.parent_hash, flashblock_number=base.block_number, local_latest=?latest.num_hash(), "Skipping non consecutive build attempt");
166            return None
167        }
168
169        let Some(last_flashblock) = self.blocks.last_flashblock() else {
170            trace!(flashblock_number = ?self.blocks.block_number(), count = %self.blocks.count(), "Missing last flashblock");
171            return None
172        };
173
174        // Check if state root must be computed
175        let compute_state_root =
176            self.compute_state_root && self.blocks.index() >= Some(FB_STATE_ROOT_FROM_INDEX as u64);
177
178        Some(BuildArgs {
179            base,
180            transactions: self.blocks.ready_transactions().collect::<Vec<_>>(),
181            cached_state: self.cached_state.take(),
182            last_flashblock_index: last_flashblock.index,
183            last_flashblock_hash: last_flashblock.diff.block_hash,
184            compute_state_root,
185        })
186    }
187
188    /// Takes out `current` [`PendingFlashBlock`] if `state` is not preceding it.
189    fn on_new_tip(&mut self, state: CanonStateNotification<N>) -> Option<PendingFlashBlock<N>> {
190        let tip = state.tip_checked()?;
191        let tip_hash = tip.hash();
192        let current = self.current.take_if(|current| current.parent_hash() != tip_hash);
193
194        // Prefill the cache with state from the new canonical tip, similar to payload/basic
195        let mut cached = CachedReads::default();
196        let committed = state.committed();
197        let new_execution_outcome = committed.execution_outcome();
198        for (addr, acc) in new_execution_outcome.bundle_accounts_iter() {
199            if let Some(info) = acc.info.clone() {
200                // Pre-cache existing accounts and their storage (only changed accounts/storage)
201                let storage =
202                    acc.storage.iter().map(|(key, slot)| (*key, slot.present_value)).collect();
203                cached.insert_account(addr, info, storage);
204            }
205        }
206        self.cached_state = Some((tip_hash, cached));
207
208        current
209    }
210}
211
212impl<N, S, EvmConfig, Provider> Stream for FlashBlockService<N, S, EvmConfig, Provider>
213where
214    N: NodePrimitives,
215    S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
216    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<ExecutionPayloadBaseV1> + Unpin>
217        + Clone
218        + 'static,
219    Provider: StateProviderFactory
220        + CanonStateSubscriptions<Primitives = N>
221        + BlockReaderIdExt<
222            Header = HeaderTy<N>,
223            Block = BlockTy<N>,
224            Transaction = N::SignedTx,
225            Receipt = ReceiptTy<N>,
226        > + Unpin
227        + Clone
228        + 'static,
229{
230    type Item = eyre::Result<Option<PendingFlashBlock<N>>>;
231
232    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
233        let this = self.get_mut();
234
235        loop {
236            // drive pending build job to completion
237            let result = match this.job.as_mut() {
238                Some((now, rx)) => {
239                    let result = ready!(rx.poll_unpin(cx));
240                    result.ok().map(|res| (*now, res))
241                }
242                None => None,
243            };
244            // reset job
245            this.job.take();
246            // No build in progress
247            let _ = this.in_progress_tx.send(None);
248
249            if let Some((now, result)) = result {
250                match result {
251                    Ok(Some((new_pending, cached_reads))) => {
252                        // update state root of the current sequence
253                        this.blocks.set_state_root(new_pending.computed_state_root());
254
255                        // built a new pending block
256                        this.current = Some(new_pending.clone());
257                        // cache reads
258                        this.cached_state = Some((new_pending.parent_hash(), cached_reads));
259                        this.rebuild = false;
260
261                        let elapsed = now.elapsed();
262                        this.metrics.execution_duration.record(elapsed.as_secs_f64());
263                        trace!(
264                            parent_hash = %new_pending.block().parent_hash(),
265                            block_number = new_pending.block().number(),
266                            flash_blocks = this.blocks.count(),
267                            ?elapsed,
268                            "Built new block with flashblocks"
269                        );
270
271                        return Poll::Ready(Some(Ok(Some(new_pending))));
272                    }
273                    Ok(None) => {
274                        // nothing to do because tracked flashblock doesn't attach to latest
275                    }
276                    Err(err) => {
277                        // we can ignore this error
278                        debug!(%err, "failed to execute flashblock");
279                    }
280                }
281            }
282
283            // consume new flashblocks while they're ready
284            while let Poll::Ready(Some(result)) = this.rx.poll_next_unpin(cx) {
285                match result {
286                    Ok(flashblock) => {
287                        if flashblock.index == 0 {
288                            this.metrics.last_flashblock_length.record(this.blocks.count() as f64);
289                        }
290                        match this.blocks.insert(flashblock) {
291                            Ok(_) => this.rebuild = true,
292                            Err(err) => debug!(%err, "Failed to prepare flashblock"),
293                        }
294                    }
295                    Err(err) => return Poll::Ready(Some(Err(err))),
296                }
297            }
298
299            // update on new head block
300            if let Poll::Ready(Ok(state)) = {
301                let fut = this.canon_receiver.recv();
302                pin!(fut);
303                fut.poll_unpin(cx)
304            } && let Some(current) = this.on_new_tip(state)
305            {
306                trace!(
307                    parent_hash = %current.block().parent_hash(),
308                    block_number = current.block().number(),
309                    "Clearing current flashblock on new canonical block"
310                );
311
312                return Poll::Ready(Some(Ok(None)))
313            }
314
315            if !this.rebuild && this.current.is_some() {
316                return Poll::Pending
317            }
318
319            // try to build a block on top of latest
320            if let Some(args) = this.build_args() {
321                let now = Instant::now();
322
323                let fb_info = FlashBlockBuildInfo {
324                    parent_hash: args.base.parent_hash,
325                    index: args.last_flashblock_index,
326                    block_number: args.base.block_number,
327                };
328                // Signal that a flashblock build has started with build metadata
329                let _ = this.in_progress_tx.send(Some(fb_info));
330                let (tx, rx) = oneshot::channel();
331                let builder = this.builder.clone();
332
333                this.spawner.spawn_blocking(async move {
334                    let _ = tx.send(builder.execute(args));
335                });
336                this.job.replace((now, rx));
337
338                // continue and poll the spawned job
339                continue
340            }
341
342            return Poll::Pending
343        }
344    }
345}
346
347type BuildJob<N> =
348    (Instant, oneshot::Receiver<eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>>>);
349
350#[derive(Metrics)]
351#[metrics(scope = "flashblock_service")]
352struct FlashBlockServiceMetrics {
353    /// The last complete length of flashblocks per block.
354    last_flashblock_length: Histogram,
355    /// The duration applying flashblock state changes in seconds.
356    execution_duration: Histogram,
357}