reth_node_events/
cl.rs
1use alloy_consensus::Header;
4use futures::Stream;
5use reth_storage_api::CanonChainTracker;
6use std::{
7 fmt,
8 pin::Pin,
9 task::{ready, Context, Poll},
10 time::Duration,
11};
12use tokio::time::{Instant, Interval};
13
14const CHECK_INTERVAL: Duration = Duration::from_secs(300);
16
17const NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD: Duration = Duration::from_secs(120);
20
21pub struct ConsensusLayerHealthEvents<H = Header> {
23 interval: Interval,
24 canon_chain: Box<dyn CanonChainTracker<Header = H>>,
25}
26
27impl<H> fmt::Debug for ConsensusLayerHealthEvents<H> {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("ConsensusLayerHealthEvents").field("interval", &self.interval).finish()
30 }
31}
32
33impl<H> ConsensusLayerHealthEvents<H> {
34 pub fn new(canon_chain: Box<dyn CanonChainTracker<Header = H>>) -> Self {
36 let interval = tokio::time::interval_at(Instant::now() + CHECK_INTERVAL, CHECK_INTERVAL);
38 Self { interval, canon_chain }
39 }
40}
41
42impl<H: Send + Sync> Stream for ConsensusLayerHealthEvents<H> {
43 type Item = ConsensusLayerHealthEvent;
44
45 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 let this = self.get_mut();
47
48 loop {
49 ready!(this.interval.poll_tick(cx));
50
51 if let Some(fork_choice) = this.canon_chain.last_received_update_timestamp() {
52 if fork_choice.elapsed() <= NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD {
53 continue
55 }
56 return Poll::Ready(Some(
58 ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(
59 fork_choice.elapsed(),
60 ),
61 ))
62 }
63
64 return Poll::Ready(Some(ConsensusLayerHealthEvent::NeverSeen))
66 }
67 }
68}
69
70#[derive(Clone, Copy, Debug)]
73pub enum ConsensusLayerHealthEvent {
74 NeverSeen,
76 HasNotBeenSeenForAWhile(Duration),
78 NeverReceivedUpdates,
80 HaveNotReceivedUpdatesForAWhile(Duration),
82}