reth_consensus_debug_client/
client.rsuse alloy_consensus::Transaction;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_rpc_types_eth::{Block, BlockTransactions};
use reth_node_api::EngineTypes;
use reth_rpc_builder::auth::AuthServerHandle;
use reth_tracing::tracing::warn;
use ringbuffer::{AllocRingBuffer, RingBuffer};
use std::future::Future;
use tokio::sync::mpsc;
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait BlockProvider: Send + Sync + 'static {
fn subscribe_blocks(&self, tx: mpsc::Sender<Block>) -> impl Future<Output = ()> + Send;
fn get_block(&self, block_number: u64) -> impl Future<Output = eyre::Result<Block>> + Send;
fn get_or_fetch_previous_block(
&self,
previous_block_hashes: &AllocRingBuffer<B256>,
current_block_number: u64,
offset: usize,
) -> impl Future<Output = eyre::Result<B256>> + Send {
async move {
let stored_hash = previous_block_hashes
.len()
.checked_sub(offset)
.and_then(|index| previous_block_hashes.get(index));
if let Some(hash) = stored_hash {
return Ok(*hash);
}
let previous_block_number = match current_block_number.checked_sub(offset as u64) {
Some(number) => number,
None => return Ok(B256::default()),
};
let block = self.get_block(previous_block_number).await?;
Ok(block.header.hash)
}
}
}
#[derive(Debug)]
pub struct DebugConsensusClient<P: BlockProvider> {
auth_server: AuthServerHandle,
block_provider: P,
}
impl<P: BlockProvider> DebugConsensusClient<P> {
pub const fn new(auth_server: AuthServerHandle, block_provider: P) -> Self {
Self { auth_server, block_provider }
}
}
impl<P: BlockProvider + Clone> DebugConsensusClient<P> {
pub async fn run<T: EngineTypes>(self) {
let execution_client = self.auth_server.http_client();
let mut previous_block_hashes = AllocRingBuffer::new(64);
let mut block_stream = {
let (tx, rx) = mpsc::channel::<Block>(64);
let block_provider = self.block_provider.clone();
tokio::spawn(async move {
block_provider.subscribe_blocks(tx).await;
});
rx
};
while let Some(block) = block_stream.recv().await {
let payload = block_to_execution_payload_v3(block);
let block_hash = payload.block_hash();
let block_number = payload.block_number();
previous_block_hashes.push(block_hash);
let _ = reth_rpc_api::EngineApiClient::<T>::new_payload_v3(
&execution_client,
payload.execution_payload_v3,
payload.versioned_hashes,
payload.parent_beacon_block_root,
)
.await
.inspect_err(|err| {
warn!(target: "consensus::debug-client", %err, %block_hash, %block_number, "failed to submit new payload to execution client");
});
let safe_block_hash = self.block_provider.get_or_fetch_previous_block(
&previous_block_hashes,
block_number,
32,
);
let finalized_block_hash = self.block_provider.get_or_fetch_previous_block(
&previous_block_hashes,
block_number,
64,
);
let (safe_block_hash, finalized_block_hash) =
tokio::join!(safe_block_hash, finalized_block_hash);
let (safe_block_hash, finalized_block_hash) = match (
safe_block_hash,
finalized_block_hash,
) {
(Ok(safe_block_hash), Ok(finalized_block_hash)) => {
(safe_block_hash, finalized_block_hash)
}
(safe_block_hash, finalized_block_hash) => {
warn!(target: "consensus::debug-client", ?safe_block_hash, ?finalized_block_hash, "failed to fetch safe or finalized hash from etherscan");
continue;
}
};
let state = alloy_rpc_types_engine::ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash,
finalized_block_hash,
};
let _ = reth_rpc_api::EngineApiClient::<T>::fork_choice_updated_v3(
&execution_client,
state,
None,
)
.await
.inspect_err(|err| {
warn!(target: "consensus::debug-client", %err, ?state, "failed to submit fork choice update to execution client");
});
}
}
}
#[derive(Debug)]
pub struct ExecutionNewPayload {
pub execution_payload_v3: ExecutionPayloadV3,
pub versioned_hashes: Vec<B256>,
pub parent_beacon_block_root: B256,
}
impl ExecutionNewPayload {
pub const fn block_hash(&self) -> B256 {
self.execution_payload_v3.payload_inner.payload_inner.block_hash
}
pub const fn block_number(&self) -> u64 {
self.execution_payload_v3.payload_inner.payload_inner.block_number
}
}
pub fn block_to_execution_payload_v3(block: Block) -> ExecutionNewPayload {
let transactions = match &block.transactions {
BlockTransactions::Full(txs) => txs.clone(),
BlockTransactions::Hashes(txs) if txs.is_empty() => vec![],
BlockTransactions::Hashes(_) | BlockTransactions::Uncle => {
panic!("Received uncle block or hash-only transactions from Etherscan API")
}
};
let versioned_hashes = transactions
.iter()
.flat_map(|tx| tx.blob_versioned_hashes().unwrap_or_default())
.copied()
.collect();
let payload: ExecutionPayloadV3 = ExecutionPayloadV3 {
payload_inner: ExecutionPayloadV2 {
payload_inner: ExecutionPayloadV1 {
parent_hash: block.header.parent_hash,
fee_recipient: block.header.beneficiary,
state_root: block.header.state_root,
receipts_root: block.header.receipts_root,
logs_bloom: block.header.logs_bloom,
prev_randao: block.header.mix_hash,
block_number: block.header.number,
gas_limit: block.header.gas_limit,
gas_used: block.header.gas_used,
timestamp: block.header.timestamp,
extra_data: block.header.extra_data.clone(),
base_fee_per_gas: block.header.base_fee_per_gas.unwrap().try_into().unwrap(),
block_hash: block.header.hash,
transactions: transactions
.into_iter()
.map(|tx| tx.inner.encoded_2718().into())
.collect(),
},
withdrawals: block.withdrawals.clone().unwrap_or_default().into_inner(),
},
blob_gas_used: block.header.blob_gas_used.unwrap(),
excess_blob_gas: block.header.excess_blob_gas.unwrap(),
};
ExecutionNewPayload {
execution_payload_v3: payload,
versioned_hashes,
parent_beacon_block_root: block.header.parent_beacon_block_root.unwrap(),
}
}