1use crate::common::{
4 AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5 EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use alloy_primitives::{Address, B256, U256};
9use clap::Parser;
10use eyre::WrapErr;
11use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
12use reth_cli::chainspec::ChainSpecParser;
13use reth_cli_util::cancellation::CancellationToken;
14use reth_consensus::FullConsensus;
15use reth_evm::{execute::Executor, ConfigureEvm};
16use reth_node_core::args::JitArgs;
17use reth_primitives_traits::{format_gas_throughput, Account, BlockBody, GotExpected};
18use reth_provider::{
19 BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
20 StaticFileProviderFactory, TransactionVariant,
21};
22use reth_revm::{
23 database::StateProviderDatabase,
24 db::{
25 states::reverts::{AccountInfoRevert, RevertToSlot},
26 BundleState,
27 },
28};
29use reth_stages::stages::calculate_gas_used_from_headers;
30use reth_storage_api::{ChangeSetReader, DBProvider, StorageChangeSetReader};
31use std::{
32 collections::HashMap,
33 sync::{
34 atomic::{AtomicU64, Ordering},
35 Arc,
36 },
37 time::{Duration, Instant},
38};
39use tokio::{sync::mpsc, task::JoinSet};
40use tracing::*;
41
42#[derive(Debug, Parser)]
46pub struct Command<C: ChainSpecParser> {
47 #[command(flatten)]
48 env: EnvironmentArgs<C>,
49
50 #[arg(long, default_value = "1")]
52 from: u64,
53
54 #[arg(long)]
56 to: Option<u64>,
57
58 #[arg(long)]
60 num_tasks: Option<u64>,
61
62 #[arg(long, default_value = "5000")]
64 blocks_per_chunk: u64,
65
66 #[arg(long)]
68 skip_invalid_blocks: bool,
69
70 #[command(flatten)]
71 pub jit: JitArgs,
72}
73
74impl<C: ChainSpecParser> Command<C> {
75 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
77 Some(&self.env.chain)
78 }
79}
80
81impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
82 pub async fn execute<N>(
84 mut self,
85 components: impl CliComponentsBuilder<N>,
86 runtime: reth_tasks::Runtime,
87 ) -> eyre::Result<()>
88 where
89 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
90 {
91 if self.env.db.rocksdb_block_cache_size.is_none() {
93 self.env.db.rocksdb_block_cache_size = Some(4 << 30);
94 }
95
96 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
97
98 let components = components(provider_factory.chain_spec());
99
100 let min_block = self.from;
101 let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
102 .best_block_number()?;
103 let mut max_block = best_block;
104 if let Some(to) = self.to {
105 if to > best_block {
106 warn!(
107 requested = to,
108 best_block,
109 "Requested --to is beyond available chain head; clamping to best block"
110 );
111 } else {
112 max_block = to;
113 }
114 };
115
116 if min_block > max_block {
117 eyre::bail!("--from ({min_block}) is beyond --to ({max_block}), nothing to re-execute");
118 }
119
120 let num_tasks = self.num_tasks.unwrap_or_else(|| {
121 std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
122 });
123
124 let total_gas = calculate_gas_used_from_headers(
125 &provider_factory.static_file_provider(),
126 min_block..=max_block,
127 )?;
128
129 let skip_invalid_blocks = self.skip_invalid_blocks;
130 let blocks_per_chunk = self.blocks_per_chunk;
131 let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
132 let (info_tx, mut info_rx) = mpsc::unbounded_channel();
133 let cancellation = CancellationToken::new();
134 let _guard = cancellation.drop_guard();
135
136 let next_block = Arc::new(AtomicU64::new(min_block));
138
139 let mut tasks = JoinSet::new();
140 for _ in 0..num_tasks {
141 let provider_factory = provider_factory.clone();
142 let evm_config = components.evm_config().clone();
143 let consensus = components.consensus().clone();
144 let stats_tx = stats_tx.clone();
145 let info_tx = info_tx.clone();
146 let cancellation = cancellation.clone();
147 let next_block = Arc::clone(&next_block);
148 tasks.spawn_blocking(move || {
149 let evm_config = evm_config.with_jit_support();
150 let executor_lifetime = Duration::from_secs(600);
151 let provider = provider_factory.database_provider_ro()?.disable_long_read_transaction_safety();
152
153 let db_at = {
154 |block_number: u64| {
155 StateProviderDatabase(
156 provider
157 .history_by_block_number(block_number)
158 .unwrap(),
159 )
160 }
161 };
162
163 loop {
164 if cancellation.is_cancelled() {
165 break;
166 }
167
168 let chunk_start =
170 next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
171 if chunk_start >= max_block {
172 break;
173 }
174 let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
175
176 let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
177 let mut executor_created = Instant::now();
178
179 'blocks: for block in chunk_start..chunk_end {
180 if cancellation.is_cancelled() {
181 break;
182 }
183
184 let block = provider_factory
185 .recovered_block(block.into(), TransactionVariant::NoHash)?
186 .unwrap();
187
188 let result = match executor.execute_one(&block) {
189 Ok(result) => result,
190 Err(err) => {
191 if skip_invalid_blocks {
192 executor =
193 evm_config.batch_executor(db_at(block.number()));
194 let _ =
195 info_tx.send((block, eyre::Report::new(err)));
196 continue
197 }
198 return Err(err.into())
199 }
200 };
201
202 if let Err(err) = consensus
203 .validate_block_post_execution(&block, &result, None,None)
204 .wrap_err_with(|| {
205 format!(
206 "Failed to validate block {} {}",
207 block.number(),
208 block.hash()
209 )
210 })
211 {
212 let correct_receipts = provider_factory
213 .receipts_by_block(block.number().into())?
214 .unwrap();
215
216 for (i, (receipt, correct_receipt)) in
217 result.receipts.iter().zip(correct_receipts.iter()).enumerate()
218 {
219 if receipt != correct_receipt {
220 let tx_hash =
221 block.body().transactions()[i].tx_hash();
222 error!(
223 ?receipt,
224 ?correct_receipt,
225 index = i,
226 ?tx_hash,
227 "Invalid receipt"
228 );
229 let expected_gas_used =
230 correct_receipt.cumulative_gas_used() -
231 if i == 0 {
232 0
233 } else {
234 correct_receipts[i - 1]
235 .cumulative_gas_used()
236 };
237 let got_gas_used = receipt.cumulative_gas_used() -
238 if i == 0 {
239 0
240 } else {
241 result.receipts[i - 1].cumulative_gas_used()
242 };
243 if got_gas_used != expected_gas_used {
244 let mismatch = GotExpected {
245 expected: expected_gas_used,
246 got: got_gas_used,
247 };
248
249 error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
250 if skip_invalid_blocks {
251 executor = evm_config
252 .batch_executor(db_at(block.number()));
253 let _ = info_tx.send((block, err));
254 continue 'blocks;
255 }
256 return Err(err);
257 }
258 } else {
259 continue;
260 }
261 }
262
263 return Err(err);
264 }
265 let _ = stats_tx.send((block.number(), block.gas_used()));
266
267 if executor.size_hint() > 5_000_000 ||
269 executor_created.elapsed() > executor_lifetime
270 {
271 let last_block = block.number();
272 let old_executor = std::mem::replace(
273 &mut executor,
274 evm_config.batch_executor(db_at(last_block)),
275 );
276 let bundle = old_executor.into_state().take_bundle();
277 verify_bundle_against_changesets(
278 &provider,
279 &bundle,
280 last_block,
281 )?;
282 executor_created = Instant::now();
283 }
284 }
285
286 let bundle = executor.into_state().take_bundle();
288 verify_bundle_against_changesets(
289 &provider,
290 &bundle,
291 chunk_end - 1,
292 )?;
293 }
294
295 eyre::Ok(())
296 });
297 }
298
299 let instant = Instant::now();
300 let mut total_executed_blocks = 0;
301 let mut total_executed_gas = 0;
302 let mut latest_executed_block = None;
303
304 let mut last_logged_gas = 0;
305 let mut last_logged_blocks = 0;
306 let mut last_logged_time = Instant::now();
307 let mut invalid_blocks = Vec::new();
308
309 let mut interval = tokio::time::interval(Duration::from_secs(10));
310
311 loop {
312 tokio::select! {
313 Some((block_number, gas_used)) = stats_rx.recv() => {
314 total_executed_blocks += 1;
315 total_executed_gas += gas_used;
316 latest_executed_block =
317 Some(latest_executed_block.unwrap_or(block_number).max(block_number));
318 }
319 Some((block, err)) = info_rx.recv() => {
320 error!(?err, block=?block.num_hash(), "Invalid block");
321 invalid_blocks.push(block.num_hash());
322 }
323 result = tasks.join_next() => {
324 if let Some(result) = result {
325 if matches!(result, Err(_) | Ok(Err(_))) {
326 error!(?result);
327 return Err(eyre::eyre!("Re-execution failed: {result:?}"));
328 }
329 } else {
330 break;
331 }
332 }
333 _ = interval.tick() => {
334 let blocks_executed = total_executed_blocks - last_logged_blocks;
335 let gas_executed = total_executed_gas - last_logged_gas;
336
337 if blocks_executed > 0 {
338 let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
339 info!(
340 throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
341 progress=format!("{progress:.2}%"),
342 ?latest_executed_block,
343 "Executed {blocks_executed} blocks"
344 );
345 }
346
347 last_logged_blocks = total_executed_blocks;
348 last_logged_gas = total_executed_gas;
349 last_logged_time = Instant::now();
350 }
351 }
352 }
353
354 if invalid_blocks.is_empty() {
355 info!(
356 start_block = min_block,
357 end_block = max_block,
358 %total_executed_blocks,
359 ?latest_executed_block,
360 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
361 "Re-executed successfully"
362 );
363 } else {
364 info!(
365 start_block = min_block,
366 end_block = max_block,
367 %total_executed_blocks,
368 ?latest_executed_block,
369 invalid_block_count = invalid_blocks.len(),
370 ?invalid_blocks,
371 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
372 "Re-executed with invalid blocks"
373 );
374 }
375
376 Ok(())
377 }
378}
379
380fn verify_bundle_against_changesets<P>(
386 provider: &P,
387 bundle: &BundleState,
388 last_block: u64,
389) -> eyre::Result<()>
390where
391 P: ChangeSetReader + StorageChangeSetReader,
392{
393 for (i, block_reverts) in bundle.reverts.iter().rev().enumerate() {
395 let block_number = last_block - i as u64;
396
397 let mut cs_accounts: HashMap<Address, Option<Account>> = provider
398 .account_block_changeset(block_number)?
399 .into_iter()
400 .map(|cs| (cs.address, cs.info))
401 .collect();
402
403 let mut cs_storage: HashMap<Address, HashMap<B256, U256>> = HashMap::new();
404 for (bna, entry) in provider.storage_changeset(block_number)? {
405 cs_storage.entry(bna.address()).or_default().insert(entry.key, entry.value);
406 }
407
408 for (addr, revert) in block_reverts {
409 match &revert.account {
411 AccountInfoRevert::DoNothing => {
412 eyre::ensure!(
413 !cs_accounts.contains_key(addr),
414 "Block {block_number}: account {addr} in changeset but revert is DoNothing",
415 );
416 }
417 AccountInfoRevert::DeleteIt => {
418 let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
419 eyre::eyre!("Block {block_number}: account {addr} revert is DeleteIt but not in changeset")
420 })?;
421 eyre::ensure!(
422 cs_info.is_none(),
423 "Block {block_number}: account {addr} revert is DeleteIt but changeset has {cs_info:?}",
424 );
425 }
426 AccountInfoRevert::RevertTo(info) => {
427 let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
428 eyre::eyre!("Block {block_number}: account {addr} revert is RevertTo but not in changeset")
429 })?;
430 let revert_acct = Some(Account::from(info));
431 eyre::ensure!(
432 revert_acct == cs_info,
433 "Block {block_number}: account {addr} info mismatch: revert={revert_acct:?} cs={cs_info:?}",
434 );
435 }
436 }
437
438 let mut cs_slots = cs_storage.get_mut(addr);
440 for (slot_key, revert_slot) in &revert.storage {
441 let b256_key = B256::from(*slot_key);
442 let cs_value = cs_slots.as_mut().and_then(|s| s.remove(&b256_key));
443 match (revert_slot, cs_value) {
444 (RevertToSlot::Destroyed, _) => {}
449 (RevertToSlot::Some(prev), Some(cs_value)) => eyre::ensure!(
450 *prev == cs_value,
451 "Block {block_number}: {addr} slot {b256_key} mismatch: \
452 revert={prev} cs={cs_value}",
453 ),
454 (RevertToSlot::Some(_), None) => eyre::ensure!(
455 revert.wipe_storage,
456 "Block {block_number}: {addr} slot {b256_key} in reverts but not in changeset",
457 ),
458 }
459 }
460
461 if let Some(remaining) = cs_slots.filter(|s| !s.is_empty()) {
463 eyre::ensure!(
464 revert.wipe_storage,
465 "Block {block_number}: {addr} has {} unmatched storage slots in changeset",
466 remaining.len(),
467 );
468 }
469 }
470
471 if let Some(addr) = cs_accounts.keys().next() {
473 eyre::bail!("Block {block_number}: account {addr} in changeset but not in reverts");
474 }
475 }
476
477 Ok(())
478}