reth_chain_state/
notifications.rs

1//! Canonical chain state notification trait and types.
2
3use alloy_eips::eip2718::Encodable2718;
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9    pin::Pin,
10    sync::Arc,
11    task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15    wrappers::{BroadcastStream, WatchStream},
16    Stream,
17};
18use tracing::debug;
19
20/// Type alias for a receiver that receives [`CanonStateNotification`]
21pub type CanonStateNotifications<N = reth_ethereum_primitives::EthPrimitives> =
22    broadcast::Receiver<CanonStateNotification<N>>;
23
24/// Type alias for a sender that sends [`CanonStateNotification`]
25pub type CanonStateNotificationSender<N = reth_ethereum_primitives::EthPrimitives> =
26    broadcast::Sender<CanonStateNotification<N>>;
27
28/// A type that allows to register chain related event subscriptions.
29pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30    /// Get notified when a new canonical chain was imported.
31    ///
32    /// A canonical chain be one or more blocks, a reorg or a revert.
33    fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35    /// Convenience method to get a stream of [`CanonStateNotification`].
36    fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37        CanonStateNotificationStream {
38            st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39        }
40    }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44    fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45        (*self).subscribe_to_canonical_state()
46    }
47
48    fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49        (*self).canonical_state_stream()
50    }
51}
52
53/// A Stream of [`CanonStateNotification`].
54#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives>
57{
58    #[pin]
59    st: BroadcastStream<CanonStateNotification<N>>,
60}
61
62impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
63    type Item = CanonStateNotification<N>;
64
65    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66        loop {
67            return match ready!(self.as_mut().project().st.poll_next(cx)) {
68                Some(Ok(notification)) => Poll::Ready(Some(notification)),
69                Some(Err(err)) => {
70                    debug!(%err, "canonical state notification stream lagging behind");
71                    continue
72                }
73                None => Poll::Ready(None),
74            }
75        }
76    }
77}
78
79/// A notification that is sent when a new block is imported, or an old block is reverted.
80///
81/// The notification contains at least one [`Chain`] with the imported segment. If some blocks were
82/// reverted (e.g. during a reorg), the old chain is also returned.
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
85    /// The canonical chain was extended.
86    Commit {
87        /// The newly added chain segment.
88        new: Arc<Chain<N>>,
89    },
90    /// A chain segment was reverted or reorged.
91    ///
92    /// - In the case of a reorg, the reverted blocks are present in `old`, and the new blocks are
93    ///   present in `new`.
94    /// - In the case of a revert, the reverted blocks are present in `old`, and `new` is an empty
95    ///   chain segment.
96    Reorg {
97        /// The chain segment that was reverted.
98        old: Arc<Chain<N>>,
99        /// The chain segment that was added on top of the canonical chain, minus the reverted
100        /// blocks.
101        ///
102        /// In the case of a revert, not a reorg, this chain segment is empty.
103        new: Arc<Chain<N>>,
104    },
105}
106
107impl<N: NodePrimitives> CanonStateNotification<N> {
108    /// Get the chain segment that was reverted, if any.
109    pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
110        match self {
111            Self::Commit { .. } => None,
112            Self::Reorg { old, .. } => Some(old.clone()),
113        }
114    }
115
116    /// Get the newly imported chain segment, if any.
117    pub fn committed(&self) -> Arc<Chain<N>> {
118        match self {
119            Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
120        }
121    }
122
123    /// Get the new tip of the chain.
124    ///
125    /// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
126    /// 1 new block.
127    pub fn tip(&self) -> &RecoveredBlock<N::Block> {
128        match self {
129            Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
130        }
131    }
132
133    /// Get receipts in the reverted and newly imported chain segments with their corresponding
134    /// block numbers and transaction hashes.
135    ///
136    /// The boolean in the tuple (2nd element) denotes whether the receipt was from the reverted
137    /// chain segment.
138    pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
139    where
140        N::SignedTx: Encodable2718,
141    {
142        let mut receipts = Vec::new();
143
144        // get old receipts
145        if let Some(old) = self.reverted() {
146            receipts
147                .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
148        }
149        // get new receipts
150        receipts.extend(
151            self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
152        );
153        receipts
154    }
155}
156
157/// Wrapper around a broadcast receiver that receives fork choice notifications.
158#[derive(Debug, Deref, DerefMut)]
159pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
160    pub watch::Receiver<Option<SealedHeader<T>>>,
161);
162
163/// A trait that allows to register to fork choice related events
164/// and get notified when a new fork choice is available.
165pub trait ForkChoiceSubscriptions: Send + Sync {
166    /// Block Header type.
167    type Header: Clone + Send + Sync + 'static;
168
169    /// Get notified when a new safe block of the chain is selected.
170    fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
171
172    /// Get notified when a new finalized block of the chain is selected.
173    fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
174
175    /// Convenience method to get a stream of the new safe blocks of the chain.
176    fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
177        ForkChoiceStream::new(self.subscribe_safe_block().0)
178    }
179
180    /// Convenience method to get a stream of the new finalized blocks of the chain.
181    fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
182        ForkChoiceStream::new(self.subscribe_finalized_block().0)
183    }
184}
185
186/// A stream for fork choice watch channels (pending, safe or finalized watchers)
187#[derive(Debug)]
188#[pin_project::pin_project]
189pub struct ForkChoiceStream<T> {
190    #[pin]
191    st: WatchStream<Option<T>>,
192}
193
194impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
195    /// Creates a new `ForkChoiceStream`
196    pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
197        Self { st: WatchStream::from_changes(rx) }
198    }
199}
200
201impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
202    type Item = T;
203
204    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205        loop {
206            match ready!(self.as_mut().project().st.poll_next(cx)) {
207                Some(Some(notification)) => return Poll::Ready(Some(notification)),
208                Some(None) => {}
209                None => return Poll::Ready(None),
210            }
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use alloy_consensus::BlockBody;
219    use alloy_primitives::{b256, B256};
220    use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
221    use reth_execution_types::ExecutionOutcome;
222    use reth_primitives_traits::SealedBlock;
223
224    #[test]
225    fn test_commit_notification() {
226        let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
227        let block1_hash = B256::new([0x01; 32]);
228        let block2_hash = B256::new([0x02; 32]);
229
230        let mut block1 = block.clone();
231        block1.set_block_number(1);
232        block1.set_hash(block1_hash);
233
234        let mut block2 = block;
235        block2.set_block_number(2);
236        block2.set_hash(block2_hash);
237
238        let chain: Arc<Chain> = Arc::new(Chain::new(
239            vec![block1.clone(), block2.clone()],
240            ExecutionOutcome::default(),
241            None,
242        ));
243
244        // Create a commit notification
245        let notification = CanonStateNotification::Commit { new: chain.clone() };
246
247        // Test that `committed` returns the correct chain
248        assert_eq!(notification.committed(), chain);
249
250        // Test that `reverted` returns None for `Commit`
251        assert!(notification.reverted().is_none());
252
253        // Test that `tip` returns the correct block
254        assert_eq!(*notification.tip(), block2);
255    }
256
257    #[test]
258    fn test_reorg_notification() {
259        let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
260        let block1_hash = B256::new([0x01; 32]);
261        let block2_hash = B256::new([0x02; 32]);
262        let block3_hash = B256::new([0x03; 32]);
263
264        let mut block1 = block.clone();
265        block1.set_block_number(1);
266        block1.set_hash(block1_hash);
267
268        let mut block2 = block.clone();
269        block2.set_block_number(2);
270        block2.set_hash(block2_hash);
271
272        let mut block3 = block;
273        block3.set_block_number(3);
274        block3.set_hash(block3_hash);
275
276        let old_chain: Arc<Chain> =
277            Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
278        let new_chain = Arc::new(Chain::new(
279            vec![block2.clone(), block3.clone()],
280            ExecutionOutcome::default(),
281            None,
282        ));
283
284        // Create a reorg notification
285        let notification =
286            CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
287
288        // Test that `reverted` returns the old chain
289        assert_eq!(notification.reverted(), Some(old_chain));
290
291        // Test that `committed` returns the new chain
292        assert_eq!(notification.committed(), new_chain);
293
294        // Test that `tip` returns the tip of the new chain (last block in the new chain)
295        assert_eq!(*notification.tip(), block3);
296    }
297
298    #[test]
299    fn test_block_receipts_commit() {
300        // Create a default block instance for use in block definitions.
301        let mut body = BlockBody::<TransactionSigned>::default();
302
303        // Define unique hashes for two blocks to differentiate them in the chain.
304        let block1_hash = B256::new([0x01; 32]);
305        let block2_hash = B256::new([0x02; 32]);
306
307        // Create a default transaction to include in block1's transactions.
308        let tx = TransactionSigned::default();
309        body.transactions.push(tx);
310
311        let block = SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
312            SealedHeader::seal_slow(alloy_consensus::Header::default()),
313            body,
314        )
315        .try_recover()
316        .unwrap();
317
318        // Create a clone of the default block and customize it to act as block1.
319        let mut block1 = block.clone();
320        block1.set_block_number(1);
321        block1.set_hash(block1_hash);
322
323        // Clone the default block and customize it to act as block2.
324        let mut block2 = block;
325        block2.set_block_number(2);
326        block2.set_hash(block2_hash);
327
328        // Create a receipt for the transaction in block1.
329        let receipt1 = Receipt {
330            tx_type: TxType::Legacy,
331            cumulative_gas_used: 12345,
332            logs: vec![],
333            success: true,
334        };
335
336        // Wrap the receipt in a `Receipts` structure, as expected in the `ExecutionOutcome`.
337        let receipts = vec![vec![receipt1.clone()]];
338
339        // Define an `ExecutionOutcome` with the created receipts.
340        let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
341
342        // Create a new chain segment with `block1` and `block2` and the execution outcome.
343        let new_chain: Arc<Chain> =
344            Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
345
346        // Create a commit notification containing the new chain segment.
347        let notification = CanonStateNotification::Commit { new: new_chain };
348
349        // Call `block_receipts` on the commit notification to retrieve block receipts.
350        let block_receipts = notification.block_receipts();
351
352        // Assert that only one receipt entry exists in the `block_receipts` list.
353        assert_eq!(block_receipts.len(), 1);
354
355        // Verify that the first entry matches block1's hash and transaction receipt.
356        assert_eq!(
357            block_receipts[0].0,
358            BlockReceipts {
359                block: block1.num_hash(),
360                tx_receipts: vec![(
361                    // Transaction hash of a Transaction::default()
362                    b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
363                    receipt1
364                )]
365            }
366        );
367
368        // Assert that the receipt is from the committed segment (not reverted).
369        assert!(!block_receipts[0].1);
370    }
371
372    #[test]
373    fn test_block_receipts_reorg() {
374        // Define block1 for the old chain segment, which will be reverted.
375        let mut body = BlockBody::<TransactionSigned>::default();
376        body.transactions.push(TransactionSigned::default());
377        let mut old_block1 =
378            SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
379                SealedHeader::seal_slow(alloy_consensus::Header::default()),
380                body,
381            )
382            .try_recover()
383            .unwrap();
384        old_block1.set_block_number(1);
385        old_block1.set_hash(B256::new([0x01; 32]));
386
387        // Create a receipt for a transaction in the reverted block.
388        let old_receipt = Receipt {
389            tx_type: TxType::Legacy,
390            cumulative_gas_used: 54321,
391            logs: vec![],
392            success: false,
393        };
394        let old_receipts = vec![vec![old_receipt.clone()]];
395
396        let old_execution_outcome =
397            ExecutionOutcome { receipts: old_receipts, ..Default::default() };
398
399        // Create an old chain segment to be reverted, containing `old_block1`.
400        let old_chain: Arc<Chain> =
401            Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
402
403        // Define block2 for the new chain segment, which will be committed.
404        let mut body = BlockBody::<TransactionSigned>::default();
405        body.transactions.push(TransactionSigned::default());
406        let mut new_block1 =
407            SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
408                SealedHeader::seal_slow(alloy_consensus::Header::default()),
409                body,
410            )
411            .try_recover()
412            .unwrap();
413        new_block1.set_block_number(2);
414        new_block1.set_hash(B256::new([0x02; 32]));
415
416        // Create a receipt for a transaction in the new committed block.
417        let new_receipt = Receipt {
418            tx_type: TxType::Legacy,
419            cumulative_gas_used: 12345,
420            logs: vec![],
421            success: true,
422        };
423        let new_receipts = vec![vec![new_receipt.clone()]];
424
425        let new_execution_outcome =
426            ExecutionOutcome { receipts: new_receipts, ..Default::default() };
427
428        // Create a new chain segment to be committed, containing `new_block1`.
429        let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
430
431        // Create a reorg notification with both reverted (old) and committed (new) chain segments.
432        let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
433
434        // Retrieve receipts from both old (reverted) and new (committed) segments.
435        let block_receipts = notification.block_receipts();
436
437        // Assert there are two receipt entries, one from each chain segment.
438        assert_eq!(block_receipts.len(), 2);
439
440        // Verify that the first entry matches old_block1 and its receipt from the reverted segment.
441        assert_eq!(
442            block_receipts[0].0,
443            BlockReceipts {
444                block: old_block1.num_hash(),
445                tx_receipts: vec![(
446                    // Transaction hash of a Transaction::default()
447                    b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
448                    old_receipt
449                )]
450            }
451        );
452        // Confirm this is from the reverted segment.
453        assert!(block_receipts[0].1);
454
455        // Verify that the second entry matches new_block1 and its receipt from the committed
456        // segment.
457        assert_eq!(
458            block_receipts[1].0,
459            BlockReceipts {
460                block: new_block1.num_hash(),
461                tx_receipts: vec![(
462                    // Transaction hash of a Transaction::default()
463                    b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
464                    new_receipt
465                )]
466            }
467        );
468        // Confirm this is from the committed segment.
469        assert!(!block_receipts[1].1);
470    }
471}