1use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_downloaders::{
15 bodies::bodies::BodiesDownloaderBuilder,
16 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
17};
18use reth_exex::ExExManagerHandle;
19use reth_network::BlockDownloaderProvider;
20use reth_network_p2p::HeadersClient;
21use reth_node_builder::common::metrics_hooks;
22use reth_node_core::{
23 args::{NetworkArgs, StageEnum},
24 version::version_metadata,
25};
26use reth_node_metrics::{
27 chain::ChainSpecInfo,
28 server::{MetricServer, MetricServerConfig},
29 version::VersionInfo,
30};
31use reth_provider::{
32 ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
33 StageCheckpointWriter,
34};
35use reth_stages::{
36 stages::{
37 AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
38 IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
39 TransactionLookupStage,
40 },
41 ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
42};
43use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
44use tokio::sync::watch;
45use tracing::*;
46
47#[derive(Debug, Parser)]
49pub struct Command<C: ChainSpecParser> {
50 #[command(flatten)]
51 env: EnvironmentArgs<C>,
52
53 #[arg(long, value_name = "SOCKET")]
57 metrics: Option<SocketAddr>,
58
59 #[arg(value_enum)]
61 stage: StageEnum,
62
63 #[arg(long)]
65 from: u64,
66
67 #[arg(long, short)]
69 to: u64,
70
71 #[arg(long)]
73 batch_size: Option<u64>,
74
75 #[arg(long, short)]
81 skip_unwind: bool,
82
83 #[arg(long, short)]
92 commit: bool,
93
94 #[arg(long)]
96 checkpoints: bool,
97
98 #[command(flatten)]
99 network: NetworkArgs,
100}
101
102impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
103 pub async fn execute<N, Comp, F>(self, ctx: CliContext, components: F) -> eyre::Result<()>
105 where
106 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
107 Comp: CliNodeComponents<N>,
108 F: FnOnce(Arc<C::ChainSpec>) -> Comp,
109 {
110 if self.requires_commit() && !self.commit {
112 return Err(eyre::eyre!(
113 "The stage {} requires overwriting existing static files and must commit, but `--commit` was not provided. Please pass `--commit` and try again.",
114 self.stage.to_string()
115 ));
116 }
117
118 let _ = fdlimit::raise_fd_limit();
121
122 let runtime = ctx.task_executor.clone();
123 let Environment { provider_factory, config, data_dir } =
124 self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
125
126 let mut provider_rw = provider_factory.database_provider_rw()?;
127 let components = components(provider_factory.chain_spec());
128
129 if let Some(listen_addr) = self.metrics {
130 let config = MetricServerConfig::new(
131 listen_addr,
132 VersionInfo {
133 version: version_metadata().cargo_pkg_version.as_ref(),
134 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
135 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
136 git_sha: version_metadata().vergen_git_sha.as_ref(),
137 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
138 build_profile: version_metadata().build_profile_name.as_ref(),
139 },
140 ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
141 ctx.task_executor,
142 metrics_hooks(&provider_factory),
143 data_dir.pprof_dumps(),
144 );
145
146 MetricServer::new(config).serve().await?;
147 }
148
149 let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
150
151 let etl_config = config.stages.etl.clone();
152 let prune_modes = config.prune.segments.clone();
153
154 let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
155 match self.stage {
156 StageEnum::Headers => {
157 let consensus = Arc::new(components.consensus().clone());
158
159 let network_secret_path = self
160 .network
161 .p2p_secret_key
162 .clone()
163 .unwrap_or_else(|| data_dir.p2p_secret());
164 let p2p_secret_key = get_secret_key(&network_secret_path)?;
165
166 let default_peers_path = data_dir.known_peers();
167
168 let network = self
169 .network
170 .network_config::<N::NetworkPrimitives>(
171 &config,
172 provider_factory.chain_spec(),
173 p2p_secret_key,
174 default_peers_path,
175 runtime.clone(),
176 )
177 .build(provider_factory.clone())
178 .start_network()
179 .await?;
180 let fetch_client = Arc::new(network.fetch_client().await?);
181
182 let tip = loop {
184 match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
185 Ok(header) => {
186 if let Some(header) = header.into_data() {
187 break header
188 }
189 }
190 Err(error) if error.is_retryable() => {
191 warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
192 }
193 Err(error) => return Err(error.into()),
194 }
195 };
196 let (_, rx) = watch::channel(tip.hash_slow());
197 (
198 Box::new(HeaderStage::new(
199 provider_factory.clone(),
200 ReverseHeadersDownloaderBuilder::new(config.stages.headers)
201 .build(fetch_client, consensus.clone()),
202 rx,
203 etl_config,
204 )),
205 None,
206 )
207 }
208 StageEnum::Bodies => {
209 let consensus = Arc::new(components.consensus().clone());
210
211 let mut config = config;
212 config.peers.trusted_nodes_only = self.network.trusted_only;
213 config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
214
215 let network_secret_path = self
216 .network
217 .p2p_secret_key
218 .clone()
219 .unwrap_or_else(|| data_dir.p2p_secret());
220 let p2p_secret_key = get_secret_key(&network_secret_path)?;
221
222 let default_peers_path = data_dir.known_peers();
223
224 let network = self
225 .network
226 .network_config::<N::NetworkPrimitives>(
227 &config,
228 provider_factory.chain_spec(),
229 p2p_secret_key,
230 default_peers_path,
231 runtime.clone(),
232 )
233 .build(provider_factory.clone())
234 .start_network()
235 .await?;
236 let fetch_client = Arc::new(network.fetch_client().await?);
237
238 let stage = BodyStage::new(
239 BodiesDownloaderBuilder::default()
240 .with_stream_batch_size(batch_size as usize)
241 .with_request_limit(config.stages.bodies.downloader_request_limit)
242 .with_max_buffered_blocks_size_bytes(
243 config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
244 )
245 .with_concurrent_requests_range(
246 config.stages.bodies.downloader_min_concurrent_requests..=
247 config.stages.bodies.downloader_max_concurrent_requests,
248 )
249 .build(fetch_client, consensus.clone(), provider_factory.clone()),
250 );
251 (Box::new(stage), None)
252 }
253 StageEnum::Senders => (
254 Box::new(SenderRecoveryStage::new(
255 SenderRecoveryConfig { commit_threshold: batch_size },
256 None,
257 )),
258 None,
259 ),
260 StageEnum::Execution => (
261 Box::new(ExecutionStage::new(
262 components.evm_config().clone(),
263 Arc::new(components.consensus().clone()),
264 ExecutionStageThresholds {
265 max_blocks: Some(batch_size),
266 max_changes: None,
267 max_cumulative_gas: None,
268 max_duration: None,
269 },
270 config.stages.merkle.incremental_threshold,
271 ExExManagerHandle::empty(),
272 )),
273 None,
274 ),
275 StageEnum::TxLookup => (
276 Box::new(TransactionLookupStage::new(
277 TransactionLookupConfig { chunk_size: batch_size },
278 etl_config,
279 prune_modes.transaction_lookup,
280 )),
281 None,
282 ),
283 StageEnum::AccountHashing => (
284 Box::new(AccountHashingStage::new(
285 HashingConfig {
286 clean_threshold: 1,
287 commit_threshold: batch_size,
288 commit_entries: u64::MAX,
289 },
290 etl_config,
291 )),
292 None,
293 ),
294 StageEnum::StorageHashing => (
295 Box::new(StorageHashingStage::new(
296 HashingConfig {
297 clean_threshold: 1,
298 commit_threshold: batch_size,
299 commit_entries: u64::MAX,
300 },
301 etl_config,
302 )),
303 None,
304 ),
305 StageEnum::Merkle => (
306 Box::new(MerkleStage::new_execution(
307 config.stages.merkle.rebuild_threshold,
308 config.stages.merkle.incremental_threshold,
309 )),
310 Some(Box::new(MerkleStage::default_unwind())),
311 ),
312 StageEnum::AccountHistory => (
313 Box::new(IndexAccountHistoryStage::new(
314 config.stages.index_account_history,
315 etl_config,
316 prune_modes.account_history,
317 )),
318 None,
319 ),
320 StageEnum::StorageHistory => (
321 Box::new(IndexStorageHistoryStage::new(
322 config.stages.index_storage_history,
323 etl_config,
324 prune_modes.storage_history,
325 )),
326 None,
327 ),
328 _ => return Ok(()),
329 };
330 if let Some(unwind_stage) = &unwind_stage {
331 assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
332 }
333
334 let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
335
336 let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
337
338 let mut unwind = UnwindInput {
339 checkpoint: checkpoint.with_block_number(self.to),
340 unwind_to: self.from,
341 bad_block: None,
342 };
343
344 if !self.skip_unwind {
345 while unwind.checkpoint.block_number > self.from {
346 let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
347 unwind.checkpoint = checkpoint;
348
349 if self.checkpoints {
350 provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
351 }
352
353 if self.commit {
354 provider_rw.commit()?;
355 provider_rw = provider_factory.database_provider_rw()?;
356 }
357 }
358 }
359
360 let mut input = ExecInput {
361 target: Some(self.to),
362 checkpoint: Some(checkpoint.with_block_number(self.from)),
363 };
364
365 let start = Instant::now();
366 info!(target: "reth::cli", stage = %self.stage, "Executing stage");
367 loop {
368 exec_stage.execute_ready(input).await?;
369 let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
370
371 input.checkpoint = Some(checkpoint);
372
373 if self.checkpoints {
374 provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
375 }
376 if self.commit {
377 provider_rw.commit()?;
378 provider_rw = provider_factory.database_provider_rw()?;
379 }
380
381 if done {
382 break
383 }
384 }
385 info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
386
387 Ok(())
388 }
389}
390
391impl<C: ChainSpecParser> Command<C> {
392 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
394 Some(&self.env.chain)
395 }
396
397 pub fn requires_commit(&self) -> bool {
403 matches!(self.stage, StageEnum::Headers | StageEnum::Bodies | StageEnum::Execution)
404 }
405}