Skip to main content

reth_consensus_debug_client/
client.rs

1use alloy_consensus::Sealable;
2use alloy_primitives::B256;
3use reth_node_api::{
4    BuiltPayload, ConsensusEngineHandle, ExecutionPayload, NodePrimitives, PayloadTypes,
5};
6use reth_primitives_traits::{Block, SealedBlock};
7use reth_tracing::tracing::warn;
8use ringbuffer::{AllocRingBuffer, RingBuffer};
9use std::future::Future;
10use tokio::sync::mpsc;
11
12/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks
13/// by number to fetch past finalized and safe blocks.
14#[auto_impl::auto_impl(&, Arc, Box)]
15pub trait BlockProvider: Send + Sync + 'static {
16    /// The block type.
17    type Block: Block;
18
19    /// Runs a block provider to send new blocks to the given sender.
20    ///
21    /// Note: This is expected to be spawned in a separate task, and as such it should ignore
22    /// errors.
23    fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
24
25    /// Get a past block by number.
26    fn get_block(
27        &self,
28        block_number: u64,
29    ) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
30
31    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
32    /// started more recently than `offset`), fetch it using `get_block`.
33    fn get_or_fetch_previous_block(
34        &self,
35        previous_block_hashes: &AllocRingBuffer<B256>,
36        current_block_number: u64,
37        offset: usize,
38    ) -> impl Future<Output = eyre::Result<B256>> + Send {
39        async move {
40            if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
41                return Ok(hash);
42            }
43
44            // Return zero hash if the chain isn't long enough to have the block at the offset.
45            let previous_block_number = match current_block_number.checked_sub(offset as u64) {
46                Some(number) => number,
47                None => return Ok(B256::default()),
48            };
49            let block = self.get_block(previous_block_number).await?;
50            Ok(block.header().hash_slow())
51        }
52    }
53}
54
55/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external
56/// provider like Etherscan or an RPC endpoint.
57#[derive(Debug)]
58pub struct DebugConsensusClient<P: BlockProvider, T: PayloadTypes> {
59    /// Handle to execution client.
60    engine_handle: ConsensusEngineHandle<T>,
61    /// Provider to get consensus blocks from.
62    block_provider: P,
63}
64
65impl<P: BlockProvider, T: PayloadTypes> DebugConsensusClient<P, T> {
66    /// Create a new debug consensus client with the given handle to execution
67    /// client and block provider.
68    pub const fn new(engine_handle: ConsensusEngineHandle<T>, block_provider: P) -> Self {
69        Self { engine_handle, block_provider }
70    }
71}
72
73impl<P, T> DebugConsensusClient<P, T>
74where
75    P: BlockProvider + Clone,
76    T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives: NodePrimitives<Block = P::Block>>>,
77{
78    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
79    /// blocks.
80    pub async fn run(self) {
81        let mut previous_block_hashes = AllocRingBuffer::new(65);
82        let mut block_stream = {
83            let (tx, rx) = mpsc::channel::<P::Block>(64);
84            let block_provider = self.block_provider.clone();
85            tokio::spawn(async move {
86                block_provider.subscribe_blocks(tx).await;
87            });
88            rx
89        };
90
91        while let Some(block) = block_stream.recv().await {
92            let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
93
94            let block_hash = payload.block_hash();
95            let block_number = payload.block_number();
96
97            previous_block_hashes.enqueue(block_hash);
98
99            // Send new events to execution client
100            let _ = self.engine_handle.new_payload(payload).await;
101
102            // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
103            // finalized block hashes.
104            let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
105                &previous_block_hashes,
106                block_number,
107                32,
108            );
109            let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
110                &previous_block_hashes,
111                block_number,
112                64,
113            );
114            let (safe_block_hash, finalized_block_hash) =
115                tokio::join!(safe_block_hash, finalized_block_hash);
116            let (safe_block_hash, finalized_block_hash) = match (
117                safe_block_hash,
118                finalized_block_hash,
119            ) {
120                (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
121                    (safe_block_hash, finalized_block_hash)
122                }
123                (safe_block_hash, finalized_block_hash) => {
124                    warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
125                    continue;
126                }
127            };
128            let state = alloy_rpc_types_engine::ForkchoiceState {
129                head_block_hash: block_hash,
130                safe_block_hash,
131                finalized_block_hash,
132            };
133            let _ = self.engine_handle.fork_choice_updated(state, None).await;
134        }
135    }
136}
137
138/// Looks up a block hash from the ring buffer at the given offset from the most recent entry.
139///
140/// Returns `None` if the buffer doesn't have enough entries to satisfy the offset.
141fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
142    buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_get_hash_at_offset() {
151        let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
152
153        // Empty buffer returns None for any offset
154        assert_eq!(get_hash_at_offset(&buffer, 0), None);
155        assert_eq!(get_hash_at_offset(&buffer, 1), None);
156
157        // Push hashes 0..65
158        for i in 0..65u8 {
159            buffer.enqueue(B256::with_last_byte(i));
160        }
161
162        // offset=0 should return the most recent (64)
163        assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
164
165        // offset=32 (safe block) should return hash 32
166        assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
167
168        // offset=64 (finalized block) should return hash 0 (the oldest)
169        assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
170
171        // offset=65 exceeds buffer, should return None
172        assert_eq!(get_hash_at_offset(&buffer, 65), None);
173    }
174
175    #[test]
176    fn test_get_hash_at_offset_insufficient_entries() {
177        let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
178
179        // With only 1 entry, only offset=0 works
180        buffer.enqueue(B256::with_last_byte(1));
181        assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
182        assert_eq!(get_hash_at_offset(&buffer, 1), None);
183        assert_eq!(get_hash_at_offset(&buffer, 32), None);
184        assert_eq!(get_hash_at_offset(&buffer, 64), None);
185
186        // With 33 entries, offset=32 works but offset=64 doesn't
187        for i in 2..=33u8 {
188            buffer.enqueue(B256::with_last_byte(i));
189        }
190        assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
191        assert_eq!(get_hash_at_offset(&buffer, 64), None);
192    }
193}