reth_cli_commands/
re_execute.rs1use crate::common::{
4 AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5 EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_cli_util::cancellation::CancellationToken;
13use reth_consensus::FullConsensus;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
16use reth_provider::{
17 BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
18 StaticFileProviderFactory, TransactionVariant,
19};
20use reth_revm::database::StateProviderDatabase;
21use reth_stages::stages::calculate_gas_used_from_headers;
22use std::{
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use tokio::{sync::mpsc, task::JoinSet};
27use tracing::*;
28
29#[derive(Debug, Parser)]
33pub struct Command<C: ChainSpecParser> {
34 #[command(flatten)]
35 env: EnvironmentArgs<C>,
36
37 #[arg(long, default_value = "1")]
39 from: u64,
40
41 #[arg(long)]
43 to: Option<u64>,
44
45 #[arg(long)]
47 num_tasks: Option<u64>,
48
49 #[arg(long)]
51 skip_invalid_blocks: bool,
52}
53
54impl<C: ChainSpecParser> Command<C> {
55 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
57 Some(&self.env.chain)
58 }
59}
60
61impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
62 pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
64 where
65 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
66 {
67 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
68
69 let components = components(provider_factory.chain_spec());
70
71 let min_block = self.from;
72 let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
73 .best_block_number()?;
74 let mut max_block = best_block;
75 if let Some(to) = self.to {
76 if to > best_block {
77 warn!(
78 requested = to,
79 best_block,
80 "Requested --to is beyond available chain head; clamping to best block"
81 );
82 } else {
83 max_block = to;
84 }
85 };
86
87 let num_tasks = self.num_tasks.unwrap_or_else(|| {
88 std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
89 });
90
91 let total_blocks = max_block - min_block;
92 let total_gas = calculate_gas_used_from_headers(
93 &provider_factory.static_file_provider(),
94 min_block..=max_block,
95 )?;
96 let blocks_per_task = total_blocks / num_tasks;
97
98 let db_at = {
99 let provider_factory = provider_factory.clone();
100 move |block_number: u64| {
101 StateProviderDatabase(
102 provider_factory.history_by_block_number(block_number).unwrap(),
103 )
104 }
105 };
106
107 let skip_invalid_blocks = self.skip_invalid_blocks;
108 let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
109 let (info_tx, mut info_rx) = mpsc::unbounded_channel();
110 let cancellation = CancellationToken::new();
111 let _guard = cancellation.drop_guard();
112
113 let mut tasks = JoinSet::new();
114 for i in 0..num_tasks {
115 let start_block = min_block + i * blocks_per_task;
116 let end_block =
117 if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task };
118
119 let provider_factory = provider_factory.clone();
121 let evm_config = components.evm_config().clone();
122 let consensus = components.consensus().clone();
123 let db_at = db_at.clone();
124 let stats_tx = stats_tx.clone();
125 let info_tx = info_tx.clone();
126 let cancellation = cancellation.clone();
127 tasks.spawn_blocking(move || {
128 let mut executor = evm_config.batch_executor(db_at(start_block - 1));
129 let mut executor_created = Instant::now();
130 let executor_lifetime = Duration::from_secs(120);
131
132 'blocks: for block in start_block..end_block {
133 if cancellation.is_cancelled() {
134 break
136 }
137
138 let block = provider_factory
139 .recovered_block(block.into(), TransactionVariant::NoHash)?
140 .unwrap();
141
142 let result = match executor.execute_one(&block) {
143 Ok(result) => result,
144 Err(err) => {
145 if skip_invalid_blocks {
146 executor = evm_config.batch_executor(db_at(block.number()));
147 let _ = info_tx.send((block, eyre::Report::new(err)));
148 continue
149 }
150 return Err(err.into())
151 }
152 };
153
154 if let Err(err) = consensus
155 .validate_block_post_execution(&block, &result, None)
156 .wrap_err_with(|| {
157 format!("Failed to validate block {} {}", block.number(), block.hash())
158 })
159 {
160 let correct_receipts =
161 provider_factory.receipts_by_block(block.number().into())?.unwrap();
162
163 for (i, (receipt, correct_receipt)) in
164 result.receipts.iter().zip(correct_receipts.iter()).enumerate()
165 {
166 if receipt != correct_receipt {
167 let tx_hash = block.body().transactions()[i].tx_hash();
168 error!(
169 ?receipt,
170 ?correct_receipt,
171 index = i,
172 ?tx_hash,
173 "Invalid receipt"
174 );
175 let expected_gas_used = correct_receipt.cumulative_gas_used() -
176 if i == 0 {
177 0
178 } else {
179 correct_receipts[i - 1].cumulative_gas_used()
180 };
181 let got_gas_used = receipt.cumulative_gas_used() -
182 if i == 0 {
183 0
184 } else {
185 result.receipts[i - 1].cumulative_gas_used()
186 };
187 if got_gas_used != expected_gas_used {
188 let mismatch = GotExpected {
189 expected: expected_gas_used,
190 got: got_gas_used,
191 };
192
193 error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
194 if skip_invalid_blocks {
195 executor = evm_config.batch_executor(db_at(block.number()));
196 let _ = info_tx.send((block, err));
197 continue 'blocks;
198 }
199 return Err(err);
200 }
201 } else {
202 continue;
203 }
204 }
205
206 return Err(err);
207 }
208 let _ = stats_tx.send(block.gas_used());
209
210 if executor.size_hint() > 1_000_000 ||
212 executor_created.elapsed() > executor_lifetime
213 {
214 executor = evm_config.batch_executor(db_at(block.number()));
215 executor_created = Instant::now();
216 }
217 }
218
219 eyre::Ok(())
220 });
221 }
222
223 let instant = Instant::now();
224 let mut total_executed_blocks = 0;
225 let mut total_executed_gas = 0;
226
227 let mut last_logged_gas = 0;
228 let mut last_logged_blocks = 0;
229 let mut last_logged_time = Instant::now();
230 let mut invalid_blocks = Vec::new();
231
232 let mut interval = tokio::time::interval(Duration::from_secs(10));
233
234 loop {
235 tokio::select! {
236 Some(gas_used) = stats_rx.recv() => {
237 total_executed_blocks += 1;
238 total_executed_gas += gas_used;
239 }
240 Some((block, err)) = info_rx.recv() => {
241 error!(?err, block=?block.num_hash(), "Invalid block");
242 invalid_blocks.push(block.num_hash());
243 }
244 result = tasks.join_next() => {
245 if let Some(result) = result {
246 if matches!(result, Err(_) | Ok(Err(_))) {
247 error!(?result);
248 return Err(eyre::eyre!("Re-execution failed: {result:?}"));
249 }
250 } else {
251 break;
252 }
253 }
254 _ = interval.tick() => {
255 let blocks_executed = total_executed_blocks - last_logged_blocks;
256 let gas_executed = total_executed_gas - last_logged_gas;
257
258 if blocks_executed > 0 {
259 let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
260 info!(
261 throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
262 progress=format!("{progress:.2}%"),
263 "Executed {blocks_executed} blocks"
264 );
265 }
266
267 last_logged_blocks = total_executed_blocks;
268 last_logged_gas = total_executed_gas;
269 last_logged_time = Instant::now();
270 }
271 }
272 }
273
274 if invalid_blocks.is_empty() {
275 info!(
276 start_block = min_block,
277 end_block = max_block,
278 %total_executed_blocks,
279 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
280 "Re-executed successfully"
281 );
282 } else {
283 info!(
284 start_block = min_block,
285 end_block = max_block,
286 %total_executed_blocks,
287 invalid_block_count = invalid_blocks.len(),
288 ?invalid_blocks,
289 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
290 "Re-executed with invalid blocks"
291 );
292 }
293
294 Ok(())
295 }
296}