reth_consensus_debug_client/
client.rs1use alloy_consensus::Sealable;
2use alloy_primitives::B256;
3use reth_node_api::{
4 BuiltPayload, ConsensusEngineHandle, EngineApiMessageVersion, ExecutionPayload, NodePrimitives,
5 PayloadTypes,
6};
7use reth_primitives_traits::{Block, SealedBlock};
8use reth_tracing::tracing::warn;
9use ringbuffer::{AllocRingBuffer, RingBuffer};
10use std::future::Future;
11use tokio::sync::mpsc;
12
13#[auto_impl::auto_impl(&, Arc, Box)]
16pub trait BlockProvider: Send + Sync + 'static {
17 type Block: Block;
19
20 fn subscribe_blocks(&self, tx: mpsc::Sender<Self::Block>) -> impl Future<Output = ()> + Send;
25
26 fn get_block(
28 &self,
29 block_number: u64,
30 ) -> impl Future<Output = eyre::Result<Self::Block>> + Send;
31
32 fn get_or_fetch_previous_block(
35 &self,
36 previous_block_hashes: &AllocRingBuffer<B256>,
37 current_block_number: u64,
38 offset: usize,
39 ) -> impl Future<Output = eyre::Result<B256>> + Send {
40 async move {
41 if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
42 return Ok(hash);
43 }
44
45 let previous_block_number = match current_block_number.checked_sub(offset as u64) {
47 Some(number) => number,
48 None => return Ok(B256::default()),
49 };
50 let block = self.get_block(previous_block_number).await?;
51 Ok(block.header().hash_slow())
52 }
53 }
54}
55
56#[derive(Debug)]
59pub struct DebugConsensusClient<P: BlockProvider, T: PayloadTypes> {
60 engine_handle: ConsensusEngineHandle<T>,
62 block_provider: P,
64}
65
66impl<P: BlockProvider, T: PayloadTypes> DebugConsensusClient<P, T> {
67 pub const fn new(engine_handle: ConsensusEngineHandle<T>, block_provider: P) -> Self {
70 Self { engine_handle, block_provider }
71 }
72}
73
74impl<P, T> DebugConsensusClient<P, T>
75where
76 P: BlockProvider + Clone,
77 T: PayloadTypes<BuiltPayload: BuiltPayload<Primitives: NodePrimitives<Block = P::Block>>>,
78{
79 pub async fn run(self) {
82 let mut previous_block_hashes = AllocRingBuffer::new(65);
83 let mut block_stream = {
84 let (tx, rx) = mpsc::channel::<P::Block>(64);
85 let block_provider = self.block_provider.clone();
86 tokio::spawn(async move {
87 block_provider.subscribe_blocks(tx).await;
88 });
89 rx
90 };
91
92 while let Some(block) = block_stream.recv().await {
93 let payload = T::block_to_payload(SealedBlock::new_unhashed(block));
94
95 let block_hash = payload.block_hash();
96 let block_number = payload.block_number();
97
98 previous_block_hashes.enqueue(block_hash);
99
100 let _ = self.engine_handle.new_payload(payload).await;
102
103 let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
106 &previous_block_hashes,
107 block_number,
108 32,
109 );
110 let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
111 &previous_block_hashes,
112 block_number,
113 64,
114 );
115 let (safe_block_hash, finalized_block_hash) =
116 tokio::join!(safe_block_hash, finalized_block_hash);
117 let (safe_block_hash, finalized_block_hash) = match (
118 safe_block_hash,
119 finalized_block_hash,
120 ) {
121 (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
122 (safe_block_hash, finalized_block_hash)
123 }
124 (safe_block_hash, finalized_block_hash) => {
125 warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
126 continue;
127 }
128 };
129 let state = alloy_rpc_types_engine::ForkchoiceState {
130 head_block_hash: block_hash,
131 safe_block_hash,
132 finalized_block_hash,
133 };
134 let _ = self
135 .engine_handle
136 .fork_choice_updated(state, None, EngineApiMessageVersion::V3)
137 .await;
138 }
139 }
140}
141
142fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
146 buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn test_get_hash_at_offset() {
155 let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
156
157 assert_eq!(get_hash_at_offset(&buffer, 0), None);
159 assert_eq!(get_hash_at_offset(&buffer, 1), None);
160
161 for i in 0..65u8 {
163 buffer.enqueue(B256::with_last_byte(i));
164 }
165
166 assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
168
169 assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
171
172 assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
174
175 assert_eq!(get_hash_at_offset(&buffer, 65), None);
177 }
178
179 #[test]
180 fn test_get_hash_at_offset_insufficient_entries() {
181 let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
182
183 buffer.enqueue(B256::with_last_byte(1));
185 assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
186 assert_eq!(get_hash_at_offset(&buffer, 1), None);
187 assert_eq!(get_hash_at_offset(&buffer, 32), None);
188 assert_eq!(get_hash_at_offset(&buffer, 64), None);
189
190 for i in 2..=33u8 {
192 buffer.enqueue(B256::with_last_byte(i));
193 }
194 assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
195 assert_eq!(get_hash_at_offset(&buffer, 64), None);
196 }
197}