reth_cli_commands/stage/
unwind.rs

1//! Unwinding a certain block range
2
3use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::B256;
6use clap::{Parser, Subcommand};
7use reth_chainspec::{EthChainSpec, EthereumHardforks};
8use reth_cli::chainspec::ChainSpecParser;
9use reth_config::Config;
10use reth_consensus::noop::NoopConsensus;
11use reth_db::DatabaseEnv;
12use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
13use reth_evm::noop::NoopBlockExecutorProvider;
14use reth_exex::ExExManagerHandle;
15use reth_provider::{
16    providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader,
17    ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StorageLocation,
18};
19use reth_stages::{
20    sets::{DefaultStages, OfflineStages},
21    stages::ExecutionStage,
22    ExecutionStageThresholds, Pipeline, StageSet,
23};
24use reth_static_file::StaticFileProducer;
25use std::sync::Arc;
26use tokio::sync::watch;
27use tracing::info;
28
29/// `reth stage unwind` command
30#[derive(Debug, Parser)]
31pub struct Command<C: ChainSpecParser> {
32    #[command(flatten)]
33    env: EnvironmentArgs<C>,
34
35    #[command(subcommand)]
36    command: Subcommands,
37
38    /// If this is enabled, then all stages except headers, bodies, and sender recovery will be
39    /// unwound.
40    #[arg(long)]
41    offline: bool,
42}
43
44impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
45    /// Execute `db stage unwind` command
46    pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
47        let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
48
49        let target = self.command.unwind_target(provider_factory.clone())?;
50
51        let highest_static_file_block = provider_factory
52            .static_file_provider()
53            .get_highest_static_files()
54            .max_block_num()
55            .filter(|highest_static_file_block| *highest_static_file_block > target);
56
57        // Execute a pipeline unwind if the start of the range overlaps the existing static
58        // files. If that's the case, then copy all available data from MDBX to static files, and
59        // only then, proceed with the unwind.
60        //
61        // We also execute a pipeline unwind if `offline` is specified, because we need to only
62        // unwind the data associated with offline stages.
63        if highest_static_file_block.is_some() || self.offline {
64            if self.offline {
65                info!(target: "reth::cli", "Performing an unwind for offline-only data!");
66            }
67
68            if let Some(highest_static_file_block) = highest_static_file_block {
69                info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
70            } else {
71                info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
72            }
73
74            // This will build an offline-only pipeline if the `offline` flag is enabled
75            let mut pipeline = self.build_pipeline(config, provider_factory)?;
76
77            // Move all applicable data from database to static files.
78            pipeline.move_to_static_files()?;
79
80            pipeline.unwind(target, None)?;
81        } else {
82            info!(target: "reth::cli", ?target, "Executing a database unwind.");
83            let provider = provider_factory.provider_rw()?;
84
85            provider
86                .remove_block_and_execution_above(target, StorageLocation::Both)
87                .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
88
89            // update finalized block if needed
90            let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
91            if last_saved_finalized_block_number.is_none_or(|f| f > target) {
92                provider.save_finalized_block_number(target)?;
93            }
94
95            provider.commit()?;
96        }
97
98        info!(target: "reth::cli", ?target, "Unwound blocks");
99
100        Ok(())
101    }
102
103    fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec> + CliNodeTypes>(
104        self,
105        config: Config,
106        provider_factory: ProviderFactory<N>,
107    ) -> Result<Pipeline<N>, eyre::Error> {
108        let stage_conf = &config.stages;
109        let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
110
111        let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
112
113        // Unwinding does not require a valid executor
114        let executor = NoopBlockExecutorProvider::<N::Primitives>::default();
115
116        let builder = if self.offline {
117            Pipeline::<N>::builder().add_stages(
118                OfflineStages::new(
119                    executor,
120                    NoopConsensus::arc(),
121                    config.stages,
122                    prune_modes.clone(),
123                )
124                .builder()
125                .disable(reth_stages::StageId::SenderRecovery),
126            )
127        } else {
128            Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
129                DefaultStages::new(
130                    provider_factory.clone(),
131                    tip_rx,
132                    Arc::new(NoopConsensus::default()),
133                    NoopHeaderDownloader::default(),
134                    NoopBodiesDownloader::default(),
135                    executor.clone(),
136                    stage_conf.clone(),
137                    prune_modes.clone(),
138                )
139                .set(ExecutionStage::new(
140                    executor,
141                    Arc::new(NoopConsensus::default()),
142                    ExecutionStageThresholds {
143                        max_blocks: None,
144                        max_changes: None,
145                        max_cumulative_gas: None,
146                        max_duration: None,
147                    },
148                    stage_conf.execution_external_clean_threshold(),
149                    ExExManagerHandle::empty(),
150                )),
151            )
152        };
153
154        let pipeline = builder.build(
155            provider_factory.clone(),
156            StaticFileProducer::new(provider_factory, prune_modes),
157        );
158        Ok(pipeline)
159    }
160}
161
162/// `reth stage unwind` subcommand
163#[derive(Subcommand, Debug, Eq, PartialEq)]
164enum Subcommands {
165    /// Unwinds the database from the latest block, until the given block number or hash has been
166    /// reached, that block is not included.
167    #[command(name = "to-block")]
168    ToBlock { target: BlockHashOrNumber },
169    /// Unwinds the database from the latest block, until the given number of blocks have been
170    /// reached.
171    #[command(name = "num-blocks")]
172    NumBlocks { amount: u64 },
173}
174
175impl Subcommands {
176    /// Returns the block to unwind to. The returned block will stay in database.
177    fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
178        &self,
179        factory: ProviderFactory<N>,
180    ) -> eyre::Result<u64> {
181        let provider = factory.provider()?;
182        let last = provider.last_block_number()?;
183        let target = match self {
184            Self::ToBlock { target } => match target {
185                BlockHashOrNumber::Hash(hash) => provider
186                    .block_number(*hash)?
187                    .ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
188                BlockHashOrNumber::Number(num) => *num,
189            },
190            Self::NumBlocks { amount } => last.saturating_sub(*amount),
191        };
192        if target > last {
193            eyre::bail!("Target block number is higher than the latest block number")
194        }
195        Ok(target)
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
202
203    use super::*;
204
205    #[test]
206    fn parse_unwind() {
207        let cmd = Command::<EthereumChainSpecParser>::parse_from([
208            "reth",
209            "--datadir",
210            "dir",
211            "to-block",
212            "100",
213        ]);
214        assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
215
216        let cmd = Command::<EthereumChainSpecParser>::parse_from([
217            "reth",
218            "--datadir",
219            "dir",
220            "num-blocks",
221            "100",
222        ]);
223        assert_eq!(cmd.command, Subcommands::NumBlocks { amount: 100 });
224    }
225}