reth_optimism_flashblocks/
sequence.rs

1use crate::{FlashBlock, FlashBlockCompleteSequenceRx};
2use alloy_primitives::{Bytes, B256};
3use alloy_rpc_types_engine::PayloadId;
4use core::mem;
5use eyre::{bail, OptionExt};
6use op_alloy_rpc_types_engine::OpFlashblockPayloadBase;
7use reth_revm::cached::CachedReads;
8use std::{collections::BTreeMap, ops::Deref};
9use tokio::sync::broadcast;
10use tracing::*;
11
12/// The size of the broadcast channel for completed flashblock sequences.
13const FLASHBLOCK_SEQUENCE_CHANNEL_SIZE: usize = 128;
14
15/// Outcome from executing a flashblock sequence.
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct SequenceExecutionOutcome {
18    /// The block hash of the executed pending block
19    pub block_hash: B256,
20    /// Properly computed state root
21    pub state_root: B256,
22}
23
24/// An ordered B-tree keeping the track of a sequence of [`FlashBlock`]s by their indices.
25#[derive(Debug)]
26pub struct FlashBlockPendingSequence {
27    /// tracks the individual flashblocks in order
28    inner: BTreeMap<u64, FlashBlock>,
29    /// Broadcasts flashblocks to subscribers.
30    block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
31    /// Optional execution outcome from building the current sequence.
32    execution_outcome: Option<SequenceExecutionOutcome>,
33    /// Cached state reads for the current block.
34    /// Current `PendingFlashBlock` is built out of a sequence of `FlashBlocks`, and executed again
35    /// when fb received on top of the same block. Avoid redundant I/O across multiple
36    /// executions within the same block.
37    cached_reads: Option<CachedReads>,
38}
39
40impl FlashBlockPendingSequence {
41    /// Create a new pending sequence.
42    pub fn new() -> Self {
43        // Note: if the channel is full, send will not block but rather overwrite the oldest
44        // messages. Order is preserved.
45        let (tx, _) = broadcast::channel(FLASHBLOCK_SEQUENCE_CHANNEL_SIZE);
46        Self {
47            inner: BTreeMap::new(),
48            block_broadcaster: tx,
49            execution_outcome: None,
50            cached_reads: None,
51        }
52    }
53
54    /// Returns the sender half of the [`FlashBlockCompleteSequence`] channel.
55    pub const fn block_sequence_broadcaster(
56        &self,
57    ) -> &broadcast::Sender<FlashBlockCompleteSequence> {
58        &self.block_broadcaster
59    }
60
61    /// Gets a subscriber to the flashblock sequences produced.
62    pub fn subscribe_block_sequence(&self) -> FlashBlockCompleteSequenceRx {
63        self.block_broadcaster.subscribe()
64    }
65
66    /// Inserts a new block into the sequence.
67    ///
68    /// A [`FlashBlock`] with index 0 resets the set.
69    pub fn insert(&mut self, flashblock: FlashBlock) {
70        if flashblock.index == 0 {
71            trace!(target: "flashblocks", number=%flashblock.block_number(), "Tracking new flashblock sequence");
72            self.inner.insert(flashblock.index, flashblock);
73            return;
74        }
75
76        // only insert if we previously received the same block and payload, assume we received
77        // index 0
78        let same_block = self.block_number() == Some(flashblock.block_number());
79        let same_payload = self.payload_id() == Some(flashblock.payload_id);
80
81        if same_block && same_payload {
82            trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index, block_count = self.inner.len()  ,"Received followup flashblock");
83            self.inner.insert(flashblock.index, flashblock);
84        } else {
85            trace!(target: "flashblocks", number=%flashblock.block_number(), index = %flashblock.index, current=?self.block_number()  ,"Ignoring untracked flashblock following");
86        }
87    }
88
89    /// Set execution outcome from building the flashblock sequence
90    pub const fn set_execution_outcome(
91        &mut self,
92        execution_outcome: Option<SequenceExecutionOutcome>,
93    ) {
94        self.execution_outcome = execution_outcome;
95    }
96
97    /// Set cached reads for this sequence
98    pub fn set_cached_reads(&mut self, cached_reads: CachedReads) {
99        self.cached_reads = Some(cached_reads);
100    }
101
102    /// Removes the cached reads for this sequence
103    pub const fn take_cached_reads(&mut self) -> Option<CachedReads> {
104        self.cached_reads.take()
105    }
106
107    /// Returns the first block number
108    pub fn block_number(&self) -> Option<u64> {
109        Some(self.inner.values().next()?.block_number())
110    }
111
112    /// Returns the payload base of the first tracked flashblock.
113    pub fn payload_base(&self) -> Option<OpFlashblockPayloadBase> {
114        self.inner.values().next()?.base.clone()
115    }
116
117    /// Returns the number of tracked flashblocks.
118    pub fn count(&self) -> usize {
119        self.inner.len()
120    }
121
122    /// Returns the reference to the last flashblock.
123    pub fn last_flashblock(&self) -> Option<&FlashBlock> {
124        self.inner.last_key_value().map(|(_, b)| b)
125    }
126
127    /// Returns the current/latest flashblock index in the sequence
128    pub fn index(&self) -> Option<u64> {
129        Some(self.inner.values().last()?.index)
130    }
131    /// Returns the payload id of the first tracked flashblock in the current sequence.
132    pub fn payload_id(&self) -> Option<PayloadId> {
133        Some(self.inner.values().next()?.payload_id)
134    }
135
136    /// Finalizes the current pending sequence and returns it as a complete sequence.
137    ///
138    /// Clears the internal state and returns an error if the sequence is empty or validation fails.
139    pub fn finalize(&mut self) -> eyre::Result<FlashBlockCompleteSequence> {
140        if self.inner.is_empty() {
141            bail!("Cannot finalize empty flashblock sequence");
142        }
143
144        let flashblocks = mem::take(&mut self.inner);
145        let execution_outcome = mem::take(&mut self.execution_outcome);
146        self.cached_reads = None;
147
148        FlashBlockCompleteSequence::new(flashblocks.into_values().collect(), execution_outcome)
149    }
150
151    /// Returns an iterator over all flashblocks in the sequence.
152    pub fn flashblocks(&self) -> impl Iterator<Item = &FlashBlock> {
153        self.inner.values()
154    }
155}
156
157impl Default for FlashBlockPendingSequence {
158    fn default() -> Self {
159        Self::new()
160    }
161}
162
163/// A complete sequence of flashblocks, often corresponding to a full block.
164///
165/// Ensures invariants of a complete flashblocks sequence.
166/// If this entire sequence of flashblocks was executed on top of latest block, this also includes
167/// the execution outcome with block hash and state root.
168#[derive(Debug, Clone)]
169pub struct FlashBlockCompleteSequence {
170    inner: Vec<FlashBlock>,
171    /// Optional execution outcome from building the flashblock sequence
172    execution_outcome: Option<SequenceExecutionOutcome>,
173}
174
175impl FlashBlockCompleteSequence {
176    /// Create a complete sequence from a vector of flashblocks.
177    /// Ensure that:
178    /// * vector is not empty
179    /// * first flashblock have the base payload
180    /// * sequence of flashblocks is sound (successive index from 0, same payload id, ...)
181    pub fn new(
182        blocks: Vec<FlashBlock>,
183        execution_outcome: Option<SequenceExecutionOutcome>,
184    ) -> eyre::Result<Self> {
185        let first_block = blocks.first().ok_or_eyre("No flashblocks in sequence")?;
186
187        // Ensure that first flashblock have base
188        first_block.base.as_ref().ok_or_eyre("Flashblock at index 0 has no base")?;
189
190        // Ensure that index are successive from 0, have same block number and payload id
191        if !blocks.iter().enumerate().all(|(idx, block)| {
192            idx == block.index as usize &&
193                block.payload_id == first_block.payload_id &&
194                block.block_number() == first_block.block_number()
195        }) {
196            bail!("Flashblock inconsistencies detected in sequence");
197        }
198
199        Ok(Self { inner: blocks, execution_outcome })
200    }
201
202    /// Returns the block number
203    pub fn block_number(&self) -> u64 {
204        self.inner.first().unwrap().block_number()
205    }
206
207    /// Returns the payload base of the first flashblock.
208    pub fn payload_base(&self) -> &OpFlashblockPayloadBase {
209        self.inner.first().unwrap().base.as_ref().unwrap()
210    }
211
212    /// Returns the number of flashblocks in the sequence.
213    pub const fn count(&self) -> usize {
214        self.inner.len()
215    }
216
217    /// Returns the last flashblock in the sequence.
218    pub fn last(&self) -> &FlashBlock {
219        self.inner.last().unwrap()
220    }
221
222    /// Returns the execution outcome of the sequence.
223    pub const fn execution_outcome(&self) -> Option<SequenceExecutionOutcome> {
224        self.execution_outcome
225    }
226
227    /// Updates execution outcome of the sequence.
228    pub const fn set_execution_outcome(
229        &mut self,
230        execution_outcome: Option<SequenceExecutionOutcome>,
231    ) {
232        self.execution_outcome = execution_outcome;
233    }
234
235    /// Returns all transactions from all flashblocks in the sequence
236    pub fn all_transactions(&self) -> Vec<Bytes> {
237        self.inner.iter().flat_map(|fb| fb.diff.transactions.iter().cloned()).collect()
238    }
239}
240
241impl Deref for FlashBlockCompleteSequence {
242    type Target = Vec<FlashBlock>;
243
244    fn deref(&self) -> &Self::Target {
245        &self.inner
246    }
247}
248
249impl TryFrom<FlashBlockPendingSequence> for FlashBlockCompleteSequence {
250    type Error = eyre::Error;
251    fn try_from(sequence: FlashBlockPendingSequence) -> Result<Self, Self::Error> {
252        Self::new(sequence.inner.into_values().collect(), sequence.execution_outcome)
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::test_utils::TestFlashBlockFactory;
260
261    mod pending_sequence_insert {
262        use super::*;
263
264        #[test]
265        fn test_insert_index_zero_creates_new_sequence() {
266            let mut sequence = FlashBlockPendingSequence::new();
267            let factory = TestFlashBlockFactory::new();
268            let fb0 = factory.flashblock_at(0).build();
269            let payload_id = fb0.payload_id;
270
271            sequence.insert(fb0);
272
273            assert_eq!(sequence.count(), 1);
274            assert_eq!(sequence.block_number(), Some(100));
275            assert_eq!(sequence.payload_id(), Some(payload_id));
276        }
277
278        #[test]
279        fn test_insert_followup_same_block_and_payload() {
280            let mut sequence = FlashBlockPendingSequence::new();
281            let factory = TestFlashBlockFactory::new();
282
283            let fb0 = factory.flashblock_at(0).build();
284            sequence.insert(fb0.clone());
285
286            let fb1 = factory.flashblock_after(&fb0).build();
287            sequence.insert(fb1.clone());
288
289            let fb2 = factory.flashblock_after(&fb1).build();
290            sequence.insert(fb2);
291
292            assert_eq!(sequence.count(), 3);
293            assert_eq!(sequence.index(), Some(2));
294        }
295
296        #[test]
297        fn test_insert_ignores_different_block_number() {
298            let mut sequence = FlashBlockPendingSequence::new();
299            let factory = TestFlashBlockFactory::new();
300
301            let fb0 = factory.flashblock_at(0).build();
302            sequence.insert(fb0.clone());
303
304            // Try to insert followup with different block number
305            let fb1 = factory.flashblock_after(&fb0).block_number(101).build();
306            sequence.insert(fb1);
307
308            assert_eq!(sequence.count(), 1);
309            assert_eq!(sequence.block_number(), Some(100));
310        }
311
312        #[test]
313        fn test_insert_ignores_different_payload_id() {
314            let mut sequence = FlashBlockPendingSequence::new();
315            let factory = TestFlashBlockFactory::new();
316
317            let fb0 = factory.flashblock_at(0).build();
318            let payload_id1 = fb0.payload_id;
319            sequence.insert(fb0.clone());
320
321            // Try to insert followup with different payload_id
322            let payload_id2 = alloy_rpc_types_engine::PayloadId::new([2u8; 8]);
323            let fb1 = factory.flashblock_after(&fb0).payload_id(payload_id2).build();
324            sequence.insert(fb1);
325
326            assert_eq!(sequence.count(), 1);
327            assert_eq!(sequence.payload_id(), Some(payload_id1));
328        }
329
330        #[test]
331        fn test_insert_maintains_btree_order() {
332            let mut sequence = FlashBlockPendingSequence::new();
333            let factory = TestFlashBlockFactory::new();
334
335            let fb0 = factory.flashblock_at(0).build();
336            sequence.insert(fb0.clone());
337
338            let fb2 = factory.flashblock_after(&fb0).index(2).build();
339            sequence.insert(fb2);
340
341            let fb1 = factory.flashblock_after(&fb0).build();
342            sequence.insert(fb1);
343
344            let indices: Vec<u64> = sequence.flashblocks().map(|fb| fb.index).collect();
345            assert_eq!(indices, vec![0, 1, 2]);
346        }
347    }
348
349    mod pending_sequence_finalize {
350        use super::*;
351
352        #[test]
353        fn test_finalize_empty_sequence_fails() {
354            let mut sequence = FlashBlockPendingSequence::new();
355            let result = sequence.finalize();
356
357            assert!(result.is_err());
358            assert_eq!(
359                result.unwrap_err().to_string(),
360                "Cannot finalize empty flashblock sequence"
361            );
362        }
363
364        #[test]
365        fn test_finalize_clears_pending_state() {
366            let mut sequence = FlashBlockPendingSequence::new();
367            let factory = TestFlashBlockFactory::new();
368
369            let fb0 = factory.flashblock_at(0).build();
370            sequence.insert(fb0);
371
372            assert_eq!(sequence.count(), 1);
373
374            let _complete = sequence.finalize().unwrap();
375
376            // After finalize, sequence should be empty
377            assert_eq!(sequence.count(), 0);
378            assert_eq!(sequence.block_number(), None);
379        }
380
381        #[test]
382        fn test_finalize_preserves_execution_outcome() {
383            let mut sequence = FlashBlockPendingSequence::new();
384            let factory = TestFlashBlockFactory::new();
385
386            let fb0 = factory.flashblock_at(0).build();
387            sequence.insert(fb0);
388
389            let outcome =
390                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
391            sequence.set_execution_outcome(Some(outcome));
392
393            let complete = sequence.finalize().unwrap();
394
395            assert_eq!(complete.execution_outcome(), Some(outcome));
396        }
397
398        #[test]
399        fn test_finalize_clears_cached_reads() {
400            let mut sequence = FlashBlockPendingSequence::new();
401            let factory = TestFlashBlockFactory::new();
402
403            let fb0 = factory.flashblock_at(0).build();
404            sequence.insert(fb0);
405
406            let cached_reads = CachedReads::default();
407            sequence.set_cached_reads(cached_reads);
408            assert!(sequence.take_cached_reads().is_some());
409
410            let _complete = sequence.finalize().unwrap();
411
412            // Cached reads should be cleared
413            assert!(sequence.take_cached_reads().is_none());
414        }
415
416        #[test]
417        fn test_finalize_multiple_times_after_refill() {
418            let mut sequence = FlashBlockPendingSequence::new();
419            let factory = TestFlashBlockFactory::new();
420
421            // First sequence
422            let fb0 = factory.flashblock_at(0).build();
423            sequence.insert(fb0);
424
425            let complete1 = sequence.finalize().unwrap();
426            assert_eq!(complete1.count(), 1);
427
428            // Add new sequence for next block
429            let fb1 = factory.flashblock_for_next_block(&complete1.last().clone()).build();
430            sequence.insert(fb1);
431
432            let complete2 = sequence.finalize().unwrap();
433            assert_eq!(complete2.count(), 1);
434            assert_eq!(complete2.block_number(), 101);
435        }
436    }
437
438    mod complete_sequence_invariants {
439        use super::*;
440
441        #[test]
442        fn test_new_empty_sequence_fails() {
443            let result = FlashBlockCompleteSequence::new(vec![], None);
444            assert!(result.is_err());
445            assert_eq!(result.unwrap_err().to_string(), "No flashblocks in sequence");
446        }
447
448        #[test]
449        fn test_new_requires_base_at_index_zero() {
450            let factory = TestFlashBlockFactory::new();
451            // Use builder() with index 1 first to create a flashblock, then change its index to 0
452            // to bypass the auto-base creation logic
453            let mut fb0_no_base = factory.flashblock_at(1).build();
454            fb0_no_base.index = 0;
455            fb0_no_base.base = None;
456
457            let result = FlashBlockCompleteSequence::new(vec![fb0_no_base], None);
458            assert!(result.is_err());
459            assert_eq!(result.unwrap_err().to_string(), "Flashblock at index 0 has no base");
460        }
461
462        #[test]
463        fn test_new_validates_successive_indices() {
464            let factory = TestFlashBlockFactory::new();
465
466            let fb0 = factory.flashblock_at(0).build();
467            // Skip index 1, go straight to 2
468            let fb2 = factory.flashblock_after(&fb0).index(2).build();
469
470            let result = FlashBlockCompleteSequence::new(vec![fb0, fb2], None);
471            assert!(result.is_err());
472            assert_eq!(
473                result.unwrap_err().to_string(),
474                "Flashblock inconsistencies detected in sequence"
475            );
476        }
477
478        #[test]
479        fn test_new_validates_same_block_number() {
480            let factory = TestFlashBlockFactory::new();
481
482            let fb0 = factory.flashblock_at(0).build();
483            let fb1 = factory.flashblock_after(&fb0).block_number(101).build();
484
485            let result = FlashBlockCompleteSequence::new(vec![fb0, fb1], None);
486            assert!(result.is_err());
487            assert_eq!(
488                result.unwrap_err().to_string(),
489                "Flashblock inconsistencies detected in sequence"
490            );
491        }
492
493        #[test]
494        fn test_new_validates_same_payload_id() {
495            let factory = TestFlashBlockFactory::new();
496
497            let fb0 = factory.flashblock_at(0).build();
498            let payload_id2 = alloy_rpc_types_engine::PayloadId::new([2u8; 8]);
499            let fb1 = factory.flashblock_after(&fb0).payload_id(payload_id2).build();
500
501            let result = FlashBlockCompleteSequence::new(vec![fb0, fb1], None);
502            assert!(result.is_err());
503            assert_eq!(
504                result.unwrap_err().to_string(),
505                "Flashblock inconsistencies detected in sequence"
506            );
507        }
508
509        #[test]
510        fn test_new_valid_single_flashblock() {
511            let factory = TestFlashBlockFactory::new();
512            let fb0 = factory.flashblock_at(0).build();
513
514            let result = FlashBlockCompleteSequence::new(vec![fb0], None);
515            assert!(result.is_ok());
516
517            let complete = result.unwrap();
518            assert_eq!(complete.count(), 1);
519            assert_eq!(complete.block_number(), 100);
520        }
521
522        #[test]
523        fn test_new_valid_multiple_flashblocks() {
524            let factory = TestFlashBlockFactory::new();
525
526            let fb0 = factory.flashblock_at(0).build();
527            let fb1 = factory.flashblock_after(&fb0).build();
528            let fb2 = factory.flashblock_after(&fb1).build();
529
530            let result = FlashBlockCompleteSequence::new(vec![fb0, fb1, fb2], None);
531            assert!(result.is_ok());
532
533            let complete = result.unwrap();
534            assert_eq!(complete.count(), 3);
535            assert_eq!(complete.last().index, 2);
536        }
537
538        #[test]
539        fn test_all_transactions_aggregates_correctly() {
540            let factory = TestFlashBlockFactory::new();
541
542            let fb0 = factory
543                .flashblock_at(0)
544                .transactions(vec![Bytes::from_static(&[1, 2, 3]), Bytes::from_static(&[4, 5, 6])])
545                .build();
546
547            let fb1 = factory
548                .flashblock_after(&fb0)
549                .transactions(vec![Bytes::from_static(&[7, 8, 9])])
550                .build();
551
552            let complete = FlashBlockCompleteSequence::new(vec![fb0, fb1], None).unwrap();
553            let all_txs = complete.all_transactions();
554
555            assert_eq!(all_txs.len(), 3);
556            assert_eq!(all_txs[0], Bytes::from_static(&[1, 2, 3]));
557            assert_eq!(all_txs[1], Bytes::from_static(&[4, 5, 6]));
558            assert_eq!(all_txs[2], Bytes::from_static(&[7, 8, 9]));
559        }
560
561        #[test]
562        fn test_payload_base_returns_first_block_base() {
563            let factory = TestFlashBlockFactory::new();
564
565            let fb0 = factory.flashblock_at(0).build();
566            let fb1 = factory.flashblock_after(&fb0).build();
567
568            let complete = FlashBlockCompleteSequence::new(vec![fb0.clone(), fb1], None).unwrap();
569
570            assert_eq!(complete.payload_base().block_number, fb0.base.unwrap().block_number);
571        }
572
573        #[test]
574        fn test_execution_outcome_mutation() {
575            let factory = TestFlashBlockFactory::new();
576            let fb0 = factory.flashblock_at(0).build();
577
578            let mut complete = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
579            assert!(complete.execution_outcome().is_none());
580
581            let outcome =
582                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
583            complete.set_execution_outcome(Some(outcome));
584
585            assert_eq!(complete.execution_outcome(), Some(outcome));
586        }
587
588        #[test]
589        fn test_deref_provides_vec_access() {
590            let factory = TestFlashBlockFactory::new();
591
592            let fb0 = factory.flashblock_at(0).build();
593            let fb1 = factory.flashblock_after(&fb0).build();
594
595            let complete = FlashBlockCompleteSequence::new(vec![fb0, fb1], None).unwrap();
596
597            // Use deref to access Vec methods
598            assert_eq!(complete.len(), 2);
599            assert!(!complete.is_empty());
600        }
601    }
602
603    mod sequence_conversion {
604        use super::*;
605
606        #[test]
607        fn test_try_from_pending_to_complete_valid() {
608            let mut pending = FlashBlockPendingSequence::new();
609            let factory = TestFlashBlockFactory::new();
610
611            let fb0 = factory.flashblock_at(0).build();
612            pending.insert(fb0);
613
614            let complete: Result<FlashBlockCompleteSequence, _> = pending.try_into();
615            assert!(complete.is_ok());
616            assert_eq!(complete.unwrap().count(), 1);
617        }
618
619        #[test]
620        fn test_try_from_pending_to_complete_empty_fails() {
621            let pending = FlashBlockPendingSequence::new();
622
623            let complete: Result<FlashBlockCompleteSequence, _> = pending.try_into();
624            assert!(complete.is_err());
625        }
626
627        #[test]
628        fn test_try_from_preserves_execution_outcome() {
629            let mut pending = FlashBlockPendingSequence::new();
630            let factory = TestFlashBlockFactory::new();
631
632            let fb0 = factory.flashblock_at(0).build();
633            pending.insert(fb0);
634
635            let outcome =
636                SequenceExecutionOutcome { block_hash: B256::random(), state_root: B256::random() };
637            pending.set_execution_outcome(Some(outcome));
638
639            let complete: FlashBlockCompleteSequence = pending.try_into().unwrap();
640            assert_eq!(complete.execution_outcome(), Some(outcome));
641        }
642    }
643
644    mod pending_sequence_helpers {
645        use super::*;
646
647        #[test]
648        fn test_last_flashblock_returns_highest_index() {
649            let mut sequence = FlashBlockPendingSequence::new();
650            let factory = TestFlashBlockFactory::new();
651
652            let fb0 = factory.flashblock_at(0).build();
653            sequence.insert(fb0.clone());
654
655            let fb1 = factory.flashblock_after(&fb0).build();
656            sequence.insert(fb1);
657
658            let last = sequence.last_flashblock().unwrap();
659            assert_eq!(last.index, 1);
660        }
661
662        #[test]
663        fn test_subscribe_block_sequence_channel() {
664            let sequence = FlashBlockPendingSequence::new();
665            let mut rx = sequence.subscribe_block_sequence();
666
667            // Spawn a task that sends a complete sequence
668            let tx = sequence.block_sequence_broadcaster().clone();
669            std::thread::spawn(move || {
670                let factory = TestFlashBlockFactory::new();
671                let fb0 = factory.flashblock_at(0).build();
672                let complete = FlashBlockCompleteSequence::new(vec![fb0], None).unwrap();
673                let _ = tx.send(complete);
674            });
675
676            // Should receive the broadcast
677            let received = rx.blocking_recv();
678            assert!(received.is_ok());
679            assert_eq!(received.unwrap().count(), 1);
680        }
681    }
682}