Skip to main content

reth_consensus_debug_client/
client.rs

1use alloy_primitives::B256;
2use reth_node_api::{ConsensusEngineHandle, ExecutionPayload, PayloadTypes};
3use reth_tracing::tracing::warn;
4use ringbuffer::{AllocRingBuffer, RingBuffer};
5use std::future::Future;
6use tokio::sync::mpsc;
7
8/// Supplies consensus client with new execution payloads sent in `tx` and a callback to find
9/// specific payloads by number to fetch past finalized and safe block hashes.
10#[auto_impl::auto_impl(&, Arc, Box)]
11pub trait PayloadProvider: Send + Sync + 'static {
12    /// The execution payload data type.
13    type ExecutionData: ExecutionPayload;
14
15    /// Runs a provider to send new execution payloads to the given sender.
16    ///
17    /// Note: This is expected to be spawned in a separate task, and as such it should ignore
18    /// errors.
19    fn subscribe_payloads(
20        &self,
21        tx: mpsc::Sender<Self::ExecutionData>,
22    ) -> impl Future<Output = ()> + Send;
23
24    /// Get a past execution payload by block number.
25    fn get_payload(
26        &self,
27        block_number: u64,
28    ) -> impl Future<Output = eyre::Result<Self::ExecutionData>> + Send;
29
30    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
31    /// started more recently than `offset`), fetch it using `get_payload`.
32    fn get_or_fetch_previous_block(
33        &self,
34        previous_block_hashes: &AllocRingBuffer<B256>,
35        current_block_number: u64,
36        offset: usize,
37    ) -> impl Future<Output = eyre::Result<B256>> + Send {
38        async move {
39            if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
40                return Ok(hash);
41            }
42
43            // Return zero hash if the chain isn't long enough to have the block at the offset.
44            let previous_block_number = match current_block_number.checked_sub(offset as u64) {
45                Some(number) => number,
46                None => return Ok(B256::default()),
47            };
48            let payload = self.get_payload(previous_block_number).await?;
49            Ok(payload.block_hash())
50        }
51    }
52}
53
54/// Debug consensus client that sends FCUs and new payloads using recent payloads from an external
55/// provider like Etherscan or an RPC endpoint.
56#[derive(Debug)]
57pub struct DebugConsensusClient<P: PayloadProvider, T: PayloadTypes> {
58    /// Handle to execution client.
59    engine_handle: ConsensusEngineHandle<T>,
60    /// Provider to get consensus payloads from.
61    block_provider: P,
62}
63
64impl<P: PayloadProvider, T: PayloadTypes> DebugConsensusClient<P, T> {
65    /// Create a new debug consensus client with the given handle to execution
66    /// client and block provider.
67    pub const fn new(engine_handle: ConsensusEngineHandle<T>, block_provider: P) -> Self {
68        Self { engine_handle, block_provider }
69    }
70}
71
72impl<P, T> DebugConsensusClient<P, T>
73where
74    P: PayloadProvider<ExecutionData = T::ExecutionData> + Clone,
75    T: PayloadTypes,
76{
77    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
78    /// payloads.
79    pub async fn run(self) {
80        let mut previous_block_hashes = AllocRingBuffer::new(65);
81        let mut payload_stream = {
82            let (tx, rx) = mpsc::channel::<P::ExecutionData>(64);
83            let block_provider = self.block_provider.clone();
84            tokio::spawn(async move {
85                block_provider.subscribe_payloads(tx).await;
86            });
87            rx
88        };
89
90        while let Some(payload) = payload_stream.recv().await {
91            let block_hash = payload.block_hash();
92            let block_number = payload.block_number();
93
94            previous_block_hashes.enqueue(block_hash);
95
96            // Send new events to execution client
97            let _ = self.engine_handle.new_payload(payload).await;
98
99            // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
100            // finalized block hashes.
101            let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
102                &previous_block_hashes,
103                block_number,
104                32,
105            );
106            let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
107                &previous_block_hashes,
108                block_number,
109                64,
110            );
111            let (safe_block_hash, finalized_block_hash) =
112                tokio::join!(safe_block_hash, finalized_block_hash);
113            let (safe_block_hash, finalized_block_hash) = match (
114                safe_block_hash,
115                finalized_block_hash,
116            ) {
117                (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
118                    (safe_block_hash, finalized_block_hash)
119                }
120                (safe_block_hash, finalized_block_hash) => {
121                    warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
122                    continue;
123                }
124            };
125            let state = alloy_rpc_types_engine::ForkchoiceState {
126                head_block_hash: block_hash,
127                safe_block_hash,
128                finalized_block_hash,
129            };
130            let _ = self.engine_handle.fork_choice_updated(state, None).await;
131        }
132    }
133}
134
135/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
136///
137/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
138fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
139    buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn test_get_hash_at_offset() {
148        let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
149
150        // Empty buffer returns None for any offset
151        assert_eq!(get_hash_at_offset(&buffer, 0), None);
152        assert_eq!(get_hash_at_offset(&buffer, 1), None);
153
154        // Push hashes 0..65
155        for i in 0..65u8 {
156            buffer.enqueue(B256::with_last_byte(i));
157        }
158
159        // offset=0 should return the most recent (64)
160        assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
161
162        // offset=32 (safe block) should return hash 32
163        assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
164
165        // offset=64 (finalized block) should return hash 0 (the oldest)
166        assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
167
168        // offset=65 exceeds buffer, should return None
169        assert_eq!(get_hash_at_offset(&buffer, 65), None);
170    }
171
172    #[test]
173    fn test_get_hash_at_offset_insufficient_entries() {
174        let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
175
176        // With only 1 entry, only offset=0 works
177        buffer.enqueue(B256::with_last_byte(1));
178        assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
179        assert_eq!(get_hash_at_offset(&buffer, 1), None);
180        assert_eq!(get_hash_at_offset(&buffer, 32), None);
181        assert_eq!(get_hash_at_offset(&buffer, 64), None);
182
183        // With 33 entries, offset=32 works but offset=64 doesn't
184        for i in 2..=33u8 {
185            buffer.enqueue(B256::with_last_byte(i));
186        }
187        assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
188        assert_eq!(get_hash_at_offset(&buffer, 64), None);
189    }
190}