reth_cli_commands/
re_execute.rs

1//! Re-execute blocks from database in parallel.
2
3use crate::common::{
4    AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5    EnvironmentArgs,
6};
7use alloy_consensus::{BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_consensus::FullConsensus;
13use reth_evm::{execute::Executor, ConfigureEvm};
14use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected, SignedTransaction};
15use reth_provider::{
16    BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
17    StaticFileProviderFactory, TransactionVariant,
18};
19use reth_revm::database::StateProviderDatabase;
20use reth_stages::stages::calculate_gas_used_from_headers;
21use std::{
22    sync::Arc,
23    time::{Duration, Instant},
24};
25use tokio::{sync::mpsc, task::JoinSet};
26use tracing::*;
27
28/// `reth re-execute` command
29///
30/// Re-execute blocks in parallel to verify historical sync correctness.
31#[derive(Debug, Parser)]
32pub struct Command<C: ChainSpecParser> {
33    #[command(flatten)]
34    env: EnvironmentArgs<C>,
35
36    /// The height to start at.
37    #[arg(long, default_value = "1")]
38    from: u64,
39
40    /// The height to end at. Defaults to the latest block.
41    #[arg(long)]
42    to: Option<u64>,
43
44    /// Number of tasks to run in parallel
45    #[arg(long, default_value = "10")]
46    num_tasks: u64,
47}
48
49impl<C: ChainSpecParser> Command<C> {
50    /// Returns the underlying chain being used to run this command
51    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
52        Some(&self.env.chain)
53    }
54}
55
56impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
57    /// Execute `re-execute` command
58    pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
59    where
60        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
61    {
62        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
63
64        let provider = provider_factory.database_provider_ro()?;
65        let components = components(provider_factory.chain_spec());
66
67        let min_block = self.from;
68        let max_block = self.to.unwrap_or(provider.best_block_number()?);
69
70        let total_blocks = max_block - min_block;
71        let total_gas = calculate_gas_used_from_headers(
72            &provider_factory.static_file_provider(),
73            min_block..=max_block,
74        )?;
75        let blocks_per_task = total_blocks / self.num_tasks;
76
77        let db_at = {
78            let provider_factory = provider_factory.clone();
79            move |block_number: u64| {
80                StateProviderDatabase(
81                    provider_factory.history_by_block_number(block_number).unwrap(),
82                )
83            }
84        };
85
86        let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
87
88        let mut tasks = JoinSet::new();
89        for i in 0..self.num_tasks {
90            let start_block = min_block + i * blocks_per_task;
91            let end_block =
92                if i == self.num_tasks - 1 { max_block } else { start_block + blocks_per_task };
93
94            // Spawn thread executing blocks
95            let provider_factory = provider_factory.clone();
96            let evm_config = components.evm_config().clone();
97            let consensus = components.consensus().clone();
98            let db_at = db_at.clone();
99            let stats_tx = stats_tx.clone();
100            tasks.spawn_blocking(move || {
101                let mut executor = evm_config.batch_executor(db_at(start_block - 1));
102                for block in start_block..end_block {
103                    let block = provider_factory
104                        .recovered_block(block.into(), TransactionVariant::NoHash)?
105                        .unwrap();
106                    let result = executor.execute_one(&block)?;
107
108                    if let Err(err) = consensus
109                        .validate_block_post_execution(&block, &result)
110                        .wrap_err_with(|| format!("Failed to validate block {}", block.number()))
111                    {
112                        let correct_receipts =
113                            provider_factory.receipts_by_block(block.number().into())?.unwrap();
114
115                        for (i, (receipt, correct_receipt)) in
116                            result.receipts.iter().zip(correct_receipts.iter()).enumerate()
117                        {
118                            if receipt != correct_receipt {
119                                let tx_hash = block.body().transactions()[i].tx_hash();
120                                error!(
121                                    ?receipt,
122                                    ?correct_receipt,
123                                    index = i,
124                                    ?tx_hash,
125                                    "Invalid receipt"
126                                );
127                                let expected_gas_used = correct_receipt.cumulative_gas_used() -
128                                    if i == 0 {
129                                        0
130                                    } else {
131                                        correct_receipts[i - 1].cumulative_gas_used()
132                                    };
133                                let got_gas_used = receipt.cumulative_gas_used() -
134                                    if i == 0 {
135                                        0
136                                    } else {
137                                        result.receipts[i - 1].cumulative_gas_used()
138                                    };
139                                if got_gas_used != expected_gas_used {
140                                    let mismatch = GotExpected {
141                                        expected: expected_gas_used,
142                                        got: got_gas_used,
143                                    };
144
145                                    error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
146                                    return Err(err);
147                                }
148                            } else {
149                                continue;
150                            }
151                        }
152
153                        return Err(err);
154                    }
155                    let _ = stats_tx.send(block.gas_used());
156
157                    // Reset DB once in a while to avoid OOM
158                    if executor.size_hint() > 1_000_000 {
159                        executor = evm_config.batch_executor(db_at(block.number()));
160                    }
161                }
162
163                eyre::Ok(())
164            });
165        }
166
167        let instant = Instant::now();
168        let mut total_executed_blocks = 0;
169        let mut total_executed_gas = 0;
170
171        let mut last_logged_gas = 0;
172        let mut last_logged_blocks = 0;
173        let mut last_logged_time = Instant::now();
174
175        let mut interval = tokio::time::interval(Duration::from_secs(10));
176
177        loop {
178            tokio::select! {
179                Some(gas_used) = stats_rx.recv() => {
180                    total_executed_blocks += 1;
181                    total_executed_gas += gas_used;
182                }
183                result = tasks.join_next() => {
184                    if let Some(result) = result {
185                        if matches!(result, Err(_) | Ok(Err(_))) {
186                            error!(?result);
187                            return Err(eyre::eyre!("Re-execution failed: {result:?}"));
188                        }
189                    } else {
190                        break;
191                    }
192                }
193                _ = interval.tick() => {
194                    let blocks_executed = total_executed_blocks - last_logged_blocks;
195                    let gas_executed = total_executed_gas - last_logged_gas;
196
197                    if blocks_executed > 0 {
198                        let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
199                        info!(
200                            throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
201                            progress=format!("{progress:.2}%"),
202                            "Executed {blocks_executed} blocks"
203                        );
204                    }
205
206                    last_logged_blocks = total_executed_blocks;
207                    last_logged_gas = total_executed_gas;
208                    last_logged_time = Instant::now();
209                }
210            }
211        }
212
213        info!(
214            start_block = min_block,
215            end_block = max_block,
216            throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
217            "Re-executed successfully"
218        );
219
220        Ok(())
221    }
222}