1use alloy_eips::eip2718::Encodable2718;
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9 pin::Pin,
10 sync::Arc,
11 task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15 wrappers::{BroadcastStream, WatchStream},
16 Stream,
17};
18use tracing::debug;
19
20pub type CanonStateNotifications<N = reth_ethereum_primitives::EthPrimitives> =
22 broadcast::Receiver<CanonStateNotification<N>>;
23
24pub type CanonStateNotificationSender<N = reth_ethereum_primitives::EthPrimitives> =
26 broadcast::Sender<CanonStateNotification<N>>;
27
28pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37 CanonStateNotificationStream {
38 st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39 }
40 }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45 (*self).subscribe_to_canonical_state()
46 }
47
48 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49 (*self).canonical_state_stream()
50 }
51}
52
53#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives>
57{
58 #[pin]
59 st: BroadcastStream<CanonStateNotification<N>>,
60}
61
62impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
63 type Item = CanonStateNotification<N>;
64
65 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 loop {
67 return match ready!(self.as_mut().project().st.poll_next(cx)) {
68 Some(Ok(notification)) => Poll::Ready(Some(notification)),
69 Some(Err(err)) => {
70 debug!(%err, "canonical state notification stream lagging behind");
71 continue
72 }
73 None => Poll::Ready(None),
74 }
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(feature = "serde", serde(bound = ""))]
86pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
87 Commit {
89 new: Arc<Chain<N>>,
91 },
92 Reorg {
99 old: Arc<Chain<N>>,
101 new: Arc<Chain<N>>,
106 },
107}
108
109impl<N: NodePrimitives> CanonStateNotification<N> {
110 pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
112 match self {
113 Self::Commit { .. } => None,
114 Self::Reorg { old, .. } => Some(old.clone()),
115 }
116 }
117
118 pub fn committed(&self) -> Arc<Chain<N>> {
120 match self {
121 Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
122 }
123 }
124
125 pub fn tip(&self) -> &RecoveredBlock<N::Block> {
134 match self {
135 Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
136 }
137 }
138
139 pub fn tip_checked(&self) -> Option<&RecoveredBlock<N::Block>> {
144 match self {
145 Self::Commit { new } | Self::Reorg { new, .. } => {
146 if new.is_empty() {
147 None
148 } else {
149 Some(new.tip())
150 }
151 }
152 }
153 }
154
155 pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
161 where
162 N::SignedTx: Encodable2718,
163 {
164 let mut receipts = Vec::new();
165
166 if let Some(old) = self.reverted() {
168 receipts
169 .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
170 }
171 receipts.extend(
173 self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
174 );
175 receipts
176 }
177}
178
179#[derive(Debug, Deref, DerefMut)]
181pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
182 pub watch::Receiver<Option<SealedHeader<T>>>,
183);
184
185pub trait ForkChoiceSubscriptions: Send + Sync {
188 type Header: Clone + Send + Sync + 'static;
190
191 fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
193
194 fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
196
197 fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
199 ForkChoiceStream::new(self.subscribe_safe_block().0)
200 }
201
202 fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
204 ForkChoiceStream::new(self.subscribe_finalized_block().0)
205 }
206}
207
208#[derive(Debug)]
210#[pin_project::pin_project]
211pub struct ForkChoiceStream<T> {
212 #[pin]
213 st: WatchStream<Option<T>>,
214}
215
216impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
217 pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
219 Self { st: WatchStream::from_changes(rx) }
220 }
221}
222
223impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
224 type Item = T;
225
226 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
227 loop {
228 match ready!(self.as_mut().project().st.poll_next(cx)) {
229 Some(Some(notification)) => return Poll::Ready(Some(notification)),
230 Some(None) => {}
231 None => return Poll::Ready(None),
232 }
233 }
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use alloy_consensus::{BlockBody, SignableTransaction, TxLegacy};
241 use alloy_primitives::{b256, Signature, B256};
242 use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
243 use reth_execution_types::ExecutionOutcome;
244 use reth_primitives_traits::SealedBlock;
245
246 #[test]
247 fn test_commit_notification() {
248 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
249 let block1_hash = B256::new([0x01; 32]);
250 let block2_hash = B256::new([0x02; 32]);
251
252 let mut block1 = block.clone();
253 block1.set_block_number(1);
254 block1.set_hash(block1_hash);
255
256 let mut block2 = block;
257 block2.set_block_number(2);
258 block2.set_hash(block2_hash);
259
260 let chain: Arc<Chain> = Arc::new(Chain::new(
261 vec![block1.clone(), block2.clone()],
262 ExecutionOutcome::default(),
263 None,
264 ));
265
266 let notification = CanonStateNotification::Commit { new: chain.clone() };
268
269 assert_eq!(notification.committed(), chain);
271
272 assert!(notification.reverted().is_none());
274
275 assert_eq!(*notification.tip(), block2);
277 }
278
279 #[test]
280 fn test_reorg_notification() {
281 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
282 let block1_hash = B256::new([0x01; 32]);
283 let block2_hash = B256::new([0x02; 32]);
284 let block3_hash = B256::new([0x03; 32]);
285
286 let mut block1 = block.clone();
287 block1.set_block_number(1);
288 block1.set_hash(block1_hash);
289
290 let mut block2 = block.clone();
291 block2.set_block_number(2);
292 block2.set_hash(block2_hash);
293
294 let mut block3 = block;
295 block3.set_block_number(3);
296 block3.set_hash(block3_hash);
297
298 let old_chain: Arc<Chain> =
299 Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
300 let new_chain = Arc::new(Chain::new(
301 vec![block2.clone(), block3.clone()],
302 ExecutionOutcome::default(),
303 None,
304 ));
305
306 let notification =
308 CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
309
310 assert_eq!(notification.reverted(), Some(old_chain));
312
313 assert_eq!(notification.committed(), new_chain);
315
316 assert_eq!(*notification.tip(), block3);
318 }
319
320 #[test]
321 fn test_block_receipts_commit() {
322 let mut body = BlockBody::<TransactionSigned>::default();
324
325 let block1_hash = B256::new([0x01; 32]);
327 let block2_hash = B256::new([0x02; 32]);
328
329 let tx = TxLegacy::default().into_signed(Signature::test_signature()).into();
331 body.transactions.push(tx);
332
333 let block = SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
334 SealedHeader::seal_slow(alloy_consensus::Header::default()),
335 body,
336 )
337 .try_recover()
338 .unwrap();
339
340 let mut block1 = block.clone();
342 block1.set_block_number(1);
343 block1.set_hash(block1_hash);
344
345 let mut block2 = block;
347 block2.set_block_number(2);
348 block2.set_hash(block2_hash);
349
350 let receipt1 = Receipt {
352 tx_type: TxType::Legacy,
353 cumulative_gas_used: 12345,
354 logs: vec![],
355 success: true,
356 };
357
358 let receipts = vec![vec![receipt1.clone()]];
360
361 let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
363
364 let new_chain: Arc<Chain> =
366 Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
367
368 let notification = CanonStateNotification::Commit { new: new_chain };
370
371 let block_receipts = notification.block_receipts();
373
374 assert_eq!(block_receipts.len(), 1);
376
377 assert_eq!(
379 block_receipts[0].0,
380 BlockReceipts {
381 block: block1.num_hash(),
382 timestamp: block1.timestamp,
383 tx_receipts: vec![(
384 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
386 receipt1
387 )]
388 }
389 );
390
391 assert!(!block_receipts[0].1);
393 }
394
395 #[test]
396 fn test_block_receipts_reorg() {
397 let mut body = BlockBody::<TransactionSigned>::default();
399 body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
400 let mut old_block1 =
401 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
402 SealedHeader::seal_slow(alloy_consensus::Header::default()),
403 body,
404 )
405 .try_recover()
406 .unwrap();
407 old_block1.set_block_number(1);
408 old_block1.set_hash(B256::new([0x01; 32]));
409
410 let old_receipt = Receipt {
412 tx_type: TxType::Legacy,
413 cumulative_gas_used: 54321,
414 logs: vec![],
415 success: false,
416 };
417 let old_receipts = vec![vec![old_receipt.clone()]];
418
419 let old_execution_outcome =
420 ExecutionOutcome { receipts: old_receipts, ..Default::default() };
421
422 let old_chain: Arc<Chain> =
424 Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
425
426 let mut body = BlockBody::<TransactionSigned>::default();
428 body.transactions.push(TxLegacy::default().into_signed(Signature::test_signature()).into());
429 let mut new_block1 =
430 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
431 SealedHeader::seal_slow(alloy_consensus::Header::default()),
432 body,
433 )
434 .try_recover()
435 .unwrap();
436 new_block1.set_block_number(2);
437 new_block1.set_hash(B256::new([0x02; 32]));
438
439 let new_receipt = Receipt {
441 tx_type: TxType::Legacy,
442 cumulative_gas_used: 12345,
443 logs: vec![],
444 success: true,
445 };
446 let new_receipts = vec![vec![new_receipt.clone()]];
447
448 let new_execution_outcome =
449 ExecutionOutcome { receipts: new_receipts, ..Default::default() };
450
451 let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
453
454 let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
456
457 let block_receipts = notification.block_receipts();
459
460 assert_eq!(block_receipts.len(), 2);
462
463 assert_eq!(
465 block_receipts[0].0,
466 BlockReceipts {
467 block: old_block1.num_hash(),
468 timestamp: old_block1.timestamp,
469 tx_receipts: vec![(
470 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
472 old_receipt
473 )]
474 }
475 );
476 assert!(block_receipts[0].1);
478
479 assert_eq!(
482 block_receipts[1].0,
483 BlockReceipts {
484 block: new_block1.num_hash(),
485 timestamp: new_block1.timestamp,
486 tx_receipts: vec![(
487 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
489 new_receipt
490 )]
491 }
492 );
493 assert!(!block_receipts[1].1);
495 }
496}