Skip to main content

reth_chain_state/
notifications.rs

1//! Canonical chain state notification trait and types.
2
3use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9    pin::Pin,
10    sync::Arc,
11    task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15    wrappers::{BroadcastStream, WatchStream},
16    Stream,
17};
18use tracing::debug;
19
20/// Type alias for a receiver that receives [`CanonStateNotification`]
21pub type CanonStateNotifications<N = reth_ethereum_primitives::EthPrimitives> =
22    broadcast::Receiver<CanonStateNotification<N>>;
23
24/// Type alias for a sender that sends [`CanonStateNotification`]
25pub type CanonStateNotificationSender<N = reth_ethereum_primitives::EthPrimitives> =
26    broadcast::Sender<CanonStateNotification<N>>;
27
28/// A type that allows to register chain related event subscriptions.
29pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30    /// Get notified when a new canonical chain was imported.
31    ///
32    /// A canonical chain be one or more blocks, a reorg or a revert.
33    fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35    /// Convenience method to get a stream of [`CanonStateNotification`].
36    fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37        CanonStateNotificationStream {
38            st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39        }
40    }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44    fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45        (*self).subscribe_to_canonical_state()
46    }
47
48    fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49        (*self).canonical_state_stream()
50    }
51}
52
53/// A Stream of [`CanonStateNotification`].
54#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives>
57{
58    #[pin]
59    st: BroadcastStream<CanonStateNotification<N>>,
60}
61
62impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
63    type Item = CanonStateNotification<N>;
64
65    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        loop {
67            return match ready!(self.as_mut().project().st.poll_next(cx)) {
68                Some(Ok(notification)) => Poll::Ready(Some(notification)),
69                Some(Err(err)) => {
70                    debug!(%err, "canonical state notification stream lagging behind");
71                    continue
72                }
73                None => Poll::Ready(None),
74            }
75        }
76    }
77}
78
79/// A notification that is sent when a new block is imported, or an old block is reverted.
80///
81/// The notification contains at least one [`Chain`] with the imported segment. If some blocks were
82/// reverted (e.g. during a reorg), the old chain is also returned.
83#[derive(Clone, Debug, PartialEq, Eq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(feature = "serde", serde(bound = ""))]
86pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
87    /// The canonical chain was extended.
88    Commit {
89        /// The newly added chain segment.
90        new: Arc<Chain<N>>,
91    },
92    /// A chain segment was reverted or reorged.
93    ///
94    /// - In the case of a reorg, the reverted blocks are present in `old`, and the new blocks are
95    ///   present in `new`.
96    /// - In the case of a revert, the reverted blocks are present in `old`, and `new` is an empty
97    ///   chain segment.
98    Reorg {
99        /// The chain segment that was reverted.
100        old: Arc<Chain<N>>,
101        /// The chain segment that was added on top of the canonical chain, minus the reverted
102        /// blocks.
103        ///
104        /// In the case of a revert, not a reorg, this chain segment is empty.
105        new: Arc<Chain<N>>,
106    },
107}
108
109impl<N: NodePrimitives> CanonStateNotification<N> {
110    /// Get the chain segment that was reverted, if any.
111    pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
112        match self {
113            Self::Commit { .. } => None,
114            Self::Reorg { old, .. } => Some(old.clone()),
115        }
116    }
117
118    /// Get the newly imported chain segment, if any.
119    pub fn committed(&self) -> Arc<Chain<N>> {
120        match self {
121            Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
122        }
123    }
124
125    /// Gets the new tip of the chain.
126    ///
127    /// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
128    /// 1 new block.
129    ///
130    /// # Panics
131    ///
132    /// If chain doesn't have any blocks.
133    pub fn tip(&self) -> &RecoveredBlock<N::Block> {
134        match self {
135            Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
136        }
137    }
138
139    /// Gets the new tip of the chain.
140    ///
141    /// If the chain has no blocks, it returns `None`. Otherwise, it returns the new tip for
142    /// [`Self::Reorg`] and [`Self::Commit`] variants.
143    pub fn tip_checked(&self) -> Option<&RecoveredBlock<N::Block>> {
144        match self {
145            Self::Commit { new } | Self::Reorg { new, .. } => {
146                if new.is_empty() {
147                    None
148                } else {
149                    Some(new.tip())
150                }
151            }
152        }
153    }
154
155    /// Get receipts in the reverted and newly imported chain segments with their corresponding
156    /// block numbers and transaction hashes.
157    ///
158    /// The boolean in the tuple (2nd element) denotes whether the receipt was from the reverted
159    /// chain segment.
160    pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
161    where
162        N::SignedTx: Encodable2718,
163    {
164        let mut receipts = Vec::new();
165
166        // get old receipts
167        if let Some(old) = self.reverted() {
168            receipts
169                .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
170        }
171        // get new receipts
172        receipts.extend(
173            self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
174        );
175        receipts
176    }
177}
178
179/// Wrapper around a broadcast receiver that receives fork choice notifications.
180#[derive(Debug, Deref, DerefMut)]
181pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
182    pub watch::Receiver<Option<SealedHeader<T>>>,
183);
184
185/// A trait that allows to register to fork choice related events
186/// and get notified when a new fork choice is available.
187pub trait ForkChoiceSubscriptions: Send + Sync {
188    /// Block Header type.
189    type Header: Clone + Send + Sync + 'static;
190
191    /// Get notified when a new safe block of the chain is selected.
192    fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
193
194    /// Get notified when a new finalized block of the chain is selected.
195    fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
196
197    /// Convenience method to get a stream of the new safe blocks of the chain.
198    fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
199        ForkChoiceStream::new(self.subscribe_safe_block().0)
200    }
201
202    /// Convenience method to get a stream of the new finalized blocks of the chain.
203    fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
204        ForkChoiceStream::new(self.subscribe_finalized_block().0)
205    }
206}
207
208/// A stream that yields values from a `watch::Receiver<Option<T>>`, filtering out `None` values.
209#[derive(Debug)]
210#[pin_project::pin_project]
211pub struct WatchValueStream<T> {
212    #[pin]
213    st: WatchStream<Option<T>>,
214}
215
216impl<T: Clone + Sync + Send + 'static> WatchValueStream<T> {
217    /// Creates a new [`WatchValueStream`]
218    pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
219        Self { st: WatchStream::from_changes(rx) }
220    }
221}
222
223impl<T: Clone + Sync + Send + 'static> Stream for WatchValueStream<T> {
224    type Item = T;
225
226    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
227        loop {
228            match ready!(self.as_mut().project().st.poll_next(cx)) {
229                Some(Some(notification)) => return Poll::Ready(Some(notification)),
230                Some(None) => {}
231                None => return Poll::Ready(None),
232            }
233        }
234    }
235}
236
237/// Alias for [`WatchValueStream`] for fork choice watch channels.
238pub type ForkChoiceStream<T> = WatchValueStream<T>;
239
240/// Wrapper around a watch receiver that receives persisted block notifications.
241#[derive(Debug, Deref, DerefMut)]
242pub struct PersistedBlockNotifications(pub watch::Receiver<Option<BlockNumHash>>);
243
244/// A trait that allows subscribing to persisted block events.
245pub trait PersistedBlockSubscriptions: Send + Sync {
246    /// Get notified when a new block is persisted to disk.
247    fn subscribe_persisted_block(&self) -> PersistedBlockNotifications;
248
249    /// Convenience method to get a stream of the persisted blocks.
250    fn persisted_block_stream(&self) -> WatchValueStream<BlockNumHash> {
251        WatchValueStream::new(self.subscribe_persisted_block().0)
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use alloy_consensus::{BlockBody, SignableTransaction, TxLegacy};
259    use alloy_primitives::{b256, Signature, B256};
260    use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
261    use reth_execution_types::ExecutionOutcome;
262    use reth_primitives_traits::SealedBlock;
263    use std::collections::BTreeMap;
264
265    #[test]
266    fn test_commit_notification() {
267        let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
268        let block1_hash = B256::new([0x01; 32]);
269        let block2_hash = B256::new([0x02; 32]);
270
271        let mut block1 = block.clone();
272        block1.set_block_number(1);
273        block1.set_hash(block1_hash);
274
275        let mut block2 = block;
276        block2.set_block_number(2);
277        block2.set_hash(block2_hash);
278
279        let chain: Arc<Chain> = Arc::new(Chain::new(
280            vec![block1.clone(), block2.clone()],
281            ExecutionOutcome::default(),
282            BTreeMap::new(),
283        ));
284
285        // Create a commit notification
286        let notification = CanonStateNotification::Commit { new: chain.clone() };
287
288        // Test that `committed` returns the correct chain
289        assert_eq!(notification.committed(), chain);
290
291        // Test that `reverted` returns None for `Commit`
292        assert!(notification.reverted().is_none());
293
294        // Test that `tip` returns the correct block
295        assert_eq!(*notification.tip(), block2);
296    }
297
298    #[test]
299    fn test_reorg_notification() {
300        let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
301        let block1_hash = B256::new([0x01; 32]);
302        let block2_hash = B256::new([0x02; 32]);
303        let block3_hash = B256::new([0x03; 32]);
304
305        let mut block1 = block.clone();
306        block1.set_block_number(1);
307        block1.set_hash(block1_hash);
308
309        let mut block2 = block.clone();
310        block2.set_block_number(2);
311        block2.set_hash(block2_hash);
312
313        let mut block3 = block;
314        block3.set_block_number(3);
315        block3.set_hash(block3_hash);
316
317        let old_chain: Arc<Chain> = Arc::new(Chain::new(
318            vec![block1.clone()],
319            ExecutionOutcome::default(),
320            BTreeMap::new(),
321        ));
322        let new_chain = Arc::new(Chain::new(
323            vec![block2.clone(), block3.clone()],
324            ExecutionOutcome::default(),
325            BTreeMap::new(),
326        ));
327
328        // Create a reorg notification
329        let notification =
330            CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
331
332        // Test that `reverted` returns the old chain
333        assert_eq!(notification.reverted(), Some(old_chain));
334
335        // Test that `committed` returns the new chain
336        assert_eq!(notification.committed(), new_chain);
337
338        // Test that `tip` returns the tip of the new chain (last block in the new chain)
339        assert_eq!(*notification.tip(), block3);
340    }
341
342    #[test]
343    fn test_block_receipts_commit() {
344        // Create a default block instance for use in block definitions.
345        let mut body = BlockBody::<TransactionSigned>::default();
346
347        // Define unique hashes for two blocks to differentiate them in the chain.
348        let block1_hash = B256::new([0x01; 32]);
349        let block2_hash = B256::new([0x02; 32]);
350
351        // Create a default transaction to include in block1's transactions.
352        let tx = TxLegacy::default().into_signed(Signature::test_signature()).into();
353        body.transactions.push(tx);
354
355        let block = SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
356            SealedHeader::seal_slow(alloy_consensus::Header::default()),
357            body,
358        )
359        .try_recover()
360        .unwrap();
361
362        // Create a clone of the default block and customize it to act as block1.
363        let mut block1 = block.clone();
364        block1.set_block_number(1);
365        block1.set_hash(block1_hash);
366
367        // Clone the default block and customize it to act as block2.
368        let mut block2 = block;
369        block2.set_block_number(2);
370        block2.set_hash(block2_hash);
371
372        // Create a receipt for the transaction in block1.
373        let receipt1 = Receipt {
374            tx_type: TxType::Legacy,
375            cumulative_gas_used: 12345,
376            logs: vec![],
377            success: true,
378        };
379
380        // Wrap the receipt in a `Receipts` structure, as expected in the `ExecutionOutcome`.
381        let receipts = vec![vec![receipt1.clone()]];
382
383        // Define an `ExecutionOutcome` with the created receipts.
384        let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
385
386        // Create a new chain segment with `block1` and `block2` and the execution outcome.
387        let new_chain: Arc<Chain> = Arc::new(Chain::new(
388            vec![block1.clone(), block2.clone()],
389            execution_outcome,
390            BTreeMap::new(),
391        ));
392
393        // Create a commit notification containing the new chain segment.
394        let notification = CanonStateNotification::Commit { new: new_chain };
395
396        // Call `block_receipts` on the commit notification to retrieve block receipts.
397        let block_receipts = notification.block_receipts();
398
399        // Assert that only one receipt entry exists in the `block_receipts` list.
400        assert_eq!(block_receipts.len(), 1);
401
402        // Verify that the first entry matches block1's hash and transaction receipt.
403        assert_eq!(
404            block_receipts[0].0,
405            BlockReceipts {
406                block: block1.num_hash(),
407                timestamp: block1.timestamp,
408                tx_receipts: vec![(
409                    // Transaction hash of a Transaction::default()
410                    b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
411                    receipt1
412                )]
413            }
414        );
415
416        // Assert that the receipt is from the committed segment (not reverted).
417        assert!(!block_receipts[0].1);
418    }
419
420    #[test]
421    fn test_block_receipts_reorg() {
422        // Define block1 for the old chain segment, which will be reverted.
423        let mut body = BlockBody::<TransactionSigned>::default();
424        body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
425        let mut old_block1 =
426            SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
427                SealedHeader::seal_slow(alloy_consensus::Header::default()),
428                body,
429            )
430            .try_recover()
431            .unwrap();
432        old_block1.set_block_number(1);
433        old_block1.set_hash(B256::new([0x01; 32]));
434
435        // Create a receipt for a transaction in the reverted block.
436        let old_receipt = Receipt {
437            tx_type: TxType::Legacy,
438            cumulative_gas_used: 54321,
439            logs: vec![],
440            success: false,
441        };
442        let old_receipts = vec![vec![old_receipt.clone()]];
443
444        let old_execution_outcome =
445            ExecutionOutcome { receipts: old_receipts, ..Default::default() };
446
447        // Create an old chain segment to be reverted, containing `old_block1`.
448        let old_chain: Arc<Chain> =
449            Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, BTreeMap::new()));
450
451        // Define block2 for the new chain segment, which will be committed.
452        let mut body = BlockBody::<TransactionSigned>::default();
453        body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
454        let mut new_block1 =
455            SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
456                SealedHeader::seal_slow(alloy_consensus::Header::default()),
457                body,
458            )
459            .try_recover()
460            .unwrap();
461        new_block1.set_block_number(2);
462        new_block1.set_hash(B256::new([0x02; 32]));
463
464        // Create a receipt for a transaction in the new committed block.
465        let new_receipt = Receipt {
466            tx_type: TxType::Legacy,
467            cumulative_gas_used: 12345,
468            logs: vec![],
469            success: true,
470        };
471        let new_receipts = vec![vec![new_receipt.clone()]];
472
473        let new_execution_outcome =
474            ExecutionOutcome { receipts: new_receipts, ..Default::default() };
475
476        // Create a new chain segment to be committed, containing `new_block1`.
477        let new_chain =
478            Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, BTreeMap::new()));
479
480        // Create a reorg notification with both reverted (old) and committed (new) chain segments.
481        let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
482
483        // Retrieve receipts from both old (reverted) and new (committed) segments.
484        let block_receipts = notification.block_receipts();
485
486        // Assert there are two receipt entries, one from each chain segment.
487        assert_eq!(block_receipts.len(), 2);
488
489        // Verify that the first entry matches old_block1 and its receipt from the reverted segment.
490        assert_eq!(
491            block_receipts[0].0,
492            BlockReceipts {
493                block: old_block1.num_hash(),
494                timestamp: old_block1.timestamp,
495                tx_receipts: vec![(
496                    // Transaction hash of a Transaction::default()
497                    b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
498                    old_receipt
499                )]
500            }
501        );
502        // Confirm this is from the reverted segment.
503        assert!(block_receipts[0].1);
504
505        // Verify that the second entry matches new_block1 and its receipt from the committed
506        // segment.
507        assert_eq!(
508            block_receipts[1].0,
509            BlockReceipts {
510                block: new_block1.num_hash(),
511                timestamp: new_block1.timestamp,
512                tx_receipts: vec![(
513                    // Transaction hash of a Transaction::default()
514                    b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
515                    new_receipt
516                )]
517            }
518        );
519        // Confirm this is from the committed segment.
520        assert!(!block_receipts[1].1);
521    }
522}