1use crate::common::{
4 AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5 EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_cli_util::cancellation::CancellationToken;
13use reth_consensus::FullConsensus;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
16use reth_provider::{
17 BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
18 StaticFileProviderFactory, TransactionVariant,
19};
20use reth_revm::database::StateProviderDatabase;
21use reth_stages::stages::calculate_gas_used_from_headers;
22use std::{
23 sync::{
24 atomic::{AtomicU64, Ordering},
25 Arc,
26 },
27 time::{Duration, Instant},
28};
29use tokio::{sync::mpsc, task::JoinSet};
30use tracing::*;
31
32#[derive(Debug, Parser)]
36pub struct Command<C: ChainSpecParser> {
37 #[command(flatten)]
38 env: EnvironmentArgs<C>,
39
40 #[arg(long, default_value = "1")]
42 from: u64,
43
44 #[arg(long)]
46 to: Option<u64>,
47
48 #[arg(long)]
50 num_tasks: Option<u64>,
51
52 #[arg(long, default_value = "5000")]
54 blocks_per_chunk: u64,
55
56 #[arg(long)]
58 skip_invalid_blocks: bool,
59}
60
61impl<C: ChainSpecParser> Command<C> {
62 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
64 Some(&self.env.chain)
65 }
66}
67
68impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
69 pub async fn execute<N>(
71 self,
72 components: impl CliComponentsBuilder<N>,
73 runtime: reth_tasks::Runtime,
74 ) -> eyre::Result<()>
75 where
76 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
77 {
78 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO, runtime)?;
79
80 let components = components(provider_factory.chain_spec());
81
82 let min_block = self.from;
83 let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
84 .best_block_number()?;
85 let mut max_block = best_block;
86 if let Some(to) = self.to {
87 if to > best_block {
88 warn!(
89 requested = to,
90 best_block,
91 "Requested --to is beyond available chain head; clamping to best block"
92 );
93 } else {
94 max_block = to;
95 }
96 };
97
98 if min_block > max_block {
99 eyre::bail!("--from ({min_block}) is beyond --to ({max_block}), nothing to re-execute");
100 }
101
102 let num_tasks = self.num_tasks.unwrap_or_else(|| {
103 std::thread::available_parallelism().map(|n| n.get() as u64).unwrap_or(10)
104 });
105
106 let total_gas = calculate_gas_used_from_headers(
107 &provider_factory.static_file_provider(),
108 min_block..=max_block,
109 )?;
110
111 let db_at = {
112 let provider_factory = provider_factory.clone();
113 move |block_number: u64| {
114 StateProviderDatabase(
115 provider_factory.history_by_block_number(block_number).unwrap(),
116 )
117 }
118 };
119
120 let skip_invalid_blocks = self.skip_invalid_blocks;
121 let blocks_per_chunk = self.blocks_per_chunk;
122 let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
123 let (info_tx, mut info_rx) = mpsc::unbounded_channel();
124 let cancellation = CancellationToken::new();
125 let _guard = cancellation.drop_guard();
126
127 let next_block = Arc::new(AtomicU64::new(min_block));
129
130 let mut tasks = JoinSet::new();
131 for _ in 0..num_tasks {
132 let provider_factory = provider_factory.clone();
133 let evm_config = components.evm_config().clone();
134 let consensus = components.consensus().clone();
135 let db_at = db_at.clone();
136 let stats_tx = stats_tx.clone();
137 let info_tx = info_tx.clone();
138 let cancellation = cancellation.clone();
139 let next_block = Arc::clone(&next_block);
140 tasks.spawn_blocking(move || {
141 let executor_lifetime = Duration::from_secs(120);
142
143 loop {
144 if cancellation.is_cancelled() {
145 break;
146 }
147
148 let chunk_start =
150 next_block.fetch_add(blocks_per_chunk, Ordering::Relaxed);
151 if chunk_start >= max_block {
152 break;
153 }
154 let chunk_end = (chunk_start + blocks_per_chunk).min(max_block);
155
156 let mut executor = evm_config.batch_executor(db_at(chunk_start - 1));
157 let mut executor_created = Instant::now();
158
159 'blocks: for block in chunk_start..chunk_end {
160 if cancellation.is_cancelled() {
161 break;
162 }
163
164 let block = provider_factory
165 .recovered_block(block.into(), TransactionVariant::NoHash)?
166 .unwrap();
167
168 let result = match executor.execute_one(&block) {
169 Ok(result) => result,
170 Err(err) => {
171 if skip_invalid_blocks {
172 executor =
173 evm_config.batch_executor(db_at(block.number()));
174 let _ =
175 info_tx.send((block, eyre::Report::new(err)));
176 continue
177 }
178 return Err(err.into())
179 }
180 };
181
182 if let Err(err) = consensus
183 .validate_block_post_execution(&block, &result, None)
184 .wrap_err_with(|| {
185 format!(
186 "Failed to validate block {} {}",
187 block.number(),
188 block.hash()
189 )
190 })
191 {
192 let correct_receipts = provider_factory
193 .receipts_by_block(block.number().into())?
194 .unwrap();
195
196 for (i, (receipt, correct_receipt)) in
197 result.receipts.iter().zip(correct_receipts.iter()).enumerate()
198 {
199 if receipt != correct_receipt {
200 let tx_hash =
201 block.body().transactions()[i].tx_hash();
202 error!(
203 ?receipt,
204 ?correct_receipt,
205 index = i,
206 ?tx_hash,
207 "Invalid receipt"
208 );
209 let expected_gas_used =
210 correct_receipt.cumulative_gas_used() -
211 if i == 0 {
212 0
213 } else {
214 correct_receipts[i - 1]
215 .cumulative_gas_used()
216 };
217 let got_gas_used = receipt.cumulative_gas_used() -
218 if i == 0 {
219 0
220 } else {
221 result.receipts[i - 1].cumulative_gas_used()
222 };
223 if got_gas_used != expected_gas_used {
224 let mismatch = GotExpected {
225 expected: expected_gas_used,
226 got: got_gas_used,
227 };
228
229 error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
230 if skip_invalid_blocks {
231 executor = evm_config
232 .batch_executor(db_at(block.number()));
233 let _ = info_tx.send((block, err));
234 continue 'blocks;
235 }
236 return Err(err);
237 }
238 } else {
239 continue;
240 }
241 }
242
243 return Err(err);
244 }
245 let _ = stats_tx.send(block.gas_used());
246
247 if executor.size_hint() > 1_000_000 ||
249 executor_created.elapsed() > executor_lifetime
250 {
251 executor =
252 evm_config.batch_executor(db_at(block.number()));
253 executor_created = Instant::now();
254 }
255 }
256 }
257
258 eyre::Ok(())
259 });
260 }
261
262 let instant = Instant::now();
263 let mut total_executed_blocks = 0;
264 let mut total_executed_gas = 0;
265
266 let mut last_logged_gas = 0;
267 let mut last_logged_blocks = 0;
268 let mut last_logged_time = Instant::now();
269 let mut invalid_blocks = Vec::new();
270
271 let mut interval = tokio::time::interval(Duration::from_secs(10));
272
273 loop {
274 tokio::select! {
275 Some(gas_used) = stats_rx.recv() => {
276 total_executed_blocks += 1;
277 total_executed_gas += gas_used;
278 }
279 Some((block, err)) = info_rx.recv() => {
280 error!(?err, block=?block.num_hash(), "Invalid block");
281 invalid_blocks.push(block.num_hash());
282 }
283 result = tasks.join_next() => {
284 if let Some(result) = result {
285 if matches!(result, Err(_) | Ok(Err(_))) {
286 error!(?result);
287 return Err(eyre::eyre!("Re-execution failed: {result:?}"));
288 }
289 } else {
290 break;
291 }
292 }
293 _ = interval.tick() => {
294 let blocks_executed = total_executed_blocks - last_logged_blocks;
295 let gas_executed = total_executed_gas - last_logged_gas;
296
297 if blocks_executed > 0 {
298 let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
299 info!(
300 throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
301 progress=format!("{progress:.2}%"),
302 "Executed {blocks_executed} blocks"
303 );
304 }
305
306 last_logged_blocks = total_executed_blocks;
307 last_logged_gas = total_executed_gas;
308 last_logged_time = Instant::now();
309 }
310 }
311 }
312
313 if invalid_blocks.is_empty() {
314 info!(
315 start_block = min_block,
316 end_block = max_block,
317 %total_executed_blocks,
318 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
319 "Re-executed successfully"
320 );
321 } else {
322 info!(
323 start_block = min_block,
324 end_block = max_block,
325 %total_executed_blocks,
326 invalid_block_count = invalid_blocks.len(),
327 ?invalid_blocks,
328 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
329 "Re-executed with invalid blocks"
330 );
331 }
332
333 Ok(())
334 }
335}