1use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16    bodies::bodies::BodiesDownloaderBuilder,
17    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_exex::ExExManagerHandle;
20use reth_network::BlockDownloaderProvider;
21use reth_network_p2p::HeadersClient;
22use reth_node_core::{
23    args::{NetworkArgs, StageEnum},
24    version::version_metadata,
25};
26use reth_node_metrics::{
27    chain::ChainSpecInfo,
28    hooks::Hooks,
29    server::{MetricServer, MetricServerConfig},
30    version::VersionInfo,
31};
32use reth_provider::{
33    ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
34    StageCheckpointWriter, StaticFileProviderFactory,
35};
36use reth_stages::{
37    stages::{
38        AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
39        IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
40        TransactionLookupStage,
41    },
42    ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
43};
44use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
45use tokio::sync::watch;
46use tracing::*;
47
48#[derive(Debug, Parser)]
50pub struct Command<C: ChainSpecParser> {
51    #[command(flatten)]
52    env: EnvironmentArgs<C>,
53
54    #[arg(long, value_name = "SOCKET")]
58    metrics: Option<SocketAddr>,
59
60    #[arg(value_enum)]
62    stage: StageEnum,
63
64    #[arg(long)]
66    from: u64,
67
68    #[arg(long, short)]
70    to: u64,
71
72    #[arg(long)]
74    batch_size: Option<u64>,
75
76    #[arg(long, short)]
82    skip_unwind: bool,
83
84    #[arg(long, short)]
90    commit: bool,
91
92    #[arg(long)]
94    checkpoints: bool,
95
96    #[command(flatten)]
97    network: NetworkArgs,
98}
99
100impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
101    pub async fn execute<N, Comp, F>(self, ctx: CliContext, components: F) -> eyre::Result<()>
103    where
104        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
105        Comp: CliNodeComponents<N>,
106        F: FnOnce(Arc<C::ChainSpec>) -> Comp,
107    {
108        let _ = fdlimit::raise_fd_limit();
111
112        let Environment { provider_factory, config, data_dir } =
113            self.env.init::<N>(AccessRights::RW)?;
114
115        let mut provider_rw = provider_factory.database_provider_rw()?;
116        let components = components(provider_factory.chain_spec());
117
118        if let Some(listen_addr) = self.metrics {
119            let config = MetricServerConfig::new(
120                listen_addr,
121                VersionInfo {
122                    version: version_metadata().cargo_pkg_version.as_ref(),
123                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
124                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
125                    git_sha: version_metadata().vergen_git_sha.as_ref(),
126                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
127                    build_profile: version_metadata().build_profile_name.as_ref(),
128                },
129                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
130                ctx.task_executor,
131                Hooks::builder()
132                    .with_hook({
133                        let db = provider_factory.db_ref().clone();
134                        move || db.report_metrics()
135                    })
136                    .with_hook({
137                        let sfp = provider_factory.static_file_provider();
138                        move || {
139                            if let Err(error) = sfp.report_metrics() {
140                                error!(%error, "Failed to report metrics from static file provider");
141                            }
142                        }
143                    })
144                    .build(),
145            );
146
147            MetricServer::new(config).serve().await?;
148        }
149
150        let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
151
152        let etl_config = config.stages.etl.clone();
153        let prune_modes = config.prune.segments.clone();
154
155        let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
156            match self.stage {
157                StageEnum::Headers => {
158                    let consensus = Arc::new(components.consensus().clone());
159
160                    let network_secret_path = self
161                        .network
162                        .p2p_secret_key
163                        .clone()
164                        .unwrap_or_else(|| data_dir.p2p_secret());
165                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
166
167                    let default_peers_path = data_dir.known_peers();
168
169                    let network = self
170                        .network
171                        .network_config::<N::NetworkPrimitives>(
172                            &config,
173                            provider_factory.chain_spec(),
174                            p2p_secret_key,
175                            default_peers_path,
176                        )
177                        .build(provider_factory.clone())
178                        .start_network()
179                        .await?;
180                    let fetch_client = Arc::new(network.fetch_client().await?);
181
182                    let tip = loop {
184                        match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
185                            Ok(header) => {
186                                if let Some(header) = header.into_data() {
187                                    break header
188                                }
189                            }
190                            Err(error) if error.is_retryable() => {
191                                warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
192                            }
193                            Err(error) => return Err(error.into()),
194                        }
195                    };
196                    let (_, rx) = watch::channel(tip.hash_slow());
197                    (
198                        Box::new(HeaderStage::new(
199                            provider_factory.clone(),
200                            ReverseHeadersDownloaderBuilder::new(config.stages.headers)
201                                .build(fetch_client, consensus.clone()),
202                            rx,
203                            etl_config,
204                        )),
205                        None,
206                    )
207                }
208                StageEnum::Bodies => {
209                    let consensus = Arc::new(components.consensus().clone());
210
211                    let mut config = config;
212                    config.peers.trusted_nodes_only = self.network.trusted_only;
213                    config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
214
215                    let network_secret_path = self
216                        .network
217                        .p2p_secret_key
218                        .clone()
219                        .unwrap_or_else(|| data_dir.p2p_secret());
220                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
221
222                    let default_peers_path = data_dir.known_peers();
223
224                    let network = self
225                        .network
226                        .network_config::<N::NetworkPrimitives>(
227                            &config,
228                            provider_factory.chain_spec(),
229                            p2p_secret_key,
230                            default_peers_path,
231                        )
232                        .build(provider_factory.clone())
233                        .start_network()
234                        .await?;
235                    let fetch_client = Arc::new(network.fetch_client().await?);
236
237                    let stage = BodyStage::new(
238                        BodiesDownloaderBuilder::default()
239                            .with_stream_batch_size(batch_size as usize)
240                            .with_request_limit(config.stages.bodies.downloader_request_limit)
241                            .with_max_buffered_blocks_size_bytes(
242                                config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
243                            )
244                            .with_concurrent_requests_range(
245                                config.stages.bodies.downloader_min_concurrent_requests..=
246                                    config.stages.bodies.downloader_max_concurrent_requests,
247                            )
248                            .build(fetch_client, consensus.clone(), provider_factory.clone()),
249                    );
250                    (Box::new(stage), None)
251                }
252                StageEnum::Senders => (
253                    Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
254                        commit_threshold: batch_size,
255                    })),
256                    None,
257                ),
258                StageEnum::Execution => (
259                    Box::new(ExecutionStage::new(
260                        components.evm_config().clone(),
261                        Arc::new(components.consensus().clone()),
262                        ExecutionStageThresholds {
263                            max_blocks: Some(batch_size),
264                            max_changes: None,
265                            max_cumulative_gas: None,
266                            max_duration: None,
267                        },
268                        config.stages.merkle.incremental_threshold,
269                        ExExManagerHandle::empty(),
270                    )),
271                    None,
272                ),
273                StageEnum::TxLookup => (
274                    Box::new(TransactionLookupStage::new(
275                        TransactionLookupConfig { chunk_size: batch_size },
276                        etl_config,
277                        prune_modes.transaction_lookup,
278                    )),
279                    None,
280                ),
281                StageEnum::AccountHashing => (
282                    Box::new(AccountHashingStage::new(
283                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
284                        etl_config,
285                    )),
286                    None,
287                ),
288                StageEnum::StorageHashing => (
289                    Box::new(StorageHashingStage::new(
290                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
291                        etl_config,
292                    )),
293                    None,
294                ),
295                StageEnum::Merkle => (
296                    Box::new(MerkleStage::new_execution(
297                        config.stages.merkle.rebuild_threshold,
298                        config.stages.merkle.incremental_threshold,
299                    )),
300                    Some(Box::new(MerkleStage::default_unwind())),
301                ),
302                StageEnum::AccountHistory => (
303                    Box::new(IndexAccountHistoryStage::new(
304                        config.stages.index_account_history,
305                        etl_config,
306                        prune_modes.account_history,
307                    )),
308                    None,
309                ),
310                StageEnum::StorageHistory => (
311                    Box::new(IndexStorageHistoryStage::new(
312                        config.stages.index_storage_history,
313                        etl_config,
314                        prune_modes.storage_history,
315                    )),
316                    None,
317                ),
318                _ => return Ok(()),
319            };
320        if let Some(unwind_stage) = &unwind_stage {
321            assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
322        }
323
324        let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
325
326        let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
327
328        let mut unwind = UnwindInput {
329            checkpoint: checkpoint.with_block_number(self.to),
330            unwind_to: self.from,
331            bad_block: None,
332        };
333
334        if !self.skip_unwind {
335            while unwind.checkpoint.block_number > self.from {
336                let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
337                unwind.checkpoint = checkpoint;
338
339                if self.checkpoints {
340                    provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
341                }
342
343                if self.commit {
344                    provider_rw.commit()?;
345                    provider_rw = provider_factory.database_provider_rw()?;
346                }
347            }
348        }
349
350        let mut input = ExecInput {
351            target: Some(self.to),
352            checkpoint: Some(checkpoint.with_block_number(self.from)),
353        };
354
355        let start = Instant::now();
356        info!(target: "reth::cli", stage = %self.stage, "Executing stage");
357        loop {
358            exec_stage.execute_ready(input).await?;
359            let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
360
361            input.checkpoint = Some(checkpoint);
362
363            if self.checkpoints {
364                provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
365            }
366            if self.commit {
367                provider_rw.commit()?;
368                provider_rw = provider_factory.database_provider_rw()?;
369            }
370
371            if done {
372                break
373            }
374        }
375        info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
376
377        Ok(())
378    }
379}
380
381impl<C: ChainSpecParser> Command<C> {
382    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
384        Some(&self.env.chain)
385    }
386}