Skip to main content

reth_cli_commands/
re_execute.rs

1//! Re-execute blocks from database in parallel.
2
3use crate::common::{
4    AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5    EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_cli_util::cancellation::CancellationToken;
13use reth_consensus::FullConsensus;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
16use reth_provider::{
17    BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
18    StaticFileProviderFactory, TransactionVariant,
19};
20use reth_revm::database::StateProviderDatabase;
21use reth_stages::stages::calculate_gas_used_from_headers;
22use std::{
23    sync::{
24        atomic::{AtomicU64, Ordering},
25        Arc,
26    },
27    time::{Duration, Instant},
28};
29use tokio::{sync::mpsc, task::JoinSet};
30use tracing::*;
31
32/// `reth re-execute` command
33///
34/// Re-execute blocks in parallel to verify historical sync correctness.
35#[derive(Debug, Parser)]
36pub struct Command<C: ChainSpecParser> {
37    #[command(flatten)]
38    env: EnvironmentArgs<C>,
39
40    /// The height to start at.
41    #[arg(long, default_value = "1")]
42    from: u64,
43
44    /// The height to end at. Defaults to the latest block.
45    #[arg(long)]
46    to: Option<u64>,
47
48    /// Number of tasks to run in parallel. Defaults to the number of available CPUs.
49    #[arg(long)]
50    num_tasks: Option<u64>,
51
52    /// Number of blocks each worker processes before grabbing the next chunk.
53    #[arg(long, default_value = "5000")]
54    blocks_per_chunk: u64,
55
56    /// Continues with execution when an invalid block is encountered and collects these blocks.
57    #[arg(long)]
58    skip_invalid_blocks: bool,
59}
60
61impl<C: ChainSpecParser> Command<C> {
62    /// Returns the underlying chain being used to run this command
63    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
64        Some(&self.env.chain)
65    }
66}
67
68impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
69    /// Execute `re-execute` command
70    pub async fn execute<N>(
71        self,
72        components: impl CliComponentsBuilder<N>,
73        runtime: reth_tasks::Runtime,
74    ) -> eyre::Result<()>
75    where
76        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
77    {
78        let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
79
80        let components = components(provider_factory.chain_spec());
81
82        let min_block = self.from;
83        let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
84            .best_block_number()?;
85        let mut max_block = best_block;
86        if let Some(to) = self.to {
87            if to > best_block {
88                warn!(
89                    requested = to,
90                    best_block,
91                    "Requested --to is beyond available chain head; clamping to best block"
92                );
93            } else {
94                max_block = to;
95            }
96        };
97
98        if min_block > max_block {
99            eyre::bail!("--from ({min_block}) is beyond --to ({max_block}), nothing to re-execute");
100        }
101
102        let num_tasks = self.num_tasks.unwrap_or_else(|| {
103            std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
104        });
105
106        let total_gas = calculate_gas_used_from_headers(
107            &provider_factory.static_file_provider(),
108            min_block..=max_block,
109        )?;
110
111        let db_at = {
112            let provider_factory = provider_factory.clone();
113            move |block_number: u64| {
114                StateProviderDatabase(
115                    provider_factory.history_by_block_number(block_number).unwrap(),
116                )
117            }
118        };
119
120        let skip_invalid_blocks = self.skip_invalid_blocks;
121        let blocks_per_chunk = self.blocks_per_chunk;
122        let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
123        let (info_tx, mut info_rx) = mpsc::unbounded_channel();
124        let cancellation = CancellationToken::new();
125        let _guard = cancellation.drop_guard();
126
127        // Shared counter for work stealing: workers atomically grab the next chunk of blocks.
128        let next_block = Arc::new(AtomicU64::new(min_block));
129
130        let mut tasks = JoinSet::new();
131        for _ in 0..num_tasks {
132            let provider_factory = provider_factory.clone();
133            let evm_config = components.evm_config().clone();
134            let consensus = components.consensus().clone();
135            let db_at = db_at.clone();
136            let stats_tx = stats_tx.clone();
137            let info_tx = info_tx.clone();
138            let cancellation = cancellation.clone();
139            let next_block = Arc::clone(&next_block);
140            tasks.spawn_blocking(move || {
141                let executor_lifetime = Duration::from_secs(120);
142
143                loop {
144                    if cancellation.is_cancelled() {
145                        break;
146                    }
147
148                    // Atomically grab the next chunk of blocks.
149                    let chunk_start =
150                        next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
151                    if chunk_start >= max_block {
152                        break;
153                    }
154                    let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
155
156                    let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
157                    let mut executor_created = Instant::now();
158
159                    'blocks: for block in chunk_start..chunk_end {
160                        if cancellation.is_cancelled() {
161                            break;
162                        }
163
164                        let block = provider_factory
165                            .recovered_block(block.into(), TransactionVariant::NoHash)?
166                            .unwrap();
167
168                        let result = match executor.execute_one(&block) {
169                            Ok(result) => result,
170                            Err(err) => {
171                                if skip_invalid_blocks {
172                                    executor =
173                                        evm_config.batch_executor(db_at(block.number()));
174                                    let _ =
175                                        info_tx.send((block, eyre::Report::new(err)));
176                                    continue
177                                }
178                                return Err(err.into())
179                            }
180                        };
181
182                        if let Err(err) = consensus
183                            .validate_block_post_execution(&block, &result, None)
184                            .wrap_err_with(|| {
185                                format!(
186                                    "Failed to validate block {} {}",
187                                    block.number(),
188                                    block.hash()
189                                )
190                            })
191                        {
192                            let correct_receipts = provider_factory
193                                .receipts_by_block(block.number().into())?
194                                .unwrap();
195
196                            for (i, (receipt, correct_receipt)) in
197                                result.receipts.iter().zip(correct_receipts.iter()).enumerate()
198                            {
199                                if receipt != correct_receipt {
200                                    let tx_hash =
201                                        block.body().transactions()[i].tx_hash();
202                                    error!(
203                                        ?receipt,
204                                        ?correct_receipt,
205                                        index = i,
206                                        ?tx_hash,
207                                        "Invalid receipt"
208                                    );
209                                    let expected_gas_used =
210                                        correct_receipt.cumulative_gas_used() -
211                                            if i == 0 {
212                                                0
213                                            } else {
214                                                correct_receipts[i - 1]
215                                                    .cumulative_gas_used()
216                                            };
217                                    let got_gas_used = receipt.cumulative_gas_used() -
218                                        if i == 0 {
219                                            0
220                                        } else {
221                                            result.receipts[i - 1].cumulative_gas_used()
222                                        };
223                                    if got_gas_used != expected_gas_used {
224                                        let mismatch = GotExpected {
225                                            expected: expected_gas_used,
226                                            got: got_gas_used,
227                                        };
228
229                                        error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
230                                        if skip_invalid_blocks {
231                                            executor = evm_config
232                                                .batch_executor(db_at(block.number()));
233                                            let _ = info_tx.send((block, err));
234                                            continue 'blocks;
235                                        }
236                                        return Err(err);
237                                    }
238                                } else {
239                                    continue;
240                                }
241                            }
242
243                            return Err(err);
244                        }
245                        let _ = stats_tx.send(block.gas_used());
246
247                        // Reset DB once in a while to avoid OOM or read tx timeouts
248                        if executor.size_hint() > 1_000_000 ||
249                            executor_created.elapsed() > executor_lifetime
250                        {
251                            executor =
252                                evm_config.batch_executor(db_at(block.number()));
253                            executor_created = Instant::now();
254                        }
255                    }
256                }
257
258                eyre::Ok(())
259            });
260        }
261
262        let instant = Instant::now();
263        let mut total_executed_blocks = 0;
264        let mut total_executed_gas = 0;
265
266        let mut last_logged_gas = 0;
267        let mut last_logged_blocks = 0;
268        let mut last_logged_time = Instant::now();
269        let mut invalid_blocks = Vec::new();
270
271        let mut interval = tokio::time::interval(Duration::from_secs(10));
272
273        loop {
274            tokio::select! {
275                Some(gas_used) = stats_rx.recv() => {
276                    total_executed_blocks += 1;
277                    total_executed_gas += gas_used;
278                }
279                Some((block, err)) = info_rx.recv() => {
280                    error!(?err, block=?block.num_hash(), "Invalid block");
281                    invalid_blocks.push(block.num_hash());
282                }
283                result = tasks.join_next() => {
284                    if let Some(result) = result {
285                        if matches!(result, Err(_) | Ok(Err(_))) {
286                            error!(?result);
287                            return Err(eyre::eyre!("Re-execution failed: {result:?}"));
288                        }
289                    } else {
290                        break;
291                    }
292                }
293                _ = interval.tick() => {
294                    let blocks_executed = total_executed_blocks - last_logged_blocks;
295                    let gas_executed = total_executed_gas - last_logged_gas;
296
297                    if blocks_executed > 0 {
298                        let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
299                        info!(
300                            throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
301                            progress=format!("{progress:.2}%"),
302                            "Executed {blocks_executed} blocks"
303                        );
304                    }
305
306                    last_logged_blocks = total_executed_blocks;
307                    last_logged_gas = total_executed_gas;
308                    last_logged_time = Instant::now();
309                }
310            }
311        }
312
313        if invalid_blocks.is_empty() {
314            info!(
315                start_block = min_block,
316                end_block = max_block,
317                %total_executed_blocks,
318                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
319                "Re-executed successfully"
320            );
321        } else {
322            info!(
323                start_block = min_block,
324                end_block = max_block,
325                %total_executed_blocks,
326                invalid_block_count = invalid_blocks.len(),
327                ?invalid_blocks,
328                throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
329                "Re-executed with invalid blocks"
330            );
331        }
332
333        Ok(())
334    }
335}