1use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
4use alloy_eips::BlockHashOrNumber;
5use alloy_primitives::B256;
6use clap::{Parser, Subcommand};
7use reth_chainspec::{EthChainSpec, EthereumHardforks};
8use reth_cli::chainspec::ChainSpecParser;
9use reth_config::Config;
10use reth_consensus::noop::NoopConsensus;
11use reth_db::DatabaseEnv;
12use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
13use reth_evm::noop::NoopBlockExecutorProvider;
14use reth_exex::ExExManagerHandle;
15use reth_provider::{
16 providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader,
17 ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StorageLocation,
18};
19use reth_stages::{
20 sets::{DefaultStages, OfflineStages},
21 stages::ExecutionStage,
22 ExecutionStageThresholds, Pipeline, StageSet,
23};
24use reth_static_file::StaticFileProducer;
25use std::sync::Arc;
26use tokio::sync::watch;
27use tracing::info;
28
29#[derive(Debug, Parser)]
31pub struct Command<C: ChainSpecParser> {
32 #[command(flatten)]
33 env: EnvironmentArgs<C>,
34
35 #[command(subcommand)]
36 command: Subcommands,
37
38 #[arg(long)]
41 offline: bool,
42}
43
44impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
45 pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
47 let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
48
49 let target = self.command.unwind_target(provider_factory.clone())?;
50
51 let highest_static_file_block = provider_factory
52 .static_file_provider()
53 .get_highest_static_files()
54 .max_block_num()
55 .filter(|highest_static_file_block| *highest_static_file_block > target);
56
57 if highest_static_file_block.is_some() || self.offline {
64 if self.offline {
65 info!(target: "reth::cli", "Performing an unwind for offline-only data!");
66 }
67
68 if let Some(highest_static_file_block) = highest_static_file_block {
69 info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
70 } else {
71 info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
72 }
73
74 let mut pipeline = self.build_pipeline(config, provider_factory)?;
76
77 pipeline.move_to_static_files()?;
79
80 pipeline.unwind(target, None)?;
81 } else {
82 info!(target: "reth::cli", ?target, "Executing a database unwind.");
83 let provider = provider_factory.provider_rw()?;
84
85 provider
86 .remove_block_and_execution_above(target, StorageLocation::Both)
87 .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
88
89 let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
91 if last_saved_finalized_block_number.is_none_or(|f| f > target) {
92 provider.save_finalized_block_number(target)?;
93 }
94
95 provider.commit()?;
96 }
97
98 info!(target: "reth::cli", ?target, "Unwound blocks");
99
100 Ok(())
101 }
102
103 fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec> + CliNodeTypes>(
104 self,
105 config: Config,
106 provider_factory: ProviderFactory<N>,
107 ) -> Result<Pipeline<N>, eyre::Error> {
108 let stage_conf = &config.stages;
109 let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
110
111 let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
112
113 let executor = NoopBlockExecutorProvider::<N::Primitives>::default();
115
116 let builder = if self.offline {
117 Pipeline::<N>::builder().add_stages(
118 OfflineStages::new(
119 executor,
120 NoopConsensus::arc(),
121 config.stages,
122 prune_modes.clone(),
123 )
124 .builder()
125 .disable(reth_stages::StageId::SenderRecovery),
126 )
127 } else {
128 Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
129 DefaultStages::new(
130 provider_factory.clone(),
131 tip_rx,
132 Arc::new(NoopConsensus::default()),
133 NoopHeaderDownloader::default(),
134 NoopBodiesDownloader::default(),
135 executor.clone(),
136 stage_conf.clone(),
137 prune_modes.clone(),
138 )
139 .set(ExecutionStage::new(
140 executor,
141 Arc::new(NoopConsensus::default()),
142 ExecutionStageThresholds {
143 max_blocks: None,
144 max_changes: None,
145 max_cumulative_gas: None,
146 max_duration: None,
147 },
148 stage_conf.execution_external_clean_threshold(),
149 ExExManagerHandle::empty(),
150 )),
151 )
152 };
153
154 let pipeline = builder.build(
155 provider_factory.clone(),
156 StaticFileProducer::new(provider_factory, prune_modes),
157 );
158 Ok(pipeline)
159 }
160}
161
162#[derive(Subcommand, Debug, Eq, PartialEq)]
164enum Subcommands {
165 #[command(name = "to-block")]
168 ToBlock { target: BlockHashOrNumber },
169 #[command(name = "num-blocks")]
172 NumBlocks { amount: u64 },
173}
174
175impl Subcommands {
176 fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
178 &self,
179 factory: ProviderFactory<N>,
180 ) -> eyre::Result<u64> {
181 let provider = factory.provider()?;
182 let last = provider.last_block_number()?;
183 let target = match self {
184 Self::ToBlock { target } => match target {
185 BlockHashOrNumber::Hash(hash) => provider
186 .block_number(*hash)?
187 .ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
188 BlockHashOrNumber::Number(num) => *num,
189 },
190 Self::NumBlocks { amount } => last.saturating_sub(*amount),
191 };
192 if target > last {
193 eyre::bail!("Target block number is higher than the latest block number")
194 }
195 Ok(target)
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
202
203 use super::*;
204
205 #[test]
206 fn parse_unwind() {
207 let cmd = Command::<EthereumChainSpecParser>::parse_from([
208 "reth",
209 "--datadir",
210 "dir",
211 "to-block",
212 "100",
213 ]);
214 assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
215
216 let cmd = Command::<EthereumChainSpecParser>::parse_from([
217 "reth",
218 "--datadir",
219 "dir",
220 "num-blocks",
221 "100",
222 ]);
223 assert_eq!(cmd.command, Subcommands::NumBlocks { amount: 100 });
224 }
225}