Skip to main content

reth_cli_commands/download/
mod.rs

1pub mod config_gen;
2pub mod manifest;
3pub mod manifest_cmd;
4mod tui;
5
6use crate::common::EnvironmentArgs;
7use blake3::Hasher;
8use clap::Parser;
9use config_gen::{config_for_selections, write_config};
10use eyre::Result;
11use futures::stream::{self, StreamExt};
12use lz4::Decoder;
13use manifest::{
14    ArchiveDescriptor, ComponentSelection, OutputFileChecksum, SnapshotComponentType,
15    SnapshotManifest,
16};
17use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
18use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
19use reth_cli::chainspec::ChainSpecParser;
20use reth_db::{init_db, Database};
21use reth_db_api::transaction::DbTx;
22use reth_fs_util as fs;
23use reth_node_core::args::DefaultPruningValues;
24use reth_prune_types::PruneMode;
25use std::{
26    borrow::Cow,
27    collections::BTreeMap,
28    fs::OpenOptions,
29    io::{self, BufWriter, Read, Write},
30    path::{Path, PathBuf},
31    sync::{
32        atomic::{AtomicBool, AtomicU64, Ordering},
33        Arc, OnceLock,
34    },
35    time::{Duration, Instant},
36};
37use tar::Archive;
38use tokio::task;
39use tracing::{info, warn};
40use tui::{run_selector, SelectorOutput};
41use url::Url;
42use zstd::stream::read::Decoder as ZstdDecoder;
43
44const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
45const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
46const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
47const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
48const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
49
50/// Maximum number of concurrent archive downloads.
51const MAX_CONCURRENT_DOWNLOADS: usize = 8;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub(crate) enum SelectionPreset {
55    Minimal,
56    Full,
57    Archive,
58}
59
60struct ResolvedComponents {
61    selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
62    preset: Option<SelectionPreset>,
63}
64
65/// Global static download defaults
66static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
67
68/// Download configuration defaults
69///
70/// Global defaults can be set via [`DownloadDefaults::try_init`].
71#[derive(Debug, Clone)]
72pub struct DownloadDefaults {
73    /// List of available snapshot sources
74    pub available_snapshots: Vec<Cow<'static, str>>,
75    /// Default base URL for snapshots
76    pub default_base_url: Cow<'static, str>,
77    /// Default base URL for chain-aware snapshots.
78    ///
79    /// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
80    /// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
81    /// the resulting URL would be `https://snapshots.example.com/1`.
82    ///
83    /// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
84    pub default_chain_aware_base_url: Option<Cow<'static, str>>,
85    /// Optional custom long help text that overrides the generated help
86    pub long_help: Option<String>,
87}
88
89impl DownloadDefaults {
90    /// Initialize the global download defaults with this configuration
91    pub fn try_init(self) -> Result<(), Self> {
92        DOWNLOAD_DEFAULTS.set(self)
93    }
94
95    /// Get a reference to the global download defaults
96    pub fn get_global() -> &'static DownloadDefaults {
97        DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
98    }
99
100    /// Default download configuration with defaults from merkle.io and publicnode
101    pub fn default_download_defaults() -> Self {
102        Self {
103            available_snapshots: vec![
104                Cow::Borrowed("https://www.merkle.io/snapshots (default, mainnet archive)"),
105                Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
106            ],
107            default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
108            default_chain_aware_base_url: None,
109            long_help: None,
110        }
111    }
112
113    /// Generates the long help text for the download URL argument using these defaults.
114    ///
115    /// If a custom long_help is set, it will be returned. Otherwise, help text is generated
116    /// from the available_snapshots list.
117    pub fn long_help(&self) -> String {
118        if let Some(ref custom_help) = self.long_help {
119            return custom_help.clone();
120        }
121
122        let mut help = String::from(
123            "Specify a snapshot URL or let the command propose a default one.\n\nAvailable snapshot sources:\n",
124        );
125
126        for source in &self.available_snapshots {
127            help.push_str("- ");
128            help.push_str(source);
129            help.push('\n');
130        }
131
132        help.push_str(
133            "\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
134        );
135        help.push_str(
136            self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
137        );
138        help.push_str(
139            ".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
140        );
141        help
142    }
143
144    /// Add a snapshot source to the list
145    pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
146        self.available_snapshots.push(source.into());
147        self
148    }
149
150    /// Replace all snapshot sources
151    pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
152        self.available_snapshots = sources;
153        self
154    }
155
156    /// Set the default base URL, e.g. `https://downloads.merkle.io`.
157    pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
158        self.default_base_url = url.into();
159        self
160    }
161
162    /// Set the default chain-aware base URL.
163    pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
164        self.default_chain_aware_base_url = Some(url.into());
165        self
166    }
167
168    /// Builder: Set custom long help text, overriding the generated help
169    pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
170        self.long_help = Some(help.into());
171        self
172    }
173}
174
175impl Default for DownloadDefaults {
176    fn default() -> Self {
177        Self::default_download_defaults()
178    }
179}
180
181/// CLI command that downloads snapshot archives and configures a reth node from them.
182#[derive(Debug, Parser)]
183pub struct DownloadCommand<C: ChainSpecParser> {
184    #[command(flatten)]
185    env: EnvironmentArgs<C>,
186
187    /// Custom URL to download a single snapshot archive (legacy mode).
188    ///
189    /// When provided, downloads and extracts a single archive without component selection.
190    #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
191    url: Option<String>,
192
193    /// URL to a snapshot manifest.json for modular component downloads.
194    ///
195    /// When provided, fetches this manifest instead of discovering it from the default
196    /// base URL. Useful for testing with custom or local manifests.
197    #[arg(long, value_name = "URL", conflicts_with = "url")]
198    manifest_url: Option<String>,
199
200    /// Local path to a snapshot manifest.json for modular component downloads.
201    #[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
202    manifest_path: Option<PathBuf>,
203
204    /// Include transaction static files.
205    #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
206    with_txs: bool,
207
208    /// Include receipt static files.
209    #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
210    with_receipts: bool,
211
212    /// Include account and storage history static files.
213    #[arg(long, alias = "with-changesets", conflicts_with_all = ["minimal", "full", "archive"])]
214    with_state_history: bool,
215
216    /// Download all available components (archive node, no pruning).
217    #[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "minimal", "full"])]
218    archive: bool,
219
220    /// Download the minimal component set (same default as --non-interactive).
221    #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "archive", "full"])]
222    minimal: bool,
223
224    /// Download the full node component set (matches default full prune settings).
225    #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "archive", "minimal"])]
226    full: bool,
227
228    /// Skip optional RocksDB indices even when archive components are selected.
229    ///
230    /// This affects `--archive`/`--all` and TUI archive preset (`a`).
231    #[arg(long, conflicts_with = "url")]
232    without_rocksdb: bool,
233
234    /// Skip interactive component selection. Downloads the minimal set
235    /// (state + headers + transactions + changesets) unless explicit --with-* flags narrow it.
236    #[arg(long, short = 'y')]
237    non_interactive: bool,
238
239    /// Use resumable two-phase downloads (download to disk first, then extract).
240    ///
241    /// Archives are downloaded to a .part file with HTTP Range resume support
242    /// before extraction. Slower but tolerates network interruptions without
243    /// restarting. By default, archives stream directly into the extractor.
244    #[arg(long)]
245    resumable: bool,
246
247    /// Maximum number of concurrent modular archive workers.
248    #[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
249    download_concurrency: usize,
250}
251
252impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
253    pub async fn execute<N>(self) -> Result<()> {
254        let chain = self.env.chain.chain();
255        let chain_id = chain.id();
256        let data_dir = self.env.datadir.clone().resolve_datadir(chain);
257        fs::create_dir_all(&data_dir)?;
258
259        // Legacy single-URL mode: download one archive and extract it
260        if let Some(url) = self.url {
261            info!(target: "reth::cli",
262                dir = ?data_dir.data_dir(),
263                url = %url,
264                "Starting snapshot download and extraction"
265            );
266
267            stream_and_extract(&url, data_dir.data_dir(), None, self.resumable).await?;
268            info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
269
270            return Ok(());
271        }
272
273        // Modular download: fetch manifest and select components
274        let manifest_source = self.resolve_manifest_source(chain_id);
275
276        info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
277        let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
278        manifest.base_url = Some(resolve_manifest_base_url(&manifest, &manifest_source)?);
279
280        info!(target: "reth::cli",
281            block = manifest.block,
282            chain_id = manifest.chain_id,
283            storage_version = %manifest.storage_version,
284            components = manifest.components.len(),
285            "Loaded snapshot manifest"
286        );
287
288        let ResolvedComponents { mut selections, preset } = self.resolve_components(&manifest)?;
289
290        if matches!(preset, Some(SelectionPreset::Archive)) {
291            inject_archive_only_components(&mut selections, &manifest, !self.without_rocksdb);
292        }
293
294        // Collect all archive descriptors across selected components.
295        let target_dir = data_dir.data_dir();
296        let mut all_downloads: Vec<PlannedArchive> = Vec::new();
297        for (ty, sel) in &selections {
298            let distance = match sel {
299                ComponentSelection::All => None,
300                ComponentSelection::Distance(d) => Some(*d),
301                ComponentSelection::None => continue,
302            };
303            let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
304            let name = ty.display_name().to_string();
305
306            if !descriptors.is_empty() {
307                info!(target: "reth::cli",
308                    component = %name,
309                    archives = descriptors.len(),
310                    selection = %sel,
311                    "Queued component for download"
312                );
313            }
314
315            for descriptor in descriptors {
316                if descriptor.output_files.is_empty() {
317                    eyre::bail!(
318                        "Invalid modular manifest: {} is missing plain output checksum metadata",
319                        descriptor.file_name
320                    );
321                }
322                all_downloads.push(PlannedArchive {
323                    ty: *ty,
324                    component: name.clone(),
325                    archive: descriptor,
326                });
327            }
328        }
329
330        all_downloads.sort_by(|a, b| {
331            archive_priority_rank(a.ty)
332                .cmp(&archive_priority_rank(b.ty))
333                .then_with(|| a.component.cmp(&b.component))
334                .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
335        });
336
337        let download_cache_dir = if self.resumable {
338            let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
339            fs::create_dir_all(&dir)?;
340            Some(dir)
341        } else {
342            None
343        };
344
345        let total_archives = all_downloads.len();
346        let total_size: u64 = selections
347            .iter()
348            .map(|(ty, sel)| match sel {
349                ComponentSelection::All => manifest.size_for_distance(*ty, None),
350                ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
351                ComponentSelection::None => 0,
352            })
353            .sum();
354
355        let startup_summary = summarize_download_startup(&all_downloads, target_dir)?;
356        info!(target: "reth::cli",
357            reusable = startup_summary.reusable,
358            needs_download = startup_summary.needs_download,
359            "Startup integrity summary (plain output files)"
360        );
361
362        info!(target: "reth::cli",
363            archives = total_archives,
364            total = %DownloadProgress::format_size(total_size),
365            "Downloading all archives"
366        );
367
368        let shared = SharedProgress::new(total_size, total_archives as u64);
369        let progress_handle = spawn_progress_display(Arc::clone(&shared));
370
371        let target = target_dir.to_path_buf();
372        let cache_dir = download_cache_dir;
373        let resumable = self.resumable;
374        let download_concurrency = self.download_concurrency.max(1);
375        let results: Vec<Result<()>> = stream::iter(all_downloads)
376            .map(|planned| {
377                let dir = target.clone();
378                let cache = cache_dir.clone();
379                let sp = Arc::clone(&shared);
380                async move {
381                    process_modular_archive(planned, &dir, cache.as_deref(), Some(sp), resumable)
382                        .await?;
383                    Ok(())
384                }
385            })
386            .buffer_unordered(download_concurrency)
387            .collect()
388            .await;
389
390        shared.done.store(true, Ordering::Relaxed);
391        let _ = progress_handle.await;
392
393        // Check for errors
394        for result in results {
395            result?;
396        }
397
398        // Generate reth.toml and set prune checkpoints
399        let config =
400            config_for_selections(&selections, &manifest, preset, Some(self.env.chain.as_ref()));
401        if write_config(&config, target_dir)? {
402            let desc = config_gen::describe_prune_config(&config);
403            info!(target: "reth::cli", "{}", desc.join(", "));
404        }
405
406        // Open the DB to write checkpoints
407        let db_path = data_dir.db();
408        let db = init_db(&db_path, self.env.db.database_args())?;
409
410        // Write prune checkpoints to the DB so the pruner knows data before the
411        // snapshot block is already in the expected pruned state
412        let should_write_prune = config.prune.segments != Default::default();
413        let should_reset_indices = should_reset_index_stage_checkpoints(&selections);
414        if should_write_prune || should_reset_indices {
415            let tx = db.tx_mut()?;
416
417            if should_write_prune {
418                config_gen::write_prune_checkpoints_tx(&tx, &config, manifest.block)?;
419            }
420
421            // Reset stage checkpoints for history indexing stages only if RocksDB
422            // indices weren't downloaded. When archive snapshots include the
423            // optional RocksDB indices component, we preserve source checkpoints.
424            if should_reset_indices {
425                config_gen::reset_index_stage_checkpoints_tx(&tx)?;
426            }
427
428            tx.commit()?;
429        }
430
431        info!(target: "reth::cli", "Snapshot download complete. Run `reth node` to start syncing.");
432
433        Ok(())
434    }
435
436    /// Determines which components to download based on CLI flags or interactive selection.
437    fn resolve_components(&self, manifest: &SnapshotManifest) -> Result<ResolvedComponents> {
438        let available = |ty: SnapshotComponentType| manifest.component(ty).is_some();
439
440        // --archive/--all: everything available as All
441        if self.archive {
442            return Ok(ResolvedComponents {
443                selections: SnapshotComponentType::ALL
444                    .iter()
445                    .copied()
446                    .filter(|ty| available(*ty))
447                    .filter(|ty| {
448                        !self.without_rocksdb || *ty != SnapshotComponentType::RocksdbIndices
449                    })
450                    .map(|ty| (ty, ComponentSelection::All))
451                    .collect(),
452                preset: Some(SelectionPreset::Archive),
453            });
454        }
455
456        if self.full {
457            return Ok(ResolvedComponents {
458                selections: self.full_preset_selections(manifest),
459                preset: Some(SelectionPreset::Full),
460            });
461        }
462
463        if self.minimal {
464            return Ok(ResolvedComponents {
465                selections: self.minimal_preset_selections(manifest),
466                preset: Some(SelectionPreset::Minimal),
467            });
468        }
469
470        let has_explicit_flags = self.with_txs || self.with_receipts || self.with_state_history;
471
472        if has_explicit_flags {
473            let mut selections = BTreeMap::new();
474            // Required components always All
475            if available(SnapshotComponentType::State) {
476                selections.insert(SnapshotComponentType::State, ComponentSelection::All);
477            }
478            if available(SnapshotComponentType::Headers) {
479                selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
480            }
481            if self.with_txs && available(SnapshotComponentType::Transactions) {
482                selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
483            }
484            if self.with_receipts && available(SnapshotComponentType::Receipts) {
485                selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
486            }
487            if self.with_state_history {
488                if available(SnapshotComponentType::AccountChangesets) {
489                    selections
490                        .insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
491                }
492                if available(SnapshotComponentType::StorageChangesets) {
493                    selections
494                        .insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
495                }
496            }
497            return Ok(ResolvedComponents { selections, preset: None });
498        }
499
500        if self.non_interactive {
501            return Ok(ResolvedComponents {
502                selections: self.minimal_preset_selections(manifest),
503                preset: Some(SelectionPreset::Minimal),
504            });
505        }
506
507        // Interactive TUI
508        let full_preset = self.full_preset_selections(manifest);
509        let SelectorOutput { selections, preset } = run_selector(manifest.clone(), &full_preset)?;
510        let selected =
511            selections.into_iter().filter(|(_, sel)| *sel != ComponentSelection::None).collect();
512
513        Ok(ResolvedComponents { selections: selected, preset })
514    }
515
516    fn minimal_preset_selections(
517        &self,
518        manifest: &SnapshotManifest,
519    ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
520        SnapshotComponentType::ALL
521            .iter()
522            .copied()
523            .filter(|ty| manifest.component(*ty).is_some())
524            .map(|ty| (ty, ty.minimal_selection()))
525            .collect()
526    }
527
528    fn full_preset_selections(
529        &self,
530        manifest: &SnapshotManifest,
531    ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
532        let mut selections = BTreeMap::new();
533
534        for ty in [
535            SnapshotComponentType::State,
536            SnapshotComponentType::Headers,
537            SnapshotComponentType::Transactions,
538            SnapshotComponentType::Receipts,
539            SnapshotComponentType::AccountChangesets,
540            SnapshotComponentType::StorageChangesets,
541            SnapshotComponentType::TransactionSenders,
542            SnapshotComponentType::RocksdbIndices,
543        ] {
544            if manifest.component(ty).is_none() {
545                continue;
546            }
547
548            let selection = self.full_selection_for_component(ty, manifest.block);
549            if selection != ComponentSelection::None {
550                selections.insert(ty, selection);
551            }
552        }
553
554        selections
555    }
556
557    fn full_selection_for_component(
558        &self,
559        ty: SnapshotComponentType,
560        snapshot_block: u64,
561    ) -> ComponentSelection {
562        let defaults = DefaultPruningValues::get_global();
563        match ty {
564            SnapshotComponentType::State | SnapshotComponentType::Headers => {
565                ComponentSelection::All
566            }
567            SnapshotComponentType::Transactions => {
568                if defaults.full_bodies_history_use_pre_merge {
569                    match self
570                        .env
571                        .chain
572                        .ethereum_fork_activation(EthereumHardfork::Paris)
573                        .block_number()
574                    {
575                        Some(paris) if snapshot_block >= paris => {
576                            ComponentSelection::Distance(snapshot_block - paris + 1)
577                        }
578                        Some(_) => ComponentSelection::None,
579                        None => ComponentSelection::All,
580                    }
581                } else {
582                    selection_from_prune_mode(
583                        defaults.full_prune_modes.bodies_history,
584                        snapshot_block,
585                    )
586                }
587            }
588            SnapshotComponentType::Receipts => {
589                selection_from_prune_mode(defaults.full_prune_modes.receipts, snapshot_block)
590            }
591            SnapshotComponentType::AccountChangesets => {
592                selection_from_prune_mode(defaults.full_prune_modes.account_history, snapshot_block)
593            }
594            SnapshotComponentType::StorageChangesets => {
595                selection_from_prune_mode(defaults.full_prune_modes.storage_history, snapshot_block)
596            }
597            SnapshotComponentType::TransactionSenders => {
598                selection_from_prune_mode(defaults.full_prune_modes.sender_recovery, snapshot_block)
599            }
600            // Keep hidden by default in full mode; if users want indices they can use archive.
601            SnapshotComponentType::RocksdbIndices => ComponentSelection::None,
602        }
603    }
604
605    fn resolve_manifest_source(&self, chain_id: u64) -> String {
606        if let Some(path) = &self.manifest_path {
607            return path.display().to_string();
608        }
609
610        match &self.manifest_url {
611            Some(url) => url.clone(),
612            None => {
613                let base_url = get_base_url(chain_id);
614                format!("{base_url}/manifest.json")
615            }
616        }
617    }
618}
619
620fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> ComponentSelection {
621    match mode {
622        None => ComponentSelection::All,
623        Some(PruneMode::Full) => ComponentSelection::None,
624        Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
625        Some(PruneMode::Before(block)) => {
626            if snapshot_block >= block {
627                ComponentSelection::Distance(snapshot_block - block + 1)
628            } else {
629                ComponentSelection::None
630            }
631        }
632    }
633}
634
635/// If all data components (txs, receipts, changesets) are `All`, automatically
636/// include hidden archive-only components when available in the manifest.
637fn inject_archive_only_components(
638    selections: &mut BTreeMap<SnapshotComponentType, ComponentSelection>,
639    manifest: &SnapshotManifest,
640    include_rocksdb: bool,
641) {
642    let is_all =
643        |ty: SnapshotComponentType| selections.get(&ty).copied() == Some(ComponentSelection::All);
644
645    let is_archive = is_all(SnapshotComponentType::Transactions) &&
646        is_all(SnapshotComponentType::Receipts) &&
647        is_all(SnapshotComponentType::AccountChangesets) &&
648        is_all(SnapshotComponentType::StorageChangesets);
649
650    if !is_archive {
651        return;
652    }
653
654    for component in
655        [SnapshotComponentType::TransactionSenders, SnapshotComponentType::RocksdbIndices]
656    {
657        if component == SnapshotComponentType::RocksdbIndices && !include_rocksdb {
658            continue;
659        }
660
661        if manifest.component(component).is_some() {
662            selections.insert(component, ComponentSelection::All);
663        }
664    }
665}
666
667fn should_reset_index_stage_checkpoints(
668    selections: &BTreeMap<SnapshotComponentType, ComponentSelection>,
669) -> bool {
670    !matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
671}
672
673impl<C: ChainSpecParser> DownloadCommand<C> {
674    /// Returns the underlying chain being used to run this command
675    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
676        Some(&self.env.chain)
677    }
678}
679
680/// Tracks download progress and throttles display updates to every 100ms.
681pub(crate) struct DownloadProgress {
682    downloaded: u64,
683    total_size: u64,
684    last_displayed: Instant,
685    started_at: Instant,
686}
687
688#[derive(Debug, Clone)]
689struct PlannedArchive {
690    ty: SnapshotComponentType,
691    component: String,
692    archive: ArchiveDescriptor,
693}
694
695const fn archive_priority_rank(ty: SnapshotComponentType) -> u8 {
696    match ty {
697        SnapshotComponentType::State => 0,
698        SnapshotComponentType::RocksdbIndices => 1,
699        _ => 2,
700    }
701}
702
703#[derive(Debug, Default, Clone, Copy)]
704struct DownloadStartupSummary {
705    reusable: usize,
706    needs_download: usize,
707}
708
709fn summarize_download_startup(
710    all_downloads: &[PlannedArchive],
711    target_dir: &Path,
712) -> Result<DownloadStartupSummary> {
713    let mut summary = DownloadStartupSummary::default();
714
715    for planned in all_downloads {
716        if verify_output_files(target_dir, &planned.archive.output_files)? {
717            summary.reusable += 1;
718        } else {
719            summary.needs_download += 1;
720        }
721    }
722
723    Ok(summary)
724}
725
726impl DownloadProgress {
727    /// Creates new progress tracker with given total size
728    fn new(total_size: u64) -> Self {
729        let now = Instant::now();
730        Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
731    }
732
733    /// Converts bytes to human readable format (B, KB, MB, GB)
734    pub(crate) fn format_size(size: u64) -> String {
735        let mut size = size as f64;
736        let mut unit_index = 0;
737
738        while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
739            size /= 1024.0;
740            unit_index += 1;
741        }
742
743        format!("{:.2} {}", size, BYTE_UNITS[unit_index])
744    }
745
746    /// Format duration as human readable string
747    fn format_duration(duration: Duration) -> String {
748        let secs = duration.as_secs();
749        if secs < 60 {
750            format!("{secs}s")
751        } else if secs < 3600 {
752            format!("{}m {}s", secs / 60, secs % 60)
753        } else {
754            format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
755        }
756    }
757
758    /// Updates progress bar (for single-archive legacy downloads)
759    fn update(&mut self, chunk_size: u64) -> Result<()> {
760        self.downloaded += chunk_size;
761
762        if self.last_displayed.elapsed() >= Duration::from_millis(100) {
763            let formatted_downloaded = Self::format_size(self.downloaded);
764            let formatted_total = Self::format_size(self.total_size);
765            let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
766
767            let elapsed = self.started_at.elapsed();
768            let eta = if self.downloaded > 0 {
769                let remaining = self.total_size.saturating_sub(self.downloaded);
770                let speed = self.downloaded as f64 / elapsed.as_secs_f64();
771                if speed > 0.0 {
772                    Duration::from_secs_f64(remaining as f64 / speed)
773                } else {
774                    Duration::ZERO
775                }
776            } else {
777                Duration::ZERO
778            };
779            let eta_str = Self::format_duration(eta);
780
781            print!(
782                "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str}     ",
783            );
784            io::stdout().flush()?;
785            self.last_displayed = Instant::now();
786        }
787
788        Ok(())
789    }
790}
791
792/// Shared progress counter for parallel downloads.
793///
794/// Each download thread atomically increments `downloaded`. A single display
795/// task on the main thread reads the counter periodically and prints one
796/// aggregated progress line.
797struct SharedProgress {
798    downloaded: AtomicU64,
799    total_size: u64,
800    total_archives: u64,
801    archives_done: AtomicU64,
802    done: AtomicBool,
803}
804
805impl SharedProgress {
806    fn new(total_size: u64, total_archives: u64) -> Arc<Self> {
807        Arc::new(Self {
808            downloaded: AtomicU64::new(0),
809            total_size,
810            total_archives,
811            archives_done: AtomicU64::new(0),
812            done: AtomicBool::new(false),
813        })
814    }
815
816    fn add(&self, bytes: u64) {
817        self.downloaded.fetch_add(bytes, Ordering::Relaxed);
818    }
819
820    fn archive_done(&self) {
821        self.archives_done.fetch_add(1, Ordering::Relaxed);
822    }
823}
824
825/// Spawns a background task that prints aggregated download progress.
826/// Returns a handle; drop it (or call `.abort()`) to stop.
827fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
828    tokio::spawn(async move {
829        let started_at = Instant::now();
830        let mut interval = tokio::time::interval(Duration::from_secs(3));
831        interval.tick().await; // first tick is immediate, skip it
832        loop {
833            interval.tick().await;
834
835            if progress.done.load(Ordering::Relaxed) {
836                break;
837            }
838
839            let downloaded = progress.downloaded.load(Ordering::Relaxed);
840            let total = progress.total_size;
841            if total == 0 {
842                continue;
843            }
844
845            let done = progress.archives_done.load(Ordering::Relaxed);
846            let all = progress.total_archives;
847            let pct = (downloaded as f64 / total as f64) * 100.0;
848            let dl = DownloadProgress::format_size(downloaded);
849            let tot = DownloadProgress::format_size(total);
850
851            let elapsed = started_at.elapsed();
852            let remaining = total.saturating_sub(downloaded);
853
854            if remaining == 0 {
855                // Downloads done, waiting for extraction
856                info!(target: "reth::cli",
857                    archives = format_args!("{done}/{all}"),
858                    downloaded = %dl,
859                    "Extracting remaining archives"
860                );
861            } else {
862                let eta = if downloaded > 0 {
863                    let speed = downloaded as f64 / elapsed.as_secs_f64();
864                    if speed > 0.0 {
865                        DownloadProgress::format_duration(Duration::from_secs_f64(
866                            remaining as f64 / speed,
867                        ))
868                    } else {
869                        "??".to_string()
870                    }
871                } else {
872                    "??".to_string()
873                };
874
875                info!(target: "reth::cli",
876                    archives = format_args!("{done}/{all}"),
877                    progress = format_args!("{pct:.1}%"),
878                    downloaded = %dl,
879                    total = %tot,
880                    eta = %eta,
881                    "Downloading"
882                );
883            }
884        }
885
886        // Final line
887        let downloaded = progress.downloaded.load(Ordering::Relaxed);
888        let dl = DownloadProgress::format_size(downloaded);
889        let tot = DownloadProgress::format_size(progress.total_size);
890        let elapsed = DownloadProgress::format_duration(started_at.elapsed());
891        info!(target: "reth::cli",
892            downloaded = %dl,
893            total = %tot,
894            elapsed = %elapsed,
895            "Downloads complete"
896        );
897    })
898}
899
900/// Adapter to track progress while reading (used for extraction in legacy path)
901struct ProgressReader<R> {
902    reader: R,
903    progress: DownloadProgress,
904}
905
906impl<R: Read> ProgressReader<R> {
907    fn new(reader: R, total_size: u64) -> Self {
908        Self { reader, progress: DownloadProgress::new(total_size) }
909    }
910}
911
912impl<R: Read> Read for ProgressReader<R> {
913    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
914        let bytes = self.reader.read(buf)?;
915        if bytes > 0 &&
916            let Err(e) = self.progress.update(bytes as u64)
917        {
918            return Err(io::Error::other(e));
919        }
920        Ok(bytes)
921    }
922}
923
924/// Supported compression formats for snapshots
925#[derive(Debug, Clone, Copy)]
926enum CompressionFormat {
927    Lz4,
928    Zstd,
929}
930
931impl CompressionFormat {
932    /// Detect compression format from file extension
933    fn from_url(url: &str) -> Result<Self> {
934        let path =
935            Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
936
937        if path.ends_with(EXTENSION_TAR_LZ4) {
938            Ok(Self::Lz4)
939        } else if path.ends_with(EXTENSION_TAR_ZSTD) {
940            Ok(Self::Zstd)
941        } else {
942            Err(eyre::eyre!(
943                "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
944                path
945            ))
946        }
947    }
948}
949
950/// Extracts a compressed tar archive to the target directory with progress tracking.
951fn extract_archive<R: Read>(
952    reader: R,
953    total_size: u64,
954    format: CompressionFormat,
955    target_dir: &Path,
956) -> Result<()> {
957    let progress_reader = ProgressReader::new(reader, total_size);
958
959    match format {
960        CompressionFormat::Lz4 => {
961            let decoder = Decoder::new(progress_reader)?;
962            Archive::new(decoder).unpack(target_dir)?;
963        }
964        CompressionFormat::Zstd => {
965            let decoder = ZstdDecoder::new(progress_reader)?;
966            Archive::new(decoder).unpack(target_dir)?;
967        }
968    }
969
970    println!();
971    Ok(())
972}
973
974/// Extracts a compressed tar archive without progress tracking.
975fn extract_archive_raw<R: Read>(
976    reader: R,
977    format: CompressionFormat,
978    target_dir: &Path,
979) -> Result<()> {
980    match format {
981        CompressionFormat::Lz4 => {
982            Archive::new(Decoder::new(reader)?).unpack(target_dir)?;
983        }
984        CompressionFormat::Zstd => {
985            Archive::new(ZstdDecoder::new(reader)?).unpack(target_dir)?;
986        }
987    }
988    Ok(())
989}
990
991/// Extracts a snapshot from a local file.
992fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
993    let file = std::fs::File::open(path)?;
994    let total_size = file.metadata()?.len();
995    info!(target: "reth::cli",
996        file = %path.display(),
997        size = %DownloadProgress::format_size(total_size),
998        "Extracting local archive"
999    );
1000    let start = Instant::now();
1001    extract_archive(file, total_size, format, target_dir)?;
1002    info!(target: "reth::cli",
1003        file = %path.display(),
1004        elapsed = %DownloadProgress::format_duration(start.elapsed()),
1005        "Local extraction complete"
1006    );
1007    Ok(())
1008}
1009
1010const MAX_DOWNLOAD_RETRIES: u32 = 10;
1011const RETRY_BACKOFF_SECS: u64 = 5;
1012
1013/// Wrapper that tracks download progress while writing data.
1014/// Used with [`io::copy`] to display progress during downloads.
1015struct ProgressWriter<W> {
1016    inner: W,
1017    progress: DownloadProgress,
1018}
1019
1020impl<W: Write> Write for ProgressWriter<W> {
1021    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1022        let n = self.inner.write(buf)?;
1023        let _ = self.progress.update(n as u64);
1024        Ok(n)
1025    }
1026
1027    fn flush(&mut self) -> io::Result<()> {
1028        self.inner.flush()
1029    }
1030}
1031
1032/// Wrapper that bumps a shared atomic counter while writing data.
1033/// Used for parallel downloads where a single display task shows aggregated progress.
1034struct SharedProgressWriter<W> {
1035    inner: W,
1036    progress: Arc<SharedProgress>,
1037}
1038
1039impl<W: Write> Write for SharedProgressWriter<W> {
1040    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1041        let n = self.inner.write(buf)?;
1042        self.progress.add(n as u64);
1043        Ok(n)
1044    }
1045
1046    fn flush(&mut self) -> io::Result<()> {
1047        self.inner.flush()
1048    }
1049}
1050
1051/// Wrapper that bumps a shared atomic counter while reading data.
1052/// Used for streaming downloads where a single display task shows aggregated progress.
1053struct SharedProgressReader<R> {
1054    inner: R,
1055    progress: Arc<SharedProgress>,
1056}
1057
1058impl<R: Read> Read for SharedProgressReader<R> {
1059    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1060        let n = self.inner.read(buf)?;
1061        self.progress.add(n as u64);
1062        Ok(n)
1063    }
1064}
1065
1066/// Downloads a file with resume support using HTTP Range requests.
1067/// Automatically retries on failure, resuming from where it left off.
1068/// Returns the path to the downloaded file and its total size.
1069///
1070/// When `shared` is provided, progress is reported to the shared counter
1071/// (for parallel downloads). Otherwise uses a local progress bar.
1072fn resumable_download(
1073    url: &str,
1074    target_dir: &Path,
1075    shared: Option<&Arc<SharedProgress>>,
1076) -> Result<(PathBuf, u64)> {
1077    let file_name = Url::parse(url)
1078        .ok()
1079        .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
1080        .unwrap_or_else(|| "snapshot.tar".to_string());
1081
1082    let final_path = target_dir.join(&file_name);
1083    let part_path = target_dir.join(format!("{file_name}.part"));
1084
1085    let quiet = shared.is_some();
1086
1087    if !quiet {
1088        info!(target: "reth::cli", file = %file_name, "Connecting to download server");
1089    }
1090    let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
1091
1092    let mut total_size: Option<u64> = None;
1093    let mut last_error: Option<eyre::Error> = None;
1094
1095    let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
1096        fs::rename(&part_path, &final_path)?;
1097        if !quiet {
1098            info!(target: "reth::cli", file = %file_name, "Download complete");
1099        }
1100        Ok((final_path.clone(), size))
1101    };
1102
1103    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1104        let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1105
1106        if let Some(total) = total_size &&
1107            existing_size >= total
1108        {
1109            return finalize_download(total);
1110        }
1111
1112        if attempt > 1 {
1113            info!(target: "reth::cli",
1114                file = %file_name,
1115                "Retry attempt {}/{} - resuming from {} bytes",
1116                attempt, MAX_DOWNLOAD_RETRIES, existing_size
1117            );
1118        }
1119
1120        let mut request = client.get(url);
1121        if existing_size > 0 {
1122            request = request.header(RANGE, format!("bytes={existing_size}-"));
1123            if !quiet && attempt == 1 {
1124                info!(target: "reth::cli", file = %file_name, "Resuming from {} bytes", existing_size);
1125            }
1126        }
1127
1128        let response = match request.send().and_then(|r| r.error_for_status()) {
1129            Ok(r) => r,
1130            Err(e) => {
1131                last_error = Some(e.into());
1132                if attempt < MAX_DOWNLOAD_RETRIES {
1133                    info!(target: "reth::cli",
1134                        file = %file_name,
1135                        "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
1136                    );
1137                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1138                }
1139                continue;
1140            }
1141        };
1142
1143        let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
1144
1145        let size = if is_partial {
1146            response
1147                .headers()
1148                .get("Content-Range")
1149                .and_then(|v| v.to_str().ok())
1150                .and_then(|v| v.split('/').next_back())
1151                .and_then(|v| v.parse().ok())
1152        } else {
1153            response.content_length()
1154        };
1155
1156        if total_size.is_none() {
1157            total_size = size;
1158            if !quiet && let Some(s) = size {
1159                info!(target: "reth::cli",
1160                    file = %file_name,
1161                    size = %DownloadProgress::format_size(s),
1162                    "Downloading"
1163                );
1164            }
1165        }
1166
1167        let current_total = total_size.ok_or_else(|| {
1168            eyre::eyre!("Server did not provide Content-Length or Content-Range header")
1169        })?;
1170
1171        let file = if is_partial && existing_size > 0 {
1172            OpenOptions::new()
1173                .append(true)
1174                .open(&part_path)
1175                .map_err(|e| fs::FsPathError::open(e, &part_path))?
1176        } else {
1177            fs::create_file(&part_path)?
1178        };
1179
1180        let start_offset = if is_partial { existing_size } else { 0 };
1181        let mut reader = response;
1182
1183        let copy_result;
1184        let flush_result;
1185
1186        if let Some(sp) = shared {
1187            // Parallel path: bump shared atomic counter
1188            if start_offset > 0 {
1189                sp.add(start_offset);
1190            }
1191            let mut writer =
1192                SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
1193            copy_result = io::copy(&mut reader, &mut writer);
1194            flush_result = writer.inner.flush();
1195        } else {
1196            // Legacy single-download path: local progress bar
1197            let mut progress = DownloadProgress::new(current_total);
1198            progress.downloaded = start_offset;
1199            let mut writer = ProgressWriter { inner: BufWriter::new(file), progress };
1200            copy_result = io::copy(&mut reader, &mut writer);
1201            flush_result = writer.inner.flush();
1202            println!();
1203        }
1204
1205        if let Err(e) = copy_result.and(flush_result) {
1206            last_error = Some(e.into());
1207            if attempt < MAX_DOWNLOAD_RETRIES {
1208                info!(target: "reth::cli",
1209                    file = %file_name,
1210                    "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
1211                );
1212                std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1213            }
1214            continue;
1215        }
1216
1217        return finalize_download(current_total);
1218    }
1219
1220    Err(last_error
1221        .unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
1222}
1223
1224/// Streams a remote archive directly into the extractor without writing to disk.
1225///
1226/// On failure, retries from scratch up to [`MAX_DOWNLOAD_RETRIES`] times.
1227fn streaming_download_and_extract(
1228    url: &str,
1229    format: CompressionFormat,
1230    target_dir: &Path,
1231    shared: Option<&Arc<SharedProgress>>,
1232) -> Result<()> {
1233    let quiet = shared.is_some();
1234    let mut last_error: Option<eyre::Error> = None;
1235
1236    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1237        if attempt > 1 {
1238            info!(target: "reth::cli",
1239                url = %url,
1240                attempt,
1241                max = MAX_DOWNLOAD_RETRIES,
1242                "Retrying streaming download from scratch"
1243            );
1244        }
1245
1246        let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
1247
1248        let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
1249            Ok(r) => r,
1250            Err(e) => {
1251                last_error = Some(e.into());
1252                if attempt < MAX_DOWNLOAD_RETRIES {
1253                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1254                }
1255                continue;
1256            }
1257        };
1258
1259        if !quiet && let Some(size) = response.content_length() {
1260            info!(target: "reth::cli",
1261                url = %url,
1262                size = %DownloadProgress::format_size(size),
1263                "Streaming archive"
1264            );
1265        }
1266
1267        let result = if let Some(sp) = shared {
1268            let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
1269            extract_archive_raw(reader, format, target_dir)
1270        } else {
1271            extract_archive_raw(response, format, target_dir)
1272        };
1273
1274        match result {
1275            Ok(()) => return Ok(()),
1276            Err(e) => {
1277                last_error = Some(e);
1278                if attempt < MAX_DOWNLOAD_RETRIES {
1279                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1280                }
1281            }
1282        }
1283    }
1284
1285    Err(last_error.unwrap_or_else(|| {
1286        eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
1287    }))
1288}
1289
1290/// Fetches the snapshot from a remote URL with resume support, then extracts it.
1291fn download_and_extract(
1292    url: &str,
1293    format: CompressionFormat,
1294    target_dir: &Path,
1295    shared: Option<&Arc<SharedProgress>>,
1296) -> Result<()> {
1297    let quiet = shared.is_some();
1298    let (downloaded_path, total_size) = resumable_download(url, target_dir, shared)?;
1299
1300    let file_name =
1301        downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
1302
1303    if !quiet {
1304        info!(target: "reth::cli",
1305            file = %file_name,
1306            size = %DownloadProgress::format_size(total_size),
1307            "Extracting archive"
1308        );
1309    }
1310    let file = fs::open(&downloaded_path)?;
1311
1312    if quiet {
1313        // Skip progress tracking for extraction in parallel mode
1314        extract_archive_raw(file, format, target_dir)?;
1315    } else {
1316        extract_archive(file, total_size, format, target_dir)?;
1317        info!(target: "reth::cli",
1318            file = %file_name,
1319            "Extraction complete"
1320        );
1321    }
1322
1323    fs::remove_file(&downloaded_path)?;
1324
1325    if let Some(sp) = shared {
1326        sp.archive_done();
1327    }
1328
1329    Ok(())
1330}
1331
1332/// Downloads and extracts a snapshot, blocking until finished.
1333///
1334/// Supports `file://` URLs for local files and HTTP(S) URLs for remote downloads.
1335/// When `resumable` is true, downloads to a `.part` file first with HTTP Range resume
1336/// support. Otherwise streams directly into the extractor.
1337fn blocking_download_and_extract(
1338    url: &str,
1339    target_dir: &Path,
1340    shared: Option<Arc<SharedProgress>>,
1341    resumable: bool,
1342) -> Result<()> {
1343    let format = CompressionFormat::from_url(url)?;
1344
1345    if let Ok(parsed_url) = Url::parse(url) &&
1346        parsed_url.scheme() == "file"
1347    {
1348        let file_path = parsed_url
1349            .to_file_path()
1350            .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
1351        let result = extract_from_file(&file_path, format, target_dir);
1352        if result.is_ok() &&
1353            let Some(sp) = shared
1354        {
1355            sp.archive_done();
1356        }
1357        result
1358    } else if resumable {
1359        download_and_extract(url, format, target_dir, shared.as_ref())
1360    } else {
1361        let result = streaming_download_and_extract(url, format, target_dir, shared.as_ref());
1362        if result.is_ok() &&
1363            let Some(sp) = shared
1364        {
1365            sp.archive_done();
1366        }
1367        result
1368    }
1369}
1370
1371/// Downloads and extracts a snapshot archive asynchronously.
1372///
1373/// When `shared` is provided, download progress is reported to the shared
1374/// counter for aggregated display. Otherwise uses a local progress bar.
1375/// When `resumable` is true, uses two-phase download with `.part` files.
1376async fn stream_and_extract(
1377    url: &str,
1378    target_dir: &Path,
1379    shared: Option<Arc<SharedProgress>>,
1380    resumable: bool,
1381) -> Result<()> {
1382    let target_dir = target_dir.to_path_buf();
1383    let url = url.to_string();
1384    task::spawn_blocking(move || {
1385        blocking_download_and_extract(&url, &target_dir, shared, resumable)
1386    })
1387    .await??;
1388
1389    Ok(())
1390}
1391
1392async fn process_modular_archive(
1393    planned: PlannedArchive,
1394    target_dir: &Path,
1395    cache_dir: Option<&Path>,
1396    shared: Option<Arc<SharedProgress>>,
1397    resumable: bool,
1398) -> Result<()> {
1399    let target_dir = target_dir.to_path_buf();
1400    let cache_dir = cache_dir.map(Path::to_path_buf);
1401
1402    task::spawn_blocking(move || {
1403        blocking_process_modular_archive(
1404            &planned,
1405            &target_dir,
1406            cache_dir.as_deref(),
1407            shared,
1408            resumable,
1409        )
1410    })
1411    .await??;
1412
1413    Ok(())
1414}
1415
1416fn blocking_process_modular_archive(
1417    planned: &PlannedArchive,
1418    target_dir: &Path,
1419    cache_dir: Option<&Path>,
1420    shared: Option<Arc<SharedProgress>>,
1421    resumable: bool,
1422) -> Result<()> {
1423    let archive = &planned.archive;
1424    if verify_output_files(target_dir, &archive.output_files)? {
1425        if let Some(sp) = &shared {
1426            sp.add(archive.size);
1427            sp.archive_done();
1428        }
1429        info!(target: "reth::cli", file = %archive.file_name, component = %planned.component, "Skipping already verified plain files");
1430        return Ok(());
1431    }
1432
1433    let format = CompressionFormat::from_url(&archive.file_name)?;
1434    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1435        cleanup_output_files(target_dir, &archive.output_files);
1436
1437        if resumable {
1438            let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
1439            let archive_path = cache_dir.join(&archive.file_name);
1440            let part_path = cache_dir.join(format!("{}.part", archive.file_name));
1441            let (downloaded_path, _downloaded_size) =
1442                resumable_download(&archive.url, cache_dir, shared.as_ref())?;
1443            let file = fs::open(&downloaded_path)?;
1444            extract_archive_raw(file, format, target_dir)?;
1445            let _ = fs::remove_file(&archive_path);
1446            let _ = fs::remove_file(&part_path);
1447        } else {
1448            streaming_download_and_extract(&archive.url, format, target_dir, shared.as_ref())?;
1449        }
1450
1451        if verify_output_files(target_dir, &archive.output_files)? {
1452            if let Some(sp) = &shared {
1453                sp.archive_done();
1454            }
1455            return Ok(());
1456        }
1457
1458        warn!(target: "reth::cli", file = %archive.file_name, component = %planned.component, attempt, "Extracted files failed integrity checks, retrying");
1459    }
1460
1461    eyre::bail!(
1462        "Failed integrity validation after {} attempts for {}",
1463        MAX_DOWNLOAD_RETRIES,
1464        archive.file_name
1465    )
1466}
1467
1468fn verify_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) -> Result<bool> {
1469    if output_files.is_empty() {
1470        return Ok(false);
1471    }
1472
1473    for expected in output_files {
1474        let output_path = target_dir.join(&expected.path);
1475        let meta = match fs::metadata(&output_path) {
1476            Ok(meta) => meta,
1477            Err(_) => return Ok(false),
1478        };
1479        if meta.len() != expected.size {
1480            return Ok(false);
1481        }
1482
1483        let actual = file_blake3_hex(&output_path)?;
1484        if !actual.eq_ignore_ascii_case(&expected.blake3) {
1485            return Ok(false);
1486        }
1487    }
1488
1489    Ok(true)
1490}
1491
1492fn cleanup_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) {
1493    for output in output_files {
1494        let _ = fs::remove_file(target_dir.join(&output.path));
1495    }
1496}
1497
1498fn file_blake3_hex(path: &Path) -> Result<String> {
1499    let mut file = fs::open(path)?;
1500    let mut hasher = Hasher::new();
1501    let mut buf = [0_u8; 64 * 1024];
1502
1503    loop {
1504        let n = file.read(&mut buf)?;
1505        if n == 0 {
1506            break;
1507        }
1508        hasher.update(&buf[..n]);
1509    }
1510
1511    Ok(hasher.finalize().to_hex().to_string())
1512}
1513
1514/// Builds the base URL for the given chain ID using configured defaults.
1515fn get_base_url(chain_id: u64) -> String {
1516    let defaults = DownloadDefaults::get_global();
1517    match &defaults.default_chain_aware_base_url {
1518        Some(url) => format!("{url}/{chain_id}"),
1519        None => defaults.default_base_url.to_string(),
1520    }
1521}
1522
1523async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
1524    if let Ok(parsed) = Url::parse(source) {
1525        return match parsed.scheme() {
1526            "http" | "https" => {
1527                Ok(Client::new().get(source).send().await?.error_for_status()?.json().await?)
1528            }
1529            "file" => {
1530                let path = parsed
1531                    .to_file_path()
1532                    .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1533                let content = fs::read_to_string(path)?;
1534                Ok(serde_json::from_str(&content)?)
1535            }
1536            _ => Err(eyre::eyre!("Unsupported manifest URL scheme: {}", parsed.scheme())),
1537        };
1538    }
1539
1540    let content = fs::read_to_string(source)?;
1541    Ok(serde_json::from_str(&content)?)
1542}
1543
1544fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Result<String> {
1545    if let Some(base_url) = manifest.base_url.as_deref() &&
1546        !base_url.is_empty()
1547    {
1548        return Ok(base_url.trim_end_matches('/').to_string());
1549    }
1550
1551    if let Ok(mut url) = Url::parse(source) {
1552        if url.scheme() == "file" {
1553            let mut path = url
1554                .to_file_path()
1555                .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1556            path.pop();
1557            let mut base = Url::from_directory_path(path)
1558                .map_err(|_| eyre::eyre!("Invalid manifest directory for source: {source}"))?
1559                .to_string();
1560            if base.ends_with('/') {
1561                base.pop();
1562            }
1563            return Ok(base);
1564        }
1565
1566        {
1567            let mut segments = url
1568                .path_segments_mut()
1569                .map_err(|_| eyre::eyre!("manifest_url must have a hierarchical path"))?;
1570            segments.pop_if_empty();
1571            segments.pop();
1572        }
1573        return Ok(url.as_str().trim_end_matches('/').to_string());
1574    }
1575
1576    let path = Path::new(source);
1577    let manifest_dir = if path.is_absolute() {
1578        path.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1579    } else {
1580        let joined = std::env::current_dir()?.join(path);
1581        joined.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1582    };
1583    let mut base = Url::from_directory_path(&manifest_dir)
1584        .map_err(|_| eyre::eyre!("Invalid manifest directory: {}", manifest_dir.display()))?
1585        .to_string();
1586    if base.ends_with('/') {
1587        base.pop();
1588    }
1589    Ok(base)
1590}
1591
1592/// Builds default URL for latest mainnet archive snapshot using configured defaults.
1593///
1594/// Used by the legacy single-archive download flow when no manifest is available.
1595#[allow(dead_code)]
1596async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
1597    let base_url = get_base_url(chain_id);
1598    let latest_url = format!("{base_url}/latest.txt");
1599    let filename = Client::new()
1600        .get(latest_url)
1601        .send()
1602        .await?
1603        .error_for_status()?
1604        .text()
1605        .await?
1606        .trim()
1607        .to_string();
1608
1609    Ok(format!("{base_url}/{filename}"))
1610}
1611
1612#[cfg(test)]
1613mod tests {
1614    use super::*;
1615    use manifest::{ComponentManifest, SingleArchive};
1616    use tempfile::tempdir;
1617
1618    fn manifest_with_archive_only_components() -> SnapshotManifest {
1619        let mut components = BTreeMap::new();
1620        components.insert(
1621            SnapshotComponentType::TransactionSenders.key().to_string(),
1622            ComponentManifest::Single(SingleArchive {
1623                file: "transaction_senders.tar.zst".to_string(),
1624                size: 1,
1625                blake3: None,
1626                output_files: vec![],
1627            }),
1628        );
1629        components.insert(
1630            SnapshotComponentType::RocksdbIndices.key().to_string(),
1631            ComponentManifest::Single(SingleArchive {
1632                file: "rocksdb_indices.tar.zst".to_string(),
1633                size: 1,
1634                blake3: None,
1635                output_files: vec![],
1636            }),
1637        );
1638        SnapshotManifest {
1639            block: 0,
1640            chain_id: 1,
1641            storage_version: 2,
1642            timestamp: 0,
1643            base_url: Some("https://example.com".to_string()),
1644            components,
1645        }
1646    }
1647
1648    #[test]
1649    fn test_download_defaults_builder() {
1650        let defaults = DownloadDefaults::default()
1651            .with_snapshot("https://example.com/snapshots (example)")
1652            .with_base_url("https://example.com");
1653
1654        assert_eq!(defaults.default_base_url, "https://example.com");
1655        assert_eq!(defaults.available_snapshots.len(), 3); // 2 defaults + 1 added
1656    }
1657
1658    #[test]
1659    fn test_download_defaults_replace_snapshots() {
1660        let defaults = DownloadDefaults::default().with_snapshots(vec![
1661            Cow::Borrowed("https://custom1.com"),
1662            Cow::Borrowed("https://custom2.com"),
1663        ]);
1664
1665        assert_eq!(defaults.available_snapshots.len(), 2);
1666        assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
1667    }
1668
1669    #[test]
1670    fn test_long_help_generation() {
1671        let defaults = DownloadDefaults::default();
1672        let help = defaults.long_help();
1673
1674        assert!(help.contains("Available snapshot sources:"));
1675        assert!(help.contains("merkle.io"));
1676        assert!(help.contains("publicnode.com"));
1677        assert!(help.contains("file://"));
1678    }
1679
1680    #[test]
1681    fn test_long_help_override() {
1682        let custom_help = "This is custom help text for downloading snapshots.";
1683        let defaults = DownloadDefaults::default().with_long_help(custom_help);
1684
1685        let help = defaults.long_help();
1686        assert_eq!(help, custom_help);
1687        assert!(!help.contains("Available snapshot sources:"));
1688    }
1689
1690    #[test]
1691    fn test_builder_chaining() {
1692        let defaults = DownloadDefaults::default()
1693            .with_base_url("https://custom.example.com")
1694            .with_snapshot("https://snapshot1.com")
1695            .with_snapshot("https://snapshot2.com")
1696            .with_long_help("Custom help for snapshots");
1697
1698        assert_eq!(defaults.default_base_url, "https://custom.example.com");
1699        assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
1700        assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
1701    }
1702
1703    #[test]
1704    fn test_compression_format_detection() {
1705        assert!(matches!(
1706            CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
1707            Ok(CompressionFormat::Lz4)
1708        ));
1709        assert!(matches!(
1710            CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
1711            Ok(CompressionFormat::Zstd)
1712        ));
1713        assert!(matches!(
1714            CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
1715            Ok(CompressionFormat::Lz4)
1716        ));
1717        assert!(matches!(
1718            CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
1719            Ok(CompressionFormat::Zstd)
1720        ));
1721        assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
1722    }
1723
1724    #[test]
1725    fn inject_archive_only_components_for_archive_selection() {
1726        let manifest = manifest_with_archive_only_components();
1727        let mut selections = BTreeMap::new();
1728        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1729        selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1730        selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1731        selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1732
1733        inject_archive_only_components(&mut selections, &manifest, true);
1734
1735        assert_eq!(
1736            selections.get(&SnapshotComponentType::TransactionSenders),
1737            Some(&ComponentSelection::All)
1738        );
1739        assert_eq!(
1740            selections.get(&SnapshotComponentType::RocksdbIndices),
1741            Some(&ComponentSelection::All)
1742        );
1743    }
1744
1745    #[test]
1746    fn inject_archive_only_components_without_rocksdb() {
1747        let manifest = manifest_with_archive_only_components();
1748        let mut selections = BTreeMap::new();
1749        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1750        selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1751        selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1752        selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1753
1754        inject_archive_only_components(&mut selections, &manifest, false);
1755
1756        assert_eq!(
1757            selections.get(&SnapshotComponentType::TransactionSenders),
1758            Some(&ComponentSelection::All)
1759        );
1760        assert_eq!(selections.get(&SnapshotComponentType::RocksdbIndices), None);
1761    }
1762
1763    #[test]
1764    fn should_reset_index_stage_checkpoints_without_rocksdb_indices() {
1765        let mut selections = BTreeMap::new();
1766        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1767        assert!(should_reset_index_stage_checkpoints(&selections));
1768
1769        selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
1770        assert!(!should_reset_index_stage_checkpoints(&selections));
1771    }
1772
1773    #[test]
1774    fn summarize_download_startup_counts_reusable_and_needs_download() {
1775        let dir = tempdir().unwrap();
1776        let target_dir = dir.path();
1777        let ok_file = target_dir.join("ok.bin");
1778        std::fs::write(&ok_file, vec![1_u8; 4]).unwrap();
1779        let ok_hash = file_blake3_hex(&ok_file).unwrap();
1780
1781        let planned = vec![
1782            PlannedArchive {
1783                ty: SnapshotComponentType::State,
1784                component: "State".to_string(),
1785                archive: ArchiveDescriptor {
1786                    url: "https://example.com/ok.tar.zst".to_string(),
1787                    file_name: "ok.tar.zst".to_string(),
1788                    size: 10,
1789                    blake3: None,
1790                    output_files: vec![OutputFileChecksum {
1791                        path: "ok.bin".to_string(),
1792                        size: 4,
1793                        blake3: ok_hash,
1794                    }],
1795                },
1796            },
1797            PlannedArchive {
1798                ty: SnapshotComponentType::Headers,
1799                component: "Headers".to_string(),
1800                archive: ArchiveDescriptor {
1801                    url: "https://example.com/missing.tar.zst".to_string(),
1802                    file_name: "missing.tar.zst".to_string(),
1803                    size: 10,
1804                    blake3: None,
1805                    output_files: vec![OutputFileChecksum {
1806                        path: "missing.bin".to_string(),
1807                        size: 1,
1808                        blake3: "deadbeef".to_string(),
1809                    }],
1810                },
1811            },
1812            PlannedArchive {
1813                ty: SnapshotComponentType::Transactions,
1814                component: "Transactions".to_string(),
1815                archive: ArchiveDescriptor {
1816                    url: "https://example.com/bad-size.tar.zst".to_string(),
1817                    file_name: "bad-size.tar.zst".to_string(),
1818                    size: 10,
1819                    blake3: None,
1820                    output_files: vec![],
1821                },
1822            },
1823        ];
1824
1825        let summary = summarize_download_startup(&planned, target_dir).unwrap();
1826        assert_eq!(summary.reusable, 1);
1827        assert_eq!(summary.needs_download, 2);
1828    }
1829
1830    #[test]
1831    fn archive_priority_prefers_state_then_rocksdb() {
1832        let mut planned = [
1833            PlannedArchive {
1834                ty: SnapshotComponentType::Transactions,
1835                component: "Transactions".to_string(),
1836                archive: ArchiveDescriptor {
1837                    url: "u3".to_string(),
1838                    file_name: "t.tar.zst".to_string(),
1839                    size: 1,
1840                    blake3: None,
1841                    output_files: vec![OutputFileChecksum {
1842                        path: "a".to_string(),
1843                        size: 1,
1844                        blake3: "x".to_string(),
1845                    }],
1846                },
1847            },
1848            PlannedArchive {
1849                ty: SnapshotComponentType::RocksdbIndices,
1850                component: "RocksDB Indices".to_string(),
1851                archive: ArchiveDescriptor {
1852                    url: "u2".to_string(),
1853                    file_name: "rocksdb_indices.tar.zst".to_string(),
1854                    size: 1,
1855                    blake3: None,
1856                    output_files: vec![OutputFileChecksum {
1857                        path: "b".to_string(),
1858                        size: 1,
1859                        blake3: "y".to_string(),
1860                    }],
1861                },
1862            },
1863            PlannedArchive {
1864                ty: SnapshotComponentType::State,
1865                component: "State (mdbx)".to_string(),
1866                archive: ArchiveDescriptor {
1867                    url: "u1".to_string(),
1868                    file_name: "state.tar.zst".to_string(),
1869                    size: 1,
1870                    blake3: None,
1871                    output_files: vec![OutputFileChecksum {
1872                        path: "c".to_string(),
1873                        size: 1,
1874                        blake3: "z".to_string(),
1875                    }],
1876                },
1877            },
1878        ];
1879
1880        planned.sort_by(|a, b| {
1881            archive_priority_rank(a.ty)
1882                .cmp(&archive_priority_rank(b.ty))
1883                .then_with(|| a.component.cmp(&b.component))
1884                .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
1885        });
1886
1887        assert_eq!(planned[0].ty, SnapshotComponentType::State);
1888        assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
1889        assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
1890    }
1891}