reth_cli_commands/
re_execute.rs1use crate::common::{
4 AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5 EnvironmentArgs,
6};
7use alloy_consensus::{BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_consensus::FullConsensus;
13use reth_evm::{execute::Executor, ConfigureEvm};
14use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected, SignedTransaction};
15use reth_provider::{
16 BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
17 StaticFileProviderFactory, TransactionVariant,
18};
19use reth_revm::database::StateProviderDatabase;
20use reth_stages::stages::calculate_gas_used_from_headers;
21use std::{
22 sync::Arc,
23 time::{Duration, Instant},
24};
25use tokio::{sync::mpsc, task::JoinSet};
26use tracing::*;
27
28#[derive(Debug, Parser)]
32pub struct Command<C: ChainSpecParser> {
33 #[command(flatten)]
34 env: EnvironmentArgs<C>,
35
36 #[arg(long, default_value = "1")]
38 from: u64,
39
40 #[arg(long)]
42 to: Option<u64>,
43
44 #[arg(long, default_value = "10")]
46 num_tasks: u64,
47}
48
49impl<C: ChainSpecParser> Command<C> {
50 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
52 Some(&self.env.chain)
53 }
54}
55
56impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
57 pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
59 where
60 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
61 {
62 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
63
64 let provider = provider_factory.database_provider_ro()?;
65 let components = components(provider_factory.chain_spec());
66
67 let min_block = self.from;
68 let max_block = self.to.unwrap_or(provider.best_block_number()?);
69
70 let total_blocks = max_block - min_block;
71 let total_gas = calculate_gas_used_from_headers(
72 &provider_factory.static_file_provider(),
73 min_block..=max_block,
74 )?;
75 let blocks_per_task = total_blocks / self.num_tasks;
76
77 let db_at = {
78 let provider_factory = provider_factory.clone();
79 move |block_number: u64| {
80 StateProviderDatabase(
81 provider_factory.history_by_block_number(block_number).unwrap(),
82 )
83 }
84 };
85
86 let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
87
88 let mut tasks = JoinSet::new();
89 for i in 0..self.num_tasks {
90 let start_block = min_block + i * blocks_per_task;
91 let end_block =
92 if i == self.num_tasks - 1 { max_block } else { start_block + blocks_per_task };
93
94 let provider_factory = provider_factory.clone();
96 let evm_config = components.evm_config().clone();
97 let consensus = components.consensus().clone();
98 let db_at = db_at.clone();
99 let stats_tx = stats_tx.clone();
100 tasks.spawn_blocking(move || {
101 let mut executor = evm_config.batch_executor(db_at(start_block - 1));
102 for block in start_block..end_block {
103 let block = provider_factory
104 .recovered_block(block.into(), TransactionVariant::NoHash)?
105 .unwrap();
106 let result = executor.execute_one(&block)?;
107
108 if let Err(err) = consensus
109 .validate_block_post_execution(&block, &result)
110 .wrap_err_with(|| format!("Failed to validate block {}", block.number()))
111 {
112 let correct_receipts =
113 provider_factory.receipts_by_block(block.number().into())?.unwrap();
114
115 for (i, (receipt, correct_receipt)) in
116 result.receipts.iter().zip(correct_receipts.iter()).enumerate()
117 {
118 if receipt != correct_receipt {
119 let tx_hash = block.body().transactions()[i].tx_hash();
120 error!(
121 ?receipt,
122 ?correct_receipt,
123 index = i,
124 ?tx_hash,
125 "Invalid receipt"
126 );
127 let expected_gas_used = correct_receipt.cumulative_gas_used() -
128 if i == 0 {
129 0
130 } else {
131 correct_receipts[i - 1].cumulative_gas_used()
132 };
133 let got_gas_used = receipt.cumulative_gas_used() -
134 if i == 0 {
135 0
136 } else {
137 result.receipts[i - 1].cumulative_gas_used()
138 };
139 if got_gas_used != expected_gas_used {
140 let mismatch = GotExpected {
141 expected: expected_gas_used,
142 got: got_gas_used,
143 };
144
145 error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
146 return Err(err);
147 }
148 } else {
149 continue;
150 }
151 }
152
153 return Err(err);
154 }
155 let _ = stats_tx.send(block.gas_used());
156
157 if executor.size_hint() > 1_000_000 {
159 executor = evm_config.batch_executor(db_at(block.number()));
160 }
161 }
162
163 eyre::Ok(())
164 });
165 }
166
167 let instant = Instant::now();
168 let mut total_executed_blocks = 0;
169 let mut total_executed_gas = 0;
170
171 let mut last_logged_gas = 0;
172 let mut last_logged_blocks = 0;
173 let mut last_logged_time = Instant::now();
174
175 let mut interval = tokio::time::interval(Duration::from_secs(10));
176
177 loop {
178 tokio::select! {
179 Some(gas_used) = stats_rx.recv() => {
180 total_executed_blocks += 1;
181 total_executed_gas += gas_used;
182 }
183 result = tasks.join_next() => {
184 if let Some(result) = result {
185 if matches!(result, Err(_) | Ok(Err(_))) {
186 error!(?result);
187 return Err(eyre::eyre!("Re-execution failed: {result:?}"));
188 }
189 } else {
190 break;
191 }
192 }
193 _ = interval.tick() => {
194 let blocks_executed = total_executed_blocks - last_logged_blocks;
195 let gas_executed = total_executed_gas - last_logged_gas;
196
197 if blocks_executed > 0 {
198 let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
199 info!(
200 throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
201 progress=format!("{progress:.2}%"),
202 "Executed {blocks_executed} blocks"
203 );
204 }
205
206 last_logged_blocks = total_executed_blocks;
207 last_logged_gas = total_executed_gas;
208 last_logged_time = Instant::now();
209 }
210 }
211 }
212
213 info!(
214 start_block = min_block,
215 end_block = max_block,
216 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
217 "Re-executed successfully"
218 );
219
220 Ok(())
221 }
222}