1use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16 bodies::bodies::BodiesDownloaderBuilder,
17 headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_exex::ExExManagerHandle;
20use reth_network::BlockDownloaderProvider;
21use reth_network_p2p::HeadersClient;
22use reth_node_core::{
23 args::{NetworkArgs, StageEnum},
24 version::version_metadata,
25};
26use reth_node_metrics::{
27 chain::ChainSpecInfo,
28 hooks::Hooks,
29 server::{MetricServer, MetricServerConfig},
30 version::VersionInfo,
31};
32use reth_provider::{
33 writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
34 StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
35};
36use reth_stages::{
37 stages::{
38 AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
39 IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
40 TransactionLookupStage,
41 },
42 ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
43};
44use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
45use tokio::sync::watch;
46use tracing::*;
47
48#[derive(Debug, Parser)]
50pub struct Command<C: ChainSpecParser> {
51 #[command(flatten)]
52 env: EnvironmentArgs<C>,
53
54 #[arg(long, value_name = "SOCKET")]
58 metrics: Option<SocketAddr>,
59
60 #[arg(value_enum)]
62 stage: StageEnum,
63
64 #[arg(long)]
66 from: u64,
67
68 #[arg(long, short)]
70 to: u64,
71
72 #[arg(long)]
74 batch_size: Option<u64>,
75
76 #[arg(long, short)]
82 skip_unwind: bool,
83
84 #[arg(long, short)]
90 commit: bool,
91
92 #[arg(long)]
94 checkpoints: bool,
95
96 #[command(flatten)]
97 network: NetworkArgs,
98}
99
100impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
101 pub async fn execute<N, Comp, F>(self, ctx: CliContext, components: F) -> eyre::Result<()>
103 where
104 N: CliNodeTypes<ChainSpec = C::ChainSpec>,
105 Comp: CliNodeComponents<N>,
106 F: FnOnce(Arc<C::ChainSpec>) -> Comp,
107 {
108 let _ = fdlimit::raise_fd_limit();
111
112 let Environment { provider_factory, config, data_dir } =
113 self.env.init::<N>(AccessRights::RW)?;
114
115 let mut provider_rw = provider_factory.database_provider_rw()?;
116 let components = components(provider_factory.chain_spec());
117
118 if let Some(listen_addr) = self.metrics {
119 info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
120 let config = MetricServerConfig::new(
121 listen_addr,
122 VersionInfo {
123 version: version_metadata().cargo_pkg_version.as_ref(),
124 build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
125 cargo_features: version_metadata().vergen_cargo_features.as_ref(),
126 git_sha: version_metadata().vergen_git_sha.as_ref(),
127 target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
128 build_profile: version_metadata().build_profile_name.as_ref(),
129 },
130 ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
131 ctx.task_executor,
132 Hooks::builder()
133 .with_hook({
134 let db = provider_factory.db_ref().clone();
135 move || db.report_metrics()
136 })
137 .with_hook({
138 let sfp = provider_factory.static_file_provider();
139 move || {
140 if let Err(error) = sfp.report_metrics() {
141 error!(%error, "Failed to report metrics from static file provider");
142 }
143 }
144 })
145 .build(),
146 );
147
148 MetricServer::new(config).serve().await?;
149 }
150
151 let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
152
153 let etl_config = config.stages.etl.clone();
154 let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
155
156 let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
157 match self.stage {
158 StageEnum::Headers => {
159 let consensus = Arc::new(components.consensus().clone());
160
161 let network_secret_path = self
162 .network
163 .p2p_secret_key
164 .clone()
165 .unwrap_or_else(|| data_dir.p2p_secret());
166 let p2p_secret_key = get_secret_key(&network_secret_path)?;
167
168 let default_peers_path = data_dir.known_peers();
169
170 let network = self
171 .network
172 .network_config::<N::NetworkPrimitives>(
173 &config,
174 provider_factory.chain_spec(),
175 p2p_secret_key,
176 default_peers_path,
177 )
178 .build(provider_factory.clone())
179 .start_network()
180 .await?;
181 let fetch_client = Arc::new(network.fetch_client().await?);
182
183 let tip = loop {
185 match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
186 Ok(header) => {
187 if let Some(header) = header.into_data() {
188 break header
189 }
190 }
191 Err(error) if error.is_retryable() => {
192 warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
193 }
194 Err(error) => return Err(error.into()),
195 }
196 };
197 let (_, rx) = watch::channel(tip.hash_slow());
198 (
199 Box::new(HeaderStage::new(
200 provider_factory.clone(),
201 ReverseHeadersDownloaderBuilder::new(config.stages.headers)
202 .build(fetch_client, consensus.clone()),
203 rx,
204 etl_config,
205 )),
206 None,
207 )
208 }
209 StageEnum::Bodies => {
210 let consensus = Arc::new(components.consensus().clone());
211
212 let mut config = config;
213 config.peers.trusted_nodes_only = self.network.trusted_only;
214 config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
215
216 let network_secret_path = self
217 .network
218 .p2p_secret_key
219 .clone()
220 .unwrap_or_else(|| data_dir.p2p_secret());
221 let p2p_secret_key = get_secret_key(&network_secret_path)?;
222
223 let default_peers_path = data_dir.known_peers();
224
225 let network = self
226 .network
227 .network_config::<N::NetworkPrimitives>(
228 &config,
229 provider_factory.chain_spec(),
230 p2p_secret_key,
231 default_peers_path,
232 )
233 .build(provider_factory.clone())
234 .start_network()
235 .await?;
236 let fetch_client = Arc::new(network.fetch_client().await?);
237
238 let stage = BodyStage::new(
239 BodiesDownloaderBuilder::default()
240 .with_stream_batch_size(batch_size as usize)
241 .with_request_limit(config.stages.bodies.downloader_request_limit)
242 .with_max_buffered_blocks_size_bytes(
243 config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
244 )
245 .with_concurrent_requests_range(
246 config.stages.bodies.downloader_min_concurrent_requests..=
247 config.stages.bodies.downloader_max_concurrent_requests,
248 )
249 .build(fetch_client, consensus.clone(), provider_factory.clone()),
250 );
251 (Box::new(stage), None)
252 }
253 StageEnum::Senders => (
254 Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
255 commit_threshold: batch_size,
256 })),
257 None,
258 ),
259 StageEnum::Execution => (
260 Box::new(ExecutionStage::new(
261 components.evm_config().clone(),
262 Arc::new(components.consensus().clone()),
263 ExecutionStageThresholds {
264 max_blocks: Some(batch_size),
265 max_changes: None,
266 max_cumulative_gas: None,
267 max_duration: None,
268 },
269 config.stages.merkle.incremental_threshold,
270 ExExManagerHandle::empty(),
271 )),
272 None,
273 ),
274 StageEnum::TxLookup => (
275 Box::new(TransactionLookupStage::new(
276 TransactionLookupConfig { chunk_size: batch_size },
277 etl_config,
278 prune_modes.transaction_lookup,
279 )),
280 None,
281 ),
282 StageEnum::AccountHashing => (
283 Box::new(AccountHashingStage::new(
284 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
285 etl_config,
286 )),
287 None,
288 ),
289 StageEnum::StorageHashing => (
290 Box::new(StorageHashingStage::new(
291 HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
292 etl_config,
293 )),
294 None,
295 ),
296 StageEnum::Merkle => (
297 Box::new(MerkleStage::new_execution(
298 config.stages.merkle.rebuild_threshold,
299 config.stages.merkle.incremental_threshold,
300 )),
301 Some(Box::new(MerkleStage::default_unwind())),
302 ),
303 StageEnum::AccountHistory => (
304 Box::new(IndexAccountHistoryStage::new(
305 config.stages.index_account_history,
306 etl_config,
307 prune_modes.account_history,
308 )),
309 None,
310 ),
311 StageEnum::StorageHistory => (
312 Box::new(IndexStorageHistoryStage::new(
313 config.stages.index_storage_history,
314 etl_config,
315 prune_modes.storage_history,
316 )),
317 None,
318 ),
319 _ => return Ok(()),
320 };
321 if let Some(unwind_stage) = &unwind_stage {
322 assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
323 }
324
325 let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
326
327 let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
328
329 let mut unwind = UnwindInput {
330 checkpoint: checkpoint.with_block_number(self.to),
331 unwind_to: self.from,
332 bad_block: None,
333 };
334
335 if !self.skip_unwind {
336 while unwind.checkpoint.block_number > self.from {
337 let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
338 unwind.checkpoint = checkpoint;
339
340 if self.checkpoints {
341 provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
342 }
343
344 if self.commit {
345 UnifiedStorageWriter::commit_unwind(provider_rw)?;
346 provider_rw = provider_factory.database_provider_rw()?;
347 }
348 }
349 }
350
351 let mut input = ExecInput {
352 target: Some(self.to),
353 checkpoint: Some(checkpoint.with_block_number(self.from)),
354 };
355
356 let start = Instant::now();
357 info!(target: "reth::cli", stage = %self.stage, "Executing stage");
358 loop {
359 exec_stage.execute_ready(input).await?;
360 let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
361
362 input.checkpoint = Some(checkpoint);
363
364 if self.checkpoints {
365 provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
366 }
367 if self.commit {
368 UnifiedStorageWriter::commit(provider_rw)?;
369 provider_rw = provider_factory.database_provider_rw()?;
370 }
371
372 if done {
373 break
374 }
375 }
376 info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
377
378 Ok(())
379 }
380}
381
382impl<C: ChainSpecParser> Command<C> {
383 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
385 Some(&self.env.chain)
386 }
387}