1use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9 pin::Pin,
10 sync::Arc,
11 task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15 wrappers::{BroadcastStream, WatchStream},
16 Stream,
17};
18use tracing::debug;
19
20pub type CanonStateNotifications<N = reth_ethereum_primitives::EthPrimitives> =
22 broadcast::Receiver<CanonStateNotification<N>>;
23
24pub type CanonStateNotificationSender<N = reth_ethereum_primitives::EthPrimitives> =
26 broadcast::Sender<CanonStateNotification<N>>;
27
28pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37 CanonStateNotificationStream {
38 st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39 }
40 }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45 (*self).subscribe_to_canonical_state()
46 }
47
48 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49 (*self).canonical_state_stream()
50 }
51}
52
53#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives>
57{
58 #[pin]
59 st: BroadcastStream<CanonStateNotification<N>>,
60}
61
62impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
63 type Item = CanonStateNotification<N>;
64
65 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 loop {
67 return match ready!(self.as_mut().project().st.poll_next(cx)) {
68 Some(Ok(notification)) => Poll::Ready(Some(notification)),
69 Some(Err(err)) => {
70 debug!(%err, "canonical state notification stream lagging behind");
71 continue
72 }
73 None => Poll::Ready(None),
74 }
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(feature = "serde", serde(bound = ""))]
86pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
87 Commit {
89 new: Arc<Chain<N>>,
91 },
92 Reorg {
99 old: Arc<Chain<N>>,
101 new: Arc<Chain<N>>,
106 },
107}
108
109impl<N: NodePrimitives> CanonStateNotification<N> {
110 pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
112 match self {
113 Self::Commit { .. } => None,
114 Self::Reorg { old, .. } => Some(old.clone()),
115 }
116 }
117
118 pub fn committed(&self) -> Arc<Chain<N>> {
120 match self {
121 Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
122 }
123 }
124
125 pub fn tip(&self) -> &RecoveredBlock<N::Block> {
134 match self {
135 Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
136 }
137 }
138
139 pub fn tip_checked(&self) -> Option<&RecoveredBlock<N::Block>> {
144 match self {
145 Self::Commit { new } | Self::Reorg { new, .. } => {
146 if new.is_empty() {
147 None
148 } else {
149 Some(new.tip())
150 }
151 }
152 }
153 }
154
155 pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
161 where
162 N::SignedTx: Encodable2718,
163 {
164 let mut receipts = Vec::new();
165
166 if let Some(old) = self.reverted() {
168 receipts
169 .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
170 }
171 receipts.extend(
173 self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
174 );
175 receipts
176 }
177}
178
179#[derive(Debug, Deref, DerefMut)]
181pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
182 pub watch::Receiver<Option<SealedHeader<T>>>,
183);
184
185pub trait ForkChoiceSubscriptions: Send + Sync {
188 type Header: Clone + Send + Sync + 'static;
190
191 fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
193
194 fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
196
197 fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
199 ForkChoiceStream::new(self.subscribe_safe_block().0)
200 }
201
202 fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
204 ForkChoiceStream::new(self.subscribe_finalized_block().0)
205 }
206}
207
208#[derive(Debug)]
210#[pin_project::pin_project]
211pub struct WatchValueStream<T> {
212 #[pin]
213 st: WatchStream<Option<T>>,
214}
215
216impl<T: Clone + Sync + Send + 'static> WatchValueStream<T> {
217 pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
219 Self { st: WatchStream::from_changes(rx) }
220 }
221}
222
223impl<T: Clone + Sync + Send + 'static> Stream for WatchValueStream<T> {
224 type Item = T;
225
226 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
227 loop {
228 match ready!(self.as_mut().project().st.poll_next(cx)) {
229 Some(Some(notification)) => return Poll::Ready(Some(notification)),
230 Some(None) => {}
231 None => return Poll::Ready(None),
232 }
233 }
234 }
235}
236
237pub type ForkChoiceStream<T> = WatchValueStream<T>;
239
240#[derive(Debug, Deref, DerefMut)]
242pub struct PersistedBlockNotifications(pub watch::Receiver<Option<BlockNumHash>>);
243
244pub trait PersistedBlockSubscriptions: Send + Sync {
246 fn subscribe_persisted_block(&self) -> PersistedBlockNotifications;
248
249 fn persisted_block_stream(&self) -> WatchValueStream<BlockNumHash> {
251 WatchValueStream::new(self.subscribe_persisted_block().0)
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use alloy_consensus::{BlockBody, SignableTransaction, TxLegacy};
259 use alloy_primitives::{b256, Signature, B256};
260 use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
261 use reth_execution_types::ExecutionOutcome;
262 use reth_primitives_traits::SealedBlock;
263 use std::collections::BTreeMap;
264
265 #[test]
266 fn test_commit_notification() {
267 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
268 let block1_hash = B256::new([0x01; 32]);
269 let block2_hash = B256::new([0x02; 32]);
270
271 let mut block1 = block.clone();
272 block1.set_block_number(1);
273 block1.set_hash(block1_hash);
274
275 let mut block2 = block;
276 block2.set_block_number(2);
277 block2.set_hash(block2_hash);
278
279 let chain: Arc<Chain> = Arc::new(Chain::new(
280 vec![block1.clone(), block2.clone()],
281 ExecutionOutcome::default(),
282 BTreeMap::new(),
283 ));
284
285 let notification = CanonStateNotification::Commit { new: chain.clone() };
287
288 assert_eq!(notification.committed(), chain);
290
291 assert!(notification.reverted().is_none());
293
294 assert_eq!(*notification.tip(), block2);
296 }
297
298 #[test]
299 fn test_reorg_notification() {
300 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
301 let block1_hash = B256::new([0x01; 32]);
302 let block2_hash = B256::new([0x02; 32]);
303 let block3_hash = B256::new([0x03; 32]);
304
305 let mut block1 = block.clone();
306 block1.set_block_number(1);
307 block1.set_hash(block1_hash);
308
309 let mut block2 = block.clone();
310 block2.set_block_number(2);
311 block2.set_hash(block2_hash);
312
313 let mut block3 = block;
314 block3.set_block_number(3);
315 block3.set_hash(block3_hash);
316
317 let old_chain: Arc<Chain> = Arc::new(Chain::new(
318 vec![block1.clone()],
319 ExecutionOutcome::default(),
320 BTreeMap::new(),
321 ));
322 let new_chain = Arc::new(Chain::new(
323 vec![block2.clone(), block3.clone()],
324 ExecutionOutcome::default(),
325 BTreeMap::new(),
326 ));
327
328 let notification =
330 CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
331
332 assert_eq!(notification.reverted(), Some(old_chain));
334
335 assert_eq!(notification.committed(), new_chain);
337
338 assert_eq!(*notification.tip(), block3);
340 }
341
342 #[test]
343 fn test_block_receipts_commit() {
344 let mut body = BlockBody::<TransactionSigned>::default();
346
347 let block1_hash = B256::new([0x01; 32]);
349 let block2_hash = B256::new([0x02; 32]);
350
351 let tx = TxLegacy::default().into_signed(Signature::test_signature()).into();
353 body.transactions.push(tx);
354
355 let block = SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
356 SealedHeader::seal_slow(alloy_consensus::Header::default()),
357 body,
358 )
359 .try_recover()
360 .unwrap();
361
362 let mut block1 = block.clone();
364 block1.set_block_number(1);
365 block1.set_hash(block1_hash);
366
367 let mut block2 = block;
369 block2.set_block_number(2);
370 block2.set_hash(block2_hash);
371
372 let receipt1 = Receipt {
374 tx_type: TxType::Legacy,
375 cumulative_gas_used: 12345,
376 logs: vec![],
377 success: true,
378 };
379
380 let receipts = vec![vec![receipt1.clone()]];
382
383 let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
385
386 let new_chain: Arc<Chain> = Arc::new(Chain::new(
388 vec![block1.clone(), block2.clone()],
389 execution_outcome,
390 BTreeMap::new(),
391 ));
392
393 let notification = CanonStateNotification::Commit { new: new_chain };
395
396 let block_receipts = notification.block_receipts();
398
399 assert_eq!(block_receipts.len(), 1);
401
402 assert_eq!(
404 block_receipts[0].0,
405 BlockReceipts {
406 block: block1.num_hash(),
407 timestamp: block1.timestamp,
408 tx_receipts: vec![(
409 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
411 receipt1
412 )]
413 }
414 );
415
416 assert!(!block_receipts[0].1);
418 }
419
420 #[test]
421 fn test_block_receipts_reorg() {
422 let mut body = BlockBody::<TransactionSigned>::default();
424 body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
425 let mut old_block1 =
426 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
427 SealedHeader::seal_slow(alloy_consensus::Header::default()),
428 body,
429 )
430 .try_recover()
431 .unwrap();
432 old_block1.set_block_number(1);
433 old_block1.set_hash(B256::new([0x01; 32]));
434
435 let old_receipt = Receipt {
437 tx_type: TxType::Legacy,
438 cumulative_gas_used: 54321,
439 logs: vec![],
440 success: false,
441 };
442 let old_receipts = vec![vec![old_receipt.clone()]];
443
444 let old_execution_outcome =
445 ExecutionOutcome { receipts: old_receipts, ..Default::default() };
446
447 let old_chain: Arc<Chain> =
449 Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, BTreeMap::new()));
450
451 let mut body = BlockBody::<TransactionSigned>::default();
453 body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
454 let mut new_block1 =
455 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
456 SealedHeader::seal_slow(alloy_consensus::Header::default()),
457 body,
458 )
459 .try_recover()
460 .unwrap();
461 new_block1.set_block_number(2);
462 new_block1.set_hash(B256::new([0x02; 32]));
463
464 let new_receipt = Receipt {
466 tx_type: TxType::Legacy,
467 cumulative_gas_used: 12345,
468 logs: vec![],
469 success: true,
470 };
471 let new_receipts = vec![vec![new_receipt.clone()]];
472
473 let new_execution_outcome =
474 ExecutionOutcome { receipts: new_receipts, ..Default::default() };
475
476 let new_chain =
478 Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, BTreeMap::new()));
479
480 let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
482
483 let block_receipts = notification.block_receipts();
485
486 assert_eq!(block_receipts.len(), 2);
488
489 assert_eq!(
491 block_receipts[0].0,
492 BlockReceipts {
493 block: old_block1.num_hash(),
494 timestamp: old_block1.timestamp,
495 tx_receipts: vec![(
496 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
498 old_receipt
499 )]
500 }
501 );
502 assert!(block_receipts[0].1);
504
505 assert_eq!(
508 block_receipts[1].0,
509 BlockReceipts {
510 block: new_block1.num_hash(),
511 timestamp: new_block1.timestamp,
512 tx_receipts: vec![(
513 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
515 new_receipt
516 )]
517 }
518 );
519 assert!(!block_receipts[1].1);
521 }
522}