reth_node_events/
cl.rs

1//! Events related to Consensus Layer health.
2
3use alloy_consensus::Header;
4use futures::Stream;
5use reth_storage_api::CanonChainTracker;
6use std::{
7    fmt,
8    pin::Pin,
9    task::{ready, Context, Poll},
10    time::Duration,
11};
12use tokio::time::{Instant, Interval};
13
14/// Interval of checking Consensus Layer client health.
15const CHECK_INTERVAL: Duration = Duration::from_secs(300);
16
17/// Period of not receiving fork choice updates from Consensus Layer client,
18/// after which the warning is issued.
19const NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD: Duration = Duration::from_secs(120);
20
21/// A Stream of [`ConsensusLayerHealthEvent`].
22pub struct ConsensusLayerHealthEvents<H = Header> {
23    interval: Interval,
24    canon_chain: Box<dyn CanonChainTracker<Header = H>>,
25}
26
27impl<H> fmt::Debug for ConsensusLayerHealthEvents<H> {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("ConsensusLayerHealthEvents").field("interval", &self.interval).finish()
30    }
31}
32
33impl<H> ConsensusLayerHealthEvents<H> {
34    /// Creates a new [`ConsensusLayerHealthEvents`] with the given canonical chain tracker.
35    pub fn new(canon_chain: Box<dyn CanonChainTracker<Header = H>>) -> Self {
36        // Skip the first tick to prevent the false `ConsensusLayerHealthEvent::NeverSeen` event.
37        let interval = tokio::time::interval_at(Instant::now() + CHECK_INTERVAL, CHECK_INTERVAL);
38        Self { interval, canon_chain }
39    }
40}
41
42impl<H: Send + Sync> Stream for ConsensusLayerHealthEvents<H> {
43    type Item = ConsensusLayerHealthEvent;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        let this = self.get_mut();
47
48        loop {
49            ready!(this.interval.poll_tick(cx));
50
51            if let Some(fork_choice) = this.canon_chain.last_received_update_timestamp() {
52                if fork_choice.elapsed() <= NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD {
53                    // We had an FCU, and it's recent. CL is healthy.
54                    continue
55                }
56                // We had an FCU, but it's too old.
57                return Poll::Ready(Some(
58                    ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(
59                        fork_choice.elapsed(),
60                    ),
61                ))
62            }
63
64            // We never had both FCU and transition config exchange.
65            return Poll::Ready(Some(ConsensusLayerHealthEvent::NeverSeen))
66        }
67    }
68}
69
70/// Event that is triggered when Consensus Layer health is degraded from the
71/// Execution Layer point of view.
72#[derive(Clone, Copy, Debug)]
73pub enum ConsensusLayerHealthEvent {
74    /// Consensus Layer client was never seen.
75    NeverSeen,
76    /// Consensus Layer client has not been seen for a while.
77    HasNotBeenSeenForAWhile(Duration),
78    /// Updates from the Consensus Layer client were never received.
79    NeverReceivedUpdates,
80    /// Updates from the Consensus Layer client have not been received for a while.
81    HaveNotReceivedUpdatesForAWhile(Duration),
82}