reth_cli_commands/
re_execute.rs1use crate::common::{
4 AccessRights, CliComponentsBuilder, CliNodeComponents, CliNodeTypes, Environment,
5 EnvironmentArgs,
6};
7use alloy_consensus::{transaction::TxHashRef, BlockHeader, TxReceipt};
8use clap::Parser;
9use eyre::WrapErr;
10use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_cli_util::cancellation::CancellationToken;
13use reth_consensus::FullConsensus;
14use reth_evm::{execute::Executor, ConfigureEvm};
15use reth_primitives_traits::{format_gas_throughput, BlockBody, GotExpected};
16use reth_provider::{
17 BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, ReceiptProvider,
18 StaticFileProviderFactory, TransactionVariant,
19};
20use reth_revm::database::StateProviderDatabase;
21use reth_stages::stages::calculate_gas_used_from_headers;
22use std::{
23 sync::Arc,
24 time::{Duration, Instant},
25};
26use tokio::{sync::mpsc, task::JoinSet};
27use tracing::*;
28
29#[derive(Debug, Parser)]
33pub struct Command<C: ChainSpecParser> {
34 #[command(flatten)]
35 env: EnvironmentArgs<C>,
36
37 #[arg(long, default_value = "1")]
39 from: u64,
40
41 #[arg(long)]
43 to: Option<u64>,
44
45 #[arg(long, default_value = "10")]
47 num_tasks: u64,
48
49 #[arg(long)]
51 skip_invalid_blocks: bool,
52}
53
54impl<C: ChainSpecParser> Command<C> {
55 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
57 Some(&self.env.chain)
58 }
59}
60
61impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
62 pub async fn execute<N>(self, components: impl CliComponentsBuilder<N>) -> eyre::Result<()>
64 where
65 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
66 {
67 let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RO)?;
68
69 let components = components(provider_factory.chain_spec());
70
71 let min_block = self.from;
72 let best_block = DatabaseProviderFactory::database_provider_ro(&provider_factory)?
73 .best_block_number()?;
74 let mut max_block = best_block;
75 if let Some(to) = self.to {
76 if to > best_block {
77 warn!(
78 requested = to,
79 best_block,
80 "Requested --to is beyond available chain head; clamping to best block"
81 );
82 } else {
83 max_block = to;
84 }
85 };
86
87 let total_blocks = max_block - min_block;
88 let total_gas = calculate_gas_used_from_headers(
89 &provider_factory.static_file_provider(),
90 min_block..=max_block,
91 )?;
92 let blocks_per_task = total_blocks / self.num_tasks;
93
94 let db_at = {
95 let provider_factory = provider_factory.clone();
96 move |block_number: u64| {
97 StateProviderDatabase(
98 provider_factory.history_by_block_number(block_number).unwrap(),
99 )
100 }
101 };
102
103 let skip_invalid_blocks = self.skip_invalid_blocks;
104 let (stats_tx, mut stats_rx) = mpsc::unbounded_channel();
105 let (info_tx, mut info_rx) = mpsc::unbounded_channel();
106 let cancellation = CancellationToken::new();
107 let _guard = cancellation.drop_guard();
108
109 let mut tasks = JoinSet::new();
110 for i in 0..self.num_tasks {
111 let start_block = min_block + i * blocks_per_task;
112 let end_block =
113 if i == self.num_tasks - 1 { max_block } else { start_block + blocks_per_task };
114
115 let provider_factory = provider_factory.clone();
117 let evm_config = components.evm_config().clone();
118 let consensus = components.consensus().clone();
119 let db_at = db_at.clone();
120 let stats_tx = stats_tx.clone();
121 let info_tx = info_tx.clone();
122 let cancellation = cancellation.clone();
123 tasks.spawn_blocking(move || {
124 let mut executor = evm_config.batch_executor(db_at(start_block - 1));
125 let mut executor_created = Instant::now();
126 let executor_lifetime = Duration::from_secs(120);
127
128 'blocks: for block in start_block..end_block {
129 if cancellation.is_cancelled() {
130 break
132 }
133
134 let block = provider_factory
135 .recovered_block(block.into(), TransactionVariant::NoHash)?
136 .unwrap();
137
138 let result = match executor.execute_one(&block) {
139 Ok(result) => result,
140 Err(err) => {
141 if skip_invalid_blocks {
142 executor = evm_config.batch_executor(db_at(block.number()));
143 let _ = info_tx.send((block, eyre::Report::new(err)));
144 continue
145 }
146 return Err(err.into())
147 }
148 };
149
150 if let Err(err) = consensus
151 .validate_block_post_execution(&block, &result)
152 .wrap_err_with(|| {
153 format!("Failed to validate block {} {}", block.number(), block.hash())
154 })
155 {
156 let correct_receipts =
157 provider_factory.receipts_by_block(block.number().into())?.unwrap();
158
159 for (i, (receipt, correct_receipt)) in
160 result.receipts.iter().zip(correct_receipts.iter()).enumerate()
161 {
162 if receipt != correct_receipt {
163 let tx_hash = block.body().transactions()[i].tx_hash();
164 error!(
165 ?receipt,
166 ?correct_receipt,
167 index = i,
168 ?tx_hash,
169 "Invalid receipt"
170 );
171 let expected_gas_used = correct_receipt.cumulative_gas_used() -
172 if i == 0 {
173 0
174 } else {
175 correct_receipts[i - 1].cumulative_gas_used()
176 };
177 let got_gas_used = receipt.cumulative_gas_used() -
178 if i == 0 {
179 0
180 } else {
181 result.receipts[i - 1].cumulative_gas_used()
182 };
183 if got_gas_used != expected_gas_used {
184 let mismatch = GotExpected {
185 expected: expected_gas_used,
186 got: got_gas_used,
187 };
188
189 error!(number=?block.number(), ?mismatch, "Gas usage mismatch");
190 if skip_invalid_blocks {
191 executor = evm_config.batch_executor(db_at(block.number()));
192 let _ = info_tx.send((block, err));
193 continue 'blocks;
194 }
195 return Err(err);
196 }
197 } else {
198 continue;
199 }
200 }
201
202 return Err(err);
203 }
204 let _ = stats_tx.send(block.gas_used());
205
206 if executor.size_hint() > 1_000_000 ||
208 executor_created.elapsed() > executor_lifetime
209 {
210 executor = evm_config.batch_executor(db_at(block.number()));
211 executor_created = Instant::now();
212 }
213 }
214
215 eyre::Ok(())
216 });
217 }
218
219 let instant = Instant::now();
220 let mut total_executed_blocks = 0;
221 let mut total_executed_gas = 0;
222
223 let mut last_logged_gas = 0;
224 let mut last_logged_blocks = 0;
225 let mut last_logged_time = Instant::now();
226 let mut invalid_blocks = Vec::new();
227
228 let mut interval = tokio::time::interval(Duration::from_secs(10));
229
230 loop {
231 tokio::select! {
232 Some(gas_used) = stats_rx.recv() => {
233 total_executed_blocks += 1;
234 total_executed_gas += gas_used;
235 }
236 Some((block, err)) = info_rx.recv() => {
237 error!(?err, block=?block.num_hash(), "Invalid block");
238 invalid_blocks.push(block.num_hash());
239 }
240 result = tasks.join_next() => {
241 if let Some(result) = result {
242 if matches!(result, Err(_) | Ok(Err(_))) {
243 error!(?result);
244 return Err(eyre::eyre!("Re-execution failed: {result:?}"));
245 }
246 } else {
247 break;
248 }
249 }
250 _ = interval.tick() => {
251 let blocks_executed = total_executed_blocks - last_logged_blocks;
252 let gas_executed = total_executed_gas - last_logged_gas;
253
254 if blocks_executed > 0 {
255 let progress = 100.0 * total_executed_gas as f64 / total_gas as f64;
256 info!(
257 throughput=?format_gas_throughput(gas_executed, last_logged_time.elapsed()),
258 progress=format!("{progress:.2}%"),
259 "Executed {blocks_executed} blocks"
260 );
261 }
262
263 last_logged_blocks = total_executed_blocks;
264 last_logged_gas = total_executed_gas;
265 last_logged_time = Instant::now();
266 }
267 }
268 }
269
270 if invalid_blocks.is_empty() {
271 info!(
272 start_block = min_block,
273 end_block = max_block,
274 %total_executed_blocks,
275 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
276 "Re-executed successfully"
277 );
278 } else {
279 info!(
280 start_block = min_block,
281 end_block = max_block,
282 %total_executed_blocks,
283 invalid_block_count = invalid_blocks.len(),
284 ?invalid_blocks,
285 throughput=?format_gas_throughput(total_executed_gas, instant.elapsed()),
286 "Re-executed with invalid blocks"
287 );
288 }
289
290 Ok(())
291 }
292}