reth_consensus_debug_client/
client.rs
1use alloy_consensus::Sealable;
2use alloy_primitives::B256;
3use reth_node_api::{
4 BeaconConsensusEngineHandle, BuiltPayload, EngineApiMessageVersion, EngineTypes,
5 ExecutionPayload, NodePrimitives,
6};
7use reth_primitives_traits::{Block, SealedBlock};
8use reth_tracing::tracing::warn;
9use ringbuffer::{AllocRingBuffer, RingBuffer};
10use std::future::Future;
11use tokio::sync::mpsc;
12
13#[auto_impl::auto_impl(&, Arc, Box)]
16pub trait BlockProvider: Send + Sync + 'static {
17 type Block: Block;
19
20 fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
25
26 fn get_block(
28 &self,
29 block_number: u64,
30 ) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
31
32 fn get_or_fetch_previous_block(
35 &self,
36 previous_block_hashes: &AllocRingBuffer<B256>,
37 current_block_number: u64,
38 offset: usize,
39 ) -> impl Future<Output = eyre::Result<B256>> + Send {
40 async move {
41 let stored_hash = previous_block_hashes
42 .len()
43 .checked_sub(offset)
44 .and_then(|index| previous_block_hashes.get(index));
45 if let Some(hash) = stored_hash {
46 return Ok(*hash);
47 }
48
49 let previous_block_number = match current_block_number.checked_sub(offset as u64) {
51 Some(number) => number,
52 None => return Ok(B256::default()),
53 };
54 let block = self.get_block(previous_block_number).await?;
55 Ok(block.header().hash_slow())
56 }
57 }
58}
59
60#[derive(Debug)]
63pub struct DebugConsensusClient<P: BlockProvider, T: EngineTypes> {
64 engine_handle: BeaconConsensusEngineHandle<T>,
66 block_provider: P,
68}
69
70impl<P: BlockProvider, T: EngineTypes> DebugConsensusClient<P, T> {
71 pub const fn new(engine_handle: BeaconConsensusEngineHandle<T>, block_provider: P) -> Self {
74 Self { engine_handle, block_provider }
75 }
76}
77
78impl<P, T> DebugConsensusClient<P, T>
79where
80 P: BlockProvider + Clone,
81 T: EngineTypes<BuiltPayload: BuiltPayload<Primitives: NodePrimitives<Block = P::Block>>>,
82{
83 pub async fn run(self) {
86 let mut previous_block_hashes = AllocRingBuffer::new(64);
87
88 let mut block_stream = {
89 let (tx, rx) = mpsc::channel::<P::Block>(64);
90 let block_provider = self.block_provider.clone();
91 tokio::spawn(async move {
92 block_provider.subscribe_blocks(tx).await;
93 });
94 rx
95 };
96
97 while let Some(block) = block_stream.recv().await {
98 let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
99
100 let block_hash = payload.block_hash();
101 let block_number = payload.block_number();
102
103 previous_block_hashes.push(block_hash);
104
105 let _ = self.engine_handle.new_payload(payload).await;
107
108 let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
111 &previous_block_hashes,
112 block_number,
113 32,
114 );
115 let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
116 &previous_block_hashes,
117 block_number,
118 64,
119 );
120 let (safe_block_hash, finalized_block_hash) =
121 tokio::join!(safe_block_hash, finalized_block_hash);
122 let (safe_block_hash, finalized_block_hash) = match (
123 safe_block_hash,
124 finalized_block_hash,
125 ) {
126 (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
127 (safe_block_hash, finalized_block_hash)
128 }
129 (safe_block_hash, finalized_block_hash) => {
130 warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
131 continue;
132 }
133 };
134 let state = alloy_rpc_types_engine::ForkchoiceState {
135 head_block_hash: block_hash,
136 safe_block_hash,
137 finalized_block_hash,
138 };
139 let _ = self
140 .engine_handle
141 .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
142 .await;
143 }
144 }
145}