1use alloy_eips::eip2718::Encodable2718;
4use derive_more::{Deref, DerefMut};
5use reth_execution_types::{BlockReceipts, Chain};
6use reth_primitives_traits::{NodePrimitives, RecoveredBlock, SealedHeader};
7use reth_storage_api::NodePrimitivesProvider;
8use std::{
9 pin::Pin,
10 sync::Arc,
11 task::{ready, Context, Poll},
12};
13use tokio::sync::{broadcast, watch};
14use tokio_stream::{
15 wrappers::{BroadcastStream, WatchStream},
16 Stream,
17};
18use tracing::debug;
19
20pub type CanonStateNotifications<N = reth_ethereum_primitives::EthPrimitives> =
22 broadcast::Receiver<CanonStateNotification<N>>;
23
24pub type CanonStateNotificationSender<N = reth_ethereum_primitives::EthPrimitives> =
26 broadcast::Sender<CanonStateNotification<N>>;
27
28pub trait CanonStateSubscriptions: NodePrimitivesProvider + Send + Sync {
30 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives>;
34
35 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
37 CanonStateNotificationStream {
38 st: BroadcastStream::new(self.subscribe_to_canonical_state()),
39 }
40 }
41}
42
43impl<T: CanonStateSubscriptions> CanonStateSubscriptions for &T {
44 fn subscribe_to_canonical_state(&self) -> CanonStateNotifications<Self::Primitives> {
45 (*self).subscribe_to_canonical_state()
46 }
47
48 fn canonical_state_stream(&self) -> CanonStateNotificationStream<Self::Primitives> {
49 (*self).canonical_state_stream()
50 }
51}
52
53#[derive(Debug)]
55#[pin_project::pin_project]
56pub struct CanonStateNotificationStream<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives>
57{
58 #[pin]
59 st: BroadcastStream<CanonStateNotification<N>>,
60}
61
62impl<N: NodePrimitives> Stream for CanonStateNotificationStream<N> {
63 type Item = CanonStateNotification<N>;
64
65 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
66 loop {
67 return match ready!(self.as_mut().project().st.poll_next(cx)) {
68 Some(Ok(notification)) => Poll::Ready(Some(notification)),
69 Some(Err(err)) => {
70 debug!(%err, "canonical state notification stream lagging behind");
71 continue
72 }
73 None => Poll::Ready(None),
74 }
75 }
76 }
77}
78
79#[derive(Clone, Debug, PartialEq, Eq)]
84pub enum CanonStateNotification<N: NodePrimitives = reth_ethereum_primitives::EthPrimitives> {
85 Commit {
87 new: Arc<Chain<N>>,
89 },
90 Reorg {
97 old: Arc<Chain<N>>,
99 new: Arc<Chain<N>>,
104 },
105}
106
107impl<N: NodePrimitives> CanonStateNotification<N> {
108 pub fn reverted(&self) -> Option<Arc<Chain<N>>> {
110 match self {
111 Self::Commit { .. } => None,
112 Self::Reorg { old, .. } => Some(old.clone()),
113 }
114 }
115
116 pub fn committed(&self) -> Arc<Chain<N>> {
118 match self {
119 Self::Commit { new } | Self::Reorg { new, .. } => new.clone(),
120 }
121 }
122
123 pub fn tip(&self) -> &RecoveredBlock<N::Block> {
128 match self {
129 Self::Commit { new } | Self::Reorg { new, .. } => new.tip(),
130 }
131 }
132
133 pub fn block_receipts(&self) -> Vec<(BlockReceipts<N::Receipt>, bool)>
139 where
140 N::SignedTx: Encodable2718,
141 {
142 let mut receipts = Vec::new();
143
144 if let Some(old) = self.reverted() {
146 receipts
147 .extend(old.receipts_with_attachment().into_iter().map(|receipt| (receipt, true)));
148 }
149 receipts.extend(
151 self.committed().receipts_with_attachment().into_iter().map(|receipt| (receipt, false)),
152 );
153 receipts
154 }
155}
156
157#[derive(Debug, Deref, DerefMut)]
159pub struct ForkChoiceNotifications<T = alloy_consensus::Header>(
160 pub watch::Receiver<Option<SealedHeader<T>>>,
161);
162
163pub trait ForkChoiceSubscriptions: Send + Sync {
166 type Header: Clone + Send + Sync + 'static;
168
169 fn subscribe_safe_block(&self) -> ForkChoiceNotifications<Self::Header>;
171
172 fn subscribe_finalized_block(&self) -> ForkChoiceNotifications<Self::Header>;
174
175 fn safe_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
177 ForkChoiceStream::new(self.subscribe_safe_block().0)
178 }
179
180 fn finalized_block_stream(&self) -> ForkChoiceStream<SealedHeader<Self::Header>> {
182 ForkChoiceStream::new(self.subscribe_finalized_block().0)
183 }
184}
185
186#[derive(Debug)]
188#[pin_project::pin_project]
189pub struct ForkChoiceStream<T> {
190 #[pin]
191 st: WatchStream<Option<T>>,
192}
193
194impl<T: Clone + Sync + Send + 'static> ForkChoiceStream<T> {
195 pub fn new(rx: watch::Receiver<Option<T>>) -> Self {
197 Self { st: WatchStream::from_changes(rx) }
198 }
199}
200
201impl<T: Clone + Sync + Send + 'static> Stream for ForkChoiceStream<T> {
202 type Item = T;
203
204 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205 loop {
206 match ready!(self.as_mut().project().st.poll_next(cx)) {
207 Some(Some(notification)) => return Poll::Ready(Some(notification)),
208 Some(None) => {}
209 None => return Poll::Ready(None),
210 }
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use alloy_consensus::BlockBody;
219 use alloy_primitives::{b256, B256};
220 use reth_ethereum_primitives::{Receipt, TransactionSigned, TxType};
221 use reth_execution_types::ExecutionOutcome;
222 use reth_primitives_traits::SealedBlock;
223
224 #[test]
225 fn test_commit_notification() {
226 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
227 let block1_hash = B256::new([0x01; 32]);
228 let block2_hash = B256::new([0x02; 32]);
229
230 let mut block1 = block.clone();
231 block1.set_block_number(1);
232 block1.set_hash(block1_hash);
233
234 let mut block2 = block;
235 block2.set_block_number(2);
236 block2.set_hash(block2_hash);
237
238 let chain: Arc<Chain> = Arc::new(Chain::new(
239 vec![block1.clone(), block2.clone()],
240 ExecutionOutcome::default(),
241 None,
242 ));
243
244 let notification = CanonStateNotification::Commit { new: chain.clone() };
246
247 assert_eq!(notification.committed(), chain);
249
250 assert!(notification.reverted().is_none());
252
253 assert_eq!(*notification.tip(), block2);
255 }
256
257 #[test]
258 fn test_reorg_notification() {
259 let block: RecoveredBlock<reth_ethereum_primitives::Block> = Default::default();
260 let block1_hash = B256::new([0x01; 32]);
261 let block2_hash = B256::new([0x02; 32]);
262 let block3_hash = B256::new([0x03; 32]);
263
264 let mut block1 = block.clone();
265 block1.set_block_number(1);
266 block1.set_hash(block1_hash);
267
268 let mut block2 = block.clone();
269 block2.set_block_number(2);
270 block2.set_hash(block2_hash);
271
272 let mut block3 = block;
273 block3.set_block_number(3);
274 block3.set_hash(block3_hash);
275
276 let old_chain: Arc<Chain> =
277 Arc::new(Chain::new(vec![block1.clone()], ExecutionOutcome::default(), None));
278 let new_chain = Arc::new(Chain::new(
279 vec![block2.clone(), block3.clone()],
280 ExecutionOutcome::default(),
281 None,
282 ));
283
284 let notification =
286 CanonStateNotification::Reorg { old: old_chain.clone(), new: new_chain.clone() };
287
288 assert_eq!(notification.reverted(), Some(old_chain));
290
291 assert_eq!(notification.committed(), new_chain);
293
294 assert_eq!(*notification.tip(), block3);
296 }
297
298 #[test]
299 fn test_block_receipts_commit() {
300 let mut body = BlockBody::<TransactionSigned>::default();
302
303 let block1_hash = B256::new([0x01; 32]);
305 let block2_hash = B256::new([0x02; 32]);
306
307 let tx = TransactionSigned::default();
309 body.transactions.push(tx);
310
311 let block = SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
312 SealedHeader::seal_slow(alloy_consensus::Header::default()),
313 body,
314 )
315 .try_recover()
316 .unwrap();
317
318 let mut block1 = block.clone();
320 block1.set_block_number(1);
321 block1.set_hash(block1_hash);
322
323 let mut block2 = block;
325 block2.set_block_number(2);
326 block2.set_hash(block2_hash);
327
328 let receipt1 = Receipt {
330 tx_type: TxType::Legacy,
331 cumulative_gas_used: 12345,
332 logs: vec![],
333 success: true,
334 };
335
336 let receipts = vec![vec![receipt1.clone()]];
338
339 let execution_outcome = ExecutionOutcome { receipts, ..Default::default() };
341
342 let new_chain: Arc<Chain> =
344 Arc::new(Chain::new(vec![block1.clone(), block2.clone()], execution_outcome, None));
345
346 let notification = CanonStateNotification::Commit { new: new_chain };
348
349 let block_receipts = notification.block_receipts();
351
352 assert_eq!(block_receipts.len(), 1);
354
355 assert_eq!(
357 block_receipts[0].0,
358 BlockReceipts {
359 block: block1.num_hash(),
360 tx_receipts: vec![(
361 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
363 receipt1
364 )]
365 }
366 );
367
368 assert!(!block_receipts[0].1);
370 }
371
372 #[test]
373 fn test_block_receipts_reorg() {
374 let mut body = BlockBody::<TransactionSigned>::default();
376 body.transactions.push(TransactionSigned::default());
377 let mut old_block1 =
378 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
379 SealedHeader::seal_slow(alloy_consensus::Header::default()),
380 body,
381 )
382 .try_recover()
383 .unwrap();
384 old_block1.set_block_number(1);
385 old_block1.set_hash(B256::new([0x01; 32]));
386
387 let old_receipt = Receipt {
389 tx_type: TxType::Legacy,
390 cumulative_gas_used: 54321,
391 logs: vec![],
392 success: false,
393 };
394 let old_receipts = vec![vec![old_receipt.clone()]];
395
396 let old_execution_outcome =
397 ExecutionOutcome { receipts: old_receipts, ..Default::default() };
398
399 let old_chain: Arc<Chain> =
401 Arc::new(Chain::new(vec![old_block1.clone()], old_execution_outcome, None));
402
403 let mut body = BlockBody::<TransactionSigned>::default();
405 body.transactions.push(TransactionSigned::default());
406 let mut new_block1 =
407 SealedBlock::<alloy_consensus::Block<TransactionSigned>>::from_sealed_parts(
408 SealedHeader::seal_slow(alloy_consensus::Header::default()),
409 body,
410 )
411 .try_recover()
412 .unwrap();
413 new_block1.set_block_number(2);
414 new_block1.set_hash(B256::new([0x02; 32]));
415
416 let new_receipt = Receipt {
418 tx_type: TxType::Legacy,
419 cumulative_gas_used: 12345,
420 logs: vec![],
421 success: true,
422 };
423 let new_receipts = vec![vec![new_receipt.clone()]];
424
425 let new_execution_outcome =
426 ExecutionOutcome { receipts: new_receipts, ..Default::default() };
427
428 let new_chain = Arc::new(Chain::new(vec![new_block1.clone()], new_execution_outcome, None));
430
431 let notification = CanonStateNotification::Reorg { old: old_chain, new: new_chain };
433
434 let block_receipts = notification.block_receipts();
436
437 assert_eq!(block_receipts.len(), 2);
439
440 assert_eq!(
442 block_receipts[0].0,
443 BlockReceipts {
444 block: old_block1.num_hash(),
445 tx_receipts: vec![(
446 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
448 old_receipt
449 )]
450 }
451 );
452 assert!(block_receipts[0].1);
454
455 assert_eq!(
458 block_receipts[1].0,
459 BlockReceipts {
460 block: new_block1.num_hash(),
461 tx_receipts: vec![(
462 b256!("0x20b5378c6fe992c118b557d2f8e8bbe0b7567f6fe5483a8f0f1c51e93a9d91ab"),
464 new_receipt
465 )]
466 }
467 );
468 assert!(!block_receipts[1].1);
470 }
471}