Skip to main content

reth_cli_commands/download/
mod.rs

1pub mod config_gen;
2pub mod manifest;
3pub mod manifest_cmd;
4mod tui;
5
6use crate::common::EnvironmentArgs;
7use blake3::Hasher;
8use clap::Parser;
9use config_gen::{config_for_selections, write_config};
10use eyre::{Result, WrapErr};
11use futures::stream::{self, StreamExt};
12use lz4::Decoder;
13use manifest::{
14    ArchiveDescriptor, ComponentSelection, OutputFileChecksum, SnapshotComponentType,
15    SnapshotManifest,
16};
17use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
18use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
19use reth_cli::chainspec::ChainSpecParser;
20use reth_cli_util::cancellation::CancellationToken;
21use reth_db::{init_db, Database};
22use reth_db_api::transaction::DbTx;
23use reth_fs_util as fs;
24use reth_node_core::args::DefaultPruningValues;
25use reth_prune_types::PruneMode;
26use std::{
27    borrow::Cow,
28    collections::BTreeMap,
29    fs::OpenOptions,
30    io::{self, BufWriter, Read, Write},
31    path::{Path, PathBuf},
32    sync::{
33        atomic::{AtomicBool, AtomicU64, Ordering},
34        Arc, OnceLock,
35    },
36    time::{Duration, Instant},
37};
38use tar::Archive;
39use tokio::task;
40use tracing::{info, warn};
41use tui::{run_selector, SelectorOutput};
42use url::Url;
43use zstd::stream::read::Decoder as ZstdDecoder;
44
45const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
46const RETH_SNAPSHOTS_BASE_URL: &str = "https://snapshots-r2.reth.rs";
47const RETH_SNAPSHOTS_API_URL: &str = "https://snapshots.reth.rs/api/snapshots";
48const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
49const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
50const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
51
52/// Maximum number of concurrent archive downloads.
53const MAX_CONCURRENT_DOWNLOADS: usize = 8;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub(crate) enum SelectionPreset {
57    Minimal,
58    Full,
59    Archive,
60}
61
62struct ResolvedComponents {
63    selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
64    preset: Option<SelectionPreset>,
65}
66
67/// Global static download defaults
68static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
69
70/// Download configuration defaults
71///
72/// Global defaults can be set via [`DownloadDefaults::try_init`].
73#[derive(Debug, Clone)]
74pub struct DownloadDefaults {
75    /// List of available snapshot sources
76    pub available_snapshots: Vec<Cow<'static, str>>,
77    /// Default base URL for snapshots
78    pub default_base_url: Cow<'static, str>,
79    /// Default base URL for chain-aware snapshots.
80    ///
81    /// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
82    /// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
83    /// the resulting URL would be `https://snapshots.example.com/1`.
84    ///
85    /// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
86    pub default_chain_aware_base_url: Option<Cow<'static, str>>,
87    /// Optional custom long help text that overrides the generated help
88    pub long_help: Option<String>,
89}
90
91impl DownloadDefaults {
92    /// Initialize the global download defaults with this configuration
93    pub fn try_init(self) -> Result<(), Self> {
94        DOWNLOAD_DEFAULTS.set(self)
95    }
96
97    /// Get a reference to the global download defaults
98    pub fn get_global() -> &'static DownloadDefaults {
99        DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
100    }
101
102    /// Default download configuration with defaults from snapshots.reth.rs and publicnode
103    pub fn default_download_defaults() -> Self {
104        Self {
105            available_snapshots: vec![
106                Cow::Borrowed("https://snapshots.reth.rs (default)"),
107                Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
108            ],
109            default_base_url: Cow::Borrowed(RETH_SNAPSHOTS_BASE_URL),
110            default_chain_aware_base_url: None,
111            long_help: None,
112        }
113    }
114
115    /// Generates the long help text for the download URL argument using these defaults.
116    ///
117    /// If a custom long_help is set, it will be returned. Otherwise, help text is generated
118    /// from the available_snapshots list.
119    pub fn long_help(&self) -> String {
120        if let Some(ref custom_help) = self.long_help {
121            return custom_help.clone();
122        }
123
124        let mut help = String::from(
125            "Specify a snapshot URL or let the command propose a default one.\n\n\
126             Browse available snapshots at https://snapshots.reth.rs\n\
127             or use --list-snapshots to see them from the CLI.\n\nAvailable snapshot sources:\n",
128        );
129
130        for source in &self.available_snapshots {
131            help.push_str("- ");
132            help.push_str(source);
133            help.push('\n');
134        }
135
136        help.push_str(
137            "\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
138        );
139        help.push_str(
140            self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
141        );
142        help.push_str(
143            ".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
144        );
145        help
146    }
147
148    /// Add a snapshot source to the list
149    pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
150        self.available_snapshots.push(source.into());
151        self
152    }
153
154    /// Replace all snapshot sources
155    pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
156        self.available_snapshots = sources;
157        self
158    }
159
160    /// Set the default base URL, e.g. `https://downloads.merkle.io`.
161    pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
162        self.default_base_url = url.into();
163        self
164    }
165
166    /// Set the default chain-aware base URL.
167    pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
168        self.default_chain_aware_base_url = Some(url.into());
169        self
170    }
171
172    /// Builder: Set custom long help text, overriding the generated help
173    pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
174        self.long_help = Some(help.into());
175        self
176    }
177}
178
179impl Default for DownloadDefaults {
180    fn default() -> Self {
181        Self::default_download_defaults()
182    }
183}
184
185/// CLI command that downloads snapshot archives and configures a reth node from them.
186#[derive(Debug, Parser)]
187pub struct DownloadCommand<C: ChainSpecParser> {
188    #[command(flatten)]
189    env: EnvironmentArgs<C>,
190
191    /// Custom URL to download a single snapshot archive (legacy mode).
192    ///
193    /// When provided, downloads and extracts a single archive without component selection.
194    /// Browse available snapshots at <https://snapshots.reth.rs> or use --list-snapshots.
195    #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
196    url: Option<String>,
197
198    /// URL to a snapshot manifest.json for modular component downloads.
199    ///
200    /// When provided, fetches this manifest instead of discovering it from the default
201    /// base URL. Useful for testing with custom or local manifests.
202    #[arg(long, value_name = "URL", conflicts_with = "url")]
203    manifest_url: Option<String>,
204
205    /// Local path to a snapshot manifest.json for modular component downloads.
206    #[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
207    manifest_path: Option<PathBuf>,
208
209    /// Include transaction static files.
210    #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
211    with_txs: bool,
212
213    /// Include receipt static files.
214    #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
215    with_receipts: bool,
216
217    /// Include account and storage history static files.
218    #[arg(long, alias = "with-changesets", conflicts_with_all = ["minimal", "full", "archive"])]
219    with_state_history: bool,
220
221    /// Include transaction sender static files. Requires `--with-txs`.
222    #[arg(long, requires = "with_txs", conflicts_with_all = ["minimal", "full", "archive"])]
223    with_senders: bool,
224
225    /// Include RocksDB index files.
226    #[arg(long, conflicts_with_all = ["minimal", "full", "archive", "without_rocksdb"])]
227    with_rocksdb: bool,
228
229    /// Download all available components (archive node, no pruning).
230    #[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "minimal", "full"])]
231    archive: bool,
232
233    /// Download the minimal component set (same default as --non-interactive).
234    #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "full"])]
235    minimal: bool,
236
237    /// Download the full node component set (matches default full prune settings).
238    #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "minimal"])]
239    full: bool,
240
241    /// Skip optional RocksDB indices even when archive components are selected.
242    ///
243    /// This affects `--archive`/`--all` and TUI archive preset (`a`).
244    #[arg(long, conflicts_with_all = ["url", "with_rocksdb"])]
245    without_rocksdb: bool,
246
247    /// Skip interactive component selection. Downloads the minimal set
248    /// (state + headers + transactions + changesets) unless explicit --with-* flags narrow it.
249    #[arg(long, short = 'y')]
250    non_interactive: bool,
251
252    /// Use resumable two-phase downloads (download to disk first, then extract).
253    ///
254    /// Archives are downloaded to a .part file with HTTP Range resume support
255    /// before extraction. Slower but tolerates network interruptions without
256    /// restarting. By default, archives stream directly into the extractor.
257    #[arg(long)]
258    resumable: bool,
259
260    /// Maximum number of concurrent modular archive workers.
261    #[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
262    download_concurrency: usize,
263
264    /// List available snapshots from snapshots.reth.rs and exit.
265    ///
266    /// Queries the snapshots API and prints all available snapshots for the selected chain,
267    /// including block number, size, and manifest URL.
268    #[arg(long, alias = "list-snapshots", conflicts_with_all = ["url", "manifest_url", "manifest_path"])]
269    list: bool,
270}
271
272impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
273    pub async fn execute<N>(self) -> Result<()> {
274        let chain = self.env.chain.chain();
275        let chain_id = chain.id();
276        let data_dir = self.env.datadir.clone().resolve_datadir(chain);
277        fs::create_dir_all(&data_dir)?;
278
279        let cancel_token = CancellationToken::new();
280        let _cancel_guard = cancel_token.drop_guard();
281
282        // --list: print available snapshots and exit
283        if self.list {
284            let entries = fetch_snapshot_api_entries(chain_id).await?;
285            print_snapshot_listing(&entries, chain_id);
286            return Ok(());
287        }
288
289        // Legacy single-URL mode: download one archive and extract it
290        if let Some(ref url) = self.url {
291            info!(target: "reth::cli",
292                dir = ?data_dir.data_dir(),
293                url = %url,
294                "Starting snapshot download and extraction"
295            );
296
297            stream_and_extract(
298                url,
299                data_dir.data_dir(),
300                None,
301                self.resumable,
302                cancel_token.clone(),
303            )
304            .await?;
305            info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
306
307            return Ok(());
308        }
309
310        // Modular download: fetch manifest and select components
311        let manifest_source = self.resolve_manifest_source(chain_id).await?;
312
313        info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
314        let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
315        manifest.base_url = Some(resolve_manifest_base_url(&manifest, &manifest_source)?);
316
317        info!(target: "reth::cli",
318            block = manifest.block,
319            chain_id = manifest.chain_id,
320            storage_version = %manifest.storage_version,
321            components = manifest.components.len(),
322            "Loaded snapshot manifest"
323        );
324
325        let ResolvedComponents { mut selections, preset } = self.resolve_components(&manifest)?;
326
327        if matches!(preset, Some(SelectionPreset::Archive)) {
328            inject_archive_only_components(&mut selections, &manifest, !self.without_rocksdb);
329        }
330
331        // Collect all archive descriptors across selected components.
332        let target_dir = data_dir.data_dir();
333        let mut all_downloads: Vec<PlannedArchive> = Vec::new();
334        for (ty, sel) in &selections {
335            let distance = match sel {
336                ComponentSelection::All => None,
337                ComponentSelection::Distance(d) => Some(*d),
338                ComponentSelection::None => continue,
339            };
340            let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
341            let name = ty.display_name().to_string();
342
343            if !descriptors.is_empty() {
344                info!(target: "reth::cli",
345                    component = %name,
346                    archives = descriptors.len(),
347                    selection = %sel,
348                    "Queued component for download"
349                );
350            }
351
352            for descriptor in descriptors {
353                if descriptor.output_files.is_empty() {
354                    eyre::bail!(
355                        "Invalid modular manifest: {} is missing plain output checksum metadata",
356                        descriptor.file_name
357                    );
358                }
359                all_downloads.push(PlannedArchive {
360                    ty: *ty,
361                    component: name.clone(),
362                    archive: descriptor,
363                });
364            }
365        }
366
367        all_downloads.sort_by(|a, b| {
368            archive_priority_rank(a.ty)
369                .cmp(&archive_priority_rank(b.ty))
370                .then_with(|| a.component.cmp(&b.component))
371                .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
372        });
373
374        let download_cache_dir = if self.resumable {
375            let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
376            fs::create_dir_all(&dir)?;
377            Some(dir)
378        } else {
379            None
380        };
381
382        let total_archives = all_downloads.len();
383        let total_size: u64 = selections
384            .iter()
385            .map(|(ty, sel)| match sel {
386                ComponentSelection::All => manifest.size_for_distance(*ty, None),
387                ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
388                ComponentSelection::None => 0,
389            })
390            .sum();
391
392        let startup_summary = summarize_download_startup(&all_downloads, target_dir)?;
393        info!(target: "reth::cli",
394            reusable = startup_summary.reusable,
395            needs_download = startup_summary.needs_download,
396            "Startup integrity summary (plain output files)"
397        );
398
399        info!(target: "reth::cli",
400            archives = total_archives,
401            total = %DownloadProgress::format_size(total_size),
402            "Downloading all archives"
403        );
404
405        let shared = SharedProgress::new(total_size, total_archives as u64, cancel_token.clone());
406        let progress_handle = spawn_progress_display(Arc::clone(&shared));
407
408        let target = target_dir.to_path_buf();
409        let cache_dir = download_cache_dir;
410        let resumable = self.resumable;
411        let download_concurrency = self.download_concurrency.max(1);
412        let results: Vec<Result<()>> = stream::iter(all_downloads)
413            .map(|planned| {
414                let dir = target.clone();
415                let cache = cache_dir.clone();
416                let sp = Arc::clone(&shared);
417                let ct = cancel_token.clone();
418                async move {
419                    process_modular_archive(
420                        planned,
421                        &dir,
422                        cache.as_deref(),
423                        Some(sp),
424                        resumable,
425                        ct,
426                    )
427                    .await?;
428                    Ok(())
429                }
430            })
431            .buffer_unordered(download_concurrency)
432            .collect()
433            .await;
434
435        shared.done.store(true, Ordering::Relaxed);
436        let _ = progress_handle.await;
437
438        // Check for errors
439        for result in results {
440            result?;
441        }
442
443        // Generate reth.toml and set prune checkpoints
444        let config =
445            config_for_selections(&selections, &manifest, preset, Some(self.env.chain.as_ref()));
446        if write_config(&config, target_dir)? {
447            let desc = config_gen::describe_prune_config(&config);
448            info!(target: "reth::cli", "{}", desc.join(", "));
449        }
450
451        // Open the DB to write checkpoints
452        let db_path = data_dir.db();
453        let db = init_db(&db_path, self.env.db.database_args())?;
454
455        // Write prune checkpoints to the DB so the pruner knows data before the
456        // snapshot block is already in the expected pruned state
457        let should_write_prune = config.prune.segments != Default::default();
458        let should_reset_indices = should_reset_index_stage_checkpoints(&selections);
459        if should_write_prune || should_reset_indices {
460            let tx = db.tx_mut()?;
461
462            if should_write_prune {
463                config_gen::write_prune_checkpoints_tx(&tx, &config, manifest.block)?;
464            }
465
466            // Reset stage checkpoints for history indexing stages only if RocksDB
467            // indices weren't downloaded. When archive snapshots include the
468            // optional RocksDB indices component, we preserve source checkpoints.
469            if should_reset_indices {
470                config_gen::reset_index_stage_checkpoints_tx(&tx)?;
471            }
472
473            tx.commit()?;
474        }
475
476        info!(target: "reth::cli", "Snapshot download complete. Run `reth node` to start syncing.");
477
478        Ok(())
479    }
480
481    /// Determines which components to download based on CLI flags or interactive selection.
482    fn resolve_components(&self, manifest: &SnapshotManifest) -> Result<ResolvedComponents> {
483        let available = |ty: SnapshotComponentType| manifest.component(ty).is_some();
484
485        // --archive/--all: everything available as All
486        if self.archive {
487            return Ok(ResolvedComponents {
488                selections: SnapshotComponentType::ALL
489                    .iter()
490                    .copied()
491                    .filter(|ty| available(*ty))
492                    .filter(|ty| {
493                        !self.without_rocksdb || *ty != SnapshotComponentType::RocksdbIndices
494                    })
495                    .map(|ty| (ty, ComponentSelection::All))
496                    .collect(),
497                preset: Some(SelectionPreset::Archive),
498            });
499        }
500
501        if self.full {
502            return Ok(ResolvedComponents {
503                selections: self.full_preset_selections(manifest),
504                preset: Some(SelectionPreset::Full),
505            });
506        }
507
508        if self.minimal {
509            return Ok(ResolvedComponents {
510                selections: self.minimal_preset_selections(manifest),
511                preset: Some(SelectionPreset::Minimal),
512            });
513        }
514
515        let has_explicit_flags = self.with_txs ||
516            self.with_receipts ||
517            self.with_state_history ||
518            self.with_senders ||
519            self.with_rocksdb;
520
521        if has_explicit_flags {
522            let mut selections = BTreeMap::new();
523            // Required components always All
524            if available(SnapshotComponentType::State) {
525                selections.insert(SnapshotComponentType::State, ComponentSelection::All);
526            }
527            if available(SnapshotComponentType::Headers) {
528                selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
529            }
530            if self.with_txs && available(SnapshotComponentType::Transactions) {
531                selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
532            }
533            if self.with_receipts && available(SnapshotComponentType::Receipts) {
534                selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
535            }
536            if self.with_state_history {
537                if available(SnapshotComponentType::AccountChangesets) {
538                    selections
539                        .insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
540                }
541                if available(SnapshotComponentType::StorageChangesets) {
542                    selections
543                        .insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
544                }
545            }
546            if self.with_senders && available(SnapshotComponentType::TransactionSenders) {
547                selections
548                    .insert(SnapshotComponentType::TransactionSenders, ComponentSelection::All);
549            }
550            if self.with_rocksdb && available(SnapshotComponentType::RocksdbIndices) {
551                selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
552            }
553            return Ok(ResolvedComponents { selections, preset: None });
554        }
555
556        if self.non_interactive {
557            return Ok(ResolvedComponents {
558                selections: self.minimal_preset_selections(manifest),
559                preset: Some(SelectionPreset::Minimal),
560            });
561        }
562
563        // Interactive TUI
564        let full_preset = self.full_preset_selections(manifest);
565        let SelectorOutput { selections, preset } = run_selector(manifest.clone(), &full_preset)?;
566        let selected =
567            selections.into_iter().filter(|(_, sel)| *sel != ComponentSelection::None).collect();
568
569        Ok(ResolvedComponents { selections: selected, preset })
570    }
571
572    fn minimal_preset_selections(
573        &self,
574        manifest: &SnapshotManifest,
575    ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
576        SnapshotComponentType::ALL
577            .iter()
578            .copied()
579            .filter(|ty| manifest.component(*ty).is_some())
580            .map(|ty| (ty, ty.minimal_selection()))
581            .collect()
582    }
583
584    fn full_preset_selections(
585        &self,
586        manifest: &SnapshotManifest,
587    ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
588        let mut selections = BTreeMap::new();
589
590        for ty in [
591            SnapshotComponentType::State,
592            SnapshotComponentType::Headers,
593            SnapshotComponentType::Transactions,
594            SnapshotComponentType::Receipts,
595            SnapshotComponentType::AccountChangesets,
596            SnapshotComponentType::StorageChangesets,
597            SnapshotComponentType::TransactionSenders,
598            SnapshotComponentType::RocksdbIndices,
599        ] {
600            if manifest.component(ty).is_none() {
601                continue;
602            }
603
604            let selection = self.full_selection_for_component(ty, manifest.block);
605            if selection != ComponentSelection::None {
606                selections.insert(ty, selection);
607            }
608        }
609
610        selections
611    }
612
613    fn full_selection_for_component(
614        &self,
615        ty: SnapshotComponentType,
616        snapshot_block: u64,
617    ) -> ComponentSelection {
618        let defaults = DefaultPruningValues::get_global();
619        match ty {
620            SnapshotComponentType::State | SnapshotComponentType::Headers => {
621                ComponentSelection::All
622            }
623            SnapshotComponentType::Transactions => {
624                if defaults.full_bodies_history_use_pre_merge {
625                    match self
626                        .env
627                        .chain
628                        .ethereum_fork_activation(EthereumHardfork::Paris)
629                        .block_number()
630                    {
631                        Some(paris) if snapshot_block >= paris => {
632                            ComponentSelection::Distance(snapshot_block - paris + 1)
633                        }
634                        Some(_) => ComponentSelection::None,
635                        None => ComponentSelection::All,
636                    }
637                } else {
638                    selection_from_prune_mode(
639                        defaults.full_prune_modes.bodies_history,
640                        snapshot_block,
641                    )
642                }
643            }
644            SnapshotComponentType::Receipts => {
645                selection_from_prune_mode(defaults.full_prune_modes.receipts, snapshot_block)
646            }
647            SnapshotComponentType::AccountChangesets => {
648                selection_from_prune_mode(defaults.full_prune_modes.account_history, snapshot_block)
649            }
650            SnapshotComponentType::StorageChangesets => {
651                selection_from_prune_mode(defaults.full_prune_modes.storage_history, snapshot_block)
652            }
653            SnapshotComponentType::TransactionSenders => {
654                selection_from_prune_mode(defaults.full_prune_modes.sender_recovery, snapshot_block)
655            }
656            // Keep hidden by default in full mode; if users want indices they can use archive.
657            SnapshotComponentType::RocksdbIndices => ComponentSelection::None,
658        }
659    }
660
661    async fn resolve_manifest_source(&self, chain_id: u64) -> Result<String> {
662        if let Some(path) = &self.manifest_path {
663            return Ok(path.display().to_string());
664        }
665
666        match &self.manifest_url {
667            Some(url) => Ok(url.clone()),
668            None => discover_manifest_url(chain_id).await,
669        }
670    }
671}
672
673fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> ComponentSelection {
674    match mode {
675        None => ComponentSelection::All,
676        Some(PruneMode::Full) => ComponentSelection::None,
677        Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
678        Some(PruneMode::Before(block)) => {
679            if snapshot_block >= block {
680                ComponentSelection::Distance(snapshot_block - block + 1)
681            } else {
682                ComponentSelection::None
683            }
684        }
685    }
686}
687
688/// If all data components (txs, receipts, changesets) are `All`, automatically
689/// include hidden archive-only components when available in the manifest.
690fn inject_archive_only_components(
691    selections: &mut BTreeMap<SnapshotComponentType, ComponentSelection>,
692    manifest: &SnapshotManifest,
693    include_rocksdb: bool,
694) {
695    let is_all =
696        |ty: SnapshotComponentType| selections.get(&ty).copied() == Some(ComponentSelection::All);
697
698    let is_archive = is_all(SnapshotComponentType::Transactions) &&
699        is_all(SnapshotComponentType::Receipts) &&
700        is_all(SnapshotComponentType::AccountChangesets) &&
701        is_all(SnapshotComponentType::StorageChangesets);
702
703    if !is_archive {
704        return;
705    }
706
707    for component in
708        [SnapshotComponentType::TransactionSenders, SnapshotComponentType::RocksdbIndices]
709    {
710        if component == SnapshotComponentType::RocksdbIndices && !include_rocksdb {
711            continue;
712        }
713
714        if manifest.component(component).is_some() {
715            selections.insert(component, ComponentSelection::All);
716        }
717    }
718}
719
720fn should_reset_index_stage_checkpoints(
721    selections: &BTreeMap<SnapshotComponentType, ComponentSelection>,
722) -> bool {
723    !matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
724}
725
726impl<C: ChainSpecParser> DownloadCommand<C> {
727    /// Returns the underlying chain being used to run this command
728    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
729        Some(&self.env.chain)
730    }
731}
732
733/// Tracks download progress and throttles display updates to every 100ms.
734pub(crate) struct DownloadProgress {
735    downloaded: u64,
736    total_size: u64,
737    last_displayed: Instant,
738    started_at: Instant,
739}
740
741#[derive(Debug, Clone)]
742struct PlannedArchive {
743    ty: SnapshotComponentType,
744    component: String,
745    archive: ArchiveDescriptor,
746}
747
748const fn archive_priority_rank(ty: SnapshotComponentType) -> u8 {
749    match ty {
750        SnapshotComponentType::State => 0,
751        SnapshotComponentType::RocksdbIndices => 1,
752        _ => 2,
753    }
754}
755
756#[derive(Debug, Default, Clone, Copy)]
757struct DownloadStartupSummary {
758    reusable: usize,
759    needs_download: usize,
760}
761
762fn summarize_download_startup(
763    all_downloads: &[PlannedArchive],
764    target_dir: &Path,
765) -> Result<DownloadStartupSummary> {
766    let mut summary = DownloadStartupSummary::default();
767
768    for planned in all_downloads {
769        if verify_output_files(target_dir, &planned.archive.output_files)? {
770            summary.reusable += 1;
771        } else {
772            summary.needs_download += 1;
773        }
774    }
775
776    Ok(summary)
777}
778
779impl DownloadProgress {
780    /// Creates new progress tracker with given total size
781    fn new(total_size: u64) -> Self {
782        let now = Instant::now();
783        Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
784    }
785
786    /// Converts bytes to human readable format (B, KB, MB, GB)
787    pub(crate) fn format_size(size: u64) -> String {
788        let mut size = size as f64;
789        let mut unit_index = 0;
790
791        while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
792            size /= 1024.0;
793            unit_index += 1;
794        }
795
796        format!("{:.2} {}", size, BYTE_UNITS[unit_index])
797    }
798
799    /// Format duration as human readable string
800    fn format_duration(duration: Duration) -> String {
801        let secs = duration.as_secs();
802        if secs < 60 {
803            format!("{secs}s")
804        } else if secs < 3600 {
805            format!("{}m {}s", secs / 60, secs % 60)
806        } else {
807            format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
808        }
809    }
810
811    /// Updates progress bar (for single-archive legacy downloads)
812    fn update(&mut self, chunk_size: u64) -> Result<()> {
813        self.downloaded += chunk_size;
814
815        if self.last_displayed.elapsed() >= Duration::from_millis(100) {
816            let formatted_downloaded = Self::format_size(self.downloaded);
817            let formatted_total = Self::format_size(self.total_size);
818            let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
819
820            let elapsed = self.started_at.elapsed();
821            let eta = if self.downloaded > 0 {
822                let remaining = self.total_size.saturating_sub(self.downloaded);
823                let speed = self.downloaded as f64 / elapsed.as_secs_f64();
824                if speed > 0.0 {
825                    Duration::from_secs_f64(remaining as f64 / speed)
826                } else {
827                    Duration::ZERO
828                }
829            } else {
830                Duration::ZERO
831            };
832            let eta_str = Self::format_duration(eta);
833
834            print!(
835                "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str}     ",
836            );
837            io::stdout().flush()?;
838            self.last_displayed = Instant::now();
839        }
840
841        Ok(())
842    }
843}
844
845/// Shared progress counter for parallel downloads.
846///
847/// Each download thread atomically increments `downloaded`. A single display
848/// task on the main thread reads the counter periodically and prints one
849/// aggregated progress line.
850struct SharedProgress {
851    downloaded: AtomicU64,
852    total_size: u64,
853    total_archives: u64,
854    archives_done: AtomicU64,
855    done: AtomicBool,
856    cancel_token: CancellationToken,
857}
858
859impl SharedProgress {
860    fn new(total_size: u64, total_archives: u64, cancel_token: CancellationToken) -> Arc<Self> {
861        Arc::new(Self {
862            downloaded: AtomicU64::new(0),
863            total_size,
864            total_archives,
865            archives_done: AtomicU64::new(0),
866            done: AtomicBool::new(false),
867            cancel_token,
868        })
869    }
870
871    fn is_cancelled(&self) -> bool {
872        self.cancel_token.is_cancelled()
873    }
874
875    fn add(&self, bytes: u64) {
876        self.downloaded.fetch_add(bytes, Ordering::Relaxed);
877    }
878
879    fn archive_done(&self) {
880        self.archives_done.fetch_add(1, Ordering::Relaxed);
881    }
882}
883
884/// Spawns a background task that prints aggregated download progress.
885/// Returns a handle; drop it (or call `.abort()`) to stop.
886fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
887    tokio::spawn(async move {
888        let started_at = Instant::now();
889        let mut interval = tokio::time::interval(Duration::from_secs(3));
890        interval.tick().await; // first tick is immediate, skip it
891        loop {
892            interval.tick().await;
893
894            if progress.done.load(Ordering::Relaxed) {
895                break;
896            }
897
898            let downloaded = progress.downloaded.load(Ordering::Relaxed);
899            let total = progress.total_size;
900            if total == 0 {
901                continue;
902            }
903
904            let done = progress.archives_done.load(Ordering::Relaxed);
905            let all = progress.total_archives;
906            let pct = (downloaded as f64 / total as f64) * 100.0;
907            let dl = DownloadProgress::format_size(downloaded);
908            let tot = DownloadProgress::format_size(total);
909
910            let elapsed = started_at.elapsed();
911            let remaining = total.saturating_sub(downloaded);
912
913            if remaining == 0 {
914                // Downloads done, waiting for extraction
915                info!(target: "reth::cli",
916                    archives = format_args!("{done}/{all}"),
917                    downloaded = %dl,
918                    "Extracting remaining archives"
919                );
920            } else {
921                let eta = if downloaded > 0 {
922                    let speed = downloaded as f64 / elapsed.as_secs_f64();
923                    if speed > 0.0 {
924                        DownloadProgress::format_duration(Duration::from_secs_f64(
925                            remaining as f64 / speed,
926                        ))
927                    } else {
928                        "??".to_string()
929                    }
930                } else {
931                    "??".to_string()
932                };
933
934                info!(target: "reth::cli",
935                    archives = format_args!("{done}/{all}"),
936                    progress = format_args!("{pct:.1}%"),
937                    downloaded = %dl,
938                    total = %tot,
939                    eta = %eta,
940                    "Downloading"
941                );
942            }
943        }
944
945        // Final line
946        let downloaded = progress.downloaded.load(Ordering::Relaxed);
947        let dl = DownloadProgress::format_size(downloaded);
948        let tot = DownloadProgress::format_size(progress.total_size);
949        let elapsed = DownloadProgress::format_duration(started_at.elapsed());
950        info!(target: "reth::cli",
951            downloaded = %dl,
952            total = %tot,
953            elapsed = %elapsed,
954            "Downloads complete"
955        );
956    })
957}
958
959/// Adapter to track progress while reading (used for extraction in legacy path)
960struct ProgressReader<R> {
961    reader: R,
962    progress: DownloadProgress,
963    cancel_token: CancellationToken,
964}
965
966impl<R: Read> ProgressReader<R> {
967    fn new(reader: R, total_size: u64, cancel_token: CancellationToken) -> Self {
968        Self { reader, progress: DownloadProgress::new(total_size), cancel_token }
969    }
970}
971
972impl<R: Read> Read for ProgressReader<R> {
973    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
974        if self.cancel_token.is_cancelled() {
975            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
976        }
977        let bytes = self.reader.read(buf)?;
978        if bytes > 0 &&
979            let Err(e) = self.progress.update(bytes as u64)
980        {
981            return Err(io::Error::other(e));
982        }
983        Ok(bytes)
984    }
985}
986
987/// Supported compression formats for snapshots
988#[derive(Debug, Clone, Copy)]
989enum CompressionFormat {
990    Lz4,
991    Zstd,
992}
993
994impl CompressionFormat {
995    /// Detect compression format from file extension
996    fn from_url(url: &str) -> Result<Self> {
997        let path =
998            Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
999
1000        if path.ends_with(EXTENSION_TAR_LZ4) {
1001            Ok(Self::Lz4)
1002        } else if path.ends_with(EXTENSION_TAR_ZSTD) {
1003            Ok(Self::Zstd)
1004        } else {
1005            Err(eyre::eyre!(
1006                "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
1007                path
1008            ))
1009        }
1010    }
1011}
1012
1013/// Extracts a compressed tar archive to the target directory with progress tracking.
1014fn extract_archive<R: Read>(
1015    reader: R,
1016    total_size: u64,
1017    format: CompressionFormat,
1018    target_dir: &Path,
1019    cancel_token: CancellationToken,
1020) -> Result<()> {
1021    let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
1022
1023    match format {
1024        CompressionFormat::Lz4 => {
1025            let decoder = Decoder::new(progress_reader)?;
1026            Archive::new(decoder).unpack(target_dir)?;
1027        }
1028        CompressionFormat::Zstd => {
1029            let decoder = ZstdDecoder::new(progress_reader)?;
1030            Archive::new(decoder).unpack(target_dir)?;
1031        }
1032    }
1033
1034    println!();
1035    Ok(())
1036}
1037
1038/// Extracts a compressed tar archive without progress tracking.
1039fn extract_archive_raw<R: Read>(
1040    reader: R,
1041    format: CompressionFormat,
1042    target_dir: &Path,
1043) -> Result<()> {
1044    match format {
1045        CompressionFormat::Lz4 => {
1046            Archive::new(Decoder::new(reader)?).unpack(target_dir)?;
1047        }
1048        CompressionFormat::Zstd => {
1049            Archive::new(ZstdDecoder::new(reader)?).unpack(target_dir)?;
1050        }
1051    }
1052    Ok(())
1053}
1054
1055/// Extracts a snapshot from a local file.
1056fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
1057    let file = std::fs::File::open(path)?;
1058    let total_size = file.metadata()?.len();
1059    info!(target: "reth::cli",
1060        file = %path.display(),
1061        size = %DownloadProgress::format_size(total_size),
1062        "Extracting local archive"
1063    );
1064    let start = Instant::now();
1065    extract_archive(file, total_size, format, target_dir, CancellationToken::new())?;
1066    info!(target: "reth::cli",
1067        file = %path.display(),
1068        elapsed = %DownloadProgress::format_duration(start.elapsed()),
1069        "Local extraction complete"
1070    );
1071    Ok(())
1072}
1073
1074const MAX_DOWNLOAD_RETRIES: u32 = 10;
1075const RETRY_BACKOFF_SECS: u64 = 5;
1076
1077/// Wrapper that tracks download progress while writing data.
1078/// Used with [`io::copy`] to display progress during downloads.
1079struct ProgressWriter<W> {
1080    inner: W,
1081    progress: DownloadProgress,
1082    cancel_token: CancellationToken,
1083}
1084
1085impl<W: Write> Write for ProgressWriter<W> {
1086    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1087        if self.cancel_token.is_cancelled() {
1088            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1089        }
1090        let n = self.inner.write(buf)?;
1091        let _ = self.progress.update(n as u64);
1092        Ok(n)
1093    }
1094
1095    fn flush(&mut self) -> io::Result<()> {
1096        self.inner.flush()
1097    }
1098}
1099
1100/// Wrapper that bumps a shared atomic counter while writing data.
1101/// Used for parallel downloads where a single display task shows aggregated progress.
1102struct SharedProgressWriter<W> {
1103    inner: W,
1104    progress: Arc<SharedProgress>,
1105}
1106
1107impl<W: Write> Write for SharedProgressWriter<W> {
1108    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1109        if self.progress.is_cancelled() {
1110            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1111        }
1112        let n = self.inner.write(buf)?;
1113        self.progress.add(n as u64);
1114        Ok(n)
1115    }
1116
1117    fn flush(&mut self) -> io::Result<()> {
1118        self.inner.flush()
1119    }
1120}
1121
1122/// Wrapper that bumps a shared atomic counter while reading data.
1123/// Used for streaming downloads where a single display task shows aggregated progress.
1124struct SharedProgressReader<R> {
1125    inner: R,
1126    progress: Arc<SharedProgress>,
1127}
1128
1129impl<R: Read> Read for SharedProgressReader<R> {
1130    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1131        if self.progress.is_cancelled() {
1132            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1133        }
1134        let n = self.inner.read(buf)?;
1135        self.progress.add(n as u64);
1136        Ok(n)
1137    }
1138}
1139
1140/// Downloads a file with resume support using HTTP Range requests.
1141/// Automatically retries on failure, resuming from where it left off.
1142/// Returns the path to the downloaded file and its total size.
1143///
1144/// When `shared` is provided, progress is reported to the shared counter
1145/// (for parallel downloads). Otherwise uses a local progress bar.
1146fn resumable_download(
1147    url: &str,
1148    target_dir: &Path,
1149    shared: Option<&Arc<SharedProgress>>,
1150    cancel_token: CancellationToken,
1151) -> Result<(PathBuf, u64)> {
1152    let file_name = Url::parse(url)
1153        .ok()
1154        .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
1155        .unwrap_or_else(|| "snapshot.tar".to_string());
1156
1157    let final_path = target_dir.join(&file_name);
1158    let part_path = target_dir.join(format!("{file_name}.part"));
1159
1160    let quiet = shared.is_some();
1161
1162    if !quiet {
1163        info!(target: "reth::cli", file = %file_name, "Connecting to download server");
1164    }
1165    let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
1166
1167    let mut total_size: Option<u64> = None;
1168    let mut last_error: Option<eyre::Error> = None;
1169
1170    let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
1171        fs::rename(&part_path, &final_path)?;
1172        if !quiet {
1173            info!(target: "reth::cli", file = %file_name, "Download complete");
1174        }
1175        Ok((final_path.clone(), size))
1176    };
1177
1178    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1179        let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1180
1181        if let Some(total) = total_size &&
1182            existing_size >= total
1183        {
1184            return finalize_download(total);
1185        }
1186
1187        if attempt > 1 {
1188            info!(target: "reth::cli",
1189                file = %file_name,
1190                "Retry attempt {}/{} - resuming from {} bytes",
1191                attempt, MAX_DOWNLOAD_RETRIES, existing_size
1192            );
1193        }
1194
1195        let mut request = client.get(url);
1196        if existing_size > 0 {
1197            request = request.header(RANGE, format!("bytes={existing_size}-"));
1198            if !quiet && attempt == 1 {
1199                info!(target: "reth::cli", file = %file_name, "Resuming from {} bytes", existing_size);
1200            }
1201        }
1202
1203        let response = match request.send().and_then(|r| r.error_for_status()) {
1204            Ok(r) => r,
1205            Err(e) => {
1206                last_error = Some(e.into());
1207                if attempt < MAX_DOWNLOAD_RETRIES {
1208                    info!(target: "reth::cli",
1209                        file = %file_name,
1210                        "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
1211                    );
1212                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1213                }
1214                continue;
1215            }
1216        };
1217
1218        let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
1219
1220        let size = if is_partial {
1221            response
1222                .headers()
1223                .get("Content-Range")
1224                .and_then(|v| v.to_str().ok())
1225                .and_then(|v| v.split('/').next_back())
1226                .and_then(|v| v.parse().ok())
1227        } else {
1228            response.content_length()
1229        };
1230
1231        if total_size.is_none() {
1232            total_size = size;
1233            if !quiet && let Some(s) = size {
1234                info!(target: "reth::cli",
1235                    file = %file_name,
1236                    size = %DownloadProgress::format_size(s),
1237                    "Downloading"
1238                );
1239            }
1240        }
1241
1242        let current_total = total_size.ok_or_else(|| {
1243            eyre::eyre!("Server did not provide Content-Length or Content-Range header")
1244        })?;
1245
1246        let file = if is_partial && existing_size > 0 {
1247            OpenOptions::new()
1248                .append(true)
1249                .open(&part_path)
1250                .map_err(|e| fs::FsPathError::open(e, &part_path))?
1251        } else {
1252            fs::create_file(&part_path)?
1253        };
1254
1255        let start_offset = if is_partial { existing_size } else { 0 };
1256        let mut reader = response;
1257
1258        let copy_result;
1259        let flush_result;
1260
1261        if let Some(sp) = shared {
1262            // Parallel path: bump shared atomic counter
1263            if start_offset > 0 {
1264                sp.add(start_offset);
1265            }
1266            let mut writer =
1267                SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
1268            copy_result = io::copy(&mut reader, &mut writer);
1269            flush_result = writer.inner.flush();
1270        } else {
1271            // Legacy single-download path: local progress bar
1272            let mut progress = DownloadProgress::new(current_total);
1273            progress.downloaded = start_offset;
1274            let mut writer = ProgressWriter {
1275                inner: BufWriter::new(file),
1276                progress,
1277                cancel_token: cancel_token.clone(),
1278            };
1279            copy_result = io::copy(&mut reader, &mut writer);
1280            flush_result = writer.inner.flush();
1281            println!();
1282        }
1283
1284        if let Err(e) = copy_result.and(flush_result) {
1285            last_error = Some(e.into());
1286            if attempt < MAX_DOWNLOAD_RETRIES {
1287                info!(target: "reth::cli",
1288                    file = %file_name,
1289                    "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
1290                );
1291                std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1292            }
1293            continue;
1294        }
1295
1296        return finalize_download(current_total);
1297    }
1298
1299    Err(last_error
1300        .unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
1301}
1302
1303/// Streams a remote archive directly into the extractor without writing to disk.
1304///
1305/// On failure, retries from scratch up to [`MAX_DOWNLOAD_RETRIES`] times.
1306fn streaming_download_and_extract(
1307    url: &str,
1308    format: CompressionFormat,
1309    target_dir: &Path,
1310    shared: Option<&Arc<SharedProgress>>,
1311    cancel_token: CancellationToken,
1312) -> Result<()> {
1313    let quiet = shared.is_some();
1314    let mut last_error: Option<eyre::Error> = None;
1315
1316    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1317        if attempt > 1 {
1318            info!(target: "reth::cli",
1319                url = %url,
1320                attempt,
1321                max = MAX_DOWNLOAD_RETRIES,
1322                "Retrying streaming download from scratch"
1323            );
1324        }
1325
1326        let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
1327
1328        let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
1329            Ok(r) => r,
1330            Err(e) => {
1331                last_error = Some(e.into());
1332                if attempt < MAX_DOWNLOAD_RETRIES {
1333                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1334                }
1335                continue;
1336            }
1337        };
1338
1339        if !quiet && let Some(size) = response.content_length() {
1340            info!(target: "reth::cli",
1341                url = %url,
1342                size = %DownloadProgress::format_size(size),
1343                "Streaming archive"
1344            );
1345        }
1346
1347        let result = if let Some(sp) = shared {
1348            let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
1349            extract_archive_raw(reader, format, target_dir)
1350        } else {
1351            let total_size = response.content_length().unwrap_or(0);
1352            extract_archive(response, total_size, format, target_dir, cancel_token.clone())
1353        };
1354
1355        match result {
1356            Ok(()) => return Ok(()),
1357            Err(e) => {
1358                last_error = Some(e);
1359                if attempt < MAX_DOWNLOAD_RETRIES {
1360                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1361                }
1362            }
1363        }
1364    }
1365
1366    Err(last_error.unwrap_or_else(|| {
1367        eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
1368    }))
1369}
1370
1371/// Fetches the snapshot from a remote URL with resume support, then extracts it.
1372fn download_and_extract(
1373    url: &str,
1374    format: CompressionFormat,
1375    target_dir: &Path,
1376    shared: Option<&Arc<SharedProgress>>,
1377    cancel_token: CancellationToken,
1378) -> Result<()> {
1379    let quiet = shared.is_some();
1380    let (downloaded_path, total_size) =
1381        resumable_download(url, target_dir, shared, cancel_token.clone())?;
1382
1383    let file_name =
1384        downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
1385
1386    if !quiet {
1387        info!(target: "reth::cli",
1388            file = %file_name,
1389            size = %DownloadProgress::format_size(total_size),
1390            "Extracting archive"
1391        );
1392    }
1393    let file = fs::open(&downloaded_path)?;
1394
1395    if quiet {
1396        // Skip progress tracking for extraction in parallel mode
1397        extract_archive_raw(file, format, target_dir)?;
1398    } else {
1399        extract_archive(file, total_size, format, target_dir, cancel_token)?;
1400        info!(target: "reth::cli",
1401            file = %file_name,
1402            "Extraction complete"
1403        );
1404    }
1405
1406    fs::remove_file(&downloaded_path)?;
1407
1408    if let Some(sp) = shared {
1409        sp.archive_done();
1410    }
1411
1412    Ok(())
1413}
1414
1415/// Downloads and extracts a snapshot, blocking until finished.
1416///
1417/// Supports `file://` URLs for local files and HTTP(S) URLs for remote downloads.
1418/// When `resumable` is true, downloads to a `.part` file first with HTTP Range resume
1419/// support. Otherwise streams directly into the extractor.
1420fn blocking_download_and_extract(
1421    url: &str,
1422    target_dir: &Path,
1423    shared: Option<Arc<SharedProgress>>,
1424    resumable: bool,
1425    cancel_token: CancellationToken,
1426) -> Result<()> {
1427    let format = CompressionFormat::from_url(url)?;
1428
1429    if let Ok(parsed_url) = Url::parse(url) &&
1430        parsed_url.scheme() == "file"
1431    {
1432        let file_path = parsed_url
1433            .to_file_path()
1434            .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
1435        let result = extract_from_file(&file_path, format, target_dir);
1436        if result.is_ok() &&
1437            let Some(sp) = shared
1438        {
1439            sp.archive_done();
1440        }
1441        result
1442    } else if resumable {
1443        download_and_extract(url, format, target_dir, shared.as_ref(), cancel_token)
1444    } else {
1445        let result =
1446            streaming_download_and_extract(url, format, target_dir, shared.as_ref(), cancel_token);
1447        if result.is_ok() &&
1448            let Some(sp) = shared
1449        {
1450            sp.archive_done();
1451        }
1452        result
1453    }
1454}
1455
1456/// Downloads and extracts a snapshot archive asynchronously.
1457///
1458/// When `shared` is provided, download progress is reported to the shared
1459/// counter for aggregated display. Otherwise uses a local progress bar.
1460/// When `resumable` is true, uses two-phase download with `.part` files.
1461async fn stream_and_extract(
1462    url: &str,
1463    target_dir: &Path,
1464    shared: Option<Arc<SharedProgress>>,
1465    resumable: bool,
1466    cancel_token: CancellationToken,
1467) -> Result<()> {
1468    let target_dir = target_dir.to_path_buf();
1469    let url = url.to_string();
1470    task::spawn_blocking(move || {
1471        blocking_download_and_extract(&url, &target_dir, shared, resumable, cancel_token)
1472    })
1473    .await??;
1474
1475    Ok(())
1476}
1477
1478async fn process_modular_archive(
1479    planned: PlannedArchive,
1480    target_dir: &Path,
1481    cache_dir: Option<&Path>,
1482    shared: Option<Arc<SharedProgress>>,
1483    resumable: bool,
1484    cancel_token: CancellationToken,
1485) -> Result<()> {
1486    let target_dir = target_dir.to_path_buf();
1487    let cache_dir = cache_dir.map(Path::to_path_buf);
1488
1489    task::spawn_blocking(move || {
1490        blocking_process_modular_archive(
1491            &planned,
1492            &target_dir,
1493            cache_dir.as_deref(),
1494            shared,
1495            resumable,
1496            cancel_token,
1497        )
1498    })
1499    .await??;
1500
1501    Ok(())
1502}
1503
1504fn blocking_process_modular_archive(
1505    planned: &PlannedArchive,
1506    target_dir: &Path,
1507    cache_dir: Option<&Path>,
1508    shared: Option<Arc<SharedProgress>>,
1509    resumable: bool,
1510    cancel_token: CancellationToken,
1511) -> Result<()> {
1512    let archive = &planned.archive;
1513    if verify_output_files(target_dir, &archive.output_files)? {
1514        if let Some(sp) = &shared {
1515            sp.add(archive.size);
1516            sp.archive_done();
1517        }
1518        info!(target: "reth::cli", file = %archive.file_name, component = %planned.component, "Skipping already verified plain files");
1519        return Ok(());
1520    }
1521
1522    let format = CompressionFormat::from_url(&archive.file_name)?;
1523    let mut last_error: Option<eyre::Error> = None;
1524    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1525        cleanup_output_files(target_dir, &archive.output_files);
1526
1527        if resumable {
1528            let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
1529            let archive_path = cache_dir.join(&archive.file_name);
1530            let part_path = cache_dir.join(format!("{}.part", archive.file_name));
1531            let result =
1532                resumable_download(&archive.url, cache_dir, shared.as_ref(), cancel_token.clone())
1533                    .and_then(|(downloaded_path, _)| {
1534                        let file = fs::open(&downloaded_path)?;
1535                        extract_archive_raw(file, format, target_dir)
1536                    });
1537            let _ = fs::remove_file(&archive_path);
1538            let _ = fs::remove_file(&part_path);
1539
1540            if let Err(e) = result {
1541                warn!(target: "reth::cli",
1542                    file = %archive.file_name,
1543                    component = %planned.component,
1544                    attempt,
1545                    err = %e,
1546                    "Download or extraction failed, retrying"
1547                );
1548                last_error = Some(e);
1549                if attempt < MAX_DOWNLOAD_RETRIES {
1550                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1551                }
1552                continue;
1553            }
1554        } else {
1555            // streaming_download_and_extract already has its own internal retry loop
1556            streaming_download_and_extract(
1557                &archive.url,
1558                format,
1559                target_dir,
1560                shared.as_ref(),
1561                cancel_token.clone(),
1562            )?;
1563        }
1564
1565        if verify_output_files(target_dir, &archive.output_files)? {
1566            if let Some(sp) = &shared {
1567                sp.archive_done();
1568            }
1569            return Ok(());
1570        }
1571
1572        warn!(target: "reth::cli", file = %archive.file_name, component = %planned.component, attempt, "Extracted files failed integrity checks, retrying");
1573    }
1574
1575    if let Some(e) = last_error {
1576        return Err(e.wrap_err(format!(
1577            "Failed after {} attempts for {}",
1578            MAX_DOWNLOAD_RETRIES, archive.file_name
1579        )));
1580    }
1581
1582    eyre::bail!(
1583        "Failed integrity validation after {} attempts for {}",
1584        MAX_DOWNLOAD_RETRIES,
1585        archive.file_name
1586    )
1587}
1588
1589fn verify_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) -> Result<bool> {
1590    if output_files.is_empty() {
1591        return Ok(false);
1592    }
1593
1594    for expected in output_files {
1595        let output_path = target_dir.join(&expected.path);
1596        let meta = match fs::metadata(&output_path) {
1597            Ok(meta) => meta,
1598            Err(_) => return Ok(false),
1599        };
1600        if meta.len() != expected.size {
1601            return Ok(false);
1602        }
1603
1604        let actual = file_blake3_hex(&output_path)?;
1605        if !actual.eq_ignore_ascii_case(&expected.blake3) {
1606            return Ok(false);
1607        }
1608    }
1609
1610    Ok(true)
1611}
1612
1613fn cleanup_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) {
1614    for output in output_files {
1615        let _ = fs::remove_file(target_dir.join(&output.path));
1616    }
1617}
1618
1619fn file_blake3_hex(path: &Path) -> Result<String> {
1620    let mut file = fs::open(path)?;
1621    let mut hasher = Hasher::new();
1622    let mut buf = [0_u8; 64 * 1024];
1623
1624    loop {
1625        let n = file.read(&mut buf)?;
1626        if n == 0 {
1627            break;
1628        }
1629        hasher.update(&buf[..n]);
1630    }
1631
1632    Ok(hasher.finalize().to_hex().to_string())
1633}
1634
1635/// Discovers the latest snapshot manifest URL for the given chain from the snapshots API.
1636///
1637/// Queries `snapshots.reth.rs/api/snapshots` and returns the manifest URL for the most
1638/// recent modular snapshot matching the requested chain.
1639async fn discover_manifest_url(chain_id: u64) -> Result<String> {
1640    let api_url = RETH_SNAPSHOTS_API_URL;
1641
1642    info!(target: "reth::cli", %api_url, %chain_id, "Discovering latest snapshot manifest");
1643
1644    let entries = fetch_snapshot_api_entries(chain_id).await?;
1645
1646    let entry =
1647        entries.iter().filter(|s| s.is_modular()).max_by_key(|s| s.block).ok_or_else(|| {
1648            eyre::eyre!(
1649                "No modular snapshot manifest found for chain \
1650                 {chain_id} at {api_url}\n\n\
1651                 You can provide a manifest URL directly with --manifest-url, or\n\
1652                 use a direct snapshot URL with -u from:\n\
1653                 \t- https://snapshots.reth.rs\n\n\
1654                 Use --list to see all available snapshots."
1655            )
1656        })?;
1657
1658    info!(target: "reth::cli",
1659        block = entry.block,
1660        url = %entry.metadata_url,
1661        "Found latest snapshot manifest"
1662    );
1663
1664    Ok(entry.metadata_url.clone())
1665}
1666
1667/// Deserializes a JSON value that may be either a number or a string-encoded number.
1668fn deserialize_string_or_u64<'de, D>(deserializer: D) -> std::result::Result<u64, D::Error>
1669where
1670    D: serde::Deserializer<'de>,
1671{
1672    use serde::Deserialize;
1673    let value = serde_json::Value::deserialize(deserializer)?;
1674    match &value {
1675        serde_json::Value::Number(n) => {
1676            n.as_u64().ok_or_else(|| serde::de::Error::custom("expected u64"))
1677        }
1678        serde_json::Value::String(s) => {
1679            s.parse::<u64>().map_err(|_| serde::de::Error::custom("expected numeric string"))
1680        }
1681        _ => Err(serde::de::Error::custom("expected number or string")),
1682    }
1683}
1684
1685/// An entry from the `snapshots.reth.rs/api/snapshots` listing.
1686#[derive(serde::Deserialize)]
1687#[serde(rename_all = "camelCase")]
1688struct SnapshotApiEntry {
1689    #[serde(deserialize_with = "deserialize_string_or_u64")]
1690    chain_id: u64,
1691    #[serde(deserialize_with = "deserialize_string_or_u64")]
1692    block: u64,
1693    #[serde(default)]
1694    date: Option<String>,
1695    #[serde(default)]
1696    profile: Option<String>,
1697    metadata_url: String,
1698    #[serde(default)]
1699    size: u64,
1700}
1701
1702impl SnapshotApiEntry {
1703    fn is_modular(&self) -> bool {
1704        self.metadata_url.ends_with("manifest.json")
1705    }
1706}
1707
1708/// Fetches the full snapshot listing from the snapshots API, filtered by chain ID.
1709async fn fetch_snapshot_api_entries(chain_id: u64) -> Result<Vec<SnapshotApiEntry>> {
1710    let api_url = RETH_SNAPSHOTS_API_URL;
1711
1712    let entries: Vec<SnapshotApiEntry> = Client::new()
1713        .get(api_url)
1714        .send()
1715        .await
1716        .and_then(|r| r.error_for_status())
1717        .wrap_err_with(|| format!("Failed to fetch snapshot listing from {api_url}"))?
1718        .json()
1719        .await?;
1720
1721    Ok(entries.into_iter().filter(|e| e.chain_id == chain_id).collect())
1722}
1723
1724/// Prints a formatted table of available modular snapshots.
1725fn print_snapshot_listing(entries: &[SnapshotApiEntry], chain_id: u64) {
1726    let modular: Vec<_> = entries.iter().filter(|e| e.is_modular()).collect();
1727
1728    println!("Available snapshots for chain {chain_id} (https://snapshots.reth.rs):\n");
1729    println!("{:<12}  {:>10}  {:<10}  {:>10}  MANIFEST URL", "DATE", "BLOCK", "PROFILE", "SIZE");
1730    println!("{}", "-".repeat(100));
1731
1732    for entry in &modular {
1733        let date = entry.date.as_deref().unwrap_or("-");
1734        let profile = entry.profile.as_deref().unwrap_or("-");
1735        let size = if entry.size > 0 {
1736            DownloadProgress::format_size(entry.size)
1737        } else {
1738            "-".to_string()
1739        };
1740
1741        println!(
1742            "{date:<12}  {:>10}  {profile:<10}  {size:>10}  {}",
1743            entry.block, entry.metadata_url
1744        );
1745    }
1746
1747    if modular.is_empty() {
1748        println!("  (no modular snapshots found)");
1749    }
1750
1751    println!(
1752        "\nTo download a specific snapshot, copy its manifest URL and run:\n  \
1753         reth download --manifest-url <URL>"
1754    );
1755}
1756
1757async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
1758    if let Ok(parsed) = Url::parse(source) {
1759        return match parsed.scheme() {
1760            "http" | "https" => {
1761                let response = Client::new()
1762                    .get(source)
1763                    .send()
1764                    .await
1765                    .and_then(|r| r.error_for_status())
1766                    .wrap_err_with(|| {
1767                        format!(
1768                            "Failed to fetch snapshot manifest from {source}\n\n\
1769                             The manifest endpoint may not be available for this snapshot source.\n\
1770                             You can use a direct snapshot URL instead:\n\n\
1771                             \treth download -u <snapshot-url>\n\n\
1772                             Available snapshot sources:\n\
1773                             \t- https://snapshots.reth.rs\n\
1774                             \t- https://publicnode.com/snapshots"
1775                        )
1776                    })?;
1777                Ok(response.json().await?)
1778            }
1779            "file" => {
1780                let path = parsed
1781                    .to_file_path()
1782                    .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1783                let content = fs::read_to_string(path)?;
1784                Ok(serde_json::from_str(&content)?)
1785            }
1786            _ => Err(eyre::eyre!("Unsupported manifest URL scheme: {}", parsed.scheme())),
1787        };
1788    }
1789
1790    let content = fs::read_to_string(source)?;
1791    Ok(serde_json::from_str(&content)?)
1792}
1793
1794fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Result<String> {
1795    if let Some(base_url) = manifest.base_url.as_deref() &&
1796        !base_url.is_empty()
1797    {
1798        return Ok(base_url.trim_end_matches('/').to_string());
1799    }
1800
1801    if let Ok(mut url) = Url::parse(source) {
1802        if url.scheme() == "file" {
1803            let mut path = url
1804                .to_file_path()
1805                .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1806            path.pop();
1807            let mut base = Url::from_directory_path(path)
1808                .map_err(|_| eyre::eyre!("Invalid manifest directory for source: {source}"))?
1809                .to_string();
1810            if base.ends_with('/') {
1811                base.pop();
1812            }
1813            return Ok(base);
1814        }
1815
1816        {
1817            let mut segments = url
1818                .path_segments_mut()
1819                .map_err(|_| eyre::eyre!("manifest_url must have a hierarchical path"))?;
1820            segments.pop_if_empty();
1821            segments.pop();
1822        }
1823        return Ok(url.as_str().trim_end_matches('/').to_string());
1824    }
1825
1826    let path = Path::new(source);
1827    let manifest_dir = if path.is_absolute() {
1828        path.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1829    } else {
1830        let joined = std::env::current_dir()?.join(path);
1831        joined.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1832    };
1833    let mut base = Url::from_directory_path(&manifest_dir)
1834        .map_err(|_| eyre::eyre!("Invalid manifest directory: {}", manifest_dir.display()))?
1835        .to_string();
1836    if base.ends_with('/') {
1837        base.pop();
1838    }
1839    Ok(base)
1840}
1841
1842#[cfg(test)]
1843mod tests {
1844    use super::*;
1845    use manifest::{ComponentManifest, SingleArchive};
1846    use tempfile::tempdir;
1847
1848    fn manifest_with_archive_only_components() -> SnapshotManifest {
1849        let mut components = BTreeMap::new();
1850        components.insert(
1851            SnapshotComponentType::TransactionSenders.key().to_string(),
1852            ComponentManifest::Single(SingleArchive {
1853                file: "transaction_senders.tar.zst".to_string(),
1854                size: 1,
1855                blake3: None,
1856                output_files: vec![],
1857            }),
1858        );
1859        components.insert(
1860            SnapshotComponentType::RocksdbIndices.key().to_string(),
1861            ComponentManifest::Single(SingleArchive {
1862                file: "rocksdb_indices.tar.zst".to_string(),
1863                size: 1,
1864                blake3: None,
1865                output_files: vec![],
1866            }),
1867        );
1868        SnapshotManifest {
1869            block: 0,
1870            chain_id: 1,
1871            storage_version: 2,
1872            timestamp: 0,
1873            base_url: Some("https://example.com".to_string()),
1874            reth_version: None,
1875            components,
1876        }
1877    }
1878
1879    #[test]
1880    fn test_download_defaults_builder() {
1881        let defaults = DownloadDefaults::default()
1882            .with_snapshot("https://example.com/snapshots (example)")
1883            .with_base_url("https://example.com");
1884
1885        assert_eq!(defaults.default_base_url, "https://example.com");
1886        assert_eq!(defaults.available_snapshots.len(), 3); // 2 defaults + 1 added
1887    }
1888
1889    #[test]
1890    fn test_download_defaults_replace_snapshots() {
1891        let defaults = DownloadDefaults::default().with_snapshots(vec![
1892            Cow::Borrowed("https://custom1.com"),
1893            Cow::Borrowed("https://custom2.com"),
1894        ]);
1895
1896        assert_eq!(defaults.available_snapshots.len(), 2);
1897        assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
1898    }
1899
1900    #[test]
1901    fn test_long_help_generation() {
1902        let defaults = DownloadDefaults::default();
1903        let help = defaults.long_help();
1904
1905        assert!(help.contains("Available snapshot sources:"));
1906        assert!(help.contains("snapshots.reth.rs"));
1907        assert!(help.contains("publicnode.com"));
1908        assert!(help.contains("file://"));
1909    }
1910
1911    #[test]
1912    fn test_long_help_override() {
1913        let custom_help = "This is custom help text for downloading snapshots.";
1914        let defaults = DownloadDefaults::default().with_long_help(custom_help);
1915
1916        let help = defaults.long_help();
1917        assert_eq!(help, custom_help);
1918        assert!(!help.contains("Available snapshot sources:"));
1919    }
1920
1921    #[test]
1922    fn test_builder_chaining() {
1923        let defaults = DownloadDefaults::default()
1924            .with_base_url("https://custom.example.com")
1925            .with_snapshot("https://snapshot1.com")
1926            .with_snapshot("https://snapshot2.com")
1927            .with_long_help("Custom help for snapshots");
1928
1929        assert_eq!(defaults.default_base_url, "https://custom.example.com");
1930        assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
1931        assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
1932    }
1933
1934    #[test]
1935    fn test_compression_format_detection() {
1936        assert!(matches!(
1937            CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
1938            Ok(CompressionFormat::Lz4)
1939        ));
1940        assert!(matches!(
1941            CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
1942            Ok(CompressionFormat::Zstd)
1943        ));
1944        assert!(matches!(
1945            CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
1946            Ok(CompressionFormat::Lz4)
1947        ));
1948        assert!(matches!(
1949            CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
1950            Ok(CompressionFormat::Zstd)
1951        ));
1952        assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
1953    }
1954
1955    #[test]
1956    fn inject_archive_only_components_for_archive_selection() {
1957        let manifest = manifest_with_archive_only_components();
1958        let mut selections = BTreeMap::new();
1959        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1960        selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1961        selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1962        selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1963
1964        inject_archive_only_components(&mut selections, &manifest, true);
1965
1966        assert_eq!(
1967            selections.get(&SnapshotComponentType::TransactionSenders),
1968            Some(&ComponentSelection::All)
1969        );
1970        assert_eq!(
1971            selections.get(&SnapshotComponentType::RocksdbIndices),
1972            Some(&ComponentSelection::All)
1973        );
1974    }
1975
1976    #[test]
1977    fn inject_archive_only_components_without_rocksdb() {
1978        let manifest = manifest_with_archive_only_components();
1979        let mut selections = BTreeMap::new();
1980        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1981        selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1982        selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1983        selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1984
1985        inject_archive_only_components(&mut selections, &manifest, false);
1986
1987        assert_eq!(
1988            selections.get(&SnapshotComponentType::TransactionSenders),
1989            Some(&ComponentSelection::All)
1990        );
1991        assert_eq!(selections.get(&SnapshotComponentType::RocksdbIndices), None);
1992    }
1993
1994    #[test]
1995    fn should_reset_index_stage_checkpoints_without_rocksdb_indices() {
1996        let mut selections = BTreeMap::new();
1997        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1998        assert!(should_reset_index_stage_checkpoints(&selections));
1999
2000        selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
2001        assert!(!should_reset_index_stage_checkpoints(&selections));
2002    }
2003
2004    #[test]
2005    fn summarize_download_startup_counts_reusable_and_needs_download() {
2006        let dir = tempdir().unwrap();
2007        let target_dir = dir.path();
2008        let ok_file = target_dir.join("ok.bin");
2009        std::fs::write(&ok_file, vec![1_u8; 4]).unwrap();
2010        let ok_hash = file_blake3_hex(&ok_file).unwrap();
2011
2012        let planned = vec![
2013            PlannedArchive {
2014                ty: SnapshotComponentType::State,
2015                component: "State".to_string(),
2016                archive: ArchiveDescriptor {
2017                    url: "https://example.com/ok.tar.zst".to_string(),
2018                    file_name: "ok.tar.zst".to_string(),
2019                    size: 10,
2020                    blake3: None,
2021                    output_files: vec![OutputFileChecksum {
2022                        path: "ok.bin".to_string(),
2023                        size: 4,
2024                        blake3: ok_hash,
2025                    }],
2026                },
2027            },
2028            PlannedArchive {
2029                ty: SnapshotComponentType::Headers,
2030                component: "Headers".to_string(),
2031                archive: ArchiveDescriptor {
2032                    url: "https://example.com/missing.tar.zst".to_string(),
2033                    file_name: "missing.tar.zst".to_string(),
2034                    size: 10,
2035                    blake3: None,
2036                    output_files: vec![OutputFileChecksum {
2037                        path: "missing.bin".to_string(),
2038                        size: 1,
2039                        blake3: "deadbeef".to_string(),
2040                    }],
2041                },
2042            },
2043            PlannedArchive {
2044                ty: SnapshotComponentType::Transactions,
2045                component: "Transactions".to_string(),
2046                archive: ArchiveDescriptor {
2047                    url: "https://example.com/bad-size.tar.zst".to_string(),
2048                    file_name: "bad-size.tar.zst".to_string(),
2049                    size: 10,
2050                    blake3: None,
2051                    output_files: vec![],
2052                },
2053            },
2054        ];
2055
2056        let summary = summarize_download_startup(&planned, target_dir).unwrap();
2057        assert_eq!(summary.reusable, 1);
2058        assert_eq!(summary.needs_download, 2);
2059    }
2060
2061    #[test]
2062    fn archive_priority_prefers_state_then_rocksdb() {
2063        let mut planned = [
2064            PlannedArchive {
2065                ty: SnapshotComponentType::Transactions,
2066                component: "Transactions".to_string(),
2067                archive: ArchiveDescriptor {
2068                    url: "u3".to_string(),
2069                    file_name: "t.tar.zst".to_string(),
2070                    size: 1,
2071                    blake3: None,
2072                    output_files: vec![OutputFileChecksum {
2073                        path: "a".to_string(),
2074                        size: 1,
2075                        blake3: "x".to_string(),
2076                    }],
2077                },
2078            },
2079            PlannedArchive {
2080                ty: SnapshotComponentType::RocksdbIndices,
2081                component: "RocksDB Indices".to_string(),
2082                archive: ArchiveDescriptor {
2083                    url: "u2".to_string(),
2084                    file_name: "rocksdb_indices.tar.zst".to_string(),
2085                    size: 1,
2086                    blake3: None,
2087                    output_files: vec![OutputFileChecksum {
2088                        path: "b".to_string(),
2089                        size: 1,
2090                        blake3: "y".to_string(),
2091                    }],
2092                },
2093            },
2094            PlannedArchive {
2095                ty: SnapshotComponentType::State,
2096                component: "State (mdbx)".to_string(),
2097                archive: ArchiveDescriptor {
2098                    url: "u1".to_string(),
2099                    file_name: "state.tar.zst".to_string(),
2100                    size: 1,
2101                    blake3: None,
2102                    output_files: vec![OutputFileChecksum {
2103                        path: "c".to_string(),
2104                        size: 1,
2105                        blake3: "z".to_string(),
2106                    }],
2107                },
2108            },
2109        ];
2110
2111        planned.sort_by(|a, b| {
2112            archive_priority_rank(a.ty)
2113                .cmp(&archive_priority_rank(b.ty))
2114                .then_with(|| a.component.cmp(&b.component))
2115                .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
2116        });
2117
2118        assert_eq!(planned[0].ty, SnapshotComponentType::State);
2119        assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
2120        assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
2121    }
2122}