reth_cli_commands/
re_execute.rs

1//! Re-execute blocks from database in parallel.
2
3use crate::common::{
4    AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5    EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_cli_util::cancellation::CancellationToken;
13use reth_consensus::FullConsensus;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
16use reth_provider::{
17    BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
18    StaticFileProviderFactory, TransactionVariant,
19};
20use reth_revm::database::StateProviderDatabase;
21use reth_stages::stages::calculate_gas_used_from_headers;
22use std::{
23    sync::Arc,
24    time::{Duration, Instant},
25};
26use tokio::{sync::mpsc, task::JoinSet};
27use tracing::*;
28
29/// `reth re-execute` command
30///
31/// Re-execute blocks in parallel to verify historical sync correctness.
32#[derive(Debug, Parser)]
33pub struct Command<C: ChainSpecParser> {
34    #[command(flatten)]
35    env: EnvironmentArgs<C>,
36
37    /// The height to start at.
38    #[arg(long, default_value = "1")]
39    from: u64,
40
41    /// The height to end at. Defaults to the latest block.
42    #[arg(long)]
43    to: Option<u64>,
44
45    /// Number of tasks to run in parallel
46    #[arg(long, default_value = "10")]
47    num_tasks: u64,
48
49    /// Continues with execution when an invalid block is encountered and collects these blocks.
50    #[arg(long)]
51    skip_invalid_blocks: bool,
52}
53
54impl<C: ChainSpecParser> Command<C> {
55    /// Returns the underlying chain being used to run this command
56    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
57        Some(&self.env.chain)
58    }
59}
60
61impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
62    /// Execute `re-execute` command
63    pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
64    where
65        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
66    {
67        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
68
69        let components = components(provider_factory.chain_spec());
70
71        let min_block = self.from;
72        let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
73            .best_block_number()?;
74        let mut max_block = best_block;
75        if let Some(to) = self.to {
76            if to > best_block {
77                warn!(
78                    requested = to,
79                    best_block,
80                    "Requested --to is beyond available chain head; clamping to best block"
81                );
82            } else {
83                max_block = to;
84            }
85        };
86
87        let total_blocks = max_block - min_block;
88        let total_gas = calculate_gas_used_from_headers(
89            &provider_factory.static_file_provider(),
90            min_block..=max_block,
91        )?;
92        let blocks_per_task = total_blocks / self.num_tasks;
93
94        let db_at = {
95            let provider_factory = provider_factory.clone();
96            move |block_number: u64| {
97                StateProviderDatabase(
98                    provider_factory.history_by_block_number(block_number).unwrap(),
99                )
100            }
101        };
102
103        let skip_invalid_blocks = self.skip_invalid_blocks;
104        let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
105        let (info_tx, mut info_rx) = mpsc::unbounded_channel();
106        let cancellation = CancellationToken::new();
107        let _guard = cancellation.drop_guard();
108
109        let mut tasks = JoinSet::new();
110        for i in 0..self.num_tasks {
111            let start_block = min_block + i * blocks_per_task;
112            let end_block =
113                if i == self.num_tasks - 1 { max_block } else { start_block + blocks_per_task };
114
115            // Spawn thread executing blocks
116            let provider_factory = provider_factory.clone();
117            let evm_config = components.evm_config().clone();
118            let consensus = components.consensus().clone();
119            let db_at = db_at.clone();
120            let stats_tx = stats_tx.clone();
121            let info_tx = info_tx.clone();
122            let cancellation = cancellation.clone();
123            tasks.spawn_blocking(move || {
124                let mut executor = evm_config.batch_executor(db_at(start_block - 1));
125                let mut executor_created = Instant::now();
126                let executor_lifetime = Duration::from_secs(120);
127
128                'blocks: for block in start_block..end_block {
129                    if cancellation.is_cancelled() {
130                        // exit if the program is being terminated
131                        break
132                    }
133
134                    let block = provider_factory
135                        .recovered_block(block.into(), TransactionVariant::NoHash)?
136                        .unwrap();
137
138                    let result = match executor.execute_one(&block) {
139                        Ok(result) => result,
140                        Err(err) => {
141                            if skip_invalid_blocks {
142                                executor = evm_config.batch_executor(db_at(block.number()));
143                                let _ = info_tx.send((block, eyre::Report::new(err)));
144                                continue
145                            }
146                            return Err(err.into())
147                        }
148                    };
149
150                    if let Err(err) = consensus
151                        .validate_block_post_execution(&block, &result)
152                        .wrap_err_with(|| {
153                            format!("Failed to validate block {} {}", block.number(), block.hash())
154                        })
155                    {
156                        let correct_receipts =
157                            provider_factory.receipts_by_block(block.number().into())?.unwrap();
158
159                        for (i, (receipt, correct_receipt)) in
160                            result.receipts.iter().zip(correct_receipts.iter()).enumerate()
161                        {
162                            if receipt != correct_receipt {
163                                let tx_hash = block.body().transactions()[i].tx_hash();
164                                error!(
165                                    ?receipt,
166                                    ?correct_receipt,
167                                    index = i,
168                                    ?tx_hash,
169                                    "Invalid receipt"
170                                );
171                                let expected_gas_used = correct_receipt.cumulative_gas_used() -
172                                    if i == 0 {
173                                        0
174                                    } else {
175                                        correct_receipts[i - 1].cumulative_gas_used()
176                                    };
177                                let got_gas_used = receipt.cumulative_gas_used() -
178                                    if i == 0 {
179                                        0
180                                    } else {
181                                        result.receipts[i - 1].cumulative_gas_used()
182                                    };
183                                if got_gas_used != expected_gas_used {
184                                    let mismatch = GotExpected {
185                                        expected: expected_gas_used,
186                                        got: got_gas_used,
187                                    };
188
189                                    error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
190                                    if skip_invalid_blocks {
191                                        executor = evm_config.batch_executor(db_at(block.number()));
192                                        let _ = info_tx.send((block, err));
193                                        continue 'blocks;
194                                    }
195                                    return Err(err);
196                                }
197                            } else {
198                                continue;
199                            }
200                        }
201
202                        return Err(err);
203                    }
204                    let _ = stats_tx.send(block.gas_used());
205
206                    // Reset DB once in a while to avoid OOM or read tx timeouts
207                    if executor.size_hint() > 1_000_000 ||
208                        executor_created.elapsed() > executor_lifetime
209                    {
210                        executor = evm_config.batch_executor(db_at(block.number()));
211                        executor_created = Instant::now();
212                    }
213                }
214
215                eyre::Ok(())
216            });
217        }
218
219        let instant = Instant::now();
220        let mut total_executed_blocks = 0;
221        let mut total_executed_gas = 0;
222
223        let mut last_logged_gas = 0;
224        let mut last_logged_blocks = 0;
225        let mut last_logged_time = Instant::now();
226        let mut invalid_blocks = Vec::new();
227
228        let mut interval = tokio::time::interval(Duration::from_secs(10));
229
230        loop {
231            tokio::select! {
232                Some(gas_used) = stats_rx.recv() => {
233                    total_executed_blocks += 1;
234                    total_executed_gas += gas_used;
235                }
236                Some((block, err)) = info_rx.recv() => {
237                    error!(?err, block=?block.num_hash(), "Invalid block");
238                    invalid_blocks.push(block.num_hash());
239                }
240                result = tasks.join_next() => {
241                    if let Some(result) = result {
242                        if matches!(result, Err(_) | Ok(Err(_))) {
243                            error!(?result);
244                            return Err(eyre::eyre!("Re-execution failed: {result:?}"));
245                        }
246                    } else {
247                        break;
248                    }
249                }
250                _ = interval.tick() => {
251                    let blocks_executed = total_executed_blocks - last_logged_blocks;
252                    let gas_executed = total_executed_gas - last_logged_gas;
253
254                    if blocks_executed > 0 {
255                        let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
256                        info!(
257                            throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
258                            progress=format!("{progress:.2}%"),
259                            "Executed {blocks_executed} blocks"
260                        );
261                    }
262
263                    last_logged_blocks = total_executed_blocks;
264                    last_logged_gas = total_executed_gas;
265                    last_logged_time = Instant::now();
266                }
267            }
268        }
269
270        if invalid_blocks.is_empty() {
271            info!(
272                start_block = min_block,
273                end_block = max_block,
274                %total_executed_blocks,
275                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
276                "Re-executed successfully"
277            );
278        } else {
279            info!(
280                start_block = min_block,
281                end_block = max_block,
282                %total_executed_blocks,
283                invalid_block_count = invalid_blocks.len(),
284                ?invalid_blocks,
285                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
286                "Re-executed with invalid blocks"
287            );
288        }
289
290        Ok(())
291    }
292}