reth_optimism_flashblocks/
cache.rs

1//! Sequence cache management for flashblocks.
2//!
3//! The `SequenceManager` maintains a ring buffer of recently completed flashblock sequences
4//! and intelligently selects which sequence to build based on the local chain tip.
5
6use crate::{
7    sequence::{FlashBlockPendingSequence, SequenceExecutionOutcome},
8    worker::BuildArgs,
9    FlashBlock, FlashBlockCompleteSequence, PendingFlashBlock,
10};
11use alloy_eips::eip2718::WithEncoded;
12use alloy_primitives::B256;
13use reth_primitives_traits::{NodePrimitives, Recovered, SignedTransaction};
14use reth_revm::cached::CachedReads;
15use ringbuffer::{AllocRingBuffer, RingBuffer};
16use tokio::sync::broadcast;
17use tracing::*;
18
19/// Maximum number of cached sequences in the ring buffer.
20const CACHE_SIZE: usize = 3;
21/// 200 ms flashblock time.
22pub(crate) const FLASHBLOCK_BLOCK_TIME: u64 = 200;
23
24/// Manages flashblock sequences with caching support.
25///
26/// This struct handles:
27/// - Tracking the current pending sequence
28/// - Caching completed sequences in a fixed-size ring buffer
29/// - Finding the best sequence to build based on local chain tip
30/// - Broadcasting completed sequences to subscribers
31#[derive(Debug)]
32pub(crate) struct SequenceManager<T: SignedTransaction> {
33    /// Current pending sequence being built up from incoming flashblocks
34    pending: FlashBlockPendingSequence,
35    /// Cached recovered transactions for the pending sequence
36    pending_transactions: Vec<WithEncoded<Recovered<T>>>,
37    /// Ring buffer of recently completed sequences bundled with their decoded transactions (FIFO,
38    /// size 3)
39    completed_cache: AllocRingBuffer<(FlashBlockCompleteSequence, Vec<WithEncoded<Recovered<T>>>)>,
40    /// Broadcast channel for completed sequences
41    block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
42    /// Whether to compute state roots when building blocks
43    compute_state_root: bool,
44}
45
46impl<T: SignedTransaction> SequenceManager<T> {
47    /// Creates a new sequence manager.
48    pub(crate) fn new(compute_state_root: bool) -> Self {
49        let (block_broadcaster, _) = broadcast::channel(128);
50        Self {
51            pending: FlashBlockPendingSequence::new(),
52            pending_transactions: Vec::new(),
53            completed_cache: AllocRingBuffer::new(CACHE_SIZE),
54            block_broadcaster,
55            compute_state_root,
56        }
57    }
58
59    /// Returns the sender half of the flashblock sequence broadcast channel.
60    pub(crate) const fn block_sequence_broadcaster(
61        &self,
62    ) -> &broadcast::Sender<FlashBlockCompleteSequence> {
63        &self.block_broadcaster
64    }
65
66    /// Gets a subscriber to the flashblock sequences produced.
67    pub(crate) fn subscribe_block_sequence(&self) -> crate::FlashBlockCompleteSequenceRx {
68        self.block_broadcaster.subscribe()
69    }
70
71    /// Inserts a new flashblock into the pending sequence.
72    ///
73    /// When a flashblock with index 0 arrives (indicating a new block), the current
74    /// pending sequence is finalized, cached, and broadcast immediately. If the sequence
75    /// is later built on top of local tip, `on_build_complete()` will broadcast again
76    /// with computed `state_root`.
77    ///
78    /// Transactions are recovered once and cached for reuse during block building.
79    pub(crate) fn insert_flashblock(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
80        // If this starts a new block, finalize and cache the previous sequence BEFORE inserting
81        if flashblock.index == 0 && self.pending.count() > 0 {
82            let completed = self.pending.finalize()?;
83            let block_number = completed.block_number();
84            let parent_hash = completed.payload_base().parent_hash;
85
86            trace!(
87                target: "flashblocks",
88                block_number,
89                %parent_hash,
90                cache_size = self.completed_cache.len(),
91                "Caching completed flashblock sequence"
92            );
93
94            // Broadcast immediately to consensus client (even without state_root)
95            // This ensures sequences are forwarded during catch-up even if not buildable on tip.
96            // ConsensusClient checks execution_outcome and skips newPayload if state_root is zero.
97            if self.block_broadcaster.receiver_count() > 0 {
98                let _ = self.block_broadcaster.send(completed.clone());
99            }
100
101            // Bundle completed sequence with its decoded transactions and push to cache
102            // Ring buffer automatically evicts oldest entry when full
103            let txs = std::mem::take(&mut self.pending_transactions);
104            self.completed_cache.push((completed, txs));
105
106            // ensure cache is wiped on new flashblock
107            let _ = self.pending.take_cached_reads();
108        }
109
110        self.pending_transactions
111            .extend(flashblock.recover_transactions().collect::<Result<Vec<_>, _>>()?);
112        self.pending.insert(flashblock);
113        Ok(())
114    }
115
116    /// Returns the current pending sequence for inspection.
117    pub(crate) const fn pending(&self) -> &FlashBlockPendingSequence {
118        &self.pending
119    }
120
121    /// Finds the next sequence to build and returns ready-to-use `BuildArgs`.
122    ///
123    /// Priority order:
124    /// 1. Current pending sequence (if parent matches local tip)
125    /// 2. Cached sequence with exact parent match
126    ///
127    /// Returns None if nothing is buildable right now.
128    pub(crate) fn next_buildable_args(
129        &mut self,
130        local_tip_hash: B256,
131        local_tip_timestamp: u64,
132    ) -> Option<BuildArgs<Vec<WithEncoded<Recovered<T>>>>> {
133        // Try to find a buildable sequence: (base, last_fb, transactions, cached_state,
134        // source_name)
135        let (base, last_flashblock, transactions, cached_state, source_name) =
136            // Priority 1: Try current pending sequence
137            if let Some(base) = self.pending.payload_base().filter(|b| b.parent_hash == local_tip_hash) {
138                let cached_state = self.pending.take_cached_reads().map(|r| (base.parent_hash, r));
139                let last_fb = self.pending.last_flashblock()?;
140                let transactions = self.pending_transactions.clone();
141                (base, last_fb, transactions, cached_state, "pending")
142            }
143            // Priority 2: Try cached sequence with exact parent match
144            else if let Some((cached, txs)) = self.completed_cache.iter().find(|(c, _)| c.payload_base().parent_hash == local_tip_hash) {
145                let base = cached.payload_base().clone();
146                let last_fb = cached.last();
147                let transactions = txs.clone();
148                let cached_state = None;
149                (base, last_fb, transactions, cached_state, "cached")
150            } else {
151                return None;
152            };
153
154        // Auto-detect when to compute state root: only if the builder didn't provide it (sent
155        // B256::ZERO) and we're near the expected final flashblock index.
156        //
157        // Background: Each block period receives multiple flashblocks at regular intervals.
158        // The sequencer sends an initial "base" flashblock at index 0 when a new block starts,
159        // then subsequent flashblocks are produced every FLASHBLOCK_BLOCK_TIME intervals (200ms).
160        //
161        // Examples with different block times:
162        // - Base (2s blocks):    expect 2000ms / 200ms = 10 intervals → Flashblocks: index 0 (base)
163        //   + indices 1-10 = potentially 11 total
164        //
165        // - Unichain (1s blocks): expect 1000ms / 200ms = 5 intervals → Flashblocks: index 0 (base)
166        //   + indices 1-5 = potentially 6 total
167        //
168        // Why compute at N-1 instead of N:
169        // 1. Timing variance in flashblock producing time may mean only N flashblocks were produced
170        //    instead of N+1 (missing the final one). Computing at N-1 ensures we get the state root
171        //    for most common cases.
172        //
173        // 2. The +1 case (index 0 base + N intervals): If all N+1 flashblocks do arrive, we'll
174        //    still calculate state root for flashblock N, which sacrifices a little performance but
175        //    still ensures correctness for common cases.
176        //
177        // Note: Pathological cases may result in fewer flashblocks than expected (e.g., builder
178        // downtime, flashblock execution exceeding timing budget). When this occurs, we won't
179        // compute the state root, causing FlashblockConsensusClient to lack precomputed state for
180        // engine_newPayload. This is safe: we still have op-node as backstop to maintain
181        // chain progression.
182        let block_time_ms = (base.timestamp - local_tip_timestamp) * 1000;
183        let expected_final_flashblock = block_time_ms / FLASHBLOCK_BLOCK_TIME;
184        let compute_state_root = self.compute_state_root &&
185            last_flashblock.diff.state_root.is_zero() &&
186            last_flashblock.index >= expected_final_flashblock.saturating_sub(1);
187
188        trace!(
189            target: "flashblocks",
190            block_number = base.block_number,
191            source = source_name,
192            flashblock_index = last_flashblock.index,
193            expected_final_flashblock,
194            compute_state_root_enabled = self.compute_state_root,
195            state_root_is_zero = last_flashblock.diff.state_root.is_zero(),
196            will_compute_state_root = compute_state_root,
197            "Building from flashblock sequence"
198        );
199
200        Some(BuildArgs {
201            base,
202            transactions,
203            cached_state,
204            last_flashblock_index: last_flashblock.index,
205            last_flashblock_hash: last_flashblock.diff.block_hash,
206            compute_state_root,
207        })
208    }
209
210    /// Records the result of building a sequence and re-broadcasts with execution outcome.
211    ///
212    /// Updates execution outcome and cached reads. For cached sequences (already broadcast
213    /// once during finalize), this broadcasts again with the computed `state_root`, allowing
214    /// the consensus client to submit via `engine_newPayload`.
215    pub(crate) fn on_build_complete<N: NodePrimitives>(
216        &mut self,
217        parent_hash: B256,
218        result: Option<(PendingFlashBlock<N>, CachedReads)>,
219    ) {
220        let Some((computed_block, cached_reads)) = result else {
221            return;
222        };
223
224        // Extract execution outcome
225        let execution_outcome = computed_block.computed_state_root().map(|state_root| {
226            SequenceExecutionOutcome { block_hash: computed_block.block().hash(), state_root }
227        });
228
229        // Update pending sequence with execution results
230        if self.pending.payload_base().is_some_and(|base| base.parent_hash == parent_hash) {
231            self.pending.set_execution_outcome(execution_outcome);
232            self.pending.set_cached_reads(cached_reads);
233            trace!(
234                target: "flashblocks",
235                block_number = self.pending.block_number(),
236                has_computed_state_root = execution_outcome.is_some(),
237                "Updated pending sequence with build results"
238            );
239        }
240        // Check if this completed sequence in cache and broadcast with execution outcome
241        else if let Some((cached, _)) = self
242            .completed_cache
243            .iter_mut()
244            .find(|(c, _)| c.payload_base().parent_hash == parent_hash)
245        {
246            // Only re-broadcast if we computed new information (state_root was missing).
247            // If sequencer already provided state_root, we already broadcast in insert_flashblock,
248            // so skip re-broadcast to avoid duplicate FCU calls.
249            let needs_rebroadcast =
250                execution_outcome.is_some() && cached.execution_outcome().is_none();
251
252            cached.set_execution_outcome(execution_outcome);
253
254            if needs_rebroadcast && self.block_broadcaster.receiver_count() > 0 {
255                trace!(
256                    target: "flashblocks",
257                    block_number = cached.block_number(),
258                    "Re-broadcasting sequence with computed state_root"
259                );
260                let _ = self.block_broadcaster.send(cached.clone());
261            }
262        }
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use crate::test_utils::TestFlashBlockFactory;
270    use alloy_primitives::B256;
271    use op_alloy_consensus::OpTxEnvelope;
272
273    #[test]
274    fn test_sequence_manager_new() {
275        let manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
276        assert_eq!(manager.pending().count(), 0);
277    }
278
279    #[test]
280    fn test_insert_flashblock_creates_pending_sequence() {
281        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
282        let factory = TestFlashBlockFactory::new();
283
284        let fb0 = factory.flashblock_at(0).build();
285        manager.insert_flashblock(fb0).unwrap();
286
287        assert_eq!(manager.pending().count(), 1);
288        assert_eq!(manager.pending().block_number(), Some(100));
289    }
290
291    #[test]
292    fn test_insert_flashblock_caches_completed_sequence() {
293        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
294        let factory = TestFlashBlockFactory::new();
295
296        // Build first sequence
297        let fb0 = factory.flashblock_at(0).build();
298        manager.insert_flashblock(fb0.clone()).unwrap();
299
300        let fb1 = factory.flashblock_after(&fb0).build();
301        manager.insert_flashblock(fb1).unwrap();
302
303        // Insert new base (index 0) which should finalize and cache previous sequence
304        let fb2 = factory.flashblock_for_next_block(&fb0).build();
305        manager.insert_flashblock(fb2).unwrap();
306
307        // New sequence should be pending
308        assert_eq!(manager.pending().count(), 1);
309        assert_eq!(manager.pending().block_number(), Some(101));
310        assert_eq!(manager.completed_cache.len(), 1);
311        let (cached_sequence, _txs) = manager.completed_cache.get(0).unwrap();
312        assert_eq!(cached_sequence.block_number(), 100);
313    }
314
315    #[test]
316    fn test_next_buildable_args_returns_none_when_empty() {
317        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
318        let local_tip_hash = B256::random();
319        let local_tip_timestamp = 1000;
320
321        let args = manager.next_buildable_args(local_tip_hash, local_tip_timestamp);
322        assert!(args.is_none());
323    }
324
325    #[test]
326    fn test_next_buildable_args_matches_pending_parent() {
327        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
328        let factory = TestFlashBlockFactory::new();
329
330        let fb0 = factory.flashblock_at(0).build();
331        let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
332        manager.insert_flashblock(fb0).unwrap();
333
334        let args = manager.next_buildable_args(parent_hash, 1000000);
335        assert!(args.is_some());
336
337        let build_args = args.unwrap();
338        assert_eq!(build_args.last_flashblock_index, 0);
339    }
340
341    #[test]
342    fn test_next_buildable_args_returns_none_when_parent_mismatch() {
343        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
344        let factory = TestFlashBlockFactory::new();
345
346        let fb0 = factory.flashblock_at(0).build();
347        manager.insert_flashblock(fb0).unwrap();
348
349        // Use different parent hash
350        let wrong_parent = B256::random();
351        let args = manager.next_buildable_args(wrong_parent, 1000000);
352        assert!(args.is_none());
353    }
354
355    #[test]
356    fn test_next_buildable_args_prefers_pending_over_cached() {
357        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
358        let factory = TestFlashBlockFactory::new();
359
360        // Create and finalize first sequence
361        let fb0 = factory.flashblock_at(0).build();
362        manager.insert_flashblock(fb0.clone()).unwrap();
363
364        // Create new sequence (finalizes previous)
365        let fb1 = factory.flashblock_for_next_block(&fb0).build();
366        let parent_hash = fb1.base.as_ref().unwrap().parent_hash;
367        manager.insert_flashblock(fb1).unwrap();
368
369        // Request with first sequence's parent (should find cached)
370        let args = manager.next_buildable_args(parent_hash, 1000000);
371        assert!(args.is_some());
372    }
373
374    #[test]
375    fn test_next_buildable_args_finds_cached_sequence() {
376        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
377        let factory = TestFlashBlockFactory::new();
378
379        // Build and cache first sequence
380        let fb0 = factory.flashblock_at(0).build();
381        let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
382        manager.insert_flashblock(fb0.clone()).unwrap();
383
384        // Start new sequence to finalize first
385        let fb1 = factory.flashblock_for_next_block(&fb0).build();
386        manager.insert_flashblock(fb1.clone()).unwrap();
387
388        // Clear pending by starting another sequence
389        let fb2 = factory.flashblock_for_next_block(&fb1).build();
390        manager.insert_flashblock(fb2).unwrap();
391
392        // Request first sequence's parent - should find in cache
393        let args = manager.next_buildable_args(parent_hash, 1000000);
394        assert!(args.is_some());
395    }
396
397    #[test]
398    fn test_compute_state_root_logic_near_expected_final() {
399        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
400        let block_time = 2u64;
401        let factory = TestFlashBlockFactory::new().with_block_time(block_time);
402
403        // Create sequence with zero state root (needs computation)
404        let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
405        let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
406        let base_timestamp = fb0.base.as_ref().unwrap().timestamp;
407        manager.insert_flashblock(fb0.clone()).unwrap();
408
409        // Add flashblocks up to expected final index (2000ms / 200ms = 10)
410        for i in 1..=9 {
411            let fb = factory.flashblock_after(&fb0).index(i).state_root(B256::ZERO).build();
412            manager.insert_flashblock(fb).unwrap();
413        }
414
415        // Request with proper timing - should compute state root for index 9
416        let args = manager.next_buildable_args(parent_hash, base_timestamp - block_time);
417        assert!(args.is_some());
418        assert!(args.unwrap().compute_state_root);
419    }
420
421    #[test]
422    fn test_no_compute_state_root_when_provided_by_sequencer() {
423        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
424        let block_time = 2u64;
425        let factory = TestFlashBlockFactory::new().with_block_time(block_time);
426
427        // Create sequence with non-zero state root (provided by sequencer)
428        let fb0 = factory.flashblock_at(0).state_root(B256::random()).build();
429        let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
430        let base_timestamp = fb0.base.as_ref().unwrap().timestamp;
431        manager.insert_flashblock(fb0).unwrap();
432
433        let args = manager.next_buildable_args(parent_hash, base_timestamp - block_time);
434        assert!(args.is_some());
435        assert!(!args.unwrap().compute_state_root);
436    }
437
438    #[test]
439    fn test_no_compute_state_root_when_disabled() {
440        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(false);
441        let block_time = 2u64;
442        let factory = TestFlashBlockFactory::new().with_block_time(block_time);
443
444        // Create sequence with zero state root (needs computation)
445        let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
446        let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
447        let base_timestamp = fb0.base.as_ref().unwrap().timestamp;
448        manager.insert_flashblock(fb0.clone()).unwrap();
449
450        // Add flashblocks up to expected final index (2000ms / 200ms = 10)
451        for i in 1..=9 {
452            let fb = factory.flashblock_after(&fb0).index(i).state_root(B256::ZERO).build();
453            manager.insert_flashblock(fb).unwrap();
454        }
455
456        // Request with proper timing - should compute state root for index 9
457        let args = manager.next_buildable_args(parent_hash, base_timestamp - block_time);
458        assert!(args.is_some());
459        assert!(!args.unwrap().compute_state_root);
460    }
461
462    #[test]
463    fn test_cache_ring_buffer_evicts_oldest() {
464        let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
465        let factory = TestFlashBlockFactory::new();
466
467        // Fill cache with 4 sequences (cache size is 3, so oldest should be evicted)
468        let mut last_fb = factory.flashblock_at(0).build();
469        manager.insert_flashblock(last_fb.clone()).unwrap();
470
471        for _ in 0..3 {
472            last_fb = factory.flashblock_for_next_block(&last_fb).build();
473            manager.insert_flashblock(last_fb.clone()).unwrap();
474        }
475
476        // The first sequence should have been evicted, so we can't build it
477        let first_parent = factory.flashblock_at(0).build().base.unwrap().parent_hash;
478        let args = manager.next_buildable_args(first_parent, 1000000);
479        // Should not find it (evicted from ring buffer)
480        assert!(args.is_none());
481    }
482}