reth_optimism_flashblocks/
service.rs

1use crate::{
2    cache::SequenceManager, worker::FlashBlockBuilder, FlashBlock, FlashBlockCompleteSequence,
3    FlashBlockCompleteSequenceRx, InProgressFlashBlockRx, PendingFlashBlock,
4};
5use alloy_primitives::B256;
6use futures_util::{FutureExt, Stream, StreamExt};
7use metrics::{Gauge, Histogram};
8use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
9use reth_evm::ConfigureEvm;
10use reth_metrics::Metrics;
11use reth_primitives_traits::{AlloyBlockHeader, BlockTy, HeaderTy, NodePrimitives, ReceiptTy};
12use reth_revm::cached::CachedReads;
13use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
14use reth_tasks::TaskExecutor;
15use std::{sync::Arc, time::Instant};
16use tokio::sync::{oneshot, watch};
17use tracing::*;
18
19/// The `FlashBlockService` maintains an in-memory [`PendingFlashBlock`] built out of a sequence of
20/// [`FlashBlock`]s.
21#[derive(Debug)]
22pub struct FlashBlockService<
23    N: NodePrimitives,
24    S,
25    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<OpFlashblockPayloadBase> + Unpin>,
26    Provider,
27> {
28    /// Incoming flashblock stream.
29    incoming_flashblock_rx: S,
30    /// Signals when a block build is in progress.
31    in_progress_tx: watch::Sender<Option<FlashBlockBuildInfo>>,
32    /// Broadcast channel to forward received flashblocks from the subscription.
33    received_flashblocks_tx: tokio::sync::broadcast::Sender<Arc<FlashBlock>>,
34
35    /// Executes flashblock sequences to build pending blocks.
36    builder: FlashBlockBuilder<EvmConfig, Provider>,
37    /// Task executor for spawning block build jobs.
38    spawner: TaskExecutor,
39    /// Currently running block build job with start time and result receiver.
40    job: Option<BuildJob<N>>,
41    /// Manages flashblock sequences with caching and intelligent build selection.
42    sequences: SequenceManager<N::SignedTx>,
43
44    /// `FlashBlock` service's metrics
45    metrics: FlashBlockServiceMetrics,
46}
47
48impl<N, S, EvmConfig, Provider> FlashBlockService<N, S, EvmConfig, Provider>
49where
50    N: NodePrimitives,
51    S: Stream<Item = eyre::Result<FlashBlock>> + Unpin + 'static,
52    EvmConfig: ConfigureEvm<Primitives = N, NextBlockEnvCtx: From<OpFlashblockPayloadBase> + Unpin>
53        + Clone
54        + 'static,
55    Provider: StateProviderFactory
56        + BlockReaderIdExt<
57            Header = HeaderTy<N>,
58            Block = BlockTy<N>,
59            Transaction = N::SignedTx,
60            Receipt = ReceiptTy<N>,
61        > + Unpin
62        + Clone
63        + 'static,
64{
65    /// Constructs a new `FlashBlockService` that receives [`FlashBlock`]s from `rx` stream.
66    pub fn new(
67        incoming_flashblock_rx: S,
68        evm_config: EvmConfig,
69        provider: Provider,
70        spawner: TaskExecutor,
71        compute_state_root: bool,
72    ) -> Self {
73        let (in_progress_tx, _) = watch::channel(None);
74        let (received_flashblocks_tx, _) = tokio::sync::broadcast::channel(128);
75        Self {
76            incoming_flashblock_rx,
77            in_progress_tx,
78            received_flashblocks_tx,
79            builder: FlashBlockBuilder::new(evm_config, provider),
80            spawner,
81            job: None,
82            sequences: SequenceManager::new(compute_state_root),
83            metrics: FlashBlockServiceMetrics::default(),
84        }
85    }
86
87    /// Returns the sender half to the received flashblocks.
88    pub const fn flashblocks_broadcaster(
89        &self,
90    ) -> &tokio::sync::broadcast::Sender<Arc<FlashBlock>> {
91        &self.received_flashblocks_tx
92    }
93
94    /// Returns the sender half to the flashblock sequence.
95    pub const fn block_sequence_broadcaster(
96        &self,
97    ) -> &tokio::sync::broadcast::Sender<FlashBlockCompleteSequence> {
98        self.sequences.block_sequence_broadcaster()
99    }
100
101    /// Returns a subscriber to the flashblock sequence.
102    pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
103        self.sequences.subscribe_block_sequence()
104    }
105
106    /// Returns a receiver that signals when a flashblock is being built.
107    pub fn subscribe_in_progress(&self) -> InProgressFlashBlockRx {
108        self.in_progress_tx.subscribe()
109    }
110
111    /// Drives the service and sends new blocks to the receiver.
112    ///
113    /// This loop:
114    /// 1. Checks if any build job has completed and processes results
115    /// 2. Receives and batches all immediately available flashblocks
116    /// 3. Attempts to build a block from the complete sequence
117    ///
118    /// Note: this should be spawned
119    pub async fn run(mut self, tx: watch::Sender<Option<PendingFlashBlock<N>>>) {
120        loop {
121            tokio::select! {
122                // Event 1: job exists, listen to job results
123                Some(result) = async {
124                    match self.job.as_mut() {
125                        Some((_, rx)) => rx.await.ok(),
126                        None => std::future::pending().await,
127                    }
128                } => {
129                    let (start_time, _) = self.job.take().unwrap();
130                    let _ = self.in_progress_tx.send(None);
131
132                    match result {
133                        Ok(Some((pending, cached_reads))) => {
134                            let parent_hash = pending.parent_hash();
135                            self.sequences
136                                .on_build_complete(parent_hash, Some((pending.clone(), cached_reads)));
137
138                            let elapsed = start_time.elapsed();
139                            self.metrics.execution_duration.record(elapsed.as_secs_f64());
140
141                            let _ = tx.send(Some(pending));
142                        }
143                        Ok(None) => {
144                            trace!(target: "flashblocks", "Build job returned None");
145                        }
146                        Err(err) => {
147                            warn!(target: "flashblocks", %err, "Build job failed");
148                        }
149                    }
150                }
151
152                // Event 2: New flashblock arrives (batch process all ready flashblocks)
153                result = self.incoming_flashblock_rx.next() => {
154                    match result {
155                        Some(Ok(flashblock)) => {
156                            // Process first flashblock
157                            self.process_flashblock(flashblock);
158
159                            // Batch process all other immediately available flashblocks
160                            while let Some(result) = self.incoming_flashblock_rx.next().now_or_never().flatten() {
161                                match result {
162                                    Ok(fb) => self.process_flashblock(fb),
163                                    Err(err) => warn!(target: "flashblocks", %err, "Error receiving flashblock"),
164                                }
165                            }
166
167                            self.try_start_build_job();
168                        }
169                        Some(Err(err)) => {
170                            warn!(target: "flashblocks", %err, "Error receiving flashblock");
171                        }
172                        None => {
173                            warn!(target: "flashblocks", "Flashblock stream ended");
174                            break;
175                        }
176                    }
177                }
178            }
179        }
180    }
181
182    /// Processes a single flashblock: notifies subscribers, records metrics, and inserts into
183    /// sequence.
184    fn process_flashblock(&mut self, flashblock: FlashBlock) {
185        self.notify_received_flashblock(&flashblock);
186
187        if flashblock.index == 0 {
188            self.metrics.last_flashblock_length.record(self.sequences.pending().count() as f64);
189        }
190
191        if let Err(err) = self.sequences.insert_flashblock(flashblock) {
192            trace!(target: "flashblocks", %err, "Failed to insert flashblock");
193        }
194    }
195
196    /// Notifies all subscribers about the received flashblock.
197    fn notify_received_flashblock(&self, flashblock: &FlashBlock) {
198        if self.received_flashblocks_tx.receiver_count() > 0 {
199            let _ = self.received_flashblocks_tx.send(Arc::new(flashblock.clone()));
200        }
201    }
202
203    /// Attempts to build a block if no job is currently running and a buildable sequence exists.
204    fn try_start_build_job(&mut self) {
205        if self.job.is_some() {
206            return; // Already building
207        }
208
209        let Some(latest) = self.builder.provider().latest_header().ok().flatten() else {
210            return;
211        };
212
213        let Some(args) = self.sequences.next_buildable_args(latest.hash(), latest.timestamp())
214        else {
215            return; // Nothing buildable
216        };
217
218        // Spawn build job
219        let fb_info = FlashBlockBuildInfo {
220            parent_hash: args.base.parent_hash,
221            index: args.last_flashblock_index,
222            block_number: args.base.block_number,
223        };
224        self.metrics.current_block_height.set(fb_info.block_number as f64);
225        self.metrics.current_index.set(fb_info.index as f64);
226        let _ = self.in_progress_tx.send(Some(fb_info));
227
228        let (tx, rx) = oneshot::channel();
229        let builder = self.builder.clone();
230        self.spawner.spawn_blocking(Box::pin(async move {
231            let _ = tx.send(builder.execute(args));
232        }));
233        self.job = Some((Instant::now(), rx));
234    }
235}
236
237/// Information for a flashblock currently built
238#[derive(Debug, Clone, Copy)]
239pub struct FlashBlockBuildInfo {
240    /// Parent block hash
241    pub parent_hash: B256,
242    /// Flashblock index within the current block's sequence
243    pub index: u64,
244    /// Block number of the flashblock being built.
245    pub block_number: u64,
246}
247
248type BuildJob<N> =
249    (Instant, oneshot::Receiver<eyre::Result<Option<(PendingFlashBlock<N>, CachedReads)>>>);
250
251#[derive(Metrics)]
252#[metrics(scope = "flashblock_service")]
253struct FlashBlockServiceMetrics {
254    /// The last complete length of flashblocks per block.
255    last_flashblock_length: Histogram,
256    /// The duration applying flashblock state changes in seconds.
257    execution_duration: Histogram,
258    /// Current block height.
259    current_block_height: Gauge,
260    /// Current flashblock index.
261    current_index: Gauge,
262}