reth_cli_commands/
re_execute.rs

1//! Re-execute blocks from database in parallel.
2
3use crate::common::{
4    AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5    EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_cli_util::cancellation::CancellationToken;
13use reth_consensus::FullConsensus;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
16use reth_provider::{
17    BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
18    StaticFileProviderFactory, TransactionVariant,
19};
20use reth_revm::database::StateProviderDatabase;
21use reth_stages::stages::calculate_gas_used_from_headers;
22use std::{
23    sync::Arc,
24    time::{Duration, Instant},
25};
26use tokio::{sync::mpsc, task::JoinSet};
27use tracing::*;
28
29/// `reth re-execute` command
30///
31/// Re-execute blocks in parallel to verify historical sync correctness.
32#[derive(Debug, Parser)]
33pub struct Command<C: ChainSpecParser> {
34    #[command(flatten)]
35    env: EnvironmentArgs<C>,
36
37    /// The height to start at.
38    #[arg(long, default_value = "1")]
39    from: u64,
40
41    /// The height to end at. Defaults to the latest block.
42    #[arg(long)]
43    to: Option<u64>,
44
45    /// Number of tasks to run in parallel. Defaults to the number of available CPUs.
46    #[arg(long)]
47    num_tasks: Option<u64>,
48
49    /// Continues with execution when an invalid block is encountered and collects these blocks.
50    #[arg(long)]
51    skip_invalid_blocks: bool,
52}
53
54impl<C: ChainSpecParser> Command<C> {
55    /// Returns the underlying chain being used to run this command
56    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
57        Some(&self.env.chain)
58    }
59}
60
61impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
62    /// Execute `re-execute` command
63    pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
64    where
65        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
66    {
67        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
68
69        let components = components(provider_factory.chain_spec());
70
71        let min_block = self.from;
72        let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
73            .best_block_number()?;
74        let mut max_block = best_block;
75        if let Some(to) = self.to {
76            if to > best_block {
77                warn!(
78                    requested = to,
79                    best_block,
80                    "Requested --to is beyond available chain head; clamping to best block"
81                );
82            } else {
83                max_block = to;
84            }
85        };
86
87        let num_tasks = self.num_tasks.unwrap_or_else(|| {
88            std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
89        });
90
91        let total_blocks = max_block - min_block;
92        let total_gas = calculate_gas_used_from_headers(
93            &provider_factory.static_file_provider(),
94            min_block..=max_block,
95        )?;
96        let blocks_per_task = total_blocks / num_tasks;
97
98        let db_at = {
99            let provider_factory = provider_factory.clone();
100            move |block_number: u64| {
101                StateProviderDatabase(
102                    provider_factory.history_by_block_number(block_number).unwrap(),
103                )
104            }
105        };
106
107        let skip_invalid_blocks = self.skip_invalid_blocks;
108        let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
109        let (info_tx, mut info_rx) = mpsc::unbounded_channel();
110        let cancellation = CancellationToken::new();
111        let _guard = cancellation.drop_guard();
112
113        let mut tasks = JoinSet::new();
114        for i in 0..num_tasks {
115            let start_block = min_block + i * blocks_per_task;
116            let end_block =
117                if i == num_tasks - 1 { max_block } else { start_block + blocks_per_task };
118
119            // Spawn thread executing blocks
120            let provider_factory = provider_factory.clone();
121            let evm_config = components.evm_config().clone();
122            let consensus = components.consensus().clone();
123            let db_at = db_at.clone();
124            let stats_tx = stats_tx.clone();
125            let info_tx = info_tx.clone();
126            let cancellation = cancellation.clone();
127            tasks.spawn_blocking(move || {
128                let mut executor = evm_config.batch_executor(db_at(start_block - 1));
129                let mut executor_created = Instant::now();
130                let executor_lifetime = Duration::from_secs(120);
131
132                'blocks: for block in start_block..end_block {
133                    if cancellation.is_cancelled() {
134                        // exit if the program is being terminated
135                        break
136                    }
137
138                    let block = provider_factory
139                        .recovered_block(block.into(), TransactionVariant::NoHash)?
140                        .unwrap();
141
142                    let result = match executor.execute_one(&block) {
143                        Ok(result) => result,
144                        Err(err) => {
145                            if skip_invalid_blocks {
146                                executor = evm_config.batch_executor(db_at(block.number()));
147                                let _ = info_tx.send((block, eyre::Report::new(err)));
148                                continue
149                            }
150                            return Err(err.into())
151                        }
152                    };
153
154                    if let Err(err) = consensus
155                        .validate_block_post_execution(&block, &result, None)
156                        .wrap_err_with(|| {
157                            format!("Failed to validate block {} {}", block.number(), block.hash())
158                        })
159                    {
160                        let correct_receipts =
161                            provider_factory.receipts_by_block(block.number().into())?.unwrap();
162
163                        for (i, (receipt, correct_receipt)) in
164                            result.receipts.iter().zip(correct_receipts.iter()).enumerate()
165                        {
166                            if receipt != correct_receipt {
167                                let tx_hash = block.body().transactions()[i].tx_hash();
168                                error!(
169                                    ?receipt,
170                                    ?correct_receipt,
171                                    index = i,
172                                    ?tx_hash,
173                                    "Invalid receipt"
174                                );
175                                let expected_gas_used = correct_receipt.cumulative_gas_used() -
176                                    if i == 0 {
177                                        0
178                                    } else {
179                                        correct_receipts[i - 1].cumulative_gas_used()
180                                    };
181                                let got_gas_used = receipt.cumulative_gas_used() -
182                                    if i == 0 {
183                                        0
184                                    } else {
185                                        result.receipts[i - 1].cumulative_gas_used()
186                                    };
187                                if got_gas_used != expected_gas_used {
188                                    let mismatch = GotExpected {
189                                        expected: expected_gas_used,
190                                        got: got_gas_used,
191                                    };
192
193                                    error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
194                                    if skip_invalid_blocks {
195                                        executor = evm_config.batch_executor(db_at(block.number()));
196                                        let _ = info_tx.send((block, err));
197                                        continue 'blocks;
198                                    }
199                                    return Err(err);
200                                }
201                            } else {
202                                continue;
203                            }
204                        }
205
206                        return Err(err);
207                    }
208                    let _ = stats_tx.send(block.gas_used());
209
210                    // Reset DB once in a while to avoid OOM or read tx timeouts
211                    if executor.size_hint() > 1_000_000 ||
212                        executor_created.elapsed() > executor_lifetime
213                    {
214                        executor = evm_config.batch_executor(db_at(block.number()));
215                        executor_created = Instant::now();
216                    }
217                }
218
219                eyre::Ok(())
220            });
221        }
222
223        let instant = Instant::now();
224        let mut total_executed_blocks = 0;
225        let mut total_executed_gas = 0;
226
227        let mut last_logged_gas = 0;
228        let mut last_logged_blocks = 0;
229        let mut last_logged_time = Instant::now();
230        let mut invalid_blocks = Vec::new();
231
232        let mut interval = tokio::time::interval(Duration::from_secs(10));
233
234        loop {
235            tokio::select! {
236                Some(gas_used) = stats_rx.recv() => {
237                    total_executed_blocks += 1;
238                    total_executed_gas += gas_used;
239                }
240                Some((block, err)) = info_rx.recv() => {
241                    error!(?err, block=?block.num_hash(), "Invalid block");
242                    invalid_blocks.push(block.num_hash());
243                }
244                result = tasks.join_next() => {
245                    if let Some(result) = result {
246                        if matches!(result, Err(_) | Ok(Err(_))) {
247                            error!(?result);
248                            return Err(eyre::eyre!("Re-execution failed: {result:?}"));
249                        }
250                    } else {
251                        break;
252                    }
253                }
254                _ = interval.tick() => {
255                    let blocks_executed = total_executed_blocks - last_logged_blocks;
256                    let gas_executed = total_executed_gas - last_logged_gas;
257
258                    if blocks_executed > 0 {
259                        let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
260                        info!(
261                            throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
262                            progress=format!("{progress:.2}%"),
263                            "Executed {blocks_executed} blocks"
264                        );
265                    }
266
267                    last_logged_blocks = total_executed_blocks;
268                    last_logged_gas = total_executed_gas;
269                    last_logged_time = Instant::now();
270                }
271            }
272        }
273
274        if invalid_blocks.is_empty() {
275            info!(
276                start_block = min_block,
277                end_block = max_block,
278                %total_executed_blocks,
279                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
280                "Re-executed successfully"
281            );
282        } else {
283            info!(
284                start_block = min_block,
285                end_block = max_block,
286                %total_executed_blocks,
287                invalid_block_count = invalid_blocks.len(),
288                ?invalid_blocks,
289                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
290                "Re-executed with invalid blocks"
291            );
292        }
293
294        Ok(())
295    }
296}