1use alloy_eips::BlockNumHash;
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9 pin::Pin,
10 sync::Arc,
11 task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15 wrappers::{BroadcastStream, WatchStream},
16 Stream,
17};
18use tracing::debug;
19
20pub type CanonStateNotifications<N = reth_ethereum_primitives::EthPrimitives> =
22 broadcast::Receiver<CanonStateNotification<N>>;
23
24pub type CanonStateNotificationSender<N = reth_ethereum_primitives::EthPrimitives> =
26 broadcast::Sender<CanonStateNotification<N>>;
27
28pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37 CanonStateNotificationStream {
38 st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39 }
40 }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45 (*self).subscribe_to_canonical_state()
46 }
47
48 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49 (*self).canonical_state_stream()
50 }
51}
52
53#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives>
57{
58 #[pin]
59 st: BroadcastStream<CanonStateNotification<N>>,
60}
61
62impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
63 type Item = CanonStateNotification<N>;
64
65 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 loop {
67 return match ready!(self.as_mut().project().st.poll_next(cx)) {
68 Some(Ok(notification)) => Poll::Ready(Some(notification)),
69 Some(Err(err)) => {
70 debug!(%err, "canonical state notification stream lagging behind");
71 continue
72 }
73 None => Poll::Ready(None),
74 }
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(feature = "serde", serde(bound = ""))]
86pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
87 Commit {
89 new: Arc<Chain<N>>,
91 },
92 Reorg {
99 old: Arc<Chain<N>>,
101 new: Arc<Chain<N>>,
106 },
107}
108
109impl<N: NodePrimitives> CanonStateNotification<N> {
110 pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
112 match self {
113 Self::Commit { .. } => None,
114 Self::Reorg { old, .. } => Some(old.clone()),
115 }
116 }
117
118 pub fn committed(&self) -> Arc<Chain<N>> {
120 match self {
121 Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
122 }
123 }
124
125 pub fn tip(&self) -> &RecoveredBlock<N::Block> {
134 match self {
135 Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
136 }
137 }
138
139 pub fn tip_checked(&self) -> Option<&RecoveredBlock<N::Block>> {
144 match self {
145 Self::Commit { new } | Self::Reorg { new, .. } => {
146 if new.is_empty() {
147 None
148 } else {
149 Some(new.tip())
150 }
151 }
152 }
153 }
154
155 pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)> {
161 let mut receipts = Vec::new();
162
163 if let Some(old) = self.reverted() {
165 receipts
166 .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
167 }
168 receipts.extend(
170 self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
171 );
172 receipts
173 }
174}
175
176#[derive(Debug, Deref, DerefMut)]
178pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
179 pub watch::Receiver<Option<SealedHeader<T>>>,
180);
181
182pub trait ForkChoiceSubscriptions: Send + Sync {
185 type Header: Clone + Send + Sync + 'static;
187
188 fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
190
191 fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
193
194 fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
196 ForkChoiceStream::new(self.subscribe_safe_block().0)
197 }
198
199 fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
201 ForkChoiceStream::new(self.subscribe_finalized_block().0)
202 }
203}
204
205#[derive(Debug)]
207#[pin_project::pin_project]
208pub struct WatchValueStream<T> {
209 #[pin]
210 st: WatchStream<Option<T>>,
211}
212
213impl<T: Clone + Sync + Send + 'static> WatchValueStream<T> {
214 pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
216 Self { st: WatchStream::from_changes(rx) }
217 }
218}
219
220impl<T: Clone + Sync + Send + 'static> Stream for WatchValueStream<T> {
221 type Item = T;
222
223 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
224 loop {
225 match ready!(self.as_mut().project().st.poll_next(cx)) {
226 Some(Some(notification)) => return Poll::Ready(Some(notification)),
227 Some(None) => {}
228 None => return Poll::Ready(None),
229 }
230 }
231 }
232}
233
234pub type ForkChoiceStream<T> = WatchValueStream<T>;
236
237#[derive(Debug, Deref, DerefMut)]
239pub struct PersistedBlockNotifications(pub watch::Receiver<Option<BlockNumHash>>);
240
241pub trait PersistedBlockSubscriptions: Send + Sync {
243 fn subscribe_persisted_block(&self) -> PersistedBlockNotifications;
245
246 fn persisted_block_stream(&self) -> WatchValueStream<BlockNumHash> {
248 WatchValueStream::new(self.subscribe_persisted_block().0)
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use alloy_consensus::{BlockBody, SignableTransaction, TxLegacy};
256 use alloy_primitives::{b256, Signature, B256};
257 use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
258 use reth_execution_types::ExecutionOutcome;
259 use reth_primitives_traits::SealedBlock;
260 use std::collections::BTreeMap;
261
262 #[test]
263 fn test_commit_notification() {
264 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
265 let block1_hash = B256::new([0x01; 32]);
266 let block2_hash = B256::new([0x02; 32]);
267
268 let mut block1 = block.clone();
269 block1.set_block_number(1);
270 block1.set_hash(block1_hash);
271
272 let mut block2 = block;
273 block2.set_block_number(2);
274 block2.set_hash(block2_hash);
275
276 let chain: Arc<Chain> = Arc::new(Chain::new(
277 vec![block1.clone(), block2.clone()],
278 ExecutionOutcome::default(),
279 BTreeMap::new(),
280 ));
281
282 let notification = CanonStateNotification::Commit { new: chain.clone() };
284
285 assert_eq!(notification.committed(), chain);
287
288 assert!(notification.reverted().is_none());
290
291 assert_eq!(*notification.tip(), block2);
293 }
294
295 #[test]
296 fn test_reorg_notification() {
297 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
298 let block1_hash = B256::new([0x01; 32]);
299 let block2_hash = B256::new([0x02; 32]);
300 let block3_hash = B256::new([0x03; 32]);
301
302 let mut block1 = block.clone();
303 block1.set_block_number(1);
304 block1.set_hash(block1_hash);
305
306 let mut block2 = block.clone();
307 block2.set_block_number(2);
308 block2.set_hash(block2_hash);
309
310 let mut block3 = block;
311 block3.set_block_number(3);
312 block3.set_hash(block3_hash);
313
314 let old_chain: Arc<Chain> = Arc::new(Chain::new(
315 vec![block1.clone()],
316 ExecutionOutcome::default(),
317 BTreeMap::new(),
318 ));
319 let new_chain = Arc::new(Chain::new(
320 vec![block2.clone(), block3.clone()],
321 ExecutionOutcome::default(),
322 BTreeMap::new(),
323 ));
324
325 let notification =
327 CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
328
329 assert_eq!(notification.reverted(), Some(old_chain));
331
332 assert_eq!(notification.committed(), new_chain);
334
335 assert_eq!(*notification.tip(), block3);
337 }
338
339 #[test]
340 fn test_block_receipts_commit() {
341 let mut body = BlockBody::<TransactionSigned>::default();
343
344 let block1_hash = B256::new([0x01; 32]);
346 let block2_hash = B256::new([0x02; 32]);
347
348 let tx = TxLegacy::default().into_signed(Signature::test_signature()).into();
350 body.transactions.push(tx);
351
352 let block = SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
353 SealedHeader::seal_slow(alloy_consensus::Header::default()),
354 body,
355 )
356 .try_recover()
357 .unwrap();
358
359 let mut block1 = block.clone();
361 block1.set_block_number(1);
362 block1.set_hash(block1_hash);
363
364 let mut block2 = block;
366 block2.set_block_number(2);
367 block2.set_hash(block2_hash);
368
369 let receipt1 = Receipt {
371 tx_type: TxType::Legacy,
372 cumulative_gas_used: 12345,
373 logs: vec![],
374 success: true,
375 };
376
377 let receipts = vec![vec![receipt1.clone()]];
379
380 let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
382
383 let new_chain: Arc<Chain> = Arc::new(Chain::new(
385 vec![block1.clone(), block2.clone()],
386 execution_outcome,
387 BTreeMap::new(),
388 ));
389
390 let notification = CanonStateNotification::Commit { new: new_chain };
392
393 let block_receipts = notification.block_receipts();
395
396 assert_eq!(block_receipts.len(), 1);
398
399 assert_eq!(
401 block_receipts[0].0,
402 BlockReceipts {
403 block: block1.num_hash(),
404 timestamp: block1.timestamp,
405 tx_receipts: vec![(
406 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
408 receipt1
409 )]
410 }
411 );
412
413 assert!(!block_receipts[0].1);
415 }
416
417 #[test]
418 fn test_block_receipts_reorg() {
419 let mut body = BlockBody::<TransactionSigned>::default();
421 body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
422 let mut old_block1 =
423 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
424 SealedHeader::seal_slow(alloy_consensus::Header::default()),
425 body,
426 )
427 .try_recover()
428 .unwrap();
429 old_block1.set_block_number(1);
430 old_block1.set_hash(B256::new([0x01; 32]));
431
432 let old_receipt = Receipt {
434 tx_type: TxType::Legacy,
435 cumulative_gas_used: 54321,
436 logs: vec![],
437 success: false,
438 };
439 let old_receipts = vec![vec![old_receipt.clone()]];
440
441 let old_execution_outcome =
442 ExecutionOutcome { receipts: old_receipts, ..Default::default() };
443
444 let old_chain: Arc<Chain> =
446 Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, BTreeMap::new()));
447
448 let mut body = BlockBody::<TransactionSigned>::default();
450 body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
451 let mut new_block1 =
452 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
453 SealedHeader::seal_slow(alloy_consensus::Header::default()),
454 body,
455 )
456 .try_recover()
457 .unwrap();
458 new_block1.set_block_number(2);
459 new_block1.set_hash(B256::new([0x02; 32]));
460
461 let new_receipt = Receipt {
463 tx_type: TxType::Legacy,
464 cumulative_gas_used: 12345,
465 logs: vec![],
466 success: true,
467 };
468 let new_receipts = vec![vec![new_receipt.clone()]];
469
470 let new_execution_outcome =
471 ExecutionOutcome { receipts: new_receipts, ..Default::default() };
472
473 let new_chain =
475 Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, BTreeMap::new()));
476
477 let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
479
480 let block_receipts = notification.block_receipts();
482
483 assert_eq!(block_receipts.len(), 2);
485
486 assert_eq!(
488 block_receipts[0].0,
489 BlockReceipts {
490 block: old_block1.num_hash(),
491 timestamp: old_block1.timestamp,
492 tx_receipts: vec![(
493 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
495 old_receipt
496 )]
497 }
498 );
499 assert!(block_receipts[0].1);
501
502 assert_eq!(
505 block_receipts[1].0,
506 BlockReceipts {
507 block: new_block1.num_hash(),
508 timestamp: new_block1.timestamp,
509 tx_receipts: vec![(
510 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
512 new_receipt
513 )]
514 }
515 );
516 assert!(!block_receipts[1].1);
518 }
519}