reth_cli_commands/
import.rs

1//! Command that initializes the node by importing a chain from a file.
2use crate::common::{AccessRights, CliNodeComponents, CliNodeTypes, Environment, EnvironmentArgs};
3use alloy_primitives::B256;
4use clap::Parser;
5use futures::{Stream, StreamExt};
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_config::Config;
9use reth_consensus::{ConsensusError, FullConsensus};
10use reth_db_api::{tables, transaction::DbTx};
11use reth_downloaders::{
12    bodies::bodies::BodiesDownloaderBuilder,
13    file_client::{ChunkedFileReader, FileClient, DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE},
14    headers::reverse_headers::ReverseHeadersDownloaderBuilder,
15};
16use reth_evm::execute::BlockExecutorProvider;
17use reth_network_p2p::{
18    bodies::downloader::BodyDownloader,
19    headers::downloader::{HeaderDownloader, SyncTarget},
20};
21use reth_node_api::BlockTy;
22use reth_node_core::version::SHORT_VERSION;
23use reth_node_events::node::NodeEvent;
24use reth_provider::{
25    providers::ProviderNodeTypes, BlockNumReader, ChainSpecProvider, HeaderProvider, ProviderError,
26    ProviderFactory, StageCheckpointReader,
27};
28use reth_prune::PruneModes;
29use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
30use reth_static_file::StaticFileProducer;
31use std::{path::PathBuf, sync::Arc};
32use tokio::sync::watch;
33use tracing::{debug, error, info};
34
35/// Syncs RLP encoded blocks from a file.
36#[derive(Debug, Parser)]
37pub struct ImportCommand<C: ChainSpecParser> {
38    #[command(flatten)]
39    env: EnvironmentArgs<C>,
40
41    /// Disables stages that require state.
42    #[arg(long, verbatim_doc_comment)]
43    no_state: bool,
44
45    /// Chunk byte length to read from file.
46    #[arg(long, value_name = "CHUNK_LEN", verbatim_doc_comment)]
47    chunk_len: Option<u64>,
48
49    /// The path to a block file for import.
50    ///
51    /// The online stages (headers and bodies) are replaced by a file import, after which the
52    /// remaining stages are executed.
53    #[arg(value_name = "IMPORT_PATH", verbatim_doc_comment)]
54    path: PathBuf,
55}
56
57impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> ImportCommand<C> {
58    /// Execute `import` command
59    pub async fn execute<N, Comp, F>(self, components: F) -> eyre::Result<()>
60    where
61        N: CliNodeTypes<ChainSpec = C::ChainSpec>,
62        Comp: CliNodeComponents<N>,
63        F: FnOnce(Arc<N::ChainSpec>) -> Comp,
64    {
65        info!(target: "reth::cli", "reth {} starting", SHORT_VERSION);
66
67        if self.no_state {
68            info!(target: "reth::cli", "Disabled stages requiring state");
69        }
70
71        debug!(target: "reth::cli",
72            chunk_byte_len=self.chunk_len.unwrap_or(DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE),
73            "Chunking chain import"
74        );
75
76        let Environment { provider_factory, config, .. } = self.env.init::<N>(AccessRights::RW)?;
77
78        let components = components(provider_factory.chain_spec());
79        let executor = components.executor().clone();
80        let consensus = Arc::new(components.consensus().clone());
81        info!(target: "reth::cli", "Consensus engine initialized");
82
83        // open file
84        let mut reader = ChunkedFileReader::new(&self.path, self.chunk_len).await?;
85
86        let mut total_decoded_blocks = 0;
87        let mut total_decoded_txns = 0;
88
89        let mut sealed_header = provider_factory
90            .sealed_header(provider_factory.last_block_number()?)?
91            .expect("should have genesis");
92
93        while let Some(file_client) =
94            reader.next_chunk::<BlockTy<N>>(consensus.clone(), Some(sealed_header)).await?
95        {
96            // create a new FileClient from chunk read from file
97            info!(target: "reth::cli",
98                "Importing chain file chunk"
99            );
100
101            let tip = file_client.tip().ok_or(eyre::eyre!("file client has no tip"))?;
102            info!(target: "reth::cli", "Chain file chunk read");
103
104            total_decoded_blocks += file_client.headers_len();
105            total_decoded_txns += file_client.total_transactions();
106
107            let (mut pipeline, events) = build_import_pipeline(
108                &config,
109                provider_factory.clone(),
110                &consensus,
111                Arc::new(file_client),
112                StaticFileProducer::new(provider_factory.clone(), PruneModes::default()),
113                self.no_state,
114                executor.clone(),
115            )?;
116
117            // override the tip
118            pipeline.set_tip(tip);
119            debug!(target: "reth::cli", ?tip, "Tip manually set");
120
121            let provider = provider_factory.provider()?;
122
123            let latest_block_number =
124                provider.get_stage_checkpoint(StageId::Finish)?.map(|ch| ch.block_number);
125            tokio::spawn(reth_node_events::node::handle_events(None, latest_block_number, events));
126
127            // Run pipeline
128            info!(target: "reth::cli", "Starting sync pipeline");
129            tokio::select! {
130                res = pipeline.run() => res?,
131                _ = tokio::signal::ctrl_c() => {},
132            }
133
134            sealed_header = provider_factory
135                .sealed_header(provider_factory.last_block_number()?)?
136                .expect("should have genesis");
137        }
138
139        let provider = provider_factory.provider()?;
140
141        let total_imported_blocks = provider.tx_ref().entries::<tables::HeaderNumbers>()?;
142        let total_imported_txns = provider.tx_ref().entries::<tables::TransactionHashNumbers>()?;
143
144        if total_decoded_blocks != total_imported_blocks ||
145            total_decoded_txns != total_imported_txns
146        {
147            error!(target: "reth::cli",
148                total_decoded_blocks,
149                total_imported_blocks,
150                total_decoded_txns,
151                total_imported_txns,
152                "Chain was partially imported"
153            );
154        }
155
156        info!(target: "reth::cli",
157            total_imported_blocks,
158            total_imported_txns,
159            "Chain file imported"
160        );
161
162        Ok(())
163    }
164    /// Returns the underlying chain being used to run this command
165    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
166        Some(&self.env.chain)
167    }
168}
169
170/// Builds import pipeline.
171///
172/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
173/// will run.
174pub fn build_import_pipeline<N, C, E>(
175    config: &Config,
176    provider_factory: ProviderFactory<N>,
177    consensus: &Arc<C>,
178    file_client: Arc<FileClient<BlockTy<N>>>,
179    static_file_producer: StaticFileProducer<ProviderFactory<N>>,
180    disable_exec: bool,
181    executor: E,
182) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent<N::Primitives>>)>
183where
184    N: ProviderNodeTypes + CliNodeTypes,
185    C: FullConsensus<N::Primitives, Error = ConsensusError> + 'static,
186    E: BlockExecutorProvider<Primitives = N::Primitives>,
187{
188    if !file_client.has_canonical_blocks() {
189        eyre::bail!("unable to import non canonical blocks");
190    }
191
192    // Retrieve latest header found in the database.
193    let last_block_number = provider_factory.last_block_number()?;
194    let local_head = provider_factory
195        .sealed_header(last_block_number)?
196        .ok_or_else(|| ProviderError::HeaderNotFound(last_block_number.into()))?;
197
198    let mut header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
199        .build(file_client.clone(), consensus.clone())
200        .into_task();
201    // TODO: The pipeline should correctly configure the downloader on its own.
202    // Find the possibility to remove unnecessary pre-configuration.
203    header_downloader.update_local_head(local_head);
204    header_downloader.update_sync_target(SyncTarget::Tip(file_client.tip().unwrap()));
205
206    let mut body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies)
207        .build(file_client.clone(), consensus.clone(), provider_factory.clone())
208        .into_task();
209    // TODO: The pipeline should correctly configure the downloader on its own.
210    // Find the possibility to remove unnecessary pre-configuration.
211    body_downloader
212        .set_download_range(file_client.min_block().unwrap()..=file_client.max_block().unwrap())
213        .expect("failed to set download range");
214
215    let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
216
217    let max_block = file_client.max_block().unwrap_or(0);
218
219    let pipeline = Pipeline::builder()
220        .with_tip_sender(tip_tx)
221        // we want to sync all blocks the file client provides or 0 if empty
222        .with_max_block(max_block)
223        .with_fail_on_unwind(true)
224        .add_stages(
225            DefaultStages::new(
226                provider_factory.clone(),
227                tip_rx,
228                consensus.clone(),
229                header_downloader,
230                body_downloader,
231                executor,
232                config.stages.clone(),
233                PruneModes::default(),
234            )
235            .builder()
236            .disable_all_if(&StageId::STATE_REQUIRED, || disable_exec),
237        )
238        .build(provider_factory, static_file_producer);
239
240    let events = pipeline.events().map(Into::into);
241
242    Ok((pipeline, events))
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use reth_ethereum_cli::chainspec::{EthereumChainSpecParser, SUPPORTED_CHAINS};
249
250    #[test]
251    fn parse_common_import_command_chain_args() {
252        for chain in SUPPORTED_CHAINS {
253            let args: ImportCommand<EthereumChainSpecParser> =
254                ImportCommand::parse_from(["reth", "--chain", chain, "."]);
255            assert_eq!(
256                Ok(args.env.chain.chain),
257                chain.parse::<reth_chainspec::Chain>(),
258                "failed to parse chain {chain}"
259            );
260        }
261    }
262}