1use crate::common::{
4 AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5 EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use alloy_primitives::{Address, B256, U256};
9use clap::Parser;
10use eyre::WrapErr;
11use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
12use reth_cli::chainspec::ChainSpecParser;
13use reth_cli_util::cancellation::CancellationToken;
14use reth_consensus::FullConsensus;
15use reth_evm::{execute::Executor, ConfigureEvm};
16use reth_primitives_traits::{format_gas_throughput, Account, BlockBody, GotExpected};
17use reth_provider::{
18 BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
19 StaticFileProviderFactory, TransactionVariant,
20};
21use reth_revm::{
22 database::StateProviderDatabase,
23 db::{
24 states::reverts::{AccountInfoRevert, RevertToSlot},
25 BundleState,
26 },
27};
28use reth_stages::stages::calculate_gas_used_from_headers;
29use reth_storage_api::{ChangeSetReader, DBProvider, StorageChangeSetReader};
30use std::{
31 collections::HashMap,
32 sync::{
33 atomic::{AtomicU64, Ordering},
34 Arc,
35 },
36 time::{Duration, Instant},
37};
38use tokio::{sync::mpsc, task::JoinSet};
39use tracing::*;
40
41#[derive(Debug, Parser)]
45pub struct Command<C: ChainSpecParser> {
46 #[command(flatten)]
47 env: EnvironmentArgs<C>,
48
49 #[arg(long, default_value = "1")]
51 from: u64,
52
53 #[arg(long)]
55 to: Option<u64>,
56
57 #[arg(long)]
59 num_tasks: Option<u64>,
60
61 #[arg(long, default_value = "5000")]
63 blocks_per_chunk: u64,
64
65 #[arg(long)]
67 skip_invalid_blocks: bool,
68}
69
70impl<C: ChainSpecParser> Command<C> {
71 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
73 Some(&self.env.chain)
74 }
75}
76
77impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
78 pub async fn execute<N>(
80 mut self,
81 components: impl CliComponentsBuilder<N>,
82 runtime: reth_tasks::Runtime,
83 ) -> eyre::Result<()>
84 where
85 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
86 {
87 if self.env.db.rocksdb_block_cache_size.is_none() {
89 self.env.db.rocksdb_block_cache_size = Some(4 << 30);
90 }
91
92 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
93
94 let components = components(provider_factory.chain_spec());
95
96 let min_block = self.from;
97 let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
98 .best_block_number()?;
99 let mut max_block = best_block;
100 if let Some(to) = self.to {
101 if to > best_block {
102 warn!(
103 requested = to,
104 best_block,
105 "Requested --to is beyond available chain head; clamping to best block"
106 );
107 } else {
108 max_block = to;
109 }
110 };
111
112 if min_block > max_block {
113 eyre::bail!("--from ({min_block}) is beyond --to ({max_block}), nothing to re-execute");
114 }
115
116 let num_tasks = self.num_tasks.unwrap_or_else(|| {
117 std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
118 });
119
120 let total_gas = calculate_gas_used_from_headers(
121 &provider_factory.static_file_provider(),
122 min_block..=max_block,
123 )?;
124
125 let skip_invalid_blocks = self.skip_invalid_blocks;
126 let blocks_per_chunk = self.blocks_per_chunk;
127 let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
128 let (info_tx, mut info_rx) = mpsc::unbounded_channel();
129 let cancellation = CancellationToken::new();
130 let _guard = cancellation.drop_guard();
131
132 let next_block = Arc::new(AtomicU64::new(min_block));
134
135 let mut tasks = JoinSet::new();
136 for _ in 0..num_tasks {
137 let provider_factory = provider_factory.clone();
138 let evm_config = components.evm_config().clone();
139 let consensus = components.consensus().clone();
140 let stats_tx = stats_tx.clone();
141 let info_tx = info_tx.clone();
142 let cancellation = cancellation.clone();
143 let next_block = Arc::clone(&next_block);
144 tasks.spawn_blocking(move || {
145 let executor_lifetime = Duration::from_secs(600);
146 let provider = provider_factory.database_provider_ro()?.disable_long_read_transaction_safety();
147
148 let db_at = {
149 |block_number: u64| {
150 StateProviderDatabase(
151 provider
152 .history_by_block_number(block_number)
153 .unwrap(),
154 )
155 }
156 };
157
158 loop {
159 if cancellation.is_cancelled() {
160 break;
161 }
162
163 let chunk_start =
165 next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
166 if chunk_start >= max_block {
167 break;
168 }
169 let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
170
171 let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
172 let mut executor_created = Instant::now();
173
174 'blocks: for block in chunk_start..chunk_end {
175 if cancellation.is_cancelled() {
176 break;
177 }
178
179 let block = provider_factory
180 .recovered_block(block.into(), TransactionVariant::NoHash)?
181 .unwrap();
182
183 let result = match executor.execute_one(&block) {
184 Ok(result) => result,
185 Err(err) => {
186 if skip_invalid_blocks {
187 executor =
188 evm_config.batch_executor(db_at(block.number()));
189 let _ =
190 info_tx.send((block, eyre::Report::new(err)));
191 continue
192 }
193 return Err(err.into())
194 }
195 };
196
197 if let Err(err) = consensus
198 .validate_block_post_execution(&block, &result, None,None)
199 .wrap_err_with(|| {
200 format!(
201 "Failed to validate block {} {}",
202 block.number(),
203 block.hash()
204 )
205 })
206 {
207 let correct_receipts = provider_factory
208 .receipts_by_block(block.number().into())?
209 .unwrap();
210
211 for (i, (receipt, correct_receipt)) in
212 result.receipts.iter().zip(correct_receipts.iter()).enumerate()
213 {
214 if receipt != correct_receipt {
215 let tx_hash =
216 block.body().transactions()[i].tx_hash();
217 error!(
218 ?receipt,
219 ?correct_receipt,
220 index = i,
221 ?tx_hash,
222 "Invalid receipt"
223 );
224 let expected_gas_used =
225 correct_receipt.cumulative_gas_used() -
226 if i == 0 {
227 0
228 } else {
229 correct_receipts[i - 1]
230 .cumulative_gas_used()
231 };
232 let got_gas_used = receipt.cumulative_gas_used() -
233 if i == 0 {
234 0
235 } else {
236 result.receipts[i - 1].cumulative_gas_used()
237 };
238 if got_gas_used != expected_gas_used {
239 let mismatch = GotExpected {
240 expected: expected_gas_used,
241 got: got_gas_used,
242 };
243
244 error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
245 if skip_invalid_blocks {
246 executor = evm_config
247 .batch_executor(db_at(block.number()));
248 let _ = info_tx.send((block, err));
249 continue 'blocks;
250 }
251 return Err(err);
252 }
253 } else {
254 continue;
255 }
256 }
257
258 return Err(err);
259 }
260 let _ = stats_tx.send(block.gas_used());
261
262 if executor.size_hint() > 5_000_000 ||
264 executor_created.elapsed() > executor_lifetime
265 {
266 let last_block = block.number();
267 let old_executor = std::mem::replace(
268 &mut executor,
269 evm_config.batch_executor(db_at(last_block)),
270 );
271 let bundle = old_executor.into_state().take_bundle();
272 verify_bundle_against_changesets(
273 &provider,
274 &bundle,
275 last_block,
276 )?;
277 executor_created = Instant::now();
278 }
279 }
280
281 let bundle = executor.into_state().take_bundle();
283 verify_bundle_against_changesets(
284 &provider,
285 &bundle,
286 chunk_end - 1,
287 )?;
288 }
289
290 eyre::Ok(())
291 });
292 }
293
294 let instant = Instant::now();
295 let mut total_executed_blocks = 0;
296 let mut total_executed_gas = 0;
297
298 let mut last_logged_gas = 0;
299 let mut last_logged_blocks = 0;
300 let mut last_logged_time = Instant::now();
301 let mut invalid_blocks = Vec::new();
302
303 let mut interval = tokio::time::interval(Duration::from_secs(10));
304
305 loop {
306 tokio::select! {
307 Some(gas_used) = stats_rx.recv() => {
308 total_executed_blocks += 1;
309 total_executed_gas += gas_used;
310 }
311 Some((block, err)) = info_rx.recv() => {
312 error!(?err, block=?block.num_hash(), "Invalid block");
313 invalid_blocks.push(block.num_hash());
314 }
315 result = tasks.join_next() => {
316 if let Some(result) = result {
317 if matches!(result, Err(_) | Ok(Err(_))) {
318 error!(?result);
319 return Err(eyre::eyre!("Re-execution failed: {result:?}"));
320 }
321 } else {
322 break;
323 }
324 }
325 _ = interval.tick() => {
326 let blocks_executed = total_executed_blocks - last_logged_blocks;
327 let gas_executed = total_executed_gas - last_logged_gas;
328
329 if blocks_executed > 0 {
330 let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
331 info!(
332 throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
333 progress=format!("{progress:.2}%"),
334 "Executed {blocks_executed} blocks"
335 );
336 }
337
338 last_logged_blocks = total_executed_blocks;
339 last_logged_gas = total_executed_gas;
340 last_logged_time = Instant::now();
341 }
342 }
343 }
344
345 if invalid_blocks.is_empty() {
346 info!(
347 start_block = min_block,
348 end_block = max_block,
349 %total_executed_blocks,
350 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
351 "Re-executed successfully"
352 );
353 } else {
354 info!(
355 start_block = min_block,
356 end_block = max_block,
357 %total_executed_blocks,
358 invalid_block_count = invalid_blocks.len(),
359 ?invalid_blocks,
360 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
361 "Re-executed with invalid blocks"
362 );
363 }
364
365 Ok(())
366 }
367}
368
369fn verify_bundle_against_changesets<P>(
375 provider: &P,
376 bundle: &BundleState,
377 last_block: u64,
378) -> eyre::Result<()>
379where
380 P: ChangeSetReader + StorageChangeSetReader,
381{
382 for (i, block_reverts) in bundle.reverts.iter().rev().enumerate() {
384 let block_number = last_block - i as u64;
385
386 let mut cs_accounts: HashMap<Address, Option<Account>> = provider
387 .account_block_changeset(block_number)?
388 .into_iter()
389 .map(|cs| (cs.address, cs.info))
390 .collect();
391
392 let mut cs_storage: HashMap<Address, HashMap<B256, U256>> = HashMap::new();
393 for (bna, entry) in provider.storage_changeset(block_number)? {
394 cs_storage.entry(bna.address()).or_default().insert(entry.key, entry.value);
395 }
396
397 for (addr, revert) in block_reverts {
398 match &revert.account {
400 AccountInfoRevert::DoNothing => {
401 eyre::ensure!(
402 !cs_accounts.contains_key(addr),
403 "Block {block_number}: account {addr} in changeset but revert is DoNothing",
404 );
405 }
406 AccountInfoRevert::DeleteIt => {
407 let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
408 eyre::eyre!("Block {block_number}: account {addr} revert is DeleteIt but not in changeset")
409 })?;
410 eyre::ensure!(
411 cs_info.is_none(),
412 "Block {block_number}: account {addr} revert is DeleteIt but changeset has {cs_info:?}",
413 );
414 }
415 AccountInfoRevert::RevertTo(info) => {
416 let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
417 eyre::eyre!("Block {block_number}: account {addr} revert is RevertTo but not in changeset")
418 })?;
419 let revert_acct = Some(Account::from(info));
420 eyre::ensure!(
421 revert_acct == cs_info,
422 "Block {block_number}: account {addr} info mismatch: revert={revert_acct:?} cs={cs_info:?}",
423 );
424 }
425 }
426
427 let mut cs_slots = cs_storage.get_mut(addr);
429 for (slot_key, revert_slot) in &revert.storage {
430 let b256_key = B256::from(*slot_key);
431 let cs_value = cs_slots.as_mut().and_then(|s| s.remove(&b256_key));
432 match (revert_slot, cs_value) {
433 (RevertToSlot::Destroyed, _) => {}
438 (RevertToSlot::Some(prev), Some(cs_value)) => eyre::ensure!(
439 *prev == cs_value,
440 "Block {block_number}: {addr} slot {b256_key} mismatch: \
441 revert={prev} cs={cs_value}",
442 ),
443 (RevertToSlot::Some(_), None) => eyre::ensure!(
444 revert.wipe_storage,
445 "Block {block_number}: {addr} slot {b256_key} in reverts but not in changeset",
446 ),
447 }
448 }
449
450 if let Some(remaining) = cs_slots.filter(|s| !s.is_empty()) {
452 eyre::ensure!(
453 revert.wipe_storage,
454 "Block {block_number}: {addr} has {} unmatched storage slots in changeset",
455 remaining.len(),
456 );
457 }
458 }
459
460 if let Some(addr) = cs_accounts.keys().next() {
462 eyre::bail!("Block {block_number}: account {addr} in changeset but not in reverts");
463 }
464 }
465
466 Ok(())
467}