use crate::common::{AccessRights, CliNodeTypes, Environment, EnvironmentArgs};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use clap::{Parser, Subcommand};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_cli::chainspec::ChainSpecParser;
use reth_config::Config;
use reth_consensus::noop::NoopConsensus;
use reth_db::DatabaseEnv;
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_evm::noop::NoopBlockExecutorProvider;
use reth_exex::ExExManagerHandle;
use reth_node_core::args::NetworkArgs;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader,
ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StorageLocation,
};
use reth_prune::PruneModes;
use reth_stages::{
sets::{DefaultStages, OfflineStages},
stages::ExecutionStage,
ExecutionStageThresholds, Pipeline, StageSet,
};
use reth_static_file::StaticFileProducer;
use std::sync::Arc;
use tokio::sync::watch;
use tracing::info;
#[derive(Debug, Parser)]
pub struct Command<C: ChainSpecParser> {
#[command(flatten)]
env: EnvironmentArgs<C>,
#[command(flatten)]
network: NetworkArgs,
#[command(subcommand)]
command: Subcommands,
#[arg(long)]
offline: bool,
}
impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C> {
pub async fn execute<N: CliNodeTypes<ChainSpec = C::ChainSpec>>(self) -> eyre::Result<()> {
let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
let target = self.command.unwind_target(provider_factory.clone())?;
let highest_static_file_block = provider_factory
.static_file_provider()
.get_highest_static_files()
.max_block_num()
.filter(|highest_static_file_block| *highest_static_file_block > target);
if highest_static_file_block.is_some() || self.offline {
if self.offline {
info!(target: "reth::cli", "Performing an unwind for offline-only data!");
}
if let Some(highest_static_file_block) = highest_static_file_block {
info!(target: "reth::cli", ?target, ?highest_static_file_block, "Executing a pipeline unwind.");
} else {
info!(target: "reth::cli", ?target, "Executing a pipeline unwind.");
}
let mut pipeline = self.build_pipeline(config, provider_factory)?;
pipeline.move_to_static_files()?;
pipeline.unwind(target, None)?;
} else {
info!(target: "reth::cli", ?target, "Executing a database unwind.");
let provider = provider_factory.provider_rw()?;
provider
.remove_block_and_execution_above(target, StorageLocation::Both)
.map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?;
let last_saved_finalized_block_number = provider.last_finalized_block_number()?;
if last_saved_finalized_block_number.is_none_or(|f| f > target) {
provider.save_finalized_block_number(target)?;
}
provider.commit()?;
}
info!(target: "reth::cli", ?target, "Unwound blocks");
Ok(())
}
fn build_pipeline<N: ProviderNodeTypes<ChainSpec = C::ChainSpec> + CliNodeTypes>(
self,
config: Config,
provider_factory: ProviderFactory<N>,
) -> Result<Pipeline<N>, eyre::Error> {
let stage_conf = &config.stages;
let prune_modes = config.prune.clone().map(|prune| prune.segments).unwrap_or_default();
let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = NoopBlockExecutorProvider::<N::Primitives>::default();
let builder = if self.offline {
Pipeline::<N>::builder().add_stages(
OfflineStages::new(executor, config.stages, PruneModes::default())
.builder()
.disable(reth_stages::StageId::SenderRecovery),
)
} else {
Pipeline::<N>::builder().with_tip_sender(tip_tx).add_stages(
DefaultStages::new(
provider_factory.clone(),
tip_rx,
Arc::new(NoopConsensus::default()),
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
executor.clone(),
stage_conf.clone(),
prune_modes.clone(),
)
.set(ExecutionStage::new(
executor,
ExecutionStageThresholds {
max_blocks: None,
max_changes: None,
max_cumulative_gas: None,
max_duration: None,
},
stage_conf.execution_external_clean_threshold(),
prune_modes,
ExExManagerHandle::empty(),
)),
)
};
let pipeline = builder.build(
provider_factory.clone(),
StaticFileProducer::new(provider_factory, PruneModes::default()),
);
Ok(pipeline)
}
}
#[derive(Subcommand, Debug, Eq, PartialEq)]
enum Subcommands {
#[command(name = "to-block")]
ToBlock { target: BlockHashOrNumber },
#[command(name = "num-blocks")]
NumBlocks { amount: u64 },
}
impl Subcommands {
fn unwind_target<N: ProviderNodeTypes<DB = Arc<DatabaseEnv>>>(
&self,
factory: ProviderFactory<N>,
) -> eyre::Result<u64> {
let provider = factory.provider()?;
let last = provider.last_block_number()?;
let target = match self {
Self::ToBlock { target } => match target {
BlockHashOrNumber::Hash(hash) => provider
.block_number(*hash)?
.ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?,
BlockHashOrNumber::Number(num) => *num,
},
Self::NumBlocks { amount } => last.saturating_sub(*amount),
};
if target > last {
eyre::bail!("Target block number is higher than the latest block number")
}
Ok(target)
}
}
#[cfg(test)]
mod tests {
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use super::*;
#[test]
fn parse_unwind() {
let cmd = Command::<EthereumChainSpecParser>::parse_from([
"reth",
"--datadir",
"dir",
"to-block",
"100",
]);
assert_eq!(cmd.command, Subcommands::ToBlock { target: BlockHashOrNumber::Number(100) });
let cmd = Command::<EthereumChainSpecParser>::parse_from([
"reth",
"--datadir",
"dir",
"num-blocks",
"100",
]);
assert_eq!(cmd.command, Subcommands::NumBlocks { amount: 100 });
}
}