Skip to main content

reth_cli_commands/stage/
run.rs

1//! Main `stage` command
2//!
3//! Stage debugging tool
4
5use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
6use alloy_eips::BlockHashOrNumber;
7use alloy_primitives::Sealable;
8use clap::Parser;
9use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks};
10use reth_cli::chainspec::ChainSpecParser;
11use reth_cli_runner::CliContext;
12use reth_cli_util::get_secret_key;
13use reth_config::config::{HashingConfig, SenderRecoveryConfig, TransactionLookupConfig};
14use reth_downloaders::{
15    bodies::bodies::BodiesDownloaderBuilder,
16    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
17};
18use reth_exex::ExExManagerHandle;
19use reth_network::BlockDownloaderProvider;
20use reth_network_p2p::HeadersClient;
21use reth_node_builder::common::metrics_hooks;
22use reth_node_core::{
23    args::{NetworkArgs, StageEnum},
24    version::version_metadata,
25};
26use reth_node_metrics::{
27    chain::ChainSpecInfo,
28    server::{MetricServer, MetricServerConfig},
29    version::VersionInfo,
30};
31use reth_provider::{
32    ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader,
33    StageCheckpointWriter,
34};
35use reth_stages::{
36    stages::{
37        AccountHashingStage, BodyStage, ExecutionStage, HeaderStage, IndexAccountHistoryStage,
38        IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage,
39        TransactionLookupStage,
40    },
41    ExecInput, ExecOutput, ExecutionStageThresholds, Stage, StageExt, UnwindInput, UnwindOutput,
42};
43use std::{any::Any, net::SocketAddr, sync::Arc, time::Instant};
44use tokio::sync::watch;
45use tracing::*;
46
47/// `reth stage` command
48#[derive(Debug, Parser)]
49pub struct Command<C: ChainSpecParser> {
50    #[command(flatten)]
51    env: EnvironmentArgs<C>,
52
53    /// Enable Prometheus metrics.
54    ///
55    /// The metrics will be served at the given interface and port.
56    #[arg(long, value_name = "SOCKET")]
57    metrics: Option<SocketAddr>,
58
59    /// The name of the stage to run
60    #[arg(value_enum)]
61    stage: StageEnum,
62
63    /// The height to start at
64    #[arg(long)]
65    from: u64,
66
67    /// The end of the stage
68    #[arg(long, short)]
69    to: u64,
70
71    /// Batch size for stage execution and unwind
72    #[arg(long)]
73    batch_size: Option<u64>,
74
75    /// Normally, running the stage requires unwinding for stages that already
76    /// have been run, in order to not rewrite to the same database slots.
77    ///
78    /// You can optionally skip the unwinding phase if you're syncing a block
79    /// range that has not been synced before.
80    #[arg(long, short)]
81    skip_unwind: bool,
82
83    /// Commits the changes in the database. WARNING: potentially destructive.
84    ///
85    /// Useful when you want to run diagnostics on the database.
86    ///
87    /// NOTE: This flag is currently required for the headers, bodies, and execution stages because
88    /// they use static files and must commit to properly unwind and run.
89    // TODO: We should consider allowing to run hooks at the end of the stage run,
90    // e.g. query the DB size, or any table data.
91    #[arg(long, short)]
92    commit: bool,
93
94    /// Save stage checkpoints
95    #[arg(long)]
96    checkpoints: bool,
97
98    #[command(flatten)]
99    network: NetworkArgs,
100}
101
102impl<C: ChainSpecParser<ChainSpec: EthChainSpec + Hardforks + EthereumHardforks>> Command<C> {
103    /// Execute `stage` command
104    pub async fn execute<N, Comp, F>(self, ctx: CliContext, components: F) -> eyre::Result<()>
105    where
106        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
107        Comp: CliNodeComponents<N>,
108        F: FnOnce(Arc<C::ChainSpec>) -> Comp,
109    {
110        // Quit early if the stages requires a commit and `--commit` is not provided.
111        if self.requires_commit() && !self.commit {
112            return Err(eyre::eyre!(
113                "The stage {} requires overwriting existing static files and must commit, but `--commit` was not provided. Please pass `--commit` and try again.",
114                self.stage.to_string()
115            ));
116        }
117
118        // Raise the fd limit of the process.
119        // Does not do anything on windows.
120        let _ = fdlimit::raise_fd_limit();
121
122        let runtime = ctx.task_executor.clone();
123        let Environment { provider_factory, config, data_dir } =
124            self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
125
126        let mut provider_rw = provider_factory.database_provider_rw()?;
127        let components = components(provider_factory.chain_spec());
128
129        if let Some(listen_addr) = self.metrics {
130            let config = MetricServerConfig::new(
131                listen_addr,
132                VersionInfo {
133                    version: version_metadata().cargo_pkg_version.as_ref(),
134                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
135                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
136                    git_sha: version_metadata().vergen_git_sha.as_ref(),
137                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
138                    build_profile: version_metadata().build_profile_name.as_ref(),
139                },
140                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
141                ctx.task_executor,
142                metrics_hooks(&provider_factory),
143                data_dir.pprof_dumps(),
144            );
145
146            MetricServer::new(config).serve().await?;
147        }
148
149        let batch_size = self.batch_size.unwrap_or(self.to.saturating_sub(self.from) + 1);
150
151        let etl_config = config.stages.etl.clone();
152        let prune_modes = config.prune.segments.clone();
153
154        let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
155            match self.stage {
156                StageEnum::Headers => {
157                    let consensus = Arc::new(components.consensus().clone());
158
159                    let network_secret_path = self
160                        .network
161                        .p2p_secret_key
162                        .clone()
163                        .unwrap_or_else(|| data_dir.p2p_secret());
164                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
165
166                    let default_peers_path = data_dir.known_peers();
167
168                    let network = self
169                        .network
170                        .network_config::<N::NetworkPrimitives>(
171                            &config,
172                            provider_factory.chain_spec(),
173                            p2p_secret_key,
174                            default_peers_path,
175                            runtime.clone(),
176                        )
177                        .build(provider_factory.clone())
178                        .start_network()
179                        .await?;
180                    let fetch_client = Arc::new(network.fetch_client().await?);
181
182                    // Use `to` as the tip for the stage
183                    let tip = loop {
184                        match fetch_client.get_header(BlockHashOrNumber::Number(self.to)).await {
185                            Ok(header) => {
186                                if let Some(header) = header.into_data() {
187                                    break header
188                                }
189                            }
190                            Err(error) if error.is_retryable() => {
191                                warn!(target: "reth::cli", "Error requesting header: {error}. Retrying...")
192                            }
193                            Err(error) => return Err(error.into()),
194                        }
195                    };
196                    let (_, rx) = watch::channel(tip.hash_slow());
197                    (
198                        Box::new(HeaderStage::new(
199                            provider_factory.clone(),
200                            ReverseHeadersDownloaderBuilder::new(config.stages.headers)
201                                .build(fetch_client, consensus.clone()),
202                            rx,
203                            etl_config,
204                        )),
205                        None,
206                    )
207                }
208                StageEnum::Bodies => {
209                    let consensus = Arc::new(components.consensus().clone());
210
211                    let mut config = config;
212                    config.peers.trusted_nodes_only = self.network.trusted_only;
213                    config.peers.trusted_nodes.extend(self.network.trusted_peers.clone());
214
215                    let network_secret_path = self
216                        .network
217                        .p2p_secret_key
218                        .clone()
219                        .unwrap_or_else(|| data_dir.p2p_secret());
220                    let p2p_secret_key = get_secret_key(&network_secret_path)?;
221
222                    let default_peers_path = data_dir.known_peers();
223
224                    let network = self
225                        .network
226                        .network_config::<N::NetworkPrimitives>(
227                            &config,
228                            provider_factory.chain_spec(),
229                            p2p_secret_key,
230                            default_peers_path,
231                            runtime.clone(),
232                        )
233                        .build(provider_factory.clone())
234                        .start_network()
235                        .await?;
236                    let fetch_client = Arc::new(network.fetch_client().await?);
237
238                    let stage = BodyStage::new(
239                        BodiesDownloaderBuilder::default()
240                            .with_stream_batch_size(batch_size as usize)
241                            .with_request_limit(config.stages.bodies.downloader_request_limit)
242                            .with_max_buffered_blocks_size_bytes(
243                                config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
244                            )
245                            .with_concurrent_requests_range(
246                                config.stages.bodies.downloader_min_concurrent_requests..=
247                                    config.stages.bodies.downloader_max_concurrent_requests,
248                            )
249                            .build(fetch_client, consensus.clone(), provider_factory.clone()),
250                    );
251                    (Box::new(stage), None)
252                }
253                StageEnum::Senders => (
254                    Box::new(SenderRecoveryStage::new(
255                        SenderRecoveryConfig { commit_threshold: batch_size },
256                        None,
257                    )),
258                    None,
259                ),
260                StageEnum::Execution => (
261                    Box::new(ExecutionStage::new(
262                        components.evm_config().clone(),
263                        Arc::new(components.consensus().clone()),
264                        ExecutionStageThresholds {
265                            max_blocks: Some(batch_size),
266                            max_changes: None,
267                            max_cumulative_gas: None,
268                            max_duration: None,
269                        },
270                        config.stages.merkle.incremental_threshold,
271                        ExExManagerHandle::empty(),
272                    )),
273                    None,
274                ),
275                StageEnum::TxLookup => (
276                    Box::new(TransactionLookupStage::new(
277                        TransactionLookupConfig { chunk_size: batch_size },
278                        etl_config,
279                        prune_modes.transaction_lookup,
280                    )),
281                    None,
282                ),
283                StageEnum::AccountHashing => (
284                    Box::new(AccountHashingStage::new(
285                        HashingConfig {
286                            clean_threshold: 1,
287                            commit_threshold: batch_size,
288                            commit_entries: u64::MAX,
289                        },
290                        etl_config,
291                    )),
292                    None,
293                ),
294                StageEnum::StorageHashing => (
295                    Box::new(StorageHashingStage::new(
296                        HashingConfig {
297                            clean_threshold: 1,
298                            commit_threshold: batch_size,
299                            commit_entries: u64::MAX,
300                        },
301                        etl_config,
302                    )),
303                    None,
304                ),
305                StageEnum::Merkle => (
306                    Box::new(MerkleStage::new_execution(
307                        config.stages.merkle.rebuild_threshold,
308                        config.stages.merkle.incremental_threshold,
309                    )),
310                    Some(Box::new(MerkleStage::default_unwind())),
311                ),
312                StageEnum::AccountHistory => (
313                    Box::new(IndexAccountHistoryStage::new(
314                        config.stages.index_account_history,
315                        etl_config,
316                        prune_modes.account_history,
317                    )),
318                    None,
319                ),
320                StageEnum::StorageHistory => (
321                    Box::new(IndexStorageHistoryStage::new(
322                        config.stages.index_storage_history,
323                        etl_config,
324                        prune_modes.storage_history,
325                    )),
326                    None,
327                ),
328                _ => return Ok(()),
329            };
330        if let Some(unwind_stage) = &unwind_stage {
331            assert_eq!((*exec_stage).type_id(), (**unwind_stage).type_id());
332        }
333
334        let checkpoint = provider_rw.get_stage_checkpoint(exec_stage.id())?.unwrap_or_default();
335
336        let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage);
337
338        let mut unwind = UnwindInput {
339            checkpoint: checkpoint.with_block_number(self.to),
340            unwind_to: self.from,
341            bad_block: None,
342        };
343
344        if !self.skip_unwind {
345            while unwind.checkpoint.block_number > self.from {
346                let UnwindOutput { checkpoint } = unwind_stage.unwind(&provider_rw, unwind)?;
347                unwind.checkpoint = checkpoint;
348
349                if self.checkpoints {
350                    provider_rw.save_stage_checkpoint(unwind_stage.id(), checkpoint)?;
351                }
352
353                if self.commit {
354                    provider_rw.commit()?;
355                    provider_rw = provider_factory.database_provider_rw()?;
356                }
357            }
358        }
359
360        let mut input = ExecInput {
361            target: Some(self.to),
362            checkpoint: Some(checkpoint.with_block_number(self.from)),
363        };
364
365        let start = Instant::now();
366        info!(target: "reth::cli", stage = %self.stage, "Executing stage");
367        loop {
368            exec_stage.execute_ready(input).await?;
369            let ExecOutput { checkpoint, done } = exec_stage.execute(&provider_rw, input)?;
370
371            input.checkpoint = Some(checkpoint);
372
373            if self.checkpoints {
374                provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
375            }
376            if self.commit {
377                provider_rw.commit()?;
378                provider_rw = provider_factory.database_provider_rw()?;
379            }
380
381            if done {
382                break
383            }
384        }
385        info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");
386
387        Ok(())
388    }
389}
390
391impl<C: ChainSpecParser> Command<C> {
392    /// Returns the underlying chain being used to run this command
393    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
394        Some(&self.env.chain)
395    }
396
397    /// Returns whether or not the configured stage requires committing.
398    ///
399    /// This is the case for stages that mainly modify static files, as there is no way to unwind
400    /// these stages without committing anyways. This is because static files do not have
401    /// transactions and we cannot change the view of headers without writing.
402    pub fn requires_commit(&self) -> bool {
403        matches!(self.stage, StageEnum::Headers | StageEnum::Bodies | StageEnum::Execution)
404    }
405}