reth_cli_commands/stage/
run.rs

1//! Main `stage` command
2//!
3//! Stage debugging tool
4
5use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_db_api::database_metrics::DatabaseMetrics;
15use reth_downloaders::{
16    bodies::bodies::BodiesDownloaderBuilder,
17    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
18};
19use reth_exex::ExExManagerHandle;
20use reth_network::BlockDownloaderProvider;
21use reth_network_p2p::HeadersClient;
22use reth_node_core::{
23    args::{NetworkArgs, StageEnum},
24    version::version_metadata,
25};
26use reth_node_metrics::{
27    chain::ChainSpecInfo,
28    hooks::Hooks,
29    server::{MetricServer, MetricServerConfig},
30    version::VersionInfo,
31};
32use reth_provider::{
33    ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
34    StageCheckpointWriter, StaticFileProviderFactory,
35};
36use reth_stages::{
37    stages::{
38        AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
39        IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
40        TransactionLookupStage,
41    },
42    ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
43};
44use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
45use tokio::sync::watch;
46use tracing::*;
47
48/// `reth stage` command
49#[derive(Debug, Parser)]
50pub struct Command<C: ChainSpecParser> {
51    #[command(flatten)]
52    env: EnvironmentArgs<C>,
53
54    /// Enable Prometheus metrics.
55    ///
56    /// The metrics will be served at the given interface and port.
57    #[arg(long, value_name = "SOCKET")]
58    metrics: Option<SocketAddr>,
59
60    /// The name of the stage to run
61    #[arg(value_enum)]
62    stage: StageEnum,
63
64    /// The height to start at
65    #[arg(long)]
66    from: u64,
67
68    /// The end of the stage
69    #[arg(long, short)]
70    to: u64,
71
72    /// Batch size for stage execution and unwind
73    #[arg(long)]
74    batch_size: Option<u64>,
75
76    /// Normally, running the stage requires unwinding for stages that already
77    /// have been run, in order to not rewrite to the same database slots.
78    ///
79    /// You can optionally skip the unwinding phase if you're syncing a block
80    /// range that has not been synced before.
81    #[arg(long, short)]
82    skip_unwind: bool,
83
84    /// Commits the changes in the database. WARNING: potentially destructive.
85    ///
86    /// Useful when you want to run diagnostics on the database.
87    ///
88    /// NOTE: This flag is currently required for the headers, bodies, and execution stages because
89    /// they use static files and must commit to properly unwind and run.
90    // TODO: We should consider allowing to run hooks at the end of the stage run,
91    // e.g. query the DB size, or any table data.
92    #[arg(long, short)]
93    commit: bool,
94
95    /// Save stage checkpoints
96    #[arg(long)]
97    checkpoints: bool,
98
99    #[command(flatten)]
100    network: NetworkArgs,
101}
102
103impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
104    /// Execute `stage` command
105    pub async fn execute<N, Comp, F>(self, ctx: CliContext, components: F) -> eyre::Result<()>
106    where
107        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
108        Comp: CliNodeComponents<N>,
109        F: FnOnce(Arc<C::ChainSpec>) -> Comp,
110    {
111        // Quit early if the stages requires a commit and `--commit` is not provided.
112        if self.requires_commit() && !self.commit {
113            return Err(eyre::eyre!(
114                "The stage {} requires overwriting existing static files and must commit, but `--commit` was not provided. Please pass `--commit` and try again.",
115                self.stage.to_string()
116            ));
117        }
118
119        // Raise the fd limit of the process.
120        // Does not do anything on windows.
121        let _ = fdlimit::raise_fd_limit();
122
123        let Environment { provider_factory, config, data_dir } =
124            self.env.init::<N>(AccessRights::RW)?;
125
126        let mut provider_rw = provider_factory.database_provider_rw()?;
127        let components = components(provider_factory.chain_spec());
128
129        if let Some(listen_addr) = self.metrics {
130            let config = MetricServerConfig::new(
131                listen_addr,
132                VersionInfo {
133                    version: version_metadata().cargo_pkg_version.as_ref(),
134                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
135                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
136                    git_sha: version_metadata().vergen_git_sha.as_ref(),
137                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
138                    build_profile: version_metadata().build_profile_name.as_ref(),
139                },
140                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
141                ctx.task_executor,
142                Hooks::builder()
143                    .with_hook({
144                        let db = provider_factory.db_ref().clone();
145                        move || db.report_metrics()
146                    })
147                    .with_hook({
148                        let sfp = provider_factory.static_file_provider();
149                        move || {
150                            if let Err(error) = sfp.report_metrics() {
151                                error!(%error, "Failed to report metrics from static file provider");
152                            }
153                        }
154                    })
155                    .build(),
156            );
157
158            MetricServer::new(config).serve().await?;
159        }
160
161        let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
162
163        let etl_config = config.stages.etl.clone();
164        let prune_modes = config.prune.segments.clone();
165
166        let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
167            match self.stage {
168                StageEnum::Headers => {
169                    let consensus = Arc::new(components.consensus().clone());
170
171                    let network_secret_path = self
172                        .network
173                        .p2p_secret_key
174                        .clone()
175                        .unwrap_or_else(|| data_dir.p2p_secret());
176                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
177
178                    let default_peers_path = data_dir.known_peers();
179
180                    let network = self
181                        .network
182                        .network_config::<N::NetworkPrimitives>(
183                            &config,
184                            provider_factory.chain_spec(),
185                            p2p_secret_key,
186                            default_peers_path,
187                        )
188                        .build(provider_factory.clone())
189                        .start_network()
190                        .await?;
191                    let fetch_client = Arc::new(network.fetch_client().await?);
192
193                    // Use `to` as the tip for the stage
194                    let tip = loop {
195                        match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
196                            Ok(header) => {
197                                if let Some(header) = header.into_data() {
198                                    break header
199                                }
200                            }
201                            Err(error) if error.is_retryable() => {
202                                warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
203                            }
204                            Err(error) => return Err(error.into()),
205                        }
206                    };
207                    let (_, rx) = watch::channel(tip.hash_slow());
208                    (
209                        Box::new(HeaderStage::new(
210                            provider_factory.clone(),
211                            ReverseHeadersDownloaderBuilder::new(config.stages.headers)
212                                .build(fetch_client, consensus.clone()),
213                            rx,
214                            etl_config,
215                        )),
216                        None,
217                    )
218                }
219                StageEnum::Bodies => {
220                    let consensus = Arc::new(components.consensus().clone());
221
222                    let mut config = config;
223                    config.peers.trusted_nodes_only = self.network.trusted_only;
224                    config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
225
226                    let network_secret_path = self
227                        .network
228                        .p2p_secret_key
229                        .clone()
230                        .unwrap_or_else(|| data_dir.p2p_secret());
231                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
232
233                    let default_peers_path = data_dir.known_peers();
234
235                    let network = self
236                        .network
237                        .network_config::<N::NetworkPrimitives>(
238                            &config,
239                            provider_factory.chain_spec(),
240                            p2p_secret_key,
241                            default_peers_path,
242                        )
243                        .build(provider_factory.clone())
244                        .start_network()
245                        .await?;
246                    let fetch_client = Arc::new(network.fetch_client().await?);
247
248                    let stage = BodyStage::new(
249                        BodiesDownloaderBuilder::default()
250                            .with_stream_batch_size(batch_size as usize)
251                            .with_request_limit(config.stages.bodies.downloader_request_limit)
252                            .with_max_buffered_blocks_size_bytes(
253                                config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
254                            )
255                            .with_concurrent_requests_range(
256                                config.stages.bodies.downloader_min_concurrent_requests..=
257                                    config.stages.bodies.downloader_max_concurrent_requests,
258                            )
259                            .build(fetch_client, consensus.clone(), provider_factory.clone()),
260                    );
261                    (Box::new(stage), None)
262                }
263                StageEnum::Senders => (
264                    Box::new(SenderRecoveryStage::new(SenderRecoveryConfig {
265                        commit_threshold: batch_size,
266                    })),
267                    None,
268                ),
269                StageEnum::Execution => (
270                    Box::new(ExecutionStage::new(
271                        components.evm_config().clone(),
272                        Arc::new(components.consensus().clone()),
273                        ExecutionStageThresholds {
274                            max_blocks: Some(batch_size),
275                            max_changes: None,
276                            max_cumulative_gas: None,
277                            max_duration: None,
278                        },
279                        config.stages.merkle.incremental_threshold,
280                        ExExManagerHandle::empty(),
281                    )),
282                    None,
283                ),
284                StageEnum::TxLookup => (
285                    Box::new(TransactionLookupStage::new(
286                        TransactionLookupConfig { chunk_size: batch_size },
287                        etl_config,
288                        prune_modes.transaction_lookup,
289                    )),
290                    None,
291                ),
292                StageEnum::AccountHashing => (
293                    Box::new(AccountHashingStage::new(
294                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
295                        etl_config,
296                    )),
297                    None,
298                ),
299                StageEnum::StorageHashing => (
300                    Box::new(StorageHashingStage::new(
301                        HashingConfig { clean_threshold: 1, commit_threshold: batch_size },
302                        etl_config,
303                    )),
304                    None,
305                ),
306                StageEnum::Merkle => (
307                    Box::new(MerkleStage::new_execution(
308                        config.stages.merkle.rebuild_threshold,
309                        config.stages.merkle.incremental_threshold,
310                    )),
311                    Some(Box::new(MerkleStage::default_unwind())),
312                ),
313                StageEnum::AccountHistory => (
314                    Box::new(IndexAccountHistoryStage::new(
315                        config.stages.index_account_history,
316                        etl_config,
317                        prune_modes.account_history,
318                    )),
319                    None,
320                ),
321                StageEnum::StorageHistory => (
322                    Box::new(IndexStorageHistoryStage::new(
323                        config.stages.index_storage_history,
324                        etl_config,
325                        prune_modes.storage_history,
326                    )),
327                    None,
328                ),
329                _ => return Ok(()),
330            };
331        if let Some(unwind_stage) = &unwind_stage {
332            assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
333        }
334
335        let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
336
337        let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
338
339        let mut unwind = UnwindInput {
340            checkpoint: checkpoint.with_block_number(self.to),
341            unwind_to: self.from,
342            bad_block: None,
343        };
344
345        if !self.skip_unwind {
346            while unwind.checkpoint.block_number > self.from {
347                let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
348                unwind.checkpoint = checkpoint;
349
350                if self.checkpoints {
351                    provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
352                }
353
354                if self.commit {
355                    provider_rw.commit()?;
356                    provider_rw = provider_factory.database_provider_rw()?;
357                }
358            }
359        }
360
361        let mut input = ExecInput {
362            target: Some(self.to),
363            checkpoint: Some(checkpoint.with_block_number(self.from)),
364        };
365
366        let start = Instant::now();
367        info!(target: "reth::cli", stage = %self.stage, "Executing stage");
368        loop {
369            exec_stage.execute_ready(input).await?;
370            let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
371
372            input.checkpoint = Some(checkpoint);
373
374            if self.checkpoints {
375                provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
376            }
377            if self.commit {
378                provider_rw.commit()?;
379                provider_rw = provider_factory.database_provider_rw()?;
380            }
381
382            if done {
383                break
384            }
385        }
386        info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
387
388        Ok(())
389    }
390}
391
392impl<C: ChainSpecParser> Command<C> {
393    /// Returns the underlying chain being used to run this command
394    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
395        Some(&self.env.chain)
396    }
397
398    /// Returns whether or not the configured stage requires committing.
399    ///
400    /// This is the case for stages that mainly modify static files, as there is no way to unwind
401    /// these stages without committing anyways. This is because static files do not have
402    /// transactions and we cannot change the view of headers without writing.
403    pub fn requires_commit(&self) -> bool {
404        matches!(self.stage, StageEnum::Headers | StageEnum::Bodies | StageEnum::Execution)
405    }
406}