Skip to main content

reth_cli_commands/
re_execute.rs

1//! Re-execute blocks from database in parallel.
2
3use crate::common::{
4    AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5    EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use alloy_primitives::{Address, B256, U256};
9use clap::Parser;
10use eyre::WrapErr;
11use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
12use reth_cli::chainspec::ChainSpecParser;
13use reth_cli_util::cancellation::CancellationToken;
14use reth_consensus::FullConsensus;
15use reth_evm::{execute::Executor, ConfigureEvm};
16use reth_primitives_traits::{format_gas_throughput, Account, BlockBody, GotExpected};
17use reth_provider::{
18    BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
19    StaticFileProviderFactory, TransactionVariant,
20};
21use reth_revm::{
22    database::StateProviderDatabase,
23    db::{
24        states::reverts::{AccountInfoRevert, RevertToSlot},
25        BundleState,
26    },
27};
28use reth_stages::stages::calculate_gas_used_from_headers;
29use reth_storage_api::{ChangeSetReader, DBProvider, StorageChangeSetReader};
30use std::{
31    collections::HashMap,
32    sync::{
33        atomic::{AtomicU64, Ordering},
34        Arc,
35    },
36    time::{Duration, Instant},
37};
38use tokio::{sync::mpsc, task::JoinSet};
39use tracing::*;
40
41/// `reth re-execute` command
42///
43/// Re-execute blocks in parallel to verify historical sync correctness.
44#[derive(Debug, Parser)]
45pub struct Command<C: ChainSpecParser> {
46    #[command(flatten)]
47    env: EnvironmentArgs<C>,
48
49    /// The height to start at.
50    #[arg(long, default_value = "1")]
51    from: u64,
52
53    /// The height to end at. Defaults to the latest block.
54    #[arg(long)]
55    to: Option<u64>,
56
57    /// Number of tasks to run in parallel. Defaults to the number of available CPUs.
58    #[arg(long)]
59    num_tasks: Option<u64>,
60
61    /// Number of blocks each worker processes before grabbing the next chunk.
62    #[arg(long, default_value = "5000")]
63    blocks_per_chunk: u64,
64
65    /// Continues with execution when an invalid block is encountered and collects these blocks.
66    #[arg(long)]
67    skip_invalid_blocks: bool,
68}
69
70impl<C: ChainSpecParser> Command<C> {
71    /// Returns the underlying chain being used to run this command
72    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
73        Some(&self.env.chain)
74    }
75}
76
77impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
78    /// Execute `re-execute` command
79    pub async fn execute<N>(
80        mut self,
81        components: impl CliComponentsBuilder<N>,
82        runtime: reth_tasks::Runtime,
83    ) -> eyre::Result<()>
84    where
85        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
86    {
87        // Default to 4GB RocksDB block cache for re-execute unless explicitly set.
88        if self.env.db.rocksdb_block_cache_size.is_none() {
89            self.env.db.rocksdb_block_cache_size = Some(4 << 30);
90        }
91
92        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
93
94        let components = components(provider_factory.chain_spec());
95
96        let min_block = self.from;
97        let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
98            .best_block_number()?;
99        let mut max_block = best_block;
100        if let Some(to) = self.to {
101            if to > best_block {
102                warn!(
103                    requested = to,
104                    best_block,
105                    "Requested --to is beyond available chain head; clamping to best block"
106                );
107            } else {
108                max_block = to;
109            }
110        };
111
112        if min_block > max_block {
113            eyre::bail!("--from ({min_block}) is beyond --to ({max_block}), nothing to re-execute");
114        }
115
116        let num_tasks = self.num_tasks.unwrap_or_else(|| {
117            std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
118        });
119
120        let total_gas = calculate_gas_used_from_headers(
121            &provider_factory.static_file_provider(),
122            min_block..=max_block,
123        )?;
124
125        let skip_invalid_blocks = self.skip_invalid_blocks;
126        let blocks_per_chunk = self.blocks_per_chunk;
127        let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
128        let (info_tx, mut info_rx) = mpsc::unbounded_channel();
129        let cancellation = CancellationToken::new();
130        let _guard = cancellation.drop_guard();
131
132        // Shared counter for work stealing: workers atomically grab the next chunk of blocks.
133        let next_block = Arc::new(AtomicU64::new(min_block));
134
135        let mut tasks = JoinSet::new();
136        for _ in 0..num_tasks {
137            let provider_factory = provider_factory.clone();
138            let evm_config = components.evm_config().clone();
139            let consensus = components.consensus().clone();
140            let stats_tx = stats_tx.clone();
141            let info_tx = info_tx.clone();
142            let cancellation = cancellation.clone();
143            let next_block = Arc::clone(&next_block);
144            tasks.spawn_blocking(move || {
145                let executor_lifetime = Duration::from_secs(600);
146                let provider = provider_factory.database_provider_ro()?.disable_long_read_transaction_safety();
147
148                let db_at = {
149                    |block_number: u64| {
150                        StateProviderDatabase(
151                            provider
152                                .history_by_block_number(block_number)
153                                .unwrap(),
154                        )
155                    }
156                };
157
158                loop {
159                    if cancellation.is_cancelled() {
160                        break;
161                    }
162
163                    // Atomically grab the next chunk of blocks.
164                    let chunk_start =
165                        next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
166                    if chunk_start >= max_block {
167                        break;
168                    }
169                    let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
170
171                    let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
172                    let mut executor_created = Instant::now();
173
174                    'blocks: for block in chunk_start..chunk_end {
175                        if cancellation.is_cancelled() {
176                            break;
177                        }
178
179                        let block = provider_factory
180                            .recovered_block(block.into(), TransactionVariant::NoHash)?
181                            .unwrap();
182
183                        let result = match executor.execute_one(&block) {
184                            Ok(result) => result,
185                            Err(err) => {
186                                if skip_invalid_blocks {
187                                    executor =
188                                        evm_config.batch_executor(db_at(block.number()));
189                                    let _ =
190                                        info_tx.send((block, eyre::Report::new(err)));
191                                    continue
192                                }
193                                return Err(err.into())
194                            }
195                        };
196
197                        if let Err(err) = consensus
198                            .validate_block_post_execution(&block, &result, None,None)
199                            .wrap_err_with(|| {
200                                format!(
201                                    "Failed to validate block {} {}",
202                                    block.number(),
203                                    block.hash()
204                                )
205                            })
206                        {
207                            let correct_receipts = provider_factory
208                                .receipts_by_block(block.number().into())?
209                                .unwrap();
210
211                            for (i, (receipt, correct_receipt)) in
212                                result.receipts.iter().zip(correct_receipts.iter()).enumerate()
213                            {
214                                if receipt != correct_receipt {
215                                    let tx_hash =
216                                        block.body().transactions()[i].tx_hash();
217                                    error!(
218                                        ?receipt,
219                                        ?correct_receipt,
220                                        index = i,
221                                        ?tx_hash,
222                                        "Invalid receipt"
223                                    );
224                                    let expected_gas_used =
225                                        correct_receipt.cumulative_gas_used() -
226                                            if i == 0 {
227                                                0
228                                            } else {
229                                                correct_receipts[i - 1]
230                                                    .cumulative_gas_used()
231                                            };
232                                    let got_gas_used = receipt.cumulative_gas_used() -
233                                        if i == 0 {
234                                            0
235                                        } else {
236                                            result.receipts[i - 1].cumulative_gas_used()
237                                        };
238                                    if got_gas_used != expected_gas_used {
239                                        let mismatch = GotExpected {
240                                            expected: expected_gas_used,
241                                            got: got_gas_used,
242                                        };
243
244                                        error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
245                                        if skip_invalid_blocks {
246                                            executor = evm_config
247                                                .batch_executor(db_at(block.number()));
248                                            let _ = info_tx.send((block, err));
249                                            continue 'blocks;
250                                        }
251                                        return Err(err);
252                                    }
253                                } else {
254                                    continue;
255                                }
256                            }
257
258                            return Err(err);
259                        }
260                        let _ = stats_tx.send(block.gas_used());
261
262                        // Reset DB once in a while to avoid OOM or read tx timeouts
263                        if executor.size_hint() > 5_000_000 ||
264                            executor_created.elapsed() > executor_lifetime
265                        {
266                            let last_block = block.number();
267                            let old_executor = std::mem::replace(
268                                &mut executor,
269                                evm_config.batch_executor(db_at(last_block)),
270                            );
271                            let bundle = old_executor.into_state().take_bundle();
272                            verify_bundle_against_changesets(
273                                &provider,
274                                &bundle,
275                                last_block,
276                            )?;
277                            executor_created = Instant::now();
278                        }
279                    }
280
281                    // Full verification at chunk end for remaining unverified blocks
282                    let bundle = executor.into_state().take_bundle();
283                    verify_bundle_against_changesets(
284                        &provider,
285                        &bundle,
286                        chunk_end - 1,
287                    )?;
288                }
289
290                eyre::Ok(())
291            });
292        }
293
294        let instant = Instant::now();
295        let mut total_executed_blocks = 0;
296        let mut total_executed_gas = 0;
297
298        let mut last_logged_gas = 0;
299        let mut last_logged_blocks = 0;
300        let mut last_logged_time = Instant::now();
301        let mut invalid_blocks = Vec::new();
302
303        let mut interval = tokio::time::interval(Duration::from_secs(10));
304
305        loop {
306            tokio::select! {
307                Some(gas_used) = stats_rx.recv() => {
308                    total_executed_blocks += 1;
309                    total_executed_gas += gas_used;
310                }
311                Some((block, err)) = info_rx.recv() => {
312                    error!(?err, block=?block.num_hash(), "Invalid block");
313                    invalid_blocks.push(block.num_hash());
314                }
315                result = tasks.join_next() => {
316                    if let Some(result) = result {
317                        if matches!(result, Err(_) | Ok(Err(_))) {
318                            error!(?result);
319                            return Err(eyre::eyre!("Re-execution failed: {result:?}"));
320                        }
321                    } else {
322                        break;
323                    }
324                }
325                _ = interval.tick() => {
326                    let blocks_executed = total_executed_blocks - last_logged_blocks;
327                    let gas_executed = total_executed_gas - last_logged_gas;
328
329                    if blocks_executed > 0 {
330                        let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
331                        info!(
332                            throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
333                            progress=format!("{progress:.2}%"),
334                            "Executed {blocks_executed} blocks"
335                        );
336                    }
337
338                    last_logged_blocks = total_executed_blocks;
339                    last_logged_gas = total_executed_gas;
340                    last_logged_time = Instant::now();
341                }
342            }
343        }
344
345        if invalid_blocks.is_empty() {
346            info!(
347                start_block = min_block,
348                end_block = max_block,
349                %total_executed_blocks,
350                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
351                "Re-executed successfully"
352            );
353        } else {
354            info!(
355                start_block = min_block,
356                end_block = max_block,
357                %total_executed_blocks,
358                invalid_block_count = invalid_blocks.len(),
359                ?invalid_blocks,
360                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
361                "Re-executed with invalid blocks"
362            );
363        }
364
365        Ok(())
366    }
367}
368
369/// Verifies reverts against database changesets.
370///
371/// For each block, reverts must match changeset entries exactly. No extra slots/accounts
372/// in reverts for non-destroyed accounts. Destroyed accounts may have extra changeset slots
373/// (from DB storage wipe) absent from reverts.
374fn verify_bundle_against_changesets<P>(
375    provider: &P,
376    bundle: &BundleState,
377    last_block: u64,
378) -> eyre::Result<()>
379where
380    P: ChangeSetReader + StorageChangeSetReader,
381{
382    // Verify reverts against changesets per block
383    for (i, block_reverts) in bundle.reverts.iter().rev().enumerate() {
384        let block_number = last_block - i as u64;
385
386        let mut cs_accounts: HashMap<Address, Option<Account>> = provider
387            .account_block_changeset(block_number)?
388            .into_iter()
389            .map(|cs| (cs.address, cs.info))
390            .collect();
391
392        let mut cs_storage: HashMap<Address, HashMap<B256, U256>> = HashMap::new();
393        for (bna, entry) in provider.storage_changeset(block_number)? {
394            cs_storage.entry(bna.address()).or_default().insert(entry.key, entry.value);
395        }
396
397        for (addr, revert) in block_reverts {
398            // Verify account info
399            match &revert.account {
400                AccountInfoRevert::DoNothing => {
401                    eyre::ensure!(
402                        !cs_accounts.contains_key(addr),
403                        "Block {block_number}: account {addr} in changeset but revert is DoNothing",
404                    );
405                }
406                AccountInfoRevert::DeleteIt => {
407                    let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
408                        eyre::eyre!("Block {block_number}: account {addr} revert is DeleteIt but not in changeset")
409                    })?;
410                    eyre::ensure!(
411                        cs_info.is_none(),
412                        "Block {block_number}: account {addr} revert is DeleteIt but changeset has {cs_info:?}",
413                    );
414                }
415                AccountInfoRevert::RevertTo(info) => {
416                    let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
417                        eyre::eyre!("Block {block_number}: account {addr} revert is RevertTo but not in changeset")
418                    })?;
419                    let revert_acct = Some(Account::from(info));
420                    eyre::ensure!(
421                        revert_acct == cs_info,
422                        "Block {block_number}: account {addr} info mismatch: revert={revert_acct:?} cs={cs_info:?}",
423                    );
424                }
425            }
426
427            // Verify storage slots — remove matched changeset entries as we go
428            let mut cs_slots = cs_storage.get_mut(addr);
429            for (slot_key, revert_slot) in &revert.storage {
430                let b256_key = B256::from(*slot_key);
431                let cs_value = cs_slots.as_mut().and_then(|s| s.remove(&b256_key));
432                match (revert_slot, cs_value) {
433                    // When a contract is selfdestructed and re-created at the same address
434                    // within the same block, revm marks slots touched by the new contract
435                    // as `Destroyed` and never reads the original DB value, so
436                    // `to_previous_value()` would resolve to zero, which might be wrong.
437                    (RevertToSlot::Destroyed, _) => {}
438                    (RevertToSlot::Some(prev), Some(cs_value)) => eyre::ensure!(
439                        *prev == cs_value,
440                        "Block {block_number}: {addr} slot {b256_key} mismatch: \
441                         revert={prev} cs={cs_value}",
442                    ),
443                    (RevertToSlot::Some(_), None) => eyre::ensure!(
444                        revert.wipe_storage,
445                        "Block {block_number}: {addr} slot {b256_key} in reverts but not in changeset",
446                    ),
447                }
448            }
449
450            // Any remaining cs_storage slots for this address must be from a destroyed account
451            if let Some(remaining) = cs_slots.filter(|s| !s.is_empty()) {
452                eyre::ensure!(
453                    revert.wipe_storage,
454                    "Block {block_number}: {addr} has {} unmatched storage slots in changeset",
455                    remaining.len(),
456                );
457            }
458        }
459
460        // Any remaining cs_accounts entries had no corresponding revert
461        if let Some(addr) = cs_accounts.keys().next() {
462            eyre::bail!("Block {block_number}: account {addr} in changeset but not in reverts");
463        }
464    }
465
466    Ok(())
467}