Skip to main content

reth_cli_commands/
re_execute.rs

1//! Re-execute blocks from database in parallel.
2
3use crate::common::{
4    AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5    EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use alloy_primitives::{Address, B256, U256};
9use clap::Parser;
10use eyre::WrapErr;
11use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
12use reth_cli::chainspec::ChainSpecParser;
13use reth_cli_util::cancellation::CancellationToken;
14use reth_consensus::FullConsensus;
15use reth_evm::{execute::Executor, ConfigureEvm};
16use reth_node_core::args::JitArgs;
17use reth_primitives_traits::{format_gas_throughput, Account, BlockBody, GotExpected};
18use reth_provider::{
19    BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
20    StaticFileProviderFactory, TransactionVariant,
21};
22use reth_revm::{
23    database::StateProviderDatabase,
24    db::{
25        states::reverts::{AccountInfoRevert, RevertToSlot},
26        BundleState,
27    },
28};
29use reth_stages::stages::calculate_gas_used_from_headers;
30use reth_storage_api::{ChangeSetReader, DBProvider, StorageChangeSetReader};
31use std::{
32    collections::HashMap,
33    sync::{
34        atomic::{AtomicU64, Ordering},
35        Arc,
36    },
37    time::{Duration, Instant},
38};
39use tokio::{sync::mpsc, task::JoinSet};
40use tracing::*;
41
42/// `reth re-execute` command
43///
44/// Re-execute blocks in parallel to verify historical sync correctness.
45#[derive(Debug, Parser)]
46pub struct Command<C: ChainSpecParser> {
47    #[command(flatten)]
48    env: EnvironmentArgs<C>,
49
50    /// The height to start at.
51    #[arg(long, default_value = "1")]
52    from: u64,
53
54    /// The height to end at. Defaults to the latest block.
55    #[arg(long)]
56    to: Option<u64>,
57
58    /// Number of tasks to run in parallel. Defaults to the number of available CPUs.
59    #[arg(long)]
60    num_tasks: Option<u64>,
61
62    /// Number of blocks each worker processes before grabbing the next chunk.
63    #[arg(long, default_value = "5000")]
64    blocks_per_chunk: u64,
65
66    /// Continues with execution when an invalid block is encountered and collects these blocks.
67    #[arg(long)]
68    skip_invalid_blocks: bool,
69
70    #[command(flatten)]
71    pub jit: JitArgs,
72}
73
74impl<C: ChainSpecParser> Command<C> {
75    /// Returns the underlying chain being used to run this command
76    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
77        Some(&self.env.chain)
78    }
79}
80
81impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
82    /// Execute `re-execute` command
83    pub async fn execute<N>(
84        mut self,
85        components: impl CliComponentsBuilder<N>,
86        runtime: reth_tasks::Runtime,
87    ) -> eyre::Result<()>
88    where
89        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
90    {
91        // Default to 4GB RocksDB block cache for re-execute unless explicitly set.
92        if self.env.db.rocksdb_block_cache_size.is_none() {
93            self.env.db.rocksdb_block_cache_size = Some(4 << 30);
94        }
95
96        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
97
98        let components = components(provider_factory.chain_spec());
99
100        let min_block = self.from;
101        let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
102            .best_block_number()?;
103        let mut max_block = best_block;
104        if let Some(to) = self.to {
105            if to > best_block {
106                warn!(
107                    requested = to,
108                    best_block,
109                    "Requested --to is beyond available chain head; clamping to best block"
110                );
111            } else {
112                max_block = to;
113            }
114        };
115
116        if min_block > max_block {
117            eyre::bail!("--from ({min_block}) is beyond --to ({max_block}), nothing to re-execute");
118        }
119
120        let num_tasks = self.num_tasks.unwrap_or_else(|| {
121            std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
122        });
123
124        let total_gas = calculate_gas_used_from_headers(
125            &provider_factory.static_file_provider(),
126            min_block..=max_block,
127        )?;
128
129        let skip_invalid_blocks = self.skip_invalid_blocks;
130        let blocks_per_chunk = self.blocks_per_chunk;
131        let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
132        let (info_tx, mut info_rx) = mpsc::unbounded_channel();
133        let cancellation = CancellationToken::new();
134        let _guard = cancellation.drop_guard();
135
136        // Shared counter for work stealing: workers atomically grab the next chunk of blocks.
137        let next_block = Arc::new(AtomicU64::new(min_block));
138
139        let mut tasks = JoinSet::new();
140        for _ in 0..num_tasks {
141            let provider_factory = provider_factory.clone();
142            let evm_config = components.evm_config().clone();
143            let consensus = components.consensus().clone();
144            let stats_tx = stats_tx.clone();
145            let info_tx = info_tx.clone();
146            let cancellation = cancellation.clone();
147            let next_block = Arc::clone(&next_block);
148            tasks.spawn_blocking(move || {
149                let evm_config = evm_config.with_jit_support();
150                let executor_lifetime = Duration::from_secs(600);
151                let provider = provider_factory.database_provider_ro()?.disable_long_read_transaction_safety();
152
153                let db_at = {
154                    |block_number: u64| {
155                        StateProviderDatabase(
156                            provider
157                                .history_by_block_number(block_number)
158                                .unwrap(),
159                        )
160                    }
161                };
162
163                loop {
164                    if cancellation.is_cancelled() {
165                        break;
166                    }
167
168                    // Atomically grab the next chunk of blocks.
169                    let chunk_start =
170                        next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
171                    if chunk_start >= max_block {
172                        break;
173                    }
174                    let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
175
176                    let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
177                    let mut executor_created = Instant::now();
178
179                    'blocks: for block in chunk_start..chunk_end {
180                        if cancellation.is_cancelled() {
181                            break;
182                        }
183
184                        let block = provider_factory
185                            .recovered_block(block.into(), TransactionVariant::NoHash)?
186                            .unwrap();
187
188                        let result = match executor.execute_one(&block) {
189                            Ok(result) => result,
190                            Err(err) => {
191                                if skip_invalid_blocks {
192                                    executor =
193                                        evm_config.batch_executor(db_at(block.number()));
194                                    let _ =
195                                        info_tx.send((block, eyre::Report::new(err)));
196                                    continue
197                                }
198                                return Err(err.into())
199                            }
200                        };
201
202                        if let Err(err) = consensus
203                            .validate_block_post_execution(&block, &result, None,None)
204                            .wrap_err_with(|| {
205                                format!(
206                                    "Failed to validate block {} {}",
207                                    block.number(),
208                                    block.hash()
209                                )
210                            })
211                        {
212                            let correct_receipts = provider_factory
213                                .receipts_by_block(block.number().into())?
214                                .unwrap();
215
216                            for (i, (receipt, correct_receipt)) in
217                                result.receipts.iter().zip(correct_receipts.iter()).enumerate()
218                            {
219                                if receipt != correct_receipt {
220                                    let tx_hash =
221                                        block.body().transactions()[i].tx_hash();
222                                    error!(
223                                        ?receipt,
224                                        ?correct_receipt,
225                                        index = i,
226                                        ?tx_hash,
227                                        "Invalid receipt"
228                                    );
229                                    let expected_gas_used =
230                                        correct_receipt.cumulative_gas_used() -
231                                            if i == 0 {
232                                                0
233                                            } else {
234                                                correct_receipts[i - 1]
235                                                    .cumulative_gas_used()
236                                            };
237                                    let got_gas_used = receipt.cumulative_gas_used() -
238                                        if i == 0 {
239                                            0
240                                        } else {
241                                            result.receipts[i - 1].cumulative_gas_used()
242                                        };
243                                    if got_gas_used != expected_gas_used {
244                                        let mismatch = GotExpected {
245                                            expected: expected_gas_used,
246                                            got: got_gas_used,
247                                        };
248
249                                        error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
250                                        if skip_invalid_blocks {
251                                            executor = evm_config
252                                                .batch_executor(db_at(block.number()));
253                                            let _ = info_tx.send((block, err));
254                                            continue 'blocks;
255                                        }
256                                        return Err(err);
257                                    }
258                                } else {
259                                    continue;
260                                }
261                            }
262
263                            return Err(err);
264                        }
265                        let _ = stats_tx.send((block.number(), block.gas_used()));
266
267                        // Reset DB once in a while to avoid OOM or read tx timeouts
268                        if executor.size_hint() > 5_000_000 ||
269                            executor_created.elapsed() > executor_lifetime
270                        {
271                            let last_block = block.number();
272                            let old_executor = std::mem::replace(
273                                &mut executor,
274                                evm_config.batch_executor(db_at(last_block)),
275                            );
276                            let bundle = old_executor.into_state().take_bundle();
277                            verify_bundle_against_changesets(
278                                &provider,
279                                &bundle,
280                                last_block,
281                            )?;
282                            executor_created = Instant::now();
283                        }
284                    }
285
286                    // Full verification at chunk end for remaining unverified blocks
287                    let bundle = executor.into_state().take_bundle();
288                    verify_bundle_against_changesets(
289                        &provider,
290                        &bundle,
291                        chunk_end - 1,
292                    )?;
293                }
294
295                eyre::Ok(())
296            });
297        }
298
299        let instant = Instant::now();
300        let mut total_executed_blocks = 0;
301        let mut total_executed_gas = 0;
302        let mut latest_executed_block = None;
303
304        let mut last_logged_gas = 0;
305        let mut last_logged_blocks = 0;
306        let mut last_logged_time = Instant::now();
307        let mut invalid_blocks = Vec::new();
308
309        let mut interval = tokio::time::interval(Duration::from_secs(10));
310
311        loop {
312            tokio::select! {
313                Some((block_number, gas_used)) = stats_rx.recv() => {
314                    total_executed_blocks += 1;
315                    total_executed_gas += gas_used;
316                    latest_executed_block =
317                        Some(latest_executed_block.unwrap_or(block_number).max(block_number));
318                }
319                Some((block, err)) = info_rx.recv() => {
320                    error!(?err, block=?block.num_hash(), "Invalid block");
321                    invalid_blocks.push(block.num_hash());
322                }
323                result = tasks.join_next() => {
324                    if let Some(result) = result {
325                        if matches!(result, Err(_) | Ok(Err(_))) {
326                            error!(?result);
327                            return Err(eyre::eyre!("Re-execution failed: {result:?}"));
328                        }
329                    } else {
330                        break;
331                    }
332                }
333                _ = interval.tick() => {
334                    let blocks_executed = total_executed_blocks - last_logged_blocks;
335                    let gas_executed = total_executed_gas - last_logged_gas;
336
337                    if blocks_executed > 0 {
338                        let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
339                        info!(
340                            throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
341                            progress=format!("{progress:.2}%"),
342                            ?latest_executed_block,
343                            "Executed {blocks_executed} blocks"
344                        );
345                    }
346
347                    last_logged_blocks = total_executed_blocks;
348                    last_logged_gas = total_executed_gas;
349                    last_logged_time = Instant::now();
350                }
351            }
352        }
353
354        if invalid_blocks.is_empty() {
355            info!(
356                start_block = min_block,
357                end_block = max_block,
358                %total_executed_blocks,
359                ?latest_executed_block,
360                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
361                "Re-executed successfully"
362            );
363        } else {
364            info!(
365                start_block = min_block,
366                end_block = max_block,
367                %total_executed_blocks,
368                ?latest_executed_block,
369                invalid_block_count = invalid_blocks.len(),
370                ?invalid_blocks,
371                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
372                "Re-executed with invalid blocks"
373            );
374        }
375
376        Ok(())
377    }
378}
379
380/// Verifies reverts against database changesets.
381///
382/// For each block, reverts must match changeset entries exactly. No extra slots/accounts
383/// in reverts for non-destroyed accounts. Destroyed accounts may have extra changeset slots
384/// (from DB storage wipe) absent from reverts.
385fn verify_bundle_against_changesets<P>(
386    provider: &P,
387    bundle: &BundleState,
388    last_block: u64,
389) -> eyre::Result<()>
390where
391    P: ChangeSetReader + StorageChangeSetReader,
392{
393    // Verify reverts against changesets per block
394    for (i, block_reverts) in bundle.reverts.iter().rev().enumerate() {
395        let block_number = last_block - i as u64;
396
397        let mut cs_accounts: HashMap<Address, Option<Account>> = provider
398            .account_block_changeset(block_number)?
399            .into_iter()
400            .map(|cs| (cs.address, cs.info))
401            .collect();
402
403        let mut cs_storage: HashMap<Address, HashMap<B256, U256>> = HashMap::new();
404        for (bna, entry) in provider.storage_changeset(block_number)? {
405            cs_storage.entry(bna.address()).or_default().insert(entry.key, entry.value);
406        }
407
408        for (addr, revert) in block_reverts {
409            // Verify account info
410            match &revert.account {
411                AccountInfoRevert::DoNothing => {
412                    eyre::ensure!(
413                        !cs_accounts.contains_key(addr),
414                        "Block {block_number}: account {addr} in changeset but revert is DoNothing",
415                    );
416                }
417                AccountInfoRevert::DeleteIt => {
418                    let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
419                        eyre::eyre!("Block {block_number}: account {addr} revert is DeleteIt but not in changeset")
420                    })?;
421                    eyre::ensure!(
422                        cs_info.is_none(),
423                        "Block {block_number}: account {addr} revert is DeleteIt but changeset has {cs_info:?}",
424                    );
425                }
426                AccountInfoRevert::RevertTo(info) => {
427                    let cs_info = cs_accounts.remove(addr).ok_or_else(|| {
428                        eyre::eyre!("Block {block_number}: account {addr} revert is RevertTo but not in changeset")
429                    })?;
430                    let revert_acct = Some(Account::from(info));
431                    eyre::ensure!(
432                        revert_acct == cs_info,
433                        "Block {block_number}: account {addr} info mismatch: revert={revert_acct:?} cs={cs_info:?}",
434                    );
435                }
436            }
437
438            // Verify storage slots — remove matched changeset entries as we go
439            let mut cs_slots = cs_storage.get_mut(addr);
440            for (slot_key, revert_slot) in &revert.storage {
441                let b256_key = B256::from(*slot_key);
442                let cs_value = cs_slots.as_mut().and_then(|s| s.remove(&b256_key));
443                match (revert_slot, cs_value) {
444                    // When a contract is selfdestructed and re-created at the same address
445                    // within the same block, revm marks slots touched by the new contract
446                    // as `Destroyed` and never reads the original DB value, so
447                    // `to_previous_value()` would resolve to zero, which might be wrong.
448                    (RevertToSlot::Destroyed, _) => {}
449                    (RevertToSlot::Some(prev), Some(cs_value)) => eyre::ensure!(
450                        *prev == cs_value,
451                        "Block {block_number}: {addr} slot {b256_key} mismatch: \
452                         revert={prev} cs={cs_value}",
453                    ),
454                    (RevertToSlot::Some(_), None) => eyre::ensure!(
455                        revert.wipe_storage,
456                        "Block {block_number}: {addr} slot {b256_key} in reverts but not in changeset",
457                    ),
458                }
459            }
460
461            // Any remaining cs_storage slots for this address must be from a destroyed account
462            if let Some(remaining) = cs_slots.filter(|s| !s.is_empty()) {
463                eyre::ensure!(
464                    revert.wipe_storage,
465                    "Block {block_number}: {addr} has {} unmatched storage slots in changeset",
466                    remaining.len(),
467                );
468            }
469        }
470
471        // Any remaining cs_accounts entries had no corresponding revert
472        if let Some(addr) = cs_accounts.keys().next() {
473            eyre::bail!("Block {block_number}: account {addr} in changeset but not in reverts");
474        }
475    }
476
477    Ok(())
478}