1use crate::{args::NetworkArgs, providers::ExecutionOutcome, utils::get_single_header};
3use alloy_consensus::BlockHeader;
4use alloy_eips::BlockHashOrNumber;
5use backon::{ConstantBuilder, Retryable};
6use clap::Parser;
7use reth_chainspec::ChainSpec;
8use reth_cli::chainspec::ChainSpecParser;
9use reth_cli_commands::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
10use reth_cli_runner::CliContext;
11use reth_cli_util::get_secret_key;
12use reth_config::Config;
13use reth_consensus::{Consensus, ConsensusError};
14use reth_db_api::{cursor::DbCursorRO, tables, transaction::DbTx};
15use reth_ethereum_primitives::EthPrimitives;
16use reth_evm::execute::{BlockExecutorProvider, Executor};
17use reth_network::{BlockDownloaderProvider, NetworkHandle};
18use reth_network_api::NetworkInfo;
19use reth_network_p2p::full_block::FullBlockClient;
20use reth_node_api::{BlockTy, NodePrimitives};
21use reth_node_ethereum::{consensus::EthBeaconConsensus, EthExecutorProvider};
22use reth_provider::{
23 providers::ProviderNodeTypes, BlockNumReader, BlockWriter, ChainSpecProvider,
24 DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
25 ProviderError, ProviderFactory, StateWriter, StorageLocation,
26};
27use reth_revm::database::StateProviderDatabase;
28use reth_stages::{
29 stages::{AccountHashingStage, MerkleStage, StorageHashingStage},
30 ExecInput, Stage, StageCheckpoint,
31};
32use reth_tasks::TaskExecutor;
33use std::{path::PathBuf, sync::Arc};
34use tracing::*;
35
36#[derive(Debug, Parser)]
38pub struct Command<C: ChainSpecParser> {
39 #[command(flatten)]
40 env: EnvironmentArgs<C>,
41
42 #[command(flatten)]
43 network: NetworkArgs,
44
45 #[arg(long, default_value = "5")]
47 retries: usize,
48
49 #[arg(long)]
51 to: u64,
52
53 #[arg(long)]
55 skip_node_depth: Option<usize>,
56}
57
58impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
59 async fn build_network<
60 N: ProviderNodeTypes<
61 ChainSpec = C::ChainSpec,
62 Primitives: NodePrimitives<
63 Block = reth_ethereum_primitives::Block,
64 Receipt = reth_ethereum_primitives::Receipt,
65 BlockHeader = alloy_consensus::Header,
66 >,
67 >,
68 >(
69 &self,
70 config: &Config,
71 task_executor: TaskExecutor,
72 provider_factory: ProviderFactory<N>,
73 network_secret_path: PathBuf,
74 default_peers_path: PathBuf,
75 ) -> eyre::Result<NetworkHandle> {
76 let secret_key = get_secret_key(&network_secret_path)?;
77 let network = self
78 .network
79 .network_config(config, provider_factory.chain_spec(), secret_key, default_peers_path)
80 .with_task_executor(Box::new(task_executor))
81 .build(provider_factory)
82 .start_network()
83 .await?;
84 info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
85 debug!(target: "reth::cli", peer_id = ?network.peer_id(), "Full peer ID");
86 Ok(network)
87 }
88
89 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec, Primitives = EthPrimitives>>(
91 self,
92 ctx: CliContext,
93 ) -> eyre::Result<()> {
94 let Environment { provider_factory, config, data_dir } =
95 self.env.init::<N>(AccessRights::RW)?;
96
97 let provider_rw = provider_factory.database_provider_rw()?;
98
99 let network_secret_path =
101 self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
102 let network = self
103 .build_network(
104 &config,
105 ctx.task_executor.clone(),
106 provider_factory.clone(),
107 network_secret_path,
108 data_dir.known_peers(),
109 )
110 .await?;
111
112 let executor_provider = EthExecutorProvider::ethereum(provider_factory.chain_spec());
113
114 info!(target: "reth::cli", target_block_number = self.to, "Downloading tip of block range");
116 let fetch_client = network.fetch_client().await?;
117
118 let retries = self.retries.max(1);
120 let backoff = ConstantBuilder::default().with_max_times(retries);
121 let client = fetch_client.clone();
122 let to_header = (move || {
123 get_single_header(client.clone(), BlockHashOrNumber::Number(self.to))
124 })
125 .retry(backoff)
126 .notify(|err, _| warn!(target: "reth::cli", "Error requesting header: {err}. Retrying..."))
127 .await?;
128 info!(target: "reth::cli", target_block_number=self.to, "Finished downloading tip of block range");
129
130 let consensus: Arc<dyn Consensus<BlockTy<N>, Error = ConsensusError>> =
132 Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));
133 let block_range_client = FullBlockClient::new(fetch_client, consensus);
134
135 let best_block_number = provider_rw.best_block_number()?;
137 assert!(best_block_number < self.to, "Nothing to run");
138
139 let block_range = best_block_number + 1..=self.to;
141 info!(target: "reth::cli", ?block_range, "Downloading range of blocks");
142 let blocks = block_range_client
143 .get_full_block_range(to_header.hash_slow(), self.to - best_block_number)
144 .await;
145
146 let mut td = provider_rw
147 .header_td_by_number(best_block_number)?
148 .ok_or(ProviderError::TotalDifficultyNotFound(best_block_number))?;
149
150 let mut account_hashing_stage = AccountHashingStage::default();
151 let mut storage_hashing_stage = StorageHashingStage::default();
152 let mut merkle_stage = MerkleStage::default_execution();
153
154 for block in blocks.into_iter().rev() {
155 let block_number = block.number;
156 let sealed_block =
157 block.try_recover().map_err(|_| eyre::eyre!("Error sealing block with senders"))?;
158 trace!(target: "reth::cli", block_number, "Executing block");
159
160 provider_rw.insert_block(sealed_block.clone(), StorageLocation::Database)?;
161
162 td += sealed_block.difficulty();
163 let executor = executor_provider
164 .executor(StateProviderDatabase::new(LatestStateProviderRef::new(&provider_rw)));
165 let output = executor.execute(&sealed_block)?;
166
167 provider_rw.write_state(
168 &ExecutionOutcome::single(block_number, output),
169 OriginalValuesKnown::Yes,
170 StorageLocation::Database,
171 )?;
172
173 let checkpoint = Some(StageCheckpoint::new(
174 block_number
175 .checked_sub(1)
176 .ok_or_else(|| eyre::eyre!("GenesisBlockHasNoParent"))?,
177 ));
178
179 let mut account_hashing_done = false;
180 while !account_hashing_done {
181 let output = account_hashing_stage
182 .execute(&provider_rw, ExecInput { target: Some(block_number), checkpoint })?;
183 account_hashing_done = output.done;
184 }
185
186 let mut storage_hashing_done = false;
187 while !storage_hashing_done {
188 let output = storage_hashing_stage
189 .execute(&provider_rw, ExecInput { target: Some(block_number), checkpoint })?;
190 storage_hashing_done = output.done;
191 }
192
193 let incremental_result = merkle_stage
194 .execute(&provider_rw, ExecInput { target: Some(block_number), checkpoint });
195
196 if incremental_result.is_ok() {
197 debug!(target: "reth::cli", block_number, "Successfully computed incremental root");
198 continue
199 }
200
201 warn!(target: "reth::cli", block_number, "Incremental calculation failed, retrying from scratch");
202 let incremental_account_trie = provider_rw
203 .tx_ref()
204 .cursor_read::<tables::AccountsTrie>()?
205 .walk_range(..)?
206 .collect::<Result<Vec<_>, _>>()?;
207 let incremental_storage_trie = provider_rw
208 .tx_ref()
209 .cursor_dup_read::<tables::StoragesTrie>()?
210 .walk_range(..)?
211 .collect::<Result<Vec<_>, _>>()?;
212
213 let clean_input = ExecInput { target: Some(sealed_block.number), checkpoint: None };
214 loop {
215 let clean_result = merkle_stage
216 .execute(&provider_rw, clean_input)
217 .map_err(|e| eyre::eyre!("Clean state root calculation failed: {}", e))?;
218 if clean_result.done {
219 break;
220 }
221 }
222
223 let clean_account_trie = provider_rw
224 .tx_ref()
225 .cursor_read::<tables::AccountsTrie>()?
226 .walk_range(..)?
227 .collect::<Result<Vec<_>, _>>()?;
228 let clean_storage_trie = provider_rw
229 .tx_ref()
230 .cursor_dup_read::<tables::StoragesTrie>()?
231 .walk_range(..)?
232 .collect::<Result<Vec<_>, _>>()?;
233
234 info!(target: "reth::cli", block_number, "Comparing incremental trie vs clean trie");
235
236 let mut incremental_account_mismatched = Vec::new();
238 let mut clean_account_mismatched = Vec::new();
239 let mut incremental_account_trie_iter = incremental_account_trie.into_iter().peekable();
240 let mut clean_account_trie_iter = clean_account_trie.into_iter().peekable();
241 while incremental_account_trie_iter.peek().is_some() ||
242 clean_account_trie_iter.peek().is_some()
243 {
244 match (incremental_account_trie_iter.next(), clean_account_trie_iter.next()) {
245 (Some(incremental), Some(clean)) => {
246 similar_asserts::assert_eq!(incremental.0, clean.0, "Nibbles don't match");
247 if incremental.1 != clean.1 &&
248 clean.0 .0.len() > self.skip_node_depth.unwrap_or_default()
249 {
250 incremental_account_mismatched.push(incremental);
251 clean_account_mismatched.push(clean);
252 }
253 }
254 (Some(incremental), None) => {
255 warn!(target: "reth::cli", next = ?incremental, "Incremental account trie has more entries");
256 }
257 (None, Some(clean)) => {
258 warn!(target: "reth::cli", next = ?clean, "Clean account trie has more entries");
259 }
260 (None, None) => {
261 info!(target: "reth::cli", "Exhausted all account trie entries");
262 }
263 }
264 }
265
266 let mut first_mismatched_storage = None;
268 let mut incremental_storage_trie_iter = incremental_storage_trie.into_iter().peekable();
269 let mut clean_storage_trie_iter = clean_storage_trie.into_iter().peekable();
270 while incremental_storage_trie_iter.peek().is_some() ||
271 clean_storage_trie_iter.peek().is_some()
272 {
273 match (incremental_storage_trie_iter.next(), clean_storage_trie_iter.next()) {
274 (Some(incremental), Some(clean)) => {
275 if incremental != clean &&
276 clean.1.nibbles.len() > self.skip_node_depth.unwrap_or_default()
277 {
278 first_mismatched_storage = Some((incremental, clean));
279 break
280 }
281 }
282 (Some(incremental), None) => {
283 warn!(target: "reth::cli", next = ?incremental, "Incremental storage trie has more entries");
284 }
285 (None, Some(clean)) => {
286 warn!(target: "reth::cli", next = ?clean, "Clean storage trie has more entries")
287 }
288 (None, None) => {
289 info!(target: "reth::cli", "Exhausted all storage trie entries.")
290 }
291 }
292 }
293
294 similar_asserts::assert_eq!(
295 (
296 incremental_account_mismatched,
297 first_mismatched_storage.as_ref().map(|(incremental, _)| incremental)
298 ),
299 (
300 clean_account_mismatched,
301 first_mismatched_storage.as_ref().map(|(_, clean)| clean)
302 ),
303 "Mismatched trie nodes"
304 );
305 }
306
307 info!(target: "reth::cli", ?block_range, "Successfully validated incremental roots");
308
309 Ok(())
310 }
311}