reth_cli_commands/stage/
unwind.rs

1//! Unwinding a certain block range
2
3use crate::{
4    common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs},
5    stage::CliNodeComponents,
6};
7use alloy_eips::BlockHashOrNumber;
8use alloy_primitives::B256;
9use clap::{Parser, Subcommand};
10use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks};
11use reth_cli::chainspec::ChainSpecParser;
12use reth_config::Config;
13use reth_consensus::noop::NoopConsensus;
14use reth_db::DatabaseEnv;
15use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
16use reth_evm::ConfigureEvm;
17use reth_exex::ExExManagerHandle;
18use reth_provider::{
19    providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader,
20    ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory,
21};
22use reth_stages::{
23    sets::{DefaultStages, OfflineStages},
24    stages::ExecutionStage,
25    ExecutionStageThresholds, Pipeline, StageSet,
26};
27use reth_static_file::StaticFileProducer;
28use std::sync::Arc;
29use tokio::sync::watch;
30use tracing::info;
31
32/// `reth stage unwind` command
33#[derive(Debug, Parser)]
34pub struct Command<C: ChainSpecParser> {
35    #[command(flatten)]
36    env: EnvironmentArgs<C>,
37
38    #[command(subcommand)]
39    command: Subcommands,
40
41    /// If this is enabled, then all stages except headers, bodies, and sender recovery will be
42    /// unwound.
43    #[arg(long)]
44    offline: bool,
45}
46
47impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
48    /// Execute `db stage unwind` command
49    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>, F, Comp>(
50        self,
51        components: F,
52    ) -> eyre::Result<()>
53    where
54        Comp: CliNodeComponents<N>,
55        F: FnOnce(Arc<C::ChainSpec>) -> Comp,
56    {
57        let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
58
59        let target = self.command.unwind_target(provider_factory.clone())?;
60
61        let components = components(provider_factory.chain_spec());
62
63        let highest_static_file_block = provider_factory
64            .static_file_provider()
65            .get_highest_static_files()
66            .max_block_num()
67            .filter(|highest_static_file_block| *highest_static_file_block > target);
68
69        // Execute a pipeline unwind if the start of the range overlaps the existing static
70        // files. If that's the case, then copy all available data from MDBX to static files, and
71        // only then, proceed with the unwind.
72        //
73        // We also execute a pipeline unwind if `offline` is specified, because we need to only
74        // unwind the data associated with offline stages.
75        if highest_static_file_block.is_some() || self.offline {
76            if self.offline {
77                info!(target: "reth::cli", "Performing an unwind for offline-only data!");
78            }
79
80            if let Some(highest_static_file_block) = highest_static_file_block {
81                info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
82            } else {
83                info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
84            }
85            info!(target: "reth::cli", prune_config=?config.prune, "Using prune settings");
86
87            // This will build an offline-only pipeline if the `offline` flag is enabled
88            let mut pipeline =
89                self.build_pipeline(config, provider_factory, components.evm_config().clone())?;
90
91            // Move all applicable data from database to static files.
92            pipeline.move_to_static_files()?;
93
94            pipeline.unwind(target, None)?;
95        } else {
96            info!(target: "reth::cli", ?target, "Executing a database unwind.");
97            let provider = provider_factory.provider_rw()?;
98
99            provider
100                .remove_block_and_execution_above(target)
101                .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
102
103            // update finalized block if needed
104            let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
105            if last_saved_finalized_block_number.is_none_or(|f| f > target) {
106                provider.save_finalized_block_number(target)?;
107            }
108
109            provider.commit()?;
110        }
111
112        info!(target: "reth::cli", ?target, "Unwound blocks");
113
114        Ok(())
115    }
116
117    fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec>>(
118        self,
119        config: Config,
120        provider_factory: ProviderFactory<N>,
121        evm_config: impl ConfigureEvm<Primitives = N::Primitives> + 'static,
122    ) -> Result<Pipeline<N>, eyre::Error> {
123        let stage_conf = &config.stages;
124        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
125
126        let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
127
128        let builder = if self.offline {
129            Pipeline::<N>::builder().add_stages(
130                OfflineStages::new(
131                    evm_config,
132                    NoopConsensus::arc(),
133                    config.stages,
134                    prune_modes.clone(),
135                )
136                .builder()
137                .disable(reth_stages::StageId::SenderRecovery),
138            )
139        } else {
140            Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
141                DefaultStages::new(
142                    provider_factory.clone(),
143                    tip_rx,
144                    Arc::new(NoopConsensus::default()),
145                    NoopHeaderDownloader::default(),
146                    NoopBodiesDownloader::default(),
147                    evm_config.clone(),
148                    stage_conf.clone(),
149                    prune_modes.clone(),
150                    None,
151                )
152                .set(ExecutionStage::new(
153                    evm_config,
154                    Arc::new(NoopConsensus::default()),
155                    ExecutionStageThresholds {
156                        max_blocks: None,
157                        max_changes: None,
158                        max_cumulative_gas: None,
159                        max_duration: None,
160                    },
161                    stage_conf.execution_external_clean_threshold(),
162                    ExExManagerHandle::empty(),
163                )),
164            )
165        };
166
167        let pipeline = builder.build(
168            provider_factory.clone(),
169            StaticFileProducer::new(provider_factory, prune_modes),
170        );
171        Ok(pipeline)
172    }
173}
174
175impl<C: ChainSpecParser> Command<C> {
176    /// Return the underlying chain being used to run this command
177    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
178        Some(&self.env.chain)
179    }
180}
181
182/// `reth stage unwind` subcommand
183#[derive(Subcommand, Debug, Eq, PartialEq)]
184enum Subcommands {
185    /// Unwinds the database from the latest block, until the given block number or hash has been
186    /// reached, that block is not included.
187    #[command(name = "to-block")]
188    ToBlock { target: BlockHashOrNumber },
189    /// Unwinds the database from the latest block, until the given number of blocks have been
190    /// reached.
191    #[command(name = "num-blocks")]
192    NumBlocks { amount: u64 },
193}
194
195impl Subcommands {
196    /// Returns the block to unwind to. The returned block will stay in database.
197    fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
198        &self,
199        factory: ProviderFactory<N>,
200    ) -> eyre::Result<u64> {
201        let provider = factory.provider()?;
202        let last = provider.last_block_number()?;
203        let target = match self {
204            Self::ToBlock { target } => match target {
205                BlockHashOrNumber::Hash(hash) => provider
206                    .block_number(*hash)?
207                    .ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
208                BlockHashOrNumber::Number(num) => *num,
209            },
210            Self::NumBlocks { amount } => last.saturating_sub(*amount),
211        };
212        if target > last {
213            eyre::bail!(
214                "Target block number {target} is higher than the latest block number {last}"
215            )
216        }
217        Ok(target)
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use reth_chainspec::SEPOLIA;
225    use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
226
227    #[test]
228    fn parse_unwind() {
229        let cmd = Command::<EthereumChainSpecParser>::parse_from([
230            "reth",
231            "--datadir",
232            "dir",
233            "to-block",
234            "100",
235        ]);
236        assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
237
238        let cmd = Command::<EthereumChainSpecParser>::parse_from([
239            "reth",
240            "--datadir",
241            "dir",
242            "num-blocks",
243            "100",
244        ]);
245        assert_eq!(cmd.command, Subcommands::NumBlocks { amount: 100 });
246    }
247
248    #[test]
249    fn parse_unwind_chain() {
250        let cmd = Command::<EthereumChainSpecParser>::parse_from([
251            "reth", "--chain", "sepolia", "to-block", "100",
252        ]);
253        assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
254        assert_eq!(cmd.env.chain.chain_id(), SEPOLIA.chain_id());
255    }
256}