reth_consensus_debug_client/
client.rs1use alloy_primitives::B256;
2use reth_node_api::{ConsensusEngineHandle, ExecutionPayload, PayloadTypes};
3use reth_tracing::tracing::warn;
4use ringbuffer::{AllocRingBuffer, RingBuffer};
5use std::future::Future;
6use tokio::sync::mpsc;
7
8#[auto_impl::auto_impl(&, Arc, Box)]
11pub trait PayloadProvider: Send + Sync + 'static {
12 type ExecutionData: ExecutionPayload;
14
15 fn subscribe_payloads(
20 &self,
21 tx: mpsc::Sender<Self::ExecutionData>,
22 ) -> impl Future<Output = ()> + Send;
23
24 fn get_payload(
26 &self,
27 block_number: u64,
28 ) -> impl Future<Output = eyre::Result<Self::ExecutionData>> + Send;
29
30 fn get_or_fetch_previous_block(
33 &self,
34 previous_block_hashes: &AllocRingBuffer<B256>,
35 current_block_number: u64,
36 offset: usize,
37 ) -> impl Future<Output = eyre::Result<B256>> + Send {
38 async move {
39 if let Some(hash) = get_hash_at_offset(previous_block_hashes, offset) {
40 return Ok(hash);
41 }
42
43 let previous_block_number = match current_block_number.checked_sub(offset as u64) {
45 Some(number) => number,
46 None => return Ok(B256::default()),
47 };
48 let payload = self.get_payload(previous_block_number).await?;
49 Ok(payload.block_hash())
50 }
51 }
52}
53
54#[derive(Debug)]
57pub struct DebugConsensusClient<P: PayloadProvider, T: PayloadTypes> {
58 engine_handle: ConsensusEngineHandle<T>,
60 block_provider: P,
62}
63
64impl<P: PayloadProvider, T: PayloadTypes> DebugConsensusClient<P, T> {
65 pub const fn new(engine_handle: ConsensusEngineHandle<T>, block_provider: P) -> Self {
68 Self { engine_handle, block_provider }
69 }
70}
71
72impl<P, T> DebugConsensusClient<P, T>
73where
74 P: PayloadProvider<ExecutionData = T::ExecutionData> + Clone,
75 T: PayloadTypes,
76{
77 pub async fn run(self) {
80 let mut previous_block_hashes = AllocRingBuffer::new(65);
81 let mut payload_stream = {
82 let (tx, rx) = mpsc::channel::<P::ExecutionData>(64);
83 let block_provider = self.block_provider.clone();
84 tokio::spawn(async move {
85 block_provider.subscribe_payloads(tx).await;
86 });
87 rx
88 };
89
90 while let Some(payload) = payload_stream.recv().await {
91 let block_hash = payload.block_hash();
92 let block_number = payload.block_number();
93
94 previous_block_hashes.enqueue(block_hash);
95
96 let _ = self.engine_handle.new_payload(payload).await;
98
99 let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
102 &previous_block_hashes,
103 block_number,
104 32,
105 );
106 let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
107 &previous_block_hashes,
108 block_number,
109 64,
110 );
111 let (safe_block_hash, finalized_block_hash) =
112 tokio::join!(safe_block_hash, finalized_block_hash);
113 let (safe_block_hash, finalized_block_hash) = match (
114 safe_block_hash,
115 finalized_block_hash,
116 ) {
117 (Ok(safe_block_hash), Ok(finalized_block_hash)) => {
118 (safe_block_hash, finalized_block_hash)
119 }
120 (safe_block_hash, finalized_block_hash) => {
121 warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
122 continue;
123 }
124 };
125 let state = alloy_rpc_types_engine::ForkchoiceState {
126 head_block_hash: block_hash,
127 safe_block_hash,
128 finalized_block_hash,
129 };
130 let _ = self.engine_handle.fork_choice_updated(state, None).await;
131 }
132 }
133}
134
135fn get_hash_at_offset(buffer: &AllocRingBuffer<B256>, offset: usize) -> Option<B256> {
139 buffer.len().checked_sub(offset + 1).and_then(|index| buffer.get(index).copied())
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
147 fn test_get_hash_at_offset() {
148 let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
149
150 assert_eq!(get_hash_at_offset(&buffer, 0), None);
152 assert_eq!(get_hash_at_offset(&buffer, 1), None);
153
154 for i in 0..65u8 {
156 buffer.enqueue(B256::with_last_byte(i));
157 }
158
159 assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(64)));
161
162 assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(32)));
164
165 assert_eq!(get_hash_at_offset(&buffer, 64), Some(B256::with_last_byte(0)));
167
168 assert_eq!(get_hash_at_offset(&buffer, 65), None);
170 }
171
172 #[test]
173 fn test_get_hash_at_offset_insufficient_entries() {
174 let mut buffer: AllocRingBuffer<B256> = AllocRingBuffer::new(65);
175
176 buffer.enqueue(B256::with_last_byte(1));
178 assert_eq!(get_hash_at_offset(&buffer, 0), Some(B256::with_last_byte(1)));
179 assert_eq!(get_hash_at_offset(&buffer, 1), None);
180 assert_eq!(get_hash_at_offset(&buffer, 32), None);
181 assert_eq!(get_hash_at_offset(&buffer, 64), None);
182
183 for i in 2..=33u8 {
185 buffer.enqueue(B256::with_last_byte(i));
186 }
187 assert_eq!(get_hash_at_offset(&buffer, 32), Some(B256::with_last_byte(1)));
188 assert_eq!(get_hash_at_offset(&buffer, 64), None);
189 }
190}