Skip to main content

reth_cli_commands/
prune.rs

1//! Command that runs pruning.
2use crate::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
3use clap::Parser;
4use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
5use reth_cli::chainspec::ChainSpecParser;
6use reth_cli_runner::CliContext;
7use reth_cli_util::cancellation::CancellationToken;
8use reth_node_builder::common::metrics_hooks;
9use reth_node_core::{args::MetricArgs, version::version_metadata};
10use reth_node_metrics::{
11    chain::ChainSpecInfo,
12    server::{MetricServer, MetricServerConfig},
13    version::VersionInfo,
14};
15use reth_provider::RocksDBProviderFactory;
16use reth_prune::PrunerBuilder;
17use reth_static_file::StaticFileProducer;
18use std::sync::Arc;
19use tracing::info;
20
21/// Prunes according to the configuration
22#[derive(Debug, Parser)]
23pub struct PruneCommand<C: ChainSpecParser> {
24    #[command(flatten)]
25    env: EnvironmentArgs<C>,
26
27    /// Prometheus metrics configuration.
28    #[command(flatten)]
29    metrics: MetricArgs,
30}
31
32impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneCommand<C> {
33    /// Execute the `prune` command
34    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
35        self,
36        ctx: CliContext,
37    ) -> eyre::Result<()> {
38        let env = self.env.init::<N>(AccessRights::RW, ctx.task_executor.clone())?;
39        let provider_factory = env.provider_factory;
40        let config = env.config.prune;
41        let data_dir = env.data_dir;
42
43        if let Some(listen_addr) = self.metrics.prometheus {
44            let config = MetricServerConfig::new(
45                listen_addr,
46                VersionInfo {
47                    version: version_metadata().cargo_pkg_version.as_ref(),
48                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
49                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
50                    git_sha: version_metadata().vergen_git_sha.as_ref(),
51                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
52                    build_profile: version_metadata().build_profile_name.as_ref(),
53                },
54                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
55                ctx.task_executor.clone(),
56                metrics_hooks(&provider_factory),
57                data_dir.pprof_dumps(),
58            );
59
60            MetricServer::new(config).serve().await?;
61        }
62
63        // Copy data from database to static files
64        info!(target: "reth::cli", "Copying data from database to static files...");
65        let static_file_producer =
66            StaticFileProducer::new(provider_factory.clone(), config.segments.clone());
67        let lowest_static_file_height =
68            static_file_producer.lock().copy_to_static_files()?.min_block_num();
69        info!(target: "reth::cli", ?lowest_static_file_height, "Copied data from database to static files");
70
71        // Delete data which has been copied to static files.
72        if let Some(prune_tip) = lowest_static_file_height {
73            info!(target: "reth::cli", ?prune_tip, ?config, "Pruning data from database...");
74
75            // Set up cancellation token for graceful shutdown on Ctrl+C
76            let cancellation = CancellationToken::new();
77            let cancellation_clone = cancellation.clone();
78            ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
79                tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
80                cancellation_clone.cancel();
81            });
82
83            // Use batched pruning with a limit to bound memory, running in a loop until complete.
84            //
85            // A limit of 20_000_000 results in a max memory usage of ~5G.
86            const DELETE_LIMIT: usize = 20_000_000;
87            let mut pruner = PrunerBuilder::new(config)
88                .delete_limit(DELETE_LIMIT)
89                .build_with_provider_factory(provider_factory.clone());
90
91            let mut total_pruned = 0usize;
92            loop {
93                if cancellation.is_cancelled() {
94                    info!(target: "reth::cli", total_pruned, "Pruning interrupted by user");
95                    break;
96                }
97
98                let output = pruner.run(prune_tip)?;
99                let batch_pruned: usize = output.segments.iter().map(|(_, seg)| seg.pruned).sum();
100                total_pruned = total_pruned.saturating_add(batch_pruned);
101
102                if output.progress.is_finished() {
103                    info!(target: "reth::cli", total_pruned, "Pruned data from database");
104                    break;
105                }
106
107                if batch_pruned == 0 {
108                    return Err(eyre::eyre!(
109                        "pruner made no progress but reported more data remaining; \
110                         aborting to prevent infinite loop"
111                    ));
112                }
113
114                info!(
115                    target: "reth::cli",
116                    batch_pruned,
117                    total_pruned,
118                    "Pruning batch complete, continuing..."
119                );
120            }
121        }
122
123        // Flush and compact RocksDB to reclaim disk space after pruning
124        {
125            info!(target: "reth::cli", "Flushing and compacting RocksDB...");
126            provider_factory.rocksdb_provider().flush_and_compact()?;
127            info!(target: "reth::cli", "RocksDB compaction complete");
128        }
129
130        Ok(())
131    }
132}
133
134impl<C: ChainSpecParser> PruneCommand<C> {
135    /// Returns the underlying chain being used to run this command
136    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
137        Some(&self.env.chain)
138    }
139}