reth_optimism_flashblocks/
sequence.rs1use crate::{FlashBlock, FlashBlockCompleteSequenceRx};
2use alloy_primitives::{Bytes, B256};
3use alloy_rpc_types_engine::PayloadId;
4use core::mem;
5use eyre::{bail, OptionExt};
6use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
7use reth_revm::cached::CachedReads;
8use std::{collections::BTreeMap, ops::Deref};
9use tokio::sync::broadcast;
10use tracing::*;
11
12const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct SequenceExecutionOutcome {
18 pub block_hash: B256,
20 pub state_root: B256,
22}
23
24#[derive(Debug)]
26pub struct FlashBlockPendingSequence {
27 inner: BTreeMap<u64, FlashBlock>,
29 block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
31 execution_outcome: Option<SequenceExecutionOutcome>,
33 cached_reads: Option<CachedReads>,
38}
39
40impl FlashBlockPendingSequence {
41 pub fn new() -> Self {
43 let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
46 Self {
47 inner: BTreeMap::new(),
48 block_broadcaster: tx,
49 execution_outcome: None,
50 cached_reads: None,
51 }
52 }
53
54 pub const fn block_sequence_broadcaster(
56 &self,
57 ) -> &broadcast::Sender<FlashBlockCompleteSequence> {
58 &self.block_broadcaster
59 }
60
61 pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
63 self.block_broadcaster.subscribe()
64 }
65
66 pub fn insert(&mut self, flashblock: FlashBlock) {
70 if flashblock.index == 0 {
71 trace!(target: "flashblocks", number=%flashblock.block_number(), "Tracking new flashblock sequence");
72 self.inner.insert(flashblock.index, flashblock);
73 return;
74 }
75
76 let same_block = self.block_number() == Some(flashblock.block_number());
79 let same_payload = self.payload_id() == Some(flashblock.payload_id);
80
81 if same_block && same_payload {
82 trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len() ,"Received followup flashblock");
83 self.inner.insert(flashblock.index, flashblock);
84 } else {
85 trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number() ,"Ignoring untracked flashblock following");
86 }
87 }
88
89 pub const fn set_execution_outcome(
91 &mut self,
92 execution_outcome: Option<SequenceExecutionOutcome>,
93 ) {
94 self.execution_outcome = execution_outcome;
95 }
96
97 pub fn set_cached_reads(&mut self, cached_reads: CachedReads) {
99 self.cached_reads = Some(cached_reads);
100 }
101
102 pub const fn take_cached_reads(&mut self) -> Option<CachedReads> {
104 self.cached_reads.take()
105 }
106
107 pub fn block_number(&self) -> Option<u64> {
109 Some(self.inner.values().next()?.block_number())
110 }
111
112 pub fn payload_base(&self) -> Option<OpFlashblockPayloadBase> {
114 self.inner.values().next()?.base.clone()
115 }
116
117 pub fn count(&self) -> usize {
119 self.inner.len()
120 }
121
122 pub fn last_flashblock(&self) -> Option<&FlashBlock> {
124 self.inner.last_key_value().map(|(_, b)| b)
125 }
126
127 pub fn index(&self) -> Option<u64> {
129 Some(self.inner.values().last()?.index)
130 }
131 pub fn payload_id(&self) -> Option<PayloadId> {
133 Some(self.inner.values().next()?.payload_id)
134 }
135
136 pub fn finalize(&mut self) -> eyre::Result<FlashBlockCompleteSequence> {
140 if self.inner.is_empty() {
141 bail!("Cannot finalize empty flashblock sequence");
142 }
143
144 let flashblocks = mem::take(&mut self.inner);
145 let execution_outcome = mem::take(&mut self.execution_outcome);
146 self.cached_reads = None;
147
148 FlashBlockCompleteSequence::new(flashblocks.into_values().collect(), execution_outcome)
149 }
150
151 pub fn flashblocks(&self) -> impl Iterator<Item = &FlashBlock> {
153 self.inner.values()
154 }
155}
156
157impl Default for FlashBlockPendingSequence {
158 fn default() -> Self {
159 Self::new()
160 }
161}
162
163#[derive(Debug, Clone)]
169pub struct FlashBlockCompleteSequence {
170 inner: Vec<FlashBlock>,
171 execution_outcome: Option<SequenceExecutionOutcome>,
173}
174
175impl FlashBlockCompleteSequence {
176 pub fn new(
182 blocks: Vec<FlashBlock>,
183 execution_outcome: Option<SequenceExecutionOutcome>,
184 ) -> eyre::Result<Self> {
185 let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
186
187 first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?;
189
190 if !blocks.iter().enumerate().all(|(idx, block)| {
192 idx == block.index as usize &&
193 block.payload_id == first_block.payload_id &&
194 block.block_number() == first_block.block_number()
195 }) {
196 bail!("Flashblock inconsistencies detected in sequence");
197 }
198
199 Ok(Self { inner: blocks, execution_outcome })
200 }
201
202 pub fn block_number(&self) -> u64 {
204 self.inner.first().unwrap().block_number()
205 }
206
207 pub fn payload_base(&self) -> &OpFlashblockPayloadBase {
209 self.inner.first().unwrap().base.as_ref().unwrap()
210 }
211
212 pub const fn count(&self) -> usize {
214 self.inner.len()
215 }
216
217 pub fn last(&self) -> &FlashBlock {
219 self.inner.last().unwrap()
220 }
221
222 pub const fn execution_outcome(&self) -> Option<SequenceExecutionOutcome> {
224 self.execution_outcome
225 }
226
227 pub const fn set_execution_outcome(
229 &mut self,
230 execution_outcome: Option<SequenceExecutionOutcome>,
231 ) {
232 self.execution_outcome = execution_outcome;
233 }
234
235 pub fn all_transactions(&self) -> Vec<Bytes> {
237 self.inner.iter().flat_map(|fb| fb.diff.transactions.iter().cloned()).collect()
238 }
239}
240
241impl Deref for FlashBlockCompleteSequence {
242 type Target = Vec<FlashBlock>;
243
244 fn deref(&self) -> &Self::Target {
245 &self.inner
246 }
247}
248
249impl TryFrom<FlashBlockPendingSequence> for FlashBlockCompleteSequence {
250 type Error = eyre::Error;
251 fn try_from(sequence: FlashBlockPendingSequence) -> Result<Self, Self::Error> {
252 Self::new(sequence.inner.into_values().collect(), sequence.execution_outcome)
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::test_utils::TestFlashBlockFactory;
260
261 mod pending_sequence_insert {
262 use super::*;
263
264 #[test]
265 fn test_insert_index_zero_creates_new_sequence() {
266 let mut sequence = FlashBlockPendingSequence::new();
267 let factory = TestFlashBlockFactory::new();
268 let fb0 = factory.flashblock_at(0).build();
269 let payload_id = fb0.payload_id;
270
271 sequence.insert(fb0);
272
273 assert_eq!(sequence.count(), 1);
274 assert_eq!(sequence.block_number(), Some(100));
275 assert_eq!(sequence.payload_id(), Some(payload_id));
276 }
277
278 #[test]
279 fn test_insert_followup_same_block_and_payload() {
280 let mut sequence = FlashBlockPendingSequence::new();
281 let factory = TestFlashBlockFactory::new();
282
283 let fb0 = factory.flashblock_at(0).build();
284 sequence.insert(fb0.clone());
285
286 let fb1 = factory.flashblock_after(&fb0).build();
287 sequence.insert(fb1.clone());
288
289 let fb2 = factory.flashblock_after(&fb1).build();
290 sequence.insert(fb2);
291
292 assert_eq!(sequence.count(), 3);
293 assert_eq!(sequence.index(), Some(2));
294 }
295
296 #[test]
297 fn test_insert_ignores_different_block_number() {
298 let mut sequence = FlashBlockPendingSequence::new();
299 let factory = TestFlashBlockFactory::new();
300
301 let fb0 = factory.flashblock_at(0).build();
302 sequence.insert(fb0.clone());
303
304 let fb1 = factory.flashblock_after(&fb0).block_number(101).build();
306 sequence.insert(fb1);
307
308 assert_eq!(sequence.count(), 1);
309 assert_eq!(sequence.block_number(), Some(100));
310 }
311
312 #[test]
313 fn test_insert_ignores_different_payload_id() {
314 let mut sequence = FlashBlockPendingSequence::new();
315 let factory = TestFlashBlockFactory::new();
316
317 let fb0 = factory.flashblock_at(0).build();
318 let payload_id1 = fb0.payload_id;
319 sequence.insert(fb0.clone());
320
321 let payload_id2 = alloy_rpc_types_engine::PayloadId::new([2u8; 8]);
323 let fb1 = factory.flashblock_after(&fb0).payload_id(payload_id2).build();
324 sequence.insert(fb1);
325
326 assert_eq!(sequence.count(), 1);
327 assert_eq!(sequence.payload_id(), Some(payload_id1));
328 }
329
330 #[test]
331 fn test_insert_maintains_btree_order() {
332 let mut sequence = FlashBlockPendingSequence::new();
333 let factory = TestFlashBlockFactory::new();
334
335 let fb0 = factory.flashblock_at(0).build();
336 sequence.insert(fb0.clone());
337
338 let fb2 = factory.flashblock_after(&fb0).index(2).build();
339 sequence.insert(fb2);
340
341 let fb1 = factory.flashblock_after(&fb0).build();
342 sequence.insert(fb1);
343
344 let indices: Vec<u64> = sequence.flashblocks().map(|fb| fb.index).collect();
345 assert_eq!(indices, vec![0, 1, 2]);
346 }
347 }
348
349 mod pending_sequence_finalize {
350 use super::*;
351
352 #[test]
353 fn test_finalize_empty_sequence_fails() {
354 let mut sequence = FlashBlockPendingSequence::new();
355 let result = sequence.finalize();
356
357 assert!(result.is_err());
358 assert_eq!(
359 result.unwrap_err().to_string(),
360 "Cannot finalize empty flashblock sequence"
361 );
362 }
363
364 #[test]
365 fn test_finalize_clears_pending_state() {
366 let mut sequence = FlashBlockPendingSequence::new();
367 let factory = TestFlashBlockFactory::new();
368
369 let fb0 = factory.flashblock_at(0).build();
370 sequence.insert(fb0);
371
372 assert_eq!(sequence.count(), 1);
373
374 let _complete = sequence.finalize().unwrap();
375
376 assert_eq!(sequence.count(), 0);
378 assert_eq!(sequence.block_number(), None);
379 }
380
381 #[test]
382 fn test_finalize_preserves_execution_outcome() {
383 let mut sequence = FlashBlockPendingSequence::new();
384 let factory = TestFlashBlockFactory::new();
385
386 let fb0 = factory.flashblock_at(0).build();
387 sequence.insert(fb0);
388
389 let outcome =
390 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
391 sequence.set_execution_outcome(Some(outcome));
392
393 let complete = sequence.finalize().unwrap();
394
395 assert_eq!(complete.execution_outcome(), Some(outcome));
396 }
397
398 #[test]
399 fn test_finalize_clears_cached_reads() {
400 let mut sequence = FlashBlockPendingSequence::new();
401 let factory = TestFlashBlockFactory::new();
402
403 let fb0 = factory.flashblock_at(0).build();
404 sequence.insert(fb0);
405
406 let cached_reads = CachedReads::default();
407 sequence.set_cached_reads(cached_reads);
408 assert!(sequence.take_cached_reads().is_some());
409
410 let _complete = sequence.finalize().unwrap();
411
412 assert!(sequence.take_cached_reads().is_none());
414 }
415
416 #[test]
417 fn test_finalize_multiple_times_after_refill() {
418 let mut sequence = FlashBlockPendingSequence::new();
419 let factory = TestFlashBlockFactory::new();
420
421 let fb0 = factory.flashblock_at(0).build();
423 sequence.insert(fb0);
424
425 let complete1 = sequence.finalize().unwrap();
426 assert_eq!(complete1.count(), 1);
427
428 let fb1 = factory.flashblock_for_next_block(&complete1.last().clone()).build();
430 sequence.insert(fb1);
431
432 let complete2 = sequence.finalize().unwrap();
433 assert_eq!(complete2.count(), 1);
434 assert_eq!(complete2.block_number(), 101);
435 }
436 }
437
438 mod complete_sequence_invariants {
439 use super::*;
440
441 #[test]
442 fn test_new_empty_sequence_fails() {
443 let result = FlashBlockCompleteSequence::new(vec![], None);
444 assert!(result.is_err());
445 assert_eq!(result.unwrap_err().to_string(), "No flashblocks in sequence");
446 }
447
448 #[test]
449 fn test_new_requires_base_at_index_zero() {
450 let factory = TestFlashBlockFactory::new();
451 let mut fb0_no_base = factory.flashblock_at(1).build();
454 fb0_no_base.index = 0;
455 fb0_no_base.base = None;
456
457 let result = FlashBlockCompleteSequence::new(vec![fb0_no_base], None);
458 assert!(result.is_err());
459 assert_eq!(result.unwrap_err().to_string(), "Flashblock at index 0 has no base");
460 }
461
462 #[test]
463 fn test_new_validates_successive_indices() {
464 let factory = TestFlashBlockFactory::new();
465
466 let fb0 = factory.flashblock_at(0).build();
467 let fb2 = factory.flashblock_after(&fb0).index(2).build();
469
470 let result = FlashBlockCompleteSequence::new(vec![fb0, fb2], None);
471 assert!(result.is_err());
472 assert_eq!(
473 result.unwrap_err().to_string(),
474 "Flashblock inconsistencies detected in sequence"
475 );
476 }
477
478 #[test]
479 fn test_new_validates_same_block_number() {
480 let factory = TestFlashBlockFactory::new();
481
482 let fb0 = factory.flashblock_at(0).build();
483 let fb1 = factory.flashblock_after(&fb0).block_number(101).build();
484
485 let result = FlashBlockCompleteSequence::new(vec![fb0, fb1], None);
486 assert!(result.is_err());
487 assert_eq!(
488 result.unwrap_err().to_string(),
489 "Flashblock inconsistencies detected in sequence"
490 );
491 }
492
493 #[test]
494 fn test_new_validates_same_payload_id() {
495 let factory = TestFlashBlockFactory::new();
496
497 let fb0 = factory.flashblock_at(0).build();
498 let payload_id2 = alloy_rpc_types_engine::PayloadId::new([2u8; 8]);
499 let fb1 = factory.flashblock_after(&fb0).payload_id(payload_id2).build();
500
501 let result = FlashBlockCompleteSequence::new(vec![fb0, fb1], None);
502 assert!(result.is_err());
503 assert_eq!(
504 result.unwrap_err().to_string(),
505 "Flashblock inconsistencies detected in sequence"
506 );
507 }
508
509 #[test]
510 fn test_new_valid_single_flashblock() {
511 let factory = TestFlashBlockFactory::new();
512 let fb0 = factory.flashblock_at(0).build();
513
514 let result = FlashBlockCompleteSequence::new(vec![fb0], None);
515 assert!(result.is_ok());
516
517 let complete = result.unwrap();
518 assert_eq!(complete.count(), 1);
519 assert_eq!(complete.block_number(), 100);
520 }
521
522 #[test]
523 fn test_new_valid_multiple_flashblocks() {
524 let factory = TestFlashBlockFactory::new();
525
526 let fb0 = factory.flashblock_at(0).build();
527 let fb1 = factory.flashblock_after(&fb0).build();
528 let fb2 = factory.flashblock_after(&fb1).build();
529
530 let result = FlashBlockCompleteSequence::new(vec![fb0, fb1, fb2], None);
531 assert!(result.is_ok());
532
533 let complete = result.unwrap();
534 assert_eq!(complete.count(), 3);
535 assert_eq!(complete.last().index, 2);
536 }
537
538 #[test]
539 fn test_all_transactions_aggregates_correctly() {
540 let factory = TestFlashBlockFactory::new();
541
542 let fb0 = factory
543 .flashblock_at(0)
544 .transactions(vec![Bytes::from_static(&[1, 2, 3]), Bytes::from_static(&[4, 5, 6])])
545 .build();
546
547 let fb1 = factory
548 .flashblock_after(&fb0)
549 .transactions(vec![Bytes::from_static(&[7, 8, 9])])
550 .build();
551
552 let complete = FlashBlockCompleteSequence::new(vec![fb0, fb1], None).unwrap();
553 let all_txs = complete.all_transactions();
554
555 assert_eq!(all_txs.len(), 3);
556 assert_eq!(all_txs[0], Bytes::from_static(&[1, 2, 3]));
557 assert_eq!(all_txs[1], Bytes::from_static(&[4, 5, 6]));
558 assert_eq!(all_txs[2], Bytes::from_static(&[7, 8, 9]));
559 }
560
561 #[test]
562 fn test_payload_base_returns_first_block_base() {
563 let factory = TestFlashBlockFactory::new();
564
565 let fb0 = factory.flashblock_at(0).build();
566 let fb1 = factory.flashblock_after(&fb0).build();
567
568 let complete = FlashBlockCompleteSequence::new(vec![fb0.clone(), fb1], None).unwrap();
569
570 assert_eq!(complete.payload_base().block_number, fb0.base.unwrap().block_number);
571 }
572
573 #[test]
574 fn test_execution_outcome_mutation() {
575 let factory = TestFlashBlockFactory::new();
576 let fb0 = factory.flashblock_at(0).build();
577
578 let mut complete = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
579 assert!(complete.execution_outcome().is_none());
580
581 let outcome =
582 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
583 complete.set_execution_outcome(Some(outcome));
584
585 assert_eq!(complete.execution_outcome(), Some(outcome));
586 }
587
588 #[test]
589 fn test_deref_provides_vec_access() {
590 let factory = TestFlashBlockFactory::new();
591
592 let fb0 = factory.flashblock_at(0).build();
593 let fb1 = factory.flashblock_after(&fb0).build();
594
595 let complete = FlashBlockCompleteSequence::new(vec![fb0, fb1], None).unwrap();
596
597 assert_eq!(complete.len(), 2);
599 assert!(!complete.is_empty());
600 }
601 }
602
603 mod sequence_conversion {
604 use super::*;
605
606 #[test]
607 fn test_try_from_pending_to_complete_valid() {
608 let mut pending = FlashBlockPendingSequence::new();
609 let factory = TestFlashBlockFactory::new();
610
611 let fb0 = factory.flashblock_at(0).build();
612 pending.insert(fb0);
613
614 let complete: Result<FlashBlockCompleteSequence, _> = pending.try_into();
615 assert!(complete.is_ok());
616 assert_eq!(complete.unwrap().count(), 1);
617 }
618
619 #[test]
620 fn test_try_from_pending_to_complete_empty_fails() {
621 let pending = FlashBlockPendingSequence::new();
622
623 let complete: Result<FlashBlockCompleteSequence, _> = pending.try_into();
624 assert!(complete.is_err());
625 }
626
627 #[test]
628 fn test_try_from_preserves_execution_outcome() {
629 let mut pending = FlashBlockPendingSequence::new();
630 let factory = TestFlashBlockFactory::new();
631
632 let fb0 = factory.flashblock_at(0).build();
633 pending.insert(fb0);
634
635 let outcome =
636 SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
637 pending.set_execution_outcome(Some(outcome));
638
639 let complete: FlashBlockCompleteSequence = pending.try_into().unwrap();
640 assert_eq!(complete.execution_outcome(), Some(outcome));
641 }
642 }
643
644 mod pending_sequence_helpers {
645 use super::*;
646
647 #[test]
648 fn test_last_flashblock_returns_highest_index() {
649 let mut sequence = FlashBlockPendingSequence::new();
650 let factory = TestFlashBlockFactory::new();
651
652 let fb0 = factory.flashblock_at(0).build();
653 sequence.insert(fb0.clone());
654
655 let fb1 = factory.flashblock_after(&fb0).build();
656 sequence.insert(fb1);
657
658 let last = sequence.last_flashblock().unwrap();
659 assert_eq!(last.index, 1);
660 }
661
662 #[test]
663 fn test_subscribe_block_sequence_channel() {
664 let sequence = FlashBlockPendingSequence::new();
665 let mut rx = sequence.subscribe_block_sequence();
666
667 let tx = sequence.block_sequence_broadcaster().clone();
669 std::thread::spawn(move || {
670 let factory = TestFlashBlockFactory::new();
671 let fb0 = factory.flashblock_at(0).build();
672 let complete = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
673 let _ = tx.send(complete);
674 });
675
676 let received = rx.blocking_recv();
678 assert!(received.is_ok());
679 assert_eq!(received.unwrap().count(), 1);
680 }
681 }
682}