1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
use alloy_primitives::BlockNumber;
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_primitives::{BlockNumHash, SealedHeader};
use std::{
    sync::{
        atomic::{AtomicU64, Ordering},
        Arc,
    },
    time::Instant,
};
use tokio::sync::watch;

/// Tracks the chain info: canonical head, safe block, finalized block.
#[derive(Debug, Clone)]
pub struct ChainInfoTracker {
    inner: Arc<ChainInfoInner>,
}

impl ChainInfoTracker {
    /// Create a new chain info container for the given canonical head and finalized header if it
    /// exists.
    pub fn new(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
        let (finalized_block, _) = watch::channel(finalized);
        let (safe_block, _) = watch::channel(None);

        Self {
            inner: Arc::new(ChainInfoInner {
                last_forkchoice_update: RwLock::new(None),
                last_transition_configuration_exchange: RwLock::new(None),
                canonical_head_number: AtomicU64::new(head.number),
                canonical_head: RwLock::new(head),
                safe_block,
                finalized_block,
            }),
        }
    }

    /// Returns the [`ChainInfo`] for the canonical head.
    pub fn chain_info(&self) -> ChainInfo {
        let inner = self.inner.canonical_head.read();
        ChainInfo { best_hash: inner.hash(), best_number: inner.number }
    }

    /// Update the timestamp when we received a forkchoice update.
    pub fn on_forkchoice_update_received(&self) {
        self.inner.last_forkchoice_update.write().replace(Instant::now());
    }

    /// Returns the instant when we received the latest forkchoice update.
    pub fn last_forkchoice_update_received_at(&self) -> Option<Instant> {
        *self.inner.last_forkchoice_update.read()
    }

    /// Update the timestamp when we exchanged a transition configuration.
    pub fn on_transition_configuration_exchanged(&self) {
        self.inner.last_transition_configuration_exchange.write().replace(Instant::now());
    }

    /// Returns the instant when we exchanged the transition configuration last time.
    pub fn last_transition_configuration_exchanged_at(&self) -> Option<Instant> {
        *self.inner.last_transition_configuration_exchange.read()
    }

    /// Returns the canonical head of the chain.
    pub fn get_canonical_head(&self) -> SealedHeader {
        self.inner.canonical_head.read().clone()
    }

    /// Returns the safe header of the chain.
    pub fn get_safe_header(&self) -> Option<SealedHeader> {
        self.inner.safe_block.borrow().clone()
    }

    /// Returns the finalized header of the chain.
    pub fn get_finalized_header(&self) -> Option<SealedHeader> {
        self.inner.finalized_block.borrow().clone()
    }

    /// Returns the canonical head of the chain.
    #[allow(dead_code)]
    pub fn get_canonical_num_hash(&self) -> BlockNumHash {
        self.inner.canonical_head.read().num_hash()
    }

    /// Returns the canonical head of the chain.
    pub fn get_canonical_block_number(&self) -> BlockNumber {
        self.inner.canonical_head_number.load(Ordering::Relaxed)
    }

    /// Returns the safe header of the chain.
    pub fn get_safe_num_hash(&self) -> Option<BlockNumHash> {
        let h = self.inner.safe_block.borrow();
        h.as_ref().map(|h| h.num_hash())
    }

    /// Returns the finalized header of the chain.
    pub fn get_finalized_num_hash(&self) -> Option<BlockNumHash> {
        let h = self.inner.finalized_block.borrow();
        h.as_ref().map(|h| h.num_hash())
    }

    /// Sets the canonical head of the chain.
    pub fn set_canonical_head(&self, header: SealedHeader) {
        let number = header.number;
        *self.inner.canonical_head.write() = header;

        // also update the atomic number.
        self.inner.canonical_head_number.store(number, Ordering::Relaxed);
    }

    /// Sets the safe header of the chain.
    pub fn set_safe(&self, header: SealedHeader) {
        self.inner.safe_block.send_modify(|h| {
            let _ = h.replace(header);
        });
    }

    /// Sets the finalized header of the chain.
    pub fn set_finalized(&self, header: SealedHeader) {
        self.inner.finalized_block.send_modify(|h| {
            let _ = h.replace(header);
        });
    }

    /// Subscribe to the finalized block.
    pub fn subscribe_finalized_block(&self) -> watch::Receiver<Option<SealedHeader>> {
        self.inner.finalized_block.subscribe()
    }

    /// Subscribe to the safe block.
    pub fn subscribe_safe_block(&self) -> watch::Receiver<Option<SealedHeader>> {
        self.inner.safe_block.subscribe()
    }
}

/// Container type for all chain info fields
#[derive(Debug)]
struct ChainInfoInner {
    /// Timestamp when we received the last fork choice update.
    ///
    /// This is mainly used to track if we're connected to a beacon node.
    last_forkchoice_update: RwLock<Option<Instant>>,
    /// Timestamp when we exchanged the transition configuration last time.
    ///
    /// This is mainly used to track if we're connected to a beacon node.
    last_transition_configuration_exchange: RwLock<Option<Instant>>,
    /// Tracks the number of the `canonical_head`.
    canonical_head_number: AtomicU64,
    /// The canonical head of the chain.
    canonical_head: RwLock<SealedHeader>,
    /// The block that the beacon node considers safe.
    safe_block: watch::Sender<Option<SealedHeader>>,
    /// The block that the beacon node considers finalized.
    finalized_block: watch::Sender<Option<SealedHeader>>,
}