reth_optimism_flashblocks/
cache.rs1use crate::{
7 sequence::{FlashBlockPendingSequence, SequenceExecutionOutcome},
8 worker::BuildArgs,
9 FlashBlock, FlashBlockCompleteSequence, PendingFlashBlock,
10};
11use alloy_eips::eip2718::WithEncoded;
12use alloy_primitives::B256;
13use reth_primitives_traits::{NodePrimitives, Recovered, SignedTransaction};
14use reth_revm::cached::CachedReads;
15use ringbuffer::{AllocRingBuffer, RingBuffer};
16use tokio::sync::broadcast;
17use tracing::*;
18
19const CACHE_SIZE: usize = 3;
21pub(crate) const FLASHBLOCK_BLOCK_TIME: u64 = 200;
23
24#[derive(Debug)]
32pub(crate) struct SequenceManager<T: SignedTransaction> {
33 pending: FlashBlockPendingSequence,
35 pending_transactions: Vec<WithEncoded<Recovered<T>>>,
37 completed_cache: AllocRingBuffer<(FlashBlockCompleteSequence, Vec<WithEncoded<Recovered<T>>>)>,
40 block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
42 compute_state_root: bool,
44}
45
46impl<T: SignedTransaction> SequenceManager<T> {
47 pub(crate) fn new(compute_state_root: bool) -> Self {
49 let (block_broadcaster, _) = broadcast::channel(128);
50 Self {
51 pending: FlashBlockPendingSequence::new(),
52 pending_transactions: Vec::new(),
53 completed_cache: AllocRingBuffer::new(CACHE_SIZE),
54 block_broadcaster,
55 compute_state_root,
56 }
57 }
58
59 pub(crate) const fn block_sequence_broadcaster(
61 &self,
62 ) -> &broadcast::Sender<FlashBlockCompleteSequence> {
63 &self.block_broadcaster
64 }
65
66 pub(crate) fn subscribe_block_sequence(&self) -> crate::FlashBlockCompleteSequenceRx {
68 self.block_broadcaster.subscribe()
69 }
70
71 pub(crate) fn insert_flashblock(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
80 if flashblock.index == 0 && self.pending.count() > 0 {
82 let completed = self.pending.finalize()?;
83 let block_number = completed.block_number();
84 let parent_hash = completed.payload_base().parent_hash;
85
86 trace!(
87 target: "flashblocks",
88 block_number,
89 %parent_hash,
90 cache_size = self.completed_cache.len(),
91 "Caching completed flashblock sequence"
92 );
93
94 if self.block_broadcaster.receiver_count() > 0 {
98 let _ = self.block_broadcaster.send(completed.clone());
99 }
100
101 let txs = std::mem::take(&mut self.pending_transactions);
104 self.completed_cache.push((completed, txs));
105
106 let _ = self.pending.take_cached_reads();
108 }
109
110 self.pending_transactions
111 .extend(flashblock.recover_transactions().collect::<Result<Vec<_>, _>>()?);
112 self.pending.insert(flashblock);
113 Ok(())
114 }
115
116 pub(crate) const fn pending(&self) -> &FlashBlockPendingSequence {
118 &self.pending
119 }
120
121 pub(crate) fn next_buildable_args(
129 &mut self,
130 local_tip_hash: B256,
131 local_tip_timestamp: u64,
132 ) -> Option<BuildArgs<Vec<WithEncoded<Recovered<T>>>>> {
133 let (base, last_flashblock, transactions, cached_state, source_name) =
136 if let Some(base) = self.pending.payload_base().filter(|b| b.parent_hash == local_tip_hash) {
138 let cached_state = self.pending.take_cached_reads().map(|r| (base.parent_hash, r));
139 let last_fb = self.pending.last_flashblock()?;
140 let transactions = self.pending_transactions.clone();
141 (base, last_fb, transactions, cached_state, "pending")
142 }
143 else if let Some((cached, txs)) = self.completed_cache.iter().find(|(c, _)| c.payload_base().parent_hash == local_tip_hash) {
145 let base = cached.payload_base().clone();
146 let last_fb = cached.last();
147 let transactions = txs.clone();
148 let cached_state = None;
149 (base, last_fb, transactions, cached_state, "cached")
150 } else {
151 return None;
152 };
153
154 let block_time_ms = (base.timestamp - local_tip_timestamp) * 1000;
183 let expected_final_flashblock = block_time_ms / FLASHBLOCK_BLOCK_TIME;
184 let compute_state_root = self.compute_state_root &&
185 last_flashblock.diff.state_root.is_zero() &&
186 last_flashblock.index >= expected_final_flashblock.saturating_sub(1);
187
188 trace!(
189 target: "flashblocks",
190 block_number = base.block_number,
191 source = source_name,
192 flashblock_index = last_flashblock.index,
193 expected_final_flashblock,
194 compute_state_root_enabled = self.compute_state_root,
195 state_root_is_zero = last_flashblock.diff.state_root.is_zero(),
196 will_compute_state_root = compute_state_root,
197 "Building from flashblock sequence"
198 );
199
200 Some(BuildArgs {
201 base,
202 transactions,
203 cached_state,
204 last_flashblock_index: last_flashblock.index,
205 last_flashblock_hash: last_flashblock.diff.block_hash,
206 compute_state_root,
207 })
208 }
209
210 pub(crate) fn on_build_complete<N: NodePrimitives>(
216 &mut self,
217 parent_hash: B256,
218 result: Option<(PendingFlashBlock<N>, CachedReads)>,
219 ) {
220 let Some((computed_block, cached_reads)) = result else {
221 return;
222 };
223
224 let execution_outcome = computed_block.computed_state_root().map(|state_root| {
226 SequenceExecutionOutcome { block_hash: computed_block.block().hash(), state_root }
227 });
228
229 if self.pending.payload_base().is_some_and(|base| base.parent_hash == parent_hash) {
231 self.pending.set_execution_outcome(execution_outcome);
232 self.pending.set_cached_reads(cached_reads);
233 trace!(
234 target: "flashblocks",
235 block_number = self.pending.block_number(),
236 has_computed_state_root = execution_outcome.is_some(),
237 "Updated pending sequence with build results"
238 );
239 }
240 else if let Some((cached, _)) = self
242 .completed_cache
243 .iter_mut()
244 .find(|(c, _)| c.payload_base().parent_hash == parent_hash)
245 {
246 let needs_rebroadcast =
250 execution_outcome.is_some() && cached.execution_outcome().is_none();
251
252 cached.set_execution_outcome(execution_outcome);
253
254 if needs_rebroadcast && self.block_broadcaster.receiver_count() > 0 {
255 trace!(
256 target: "flashblocks",
257 block_number = cached.block_number(),
258 "Re-broadcasting sequence with computed state_root"
259 );
260 let _ = self.block_broadcaster.send(cached.clone());
261 }
262 }
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::test_utils::TestFlashBlockFactory;
270 use alloy_primitives::B256;
271 use op_alloy_consensus::OpTxEnvelope;
272
273 #[test]
274 fn test_sequence_manager_new() {
275 let manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
276 assert_eq!(manager.pending().count(), 0);
277 }
278
279 #[test]
280 fn test_insert_flashblock_creates_pending_sequence() {
281 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
282 let factory = TestFlashBlockFactory::new();
283
284 let fb0 = factory.flashblock_at(0).build();
285 manager.insert_flashblock(fb0).unwrap();
286
287 assert_eq!(manager.pending().count(), 1);
288 assert_eq!(manager.pending().block_number(), Some(100));
289 }
290
291 #[test]
292 fn test_insert_flashblock_caches_completed_sequence() {
293 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
294 let factory = TestFlashBlockFactory::new();
295
296 let fb0 = factory.flashblock_at(0).build();
298 manager.insert_flashblock(fb0.clone()).unwrap();
299
300 let fb1 = factory.flashblock_after(&fb0).build();
301 manager.insert_flashblock(fb1).unwrap();
302
303 let fb2 = factory.flashblock_for_next_block(&fb0).build();
305 manager.insert_flashblock(fb2).unwrap();
306
307 assert_eq!(manager.pending().count(), 1);
309 assert_eq!(manager.pending().block_number(), Some(101));
310 assert_eq!(manager.completed_cache.len(), 1);
311 let (cached_sequence, _txs) = manager.completed_cache.get(0).unwrap();
312 assert_eq!(cached_sequence.block_number(), 100);
313 }
314
315 #[test]
316 fn test_next_buildable_args_returns_none_when_empty() {
317 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
318 let local_tip_hash = B256::random();
319 let local_tip_timestamp = 1000;
320
321 let args = manager.next_buildable_args(local_tip_hash, local_tip_timestamp);
322 assert!(args.is_none());
323 }
324
325 #[test]
326 fn test_next_buildable_args_matches_pending_parent() {
327 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
328 let factory = TestFlashBlockFactory::new();
329
330 let fb0 = factory.flashblock_at(0).build();
331 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
332 manager.insert_flashblock(fb0).unwrap();
333
334 let args = manager.next_buildable_args(parent_hash, 1000000);
335 assert!(args.is_some());
336
337 let build_args = args.unwrap();
338 assert_eq!(build_args.last_flashblock_index, 0);
339 }
340
341 #[test]
342 fn test_next_buildable_args_returns_none_when_parent_mismatch() {
343 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
344 let factory = TestFlashBlockFactory::new();
345
346 let fb0 = factory.flashblock_at(0).build();
347 manager.insert_flashblock(fb0).unwrap();
348
349 let wrong_parent = B256::random();
351 let args = manager.next_buildable_args(wrong_parent, 1000000);
352 assert!(args.is_none());
353 }
354
355 #[test]
356 fn test_next_buildable_args_prefers_pending_over_cached() {
357 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
358 let factory = TestFlashBlockFactory::new();
359
360 let fb0 = factory.flashblock_at(0).build();
362 manager.insert_flashblock(fb0.clone()).unwrap();
363
364 let fb1 = factory.flashblock_for_next_block(&fb0).build();
366 let parent_hash = fb1.base.as_ref().unwrap().parent_hash;
367 manager.insert_flashblock(fb1).unwrap();
368
369 let args = manager.next_buildable_args(parent_hash, 1000000);
371 assert!(args.is_some());
372 }
373
374 #[test]
375 fn test_next_buildable_args_finds_cached_sequence() {
376 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
377 let factory = TestFlashBlockFactory::new();
378
379 let fb0 = factory.flashblock_at(0).build();
381 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
382 manager.insert_flashblock(fb0.clone()).unwrap();
383
384 let fb1 = factory.flashblock_for_next_block(&fb0).build();
386 manager.insert_flashblock(fb1.clone()).unwrap();
387
388 let fb2 = factory.flashblock_for_next_block(&fb1).build();
390 manager.insert_flashblock(fb2).unwrap();
391
392 let args = manager.next_buildable_args(parent_hash, 1000000);
394 assert!(args.is_some());
395 }
396
397 #[test]
398 fn test_compute_state_root_logic_near_expected_final() {
399 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
400 let block_time = 2u64;
401 let factory = TestFlashBlockFactory::new().with_block_time(block_time);
402
403 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
405 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
406 let base_timestamp = fb0.base.as_ref().unwrap().timestamp;
407 manager.insert_flashblock(fb0.clone()).unwrap();
408
409 for i in 1..=9 {
411 let fb = factory.flashblock_after(&fb0).index(i).state_root(B256::ZERO).build();
412 manager.insert_flashblock(fb).unwrap();
413 }
414
415 let args = manager.next_buildable_args(parent_hash, base_timestamp - block_time);
417 assert!(args.is_some());
418 assert!(args.unwrap().compute_state_root);
419 }
420
421 #[test]
422 fn test_no_compute_state_root_when_provided_by_sequencer() {
423 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
424 let block_time = 2u64;
425 let factory = TestFlashBlockFactory::new().with_block_time(block_time);
426
427 let fb0 = factory.flashblock_at(0).state_root(B256::random()).build();
429 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
430 let base_timestamp = fb0.base.as_ref().unwrap().timestamp;
431 manager.insert_flashblock(fb0).unwrap();
432
433 let args = manager.next_buildable_args(parent_hash, base_timestamp - block_time);
434 assert!(args.is_some());
435 assert!(!args.unwrap().compute_state_root);
436 }
437
438 #[test]
439 fn test_no_compute_state_root_when_disabled() {
440 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(false);
441 let block_time = 2u64;
442 let factory = TestFlashBlockFactory::new().with_block_time(block_time);
443
444 let fb0 = factory.flashblock_at(0).state_root(B256::ZERO).build();
446 let parent_hash = fb0.base.as_ref().unwrap().parent_hash;
447 let base_timestamp = fb0.base.as_ref().unwrap().timestamp;
448 manager.insert_flashblock(fb0.clone()).unwrap();
449
450 for i in 1..=9 {
452 let fb = factory.flashblock_after(&fb0).index(i).state_root(B256::ZERO).build();
453 manager.insert_flashblock(fb).unwrap();
454 }
455
456 let args = manager.next_buildable_args(parent_hash, base_timestamp - block_time);
458 assert!(args.is_some());
459 assert!(!args.unwrap().compute_state_root);
460 }
461
462 #[test]
463 fn test_cache_ring_buffer_evicts_oldest() {
464 let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
465 let factory = TestFlashBlockFactory::new();
466
467 let mut last_fb = factory.flashblock_at(0).build();
469 manager.insert_flashblock(last_fb.clone()).unwrap();
470
471 for _ in 0..3 {
472 last_fb = factory.flashblock_for_next_block(&last_fb).build();
473 manager.insert_flashblock(last_fb.clone()).unwrap();
474 }
475
476 let first_parent = factory.flashblock_at(0).build().base.unwrap().parent_hash;
478 let args = manager.next_buildable_args(first_parent, 1000000);
479 assert!(args.is_none());
481 }
482}