Skip to main content

reth_consensus_debug_client/
client.rs

1use alloy_consensus::Sealable;
2use alloy_primitives::B256;
3use reth_node_api::{
4    BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, ExecutionPayload, NodePrimitives,
5    PayloadTypes,
6};
7use reth_primitives_traits::{Block, SealedBlock};
8use reth_tracing::tracing::warn;
9use ringbuffer::{AllocRingBuffer, RingBuffer};
10use std::future::Future;
11use tokio::sync::mpsc;
12
13/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks
14/// by number to fetch past finalized and safe blocks.
15#[auto_impl::auto_impl(&, Arc, Box)]
16pub trait BlockProvider: Send + Sync + 'static {
17    /// The block type.
18    type Block: Block;
19
20    /// Runs a block provider to send new blocks to the given sender.
21    ///
22    /// Note: This is expected to be spawned in a separate task, and as such it should ignore
23    /// errors.
24    fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
25
26    /// Get a past block by number.
27    fn get_block(
28        &self,
29        block_number: u64,
30    ) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
31
32    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
33    /// started more recently than `offset`), fetch it using `get_block`.
34    fn get_or_fetch_previous_block(
35        &self,
36        previous_block_hashes: &AllocRingBuffer<B256>,
37        current_block_number: u64,
38        offset: usize,
39    ) -> impl Future<Output = eyre::Result<B256>> + Send {
40        async move {
41            if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
42                return Ok(hash);
43            }
44
45            // Return zero hash if the chain isn't long enough to have the block at the offset.
46            let previous_block_number = match current_block_number.checked_sub(offset as u64) {
47                Some(number) => number,
48                None => return Ok(B256::default()),
49            };
50            let block = self.get_block(previous_block_number).await?;
51            Ok(block.header().hash_slow())
52        }
53    }
54}
55
56/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external
57/// provider like Etherscan or an RPC endpoint.
58#[derive(Debug)]
59pub struct DebugConsensusClient<P: BlockProvider, T: PayloadTypes> {
60    /// Handle to execution client.
61    engine_handle: ConsensusEngineHandle<T>,
62    /// Provider to get consensus blocks from.
63    block_provider: P,
64}
65
66impl<P: BlockProvider, T: PayloadTypes> DebugConsensusClient<P, T> {
67    /// Create a new debug consensus client with the given handle to execution
68    /// client and block provider.
69    pub const fn new(engine_handle: ConsensusEngineHandle<T>, block_provider: P) -> Self {
70        Self { engine_handle, block_provider }
71    }
72}
73
74impl<P, T> DebugConsensusClient<P, T>
75where
76    P: BlockProvider + Clone,
77    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives: NodePrimitives<Block = P::Block>>>,
78{
79    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
80    /// blocks.
81    pub async fn run(self) {
82        let mut previous_block_hashes = AllocRingBuffer::new(65);
83        let mut block_stream = {
84            let (tx, rx) = mpsc::channel::<P::Block>(64);
85            let block_provider = self.block_provider.clone();
86            tokio::spawn(async move {
87                block_provider.subscribe_blocks(tx).await;
88            });
89            rx
90        };
91
92        while let Some(block) = block_stream.recv().await {
93            let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
94
95            let block_hash = payload.block_hash();
96            let block_number = payload.block_number();
97
98            previous_block_hashes.enqueue(block_hash);
99
100            // Send new events to execution client
101            let _ = self.engine_handle.new_payload(payload).await;
102
103            // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
104            // finalized block hashes.
105            let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
106                &previous_block_hashes,
107                block_number,
108                32,
109            );
110            let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
111                &previous_block_hashes,
112                block_number,
113                64,
114            );
115            let (safe_block_hash, finalized_block_hash) =
116                tokio::join!(safe_block_hash, finalized_block_hash);
117            let (safe_block_hash, finalized_block_hash) = match (
118                safe_block_hash,
119                finalized_block_hash,
120            ) {
121                (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
122                    (safe_block_hash, finalized_block_hash)
123                }
124                (safe_block_hash, finalized_block_hash) => {
125                    warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
126                    continue;
127                }
128            };
129            let state = alloy_rpc_types_engine::ForkchoiceState {
130                head_block_hash: block_hash,
131                safe_block_hash,
132                finalized_block_hash,
133            };
134            let _ = self
135                .engine_handle
136                .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
137                .await;
138        }
139    }
140}
141
142/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
143///
144/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
145fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
146    buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn test_get_hash_at_offset() {
155        let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
156
157        // Empty buffer returns None for any offset
158        assert_eq!(get_hash_at_offset(&buffer, 0), None);
159        assert_eq!(get_hash_at_offset(&buffer, 1), None);
160
161        // Push hashes 0..65
162        for i in 0..65u8 {
163            buffer.enqueue(B256::with_last_byte(i));
164        }
165
166        // offset=0 should return the most recent (64)
167        assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
168
169        // offset=32 (safe block) should return hash 32
170        assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
171
172        // offset=64 (finalized block) should return hash 0 (the oldest)
173        assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
174
175        // offset=65 exceeds buffer, should return None
176        assert_eq!(get_hash_at_offset(&buffer, 65), None);
177    }
178
179    #[test]
180    fn test_get_hash_at_offset_insufficient_entries() {
181        let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
182
183        // With only 1 entry, only offset=0 works
184        buffer.enqueue(B256::with_last_byte(1));
185        assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
186        assert_eq!(get_hash_at_offset(&buffer, 1), None);
187        assert_eq!(get_hash_at_offset(&buffer, 32), None);
188        assert_eq!(get_hash_at_offset(&buffer, 64), None);
189
190        // With 33 entries, offset=32 works but offset=64 doesn't
191        for i in 2..=33u8 {
192            buffer.enqueue(B256::with_last_byte(i));
193        }
194        assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
195        assert_eq!(get_hash_at_offset(&buffer, 64), None);
196    }
197}