reth_consensus_debug_client/
client.rs

1use alloy_consensus::Sealable;
2use alloy_primitives::B256;
3use reth_node_api::{
4    BeaconConsensusEngineHandle, BuiltPayload, EngineApiMessageVersion, EngineTypes,
5    ExecutionPayload, NodePrimitives,
6};
7use reth_primitives_traits::{Block, SealedBlock};
8use reth_tracing::tracing::warn;
9use ringbuffer::{AllocRingBuffer, RingBuffer};
10use std::future::Future;
11use tokio::sync::mpsc;
12
13/// Supplies consensus client with new blocks sent in `tx` and a callback to find specific blocks
14/// by number to fetch past finalized and safe blocks.
15#[auto_impl::auto_impl(&, Arc, Box)]
16pub trait BlockProvider: Send + Sync + 'static {
17    /// The block type.
18    type Block: Block;
19
20    /// Runs a block provider to send new blocks to the given sender.
21    ///
22    /// Note: This is expected to be spawned in a separate task, and as such it should ignore
23    /// errors.
24    fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
25
26    /// Get a past block by number.
27    fn get_block(
28        &self,
29        block_number: u64,
30    ) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
31
32    /// Get previous block hash using previous block hash buffer. If it isn't available (buffer
33    /// started more recently than `offset`), fetch it using `get_block`.
34    fn get_or_fetch_previous_block(
35        &self,
36        previous_block_hashes: &AllocRingBuffer<B256>,
37        current_block_number: u64,
38        offset: usize,
39    ) -> impl Future<Output = eyre::Result<B256>> + Send {
40        async move {
41            let stored_hash = previous_block_hashes
42                .len()
43                .checked_sub(offset)
44                .and_then(|index| previous_block_hashes.get(index));
45            if let Some(hash) = stored_hash {
46                return Ok(*hash);
47            }
48
49            // Return zero hash if the chain isn't long enough to have the block at the offset.
50            let previous_block_number = match current_block_number.checked_sub(offset as u64) {
51                Some(number) => number,
52                None => return Ok(B256::default()),
53            };
54            let block = self.get_block(previous_block_number).await?;
55            Ok(block.header().hash_slow())
56        }
57    }
58}
59
60/// Debug consensus client that sends FCUs and new payloads using recent blocks from an external
61/// provider like Etherscan or an RPC endpoint.
62#[derive(Debug)]
63pub struct DebugConsensusClient<P: BlockProvider, T: EngineTypes> {
64    /// Handle to execution client.
65    engine_handle: BeaconConsensusEngineHandle<T>,
66    /// Provider to get consensus blocks from.
67    block_provider: P,
68}
69
70impl<P: BlockProvider, T: EngineTypes> DebugConsensusClient<P, T> {
71    /// Create a new debug consensus client with the given handle to execution
72    /// client and block provider.
73    pub const fn new(engine_handle: BeaconConsensusEngineHandle<T>, block_provider: P) -> Self {
74        Self { engine_handle, block_provider }
75    }
76}
77
78impl<P, T> DebugConsensusClient<P, T>
79where
80    P: BlockProvider + Clone,
81    T: EngineTypes<BuiltPayload: BuiltPayload<Primitives: NodePrimitives<Block = P::Block>>>,
82{
83    /// Spawn the client to start sending FCUs and new payloads by periodically fetching recent
84    /// blocks.
85    pub async fn run(self) {
86        let mut previous_block_hashes = AllocRingBuffer::new(64);
87
88        let mut block_stream = {
89            let (tx, rx) = mpsc::channel::<P::Block>(64);
90            let block_provider = self.block_provider.clone();
91            tokio::spawn(async move {
92                block_provider.subscribe_blocks(tx).await;
93            });
94            rx
95        };
96
97        while let Some(block) = block_stream.recv().await {
98            let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
99
100            let block_hash = payload.block_hash();
101            let block_number = payload.block_number();
102
103            previous_block_hashes.push(block_hash);
104
105            // Send new events to execution client
106            let _ = self.engine_handle.new_payload(payload).await;
107
108            // Load previous block hashes. We're using (head - 32) and (head - 64) as the safe and
109            // finalized block hashes.
110            let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
111                &previous_block_hashes,
112                block_number,
113                32,
114            );
115            let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
116                &previous_block_hashes,
117                block_number,
118                64,
119            );
120            let (safe_block_hash, finalized_block_hash) =
121                tokio::join!(safe_block_hash, finalized_block_hash);
122            let (safe_block_hash, finalized_block_hash) = match (
123                safe_block_hash,
124                finalized_block_hash,
125            ) {
126                (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
127                    (safe_block_hash, finalized_block_hash)
128                }
129                (safe_block_hash, finalized_block_hash) => {
130                    warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
131                    continue;
132                }
133            };
134            let state = alloy_rpc_types_engine::ForkchoiceState {
135                head_block_hash: block_hash,
136                safe_block_hash,
137                finalized_block_hash,
138            };
139            let _ = self
140                .engine_handle
141                .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
142                .await;
143        }
144    }
145}