reth_consensus_debug_client/
client.rs1use alloy_consensus::Sealable;
2use alloy_primitives::B256;
3use reth_node_api::{
4 BuiltPayload, ConsensusEngineHandle, ExecutionPayload, NodePrimitives, PayloadTypes,
5};
6use reth_primitives_traits::{Block, SealedBlock};
7use reth_tracing::tracing::warn;
8use ringbuffer::{AllocRingBuffer, RingBuffer};
9use std::future::Future;
10use tokio::sync::mpsc;
11
12#[auto_impl::auto_impl(&, Arc, Box)]
15pub trait BlockProvider: Send + Sync + 'static {
16 type Block: Block;
18
19 fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
24
25 fn get_block(
27 &self,
28 block_number: u64,
29 ) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
30
31 fn get_or_fetch_previous_block(
34 &self,
35 previous_block_hashes: &AllocRingBuffer<B256>,
36 current_block_number: u64,
37 offset: usize,
38 ) -> impl Future<Output = eyre::Result<B256>> + Send {
39 async move {
40 if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
41 return Ok(hash);
42 }
43
44 let previous_block_number = match current_block_number.checked_sub(offset as u64) {
46 Some(number) => number,
47 None => return Ok(B256::default()),
48 };
49 let block = self.get_block(previous_block_number).await?;
50 Ok(block.header().hash_slow())
51 }
52 }
53}
54
55#[derive(Debug)]
58pub struct DebugConsensusClient<P: BlockProvider, T: PayloadTypes> {
59 engine_handle: ConsensusEngineHandle<T>,
61 block_provider: P,
63}
64
65impl<P: BlockProvider, T: PayloadTypes> DebugConsensusClient<P, T> {
66 pub const fn new(engine_handle: ConsensusEngineHandle<T>, block_provider: P) -> Self {
69 Self { engine_handle, block_provider }
70 }
71}
72
73impl<P, T> DebugConsensusClient<P, T>
74where
75 P: BlockProvider + Clone,
76 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives: NodePrimitives<Block = P::Block>>>,
77{
78 pub async fn run(self) {
81 let mut previous_block_hashes = AllocRingBuffer::new(65);
82 let mut block_stream = {
83 let (tx, rx) = mpsc::channel::<P::Block>(64);
84 let block_provider = self.block_provider.clone();
85 tokio::spawn(async move {
86 block_provider.subscribe_blocks(tx).await;
87 });
88 rx
89 };
90
91 while let Some(block) = block_stream.recv().await {
92 let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
93
94 let block_hash = payload.block_hash();
95 let block_number = payload.block_number();
96
97 previous_block_hashes.enqueue(block_hash);
98
99 let _ = self.engine_handle.new_payload(payload).await;
101
102 let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
105 &previous_block_hashes,
106 block_number,
107 32,
108 );
109 let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
110 &previous_block_hashes,
111 block_number,
112 64,
113 );
114 let (safe_block_hash, finalized_block_hash) =
115 tokio::join!(safe_block_hash, finalized_block_hash);
116 let (safe_block_hash, finalized_block_hash) = match (
117 safe_block_hash,
118 finalized_block_hash,
119 ) {
120 (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
121 (safe_block_hash, finalized_block_hash)
122 }
123 (safe_block_hash, finalized_block_hash) => {
124 warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
125 continue;
126 }
127 };
128 let state = alloy_rpc_types_engine::ForkchoiceState {
129 head_block_hash: block_hash,
130 safe_block_hash,
131 finalized_block_hash,
132 };
133 let _ = self.engine_handle.fork_choice_updated(state, None).await;
134 }
135 }
136}
137
138fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
142 buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_get_hash_at_offset() {
151 let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
152
153 assert_eq!(get_hash_at_offset(&buffer, 0), None);
155 assert_eq!(get_hash_at_offset(&buffer, 1), None);
156
157 for i in 0..65u8 {
159 buffer.enqueue(B256::with_last_byte(i));
160 }
161
162 assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
164
165 assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
167
168 assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
170
171 assert_eq!(get_hash_at_offset(&buffer, 65), None);
173 }
174
175 #[test]
176 fn test_get_hash_at_offset_insufficient_entries() {
177 let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
178
179 buffer.enqueue(B256::with_last_byte(1));
181 assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
182 assert_eq!(get_hash_at_offset(&buffer, 1), None);
183 assert_eq!(get_hash_at_offset(&buffer, 32), None);
184 assert_eq!(get_hash_at_offset(&buffer, 64), None);
185
186 for i in 2..=33u8 {
188 buffer.enqueue(B256::with_last_byte(i));
189 }
190 assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
191 assert_eq!(get_hash_at_offset(&buffer, 64), None);
192 }
193}