Skip to main content

reth_cli_commands/
prune.rs

1//! Command that runs pruning.
2use crate::common::{AccessRights, CliNodeTypes, EnvironmentArgs};
3use clap::Parser;
4use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
5use reth_cli::chainspec::ChainSpecParser;
6use reth_cli_runner::CliContext;
7use reth_cli_util::cancellation::CancellationToken;
8use reth_node_builder::common::metrics_hooks;
9use reth_node_core::{args::MetricArgs, version::version_metadata};
10use reth_node_metrics::{
11    chain::ChainSpecInfo,
12    server::{MetricServer, MetricServerConfig},
13    version::VersionInfo,
14};
15#[cfg(all(unix, feature = "rocksdb"))]
16use reth_provider::RocksDBProviderFactory;
17use reth_prune::PrunerBuilder;
18use reth_static_file::StaticFileProducer;
19use std::sync::Arc;
20use tracing::info;
21
22/// Prunes according to the configuration
23#[derive(Debug, Parser)]
24pub struct PruneCommand<C: ChainSpecParser> {
25    #[command(flatten)]
26    env: EnvironmentArgs<C>,
27
28    /// Prometheus metrics configuration.
29    #[command(flatten)]
30    metrics: MetricArgs,
31}
32
33impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> PruneCommand<C> {
34    /// Execute the `prune` command
35    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(
36        self,
37        ctx: CliContext,
38    ) -> eyre::Result<()> {
39        let env = self.env.init::<N>(AccessRights::RW)?;
40        let provider_factory = env.provider_factory;
41        let config = env.config.prune;
42        let data_dir = env.data_dir;
43
44        if let Some(listen_addr) = self.metrics.prometheus {
45            let config = MetricServerConfig::new(
46                listen_addr,
47                VersionInfo {
48                    version: version_metadata().cargo_pkg_version.as_ref(),
49                    build_timestamp: version_metadata().vergen_build_timestamp.as_ref(),
50                    cargo_features: version_metadata().vergen_cargo_features.as_ref(),
51                    git_sha: version_metadata().vergen_git_sha.as_ref(),
52                    target_triple: version_metadata().vergen_cargo_target_triple.as_ref(),
53                    build_profile: version_metadata().build_profile_name.as_ref(),
54                },
55                ChainSpecInfo { name: provider_factory.chain_spec().chain().to_string() },
56                ctx.task_executor.clone(),
57                metrics_hooks(&provider_factory),
58                data_dir.pprof_dumps(),
59            );
60
61            MetricServer::new(config).serve().await?;
62        }
63
64        // Copy data from database to static files
65        info!(target: "reth::cli", "Copying data from database to static files...");
66        let static_file_producer =
67            StaticFileProducer::new(provider_factory.clone(), config.segments.clone());
68        let lowest_static_file_height =
69            static_file_producer.lock().copy_to_static_files()?.min_block_num();
70        info!(target: "reth::cli", ?lowest_static_file_height, "Copied data from database to static files");
71
72        // Delete data which has been copied to static files.
73        if let Some(prune_tip) = lowest_static_file_height {
74            info!(target: "reth::cli", ?prune_tip, ?config, "Pruning data from database...");
75
76            // Set up cancellation token for graceful shutdown on Ctrl+C
77            let cancellation = CancellationToken::new();
78            let cancellation_clone = cancellation.clone();
79            ctx.task_executor.spawn_critical_task("prune-ctrl-c", async move {
80                tokio::signal::ctrl_c().await.expect("failed to listen for ctrl-c");
81                cancellation_clone.cancel();
82            });
83
84            // Use batched pruning with a limit to bound memory, running in a loop until complete.
85            //
86            // A limit of 20_000_000 results in a max memory usage of ~5G.
87            const DELETE_LIMIT: usize = 20_000_000;
88            let mut pruner = PrunerBuilder::new(config)
89                .delete_limit(DELETE_LIMIT)
90                .build_with_provider_factory(provider_factory.clone());
91
92            let mut total_pruned = 0usize;
93            loop {
94                if cancellation.is_cancelled() {
95                    info!(target: "reth::cli", total_pruned, "Pruning interrupted by user");
96                    break;
97                }
98
99                let output = pruner.run(prune_tip)?;
100                let batch_pruned: usize = output.segments.iter().map(|(_, seg)| seg.pruned).sum();
101                total_pruned = total_pruned.saturating_add(batch_pruned);
102
103                if output.progress.is_finished() {
104                    info!(target: "reth::cli", total_pruned, "Pruned data from database");
105                    break;
106                }
107
108                if batch_pruned == 0 {
109                    return Err(eyre::eyre!(
110                        "pruner made no progress but reported more data remaining; \
111                         aborting to prevent infinite loop"
112                    ));
113                }
114
115                info!(
116                    target: "reth::cli",
117                    batch_pruned,
118                    total_pruned,
119                    "Pruning batch complete, continuing..."
120                );
121            }
122        }
123
124        // Flush and compact RocksDB to reclaim disk space after pruning
125        #[cfg(all(unix, feature = "rocksdb"))]
126        {
127            info!(target: "reth::cli", "Flushing and compacting RocksDB...");
128            provider_factory.rocksdb_provider().flush_and_compact()?;
129            info!(target: "reth::cli", "RocksDB compaction complete");
130        }
131
132        Ok(())
133    }
134}
135
136impl<C: ChainSpecParser> PruneCommand<C> {
137    /// Returns the underlying chain being used to run this command
138    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
139        Some(&self.env.chain)
140    }
141}