1use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16 bodies::bodies::BodiesDownloaderBuilder,
17 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_exex::ExExManagerHandle;
20use reth_network::BlockDownloaderProvider;
21use reth_network_p2p::HeadersClient;
22use reth_node_core::{
23 args::{NetworkArgs, StageEnum},
24 version::version_metadata,
25};
26use reth_node_metrics::{
27 chain::ChainSpecInfo,
28 hooks::Hooks,
29 server::{MetricServer, MetricServerConfig},
30 version::VersionInfo,
31};
32use reth_provider::{
33 ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
34 StageCheckpointWriter, StaticFileProviderFactory,
35};
36use reth_stages::{
37 stages::{
38 AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
39 IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
40 TransactionLookupStage,
41 },
42 ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
43};
44use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
45use tokio::sync::watch;
46use tracing::*;
47
48#[derive(Debug, Parser)]
50pub struct Command<C: ChainSpecParser> {
51 #[command(flatten)]
52 env: EnvironmentArgs<C>,
53
54 #[arg(long, value_name = "SOCKET")]
58 metrics: Option<SocketAddr>,
59
60 #[arg(value_enum)]
62 stage: StageEnum,
63
64 #[arg(long)]
66 from: u64,
67
68 #[arg(long, short)]
70 to: u64,
71
72 #[arg(long)]
74 batch_size: Option<u64>,
75
76 #[arg(long, short)]
82 skip_unwind: bool,
83
84 #[arg(long, short)]
93 commit: bool,
94
95 #[arg(long)]
97 checkpoints: bool,
98
99 #[command(flatten)]
100 network: NetworkArgs,
101}
102
103impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
104 pub async fn execute<N, Comp, F>(self, ctx: CliContext, components: F) -> eyre::Result<()>
106 where
107 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
108 Comp: CliNodeComponents<N>,
109 F: FnOnce(Arc<C::ChainSpec>) -> Comp,
110 {
111 if self.requires_commit() && !self.commit {
113 return Err(eyre::eyre!(
114 "The stage {} requires overwriting existing static files and must commit, but `--commit` was not provided. Please pass `--commit` and try again.",
115 self.stage.to_string()
116 ));
117 }
118
119 let _ = fdlimit::raise_fd_limit();
122
123 let Environment { provider_factory, config, data_dir } =
124 self.env.init::<N>(AccessRights::RW)?;
125
126 let mut provider_rw = provider_factory.database_provider_rw()?;
127 let components = components(provider_factory.chain_spec());
128
129 if let Some(listen_addr) = self.metrics {
130 let config = MetricServerConfig::new(
131 listen_addr,
132 VersionInfo {
133 version: version_metadata().cargo_pkg_version.as_ref(),
134 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
135 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
136 git_sha: version_metadata().vergen_git_sha.as_ref(),
137 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
138 build_profile: version_metadata().build_profile_name.as_ref(),
139 },
140 ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
141 ctx.task_executor,
142 Hooks::builder()
143 .with_hook({
144 let db = provider_factory.db_ref().clone();
145 move || db.report_metrics()
146 })
147 .with_hook({
148 let sfp = provider_factory.static_file_provider();
149 move || {
150 if let Err(error) = sfp.report_metrics() {
151 error!(%error, "Failed to report metrics from static file provider");
152 }
153 }
154 })
155 .build(),
156 );
157
158 MetricServer::new(config).serve().await?;
159 }
160
161 let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
162
163 let etl_config = config.stages.etl.clone();
164 let prune_modes = config.prune.segments.clone();
165
166 let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
167 match self.stage {
168 StageEnum::Headers => {
169 let consensus = Arc::new(components.consensus().clone());
170
171 let network_secret_path = self
172 .network
173 .p2p_secret_key
174 .clone()
175 .unwrap_or_else(|| data_dir.p2p_secret());
176 let p2p_secret_key = get_secret_key(&network_secret_path)?;
177
178 let default_peers_path = data_dir.known_peers();
179
180 let network = self
181 .network
182 .network_config::<N::NetworkPrimitives>(
183 &config,
184 provider_factory.chain_spec(),
185 p2p_secret_key,
186 default_peers_path,
187 )
188 .build(provider_factory.clone())
189 .start_network()
190 .await?;
191 let fetch_client = Arc::new(network.fetch_client().await?);
192
193 let tip = loop {
195 match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
196 Ok(header) => {
197 if let Some(header) = header.into_data() {
198 break header
199 }
200 }
201 Err(error) if error.is_retryable() => {
202 warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
203 }
204 Err(error) => return Err(error.into()),
205 }
206 };
207 let (_, rx) = watch::channel(tip.hash_slow());
208 (
209 Box::new(HeaderStage::new(
210 provider_factory.clone(),
211 ReverseHeadersDownloaderBuilder::new(config.stages.headers)
212 .build(fetch_client, consensus.clone()),
213 rx,
214 etl_config,
215 )),
216 None,
217 )
218 }
219 StageEnum::Bodies => {
220 let consensus = Arc::new(components.consensus().clone());
221
222 let mut config = config;
223 config.peers.trusted_nodes_only = self.network.trusted_only;
224 config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
225
226 let network_secret_path = self
227 .network
228 .p2p_secret_key
229 .clone()
230 .unwrap_or_else(|| data_dir.p2p_secret());
231 let p2p_secret_key = get_secret_key(&network_secret_path)?;
232
233 let default_peers_path = data_dir.known_peers();
234
235 let network = self
236 .network
237 .network_config::<N::NetworkPrimitives>(
238 &config,
239 provider_factory.chain_spec(),
240 p2p_secret_key,
241 default_peers_path,
242 )
243 .build(provider_factory.clone())
244 .start_network()
245 .await?;
246 let fetch_client = Arc::new(network.fetch_client().await?);
247
248 let stage = BodyStage::new(
249 BodiesDownloaderBuilder::default()
250 .with_stream_batch_size(batch_size as usize)
251 .with_request_limit(config.stages.bodies.downloader_request_limit)
252 .with_max_buffered_blocks_size_bytes(
253 config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
254 )
255 .with_concurrent_requests_range(
256 config.stages.bodies.downloader_min_concurrent_requests..=
257 config.stages.bodies.downloader_max_concurrent_requests,
258 )
259 .build(fetch_client, consensus.clone(), provider_factory.clone()),
260 );
261 (Box::new(stage), None)
262 }
263 StageEnum::Senders => (
264 Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
265 commit_threshold: batch_size,
266 })),
267 None,
268 ),
269 StageEnum::Execution => (
270 Box::new(ExecutionStage::new(
271 components.evm_config().clone(),
272 Arc::new(components.consensus().clone()),
273 ExecutionStageThresholds {
274 max_blocks: Some(batch_size),
275 max_changes: None,
276 max_cumulative_gas: None,
277 max_duration: None,
278 },
279 config.stages.merkle.incremental_threshold,
280 ExExManagerHandle::empty(),
281 )),
282 None,
283 ),
284 StageEnum::TxLookup => (
285 Box::new(TransactionLookupStage::new(
286 TransactionLookupConfig { chunk_size: batch_size },
287 etl_config,
288 prune_modes.transaction_lookup,
289 )),
290 None,
291 ),
292 StageEnum::AccountHashing => (
293 Box::new(AccountHashingStage::new(
294 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
295 etl_config,
296 )),
297 None,
298 ),
299 StageEnum::StorageHashing => (
300 Box::new(StorageHashingStage::new(
301 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
302 etl_config,
303 )),
304 None,
305 ),
306 StageEnum::Merkle => (
307 Box::new(MerkleStage::new_execution(
308 config.stages.merkle.rebuild_threshold,
309 config.stages.merkle.incremental_threshold,
310 )),
311 Some(Box::new(MerkleStage::default_unwind())),
312 ),
313 StageEnum::AccountHistory => (
314 Box::new(IndexAccountHistoryStage::new(
315 config.stages.index_account_history,
316 etl_config,
317 prune_modes.account_history,
318 )),
319 None,
320 ),
321 StageEnum::StorageHistory => (
322 Box::new(IndexStorageHistoryStage::new(
323 config.stages.index_storage_history,
324 etl_config,
325 prune_modes.storage_history,
326 )),
327 None,
328 ),
329 _ => return Ok(()),
330 };
331 if let Some(unwind_stage) = &unwind_stage {
332 assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
333 }
334
335 let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
336
337 let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
338
339 let mut unwind = UnwindInput {
340 checkpoint: checkpoint.with_block_number(self.to),
341 unwind_to: self.from,
342 bad_block: None,
343 };
344
345 if !self.skip_unwind {
346 while unwind.checkpoint.block_number > self.from {
347 let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
348 unwind.checkpoint = checkpoint;
349
350 if self.checkpoints {
351 provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
352 }
353
354 if self.commit {
355 provider_rw.commit()?;
356 provider_rw = provider_factory.database_provider_rw()?;
357 }
358 }
359 }
360
361 let mut input = ExecInput {
362 target: Some(self.to),
363 checkpoint: Some(checkpoint.with_block_number(self.from)),
364 };
365
366 let start = Instant::now();
367 info!(target: "reth::cli", stage = %self.stage, "Executing stage");
368 loop {
369 exec_stage.execute_ready(input).await?;
370 let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
371
372 input.checkpoint = Some(checkpoint);
373
374 if self.checkpoints {
375 provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
376 }
377 if self.commit {
378 provider_rw.commit()?;
379 provider_rw = provider_factory.database_provider_rw()?;
380 }
381
382 if done {
383 break
384 }
385 }
386 info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
387
388 Ok(())
389 }
390}
391
392impl<C: ChainSpecParser> Command<C> {
393 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
395 Some(&self.env.chain)
396 }
397
398 pub fn requires_commit(&self) -> bool {
404 matches!(self.stage, StageEnum::Headers | StageEnum::Bodies | StageEnum::Execution)
405 }
406}