Skip to main content

reth_cli_commands/download/
mod.rs

1pub mod config_gen;
2pub mod manifest;
3pub mod manifest_cmd;
4mod tui;
5
6use crate::common::EnvironmentArgs;
7use blake3::Hasher;
8use clap::{builder::RangedU64ValueParser, Parser};
9use config_gen::{config_for_selections, write_config};
10use eyre::{Result, WrapErr};
11use futures::stream::{self, StreamExt};
12use lz4::Decoder;
13use manifest::{
14    ArchiveDescriptor, ComponentSelection, OutputFileChecksum, SnapshotComponentType,
15    SnapshotManifest,
16};
17use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
18use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
19use reth_cli::chainspec::ChainSpecParser;
20use reth_cli_util::cancellation::CancellationToken;
21use reth_db::{init_db, Database};
22use reth_db_api::transaction::DbTx;
23use reth_fs_util as fs;
24use reth_node_core::args::DefaultPruningValues;
25use reth_prune_types::PruneMode;
26use std::{
27    borrow::Cow,
28    collections::BTreeMap,
29    fs::OpenOptions,
30    io::{self, BufWriter, Read, Write},
31    path::{Path, PathBuf},
32    sync::{
33        atomic::{AtomicBool, AtomicU64, Ordering},
34        Arc, OnceLock,
35    },
36    time::{Duration, Instant},
37};
38use tar::Archive;
39use tokio::task;
40use tracing::{info, warn};
41use tui::{run_selector, SelectorOutput};
42use url::Url;
43use zstd::stream::read::Decoder as ZstdDecoder;
44
45const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
46const RETH_SNAPSHOTS_BASE_URL: &str = "https://snapshots-r2.reth.rs";
47const RETH_SNAPSHOTS_API_URL: &str = "https://snapshots.reth.rs/api/snapshots";
48const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
49const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
50const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
51const STATIC_FILES_PREFIX: &str = "static_files/";
52
53/// Maximum number of concurrent archive downloads.
54const MAX_CONCURRENT_DOWNLOADS: usize = 8;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub(crate) enum SelectionPreset {
58    Minimal,
59    Full,
60    Archive,
61}
62
63#[derive(Debug)]
64struct ResolvedComponents {
65    selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
66    preset: Option<SelectionPreset>,
67}
68
69/// Global static download defaults
70static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
71
72/// Download configuration defaults
73///
74/// Global defaults can be set via [`DownloadDefaults::try_init`].
75#[derive(Debug, Clone)]
76pub struct DownloadDefaults {
77    /// List of available snapshot sources
78    pub available_snapshots: Vec<Cow<'static, str>>,
79    /// Default base URL for snapshots
80    pub default_base_url: Cow<'static, str>,
81    /// Default base URL for chain-aware snapshots.
82    ///
83    /// When set, the chain ID is appended to form the full URL: `{base_url}/{chain_id}`.
84    /// For example, given a base URL of `https://snapshots.example.com` and chain ID `1`,
85    /// the resulting URL would be `https://snapshots.example.com/1`.
86    ///
87    /// Falls back to [`default_base_url`](Self::default_base_url) when `None`.
88    pub default_chain_aware_base_url: Option<Cow<'static, str>>,
89    /// URL for the snapshot discovery API that lists available snapshots.
90    ///
91    /// Defaults to `https://snapshots.reth.rs/api/snapshots`.
92    pub snapshot_api_url: Cow<'static, str>,
93    /// Optional custom long help text that overrides the generated help
94    pub long_help: Option<String>,
95}
96
97impl DownloadDefaults {
98    /// Initialize the global download defaults with this configuration
99    pub fn try_init(self) -> Result<(), Self> {
100        DOWNLOAD_DEFAULTS.set(self)
101    }
102
103    /// Get a reference to the global download defaults
104    pub fn get_global() -> &'static DownloadDefaults {
105        DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
106    }
107
108    /// Default download configuration with defaults from snapshots.reth.rs and publicnode
109    pub fn default_download_defaults() -> Self {
110        Self {
111            available_snapshots: vec![
112                Cow::Borrowed("https://snapshots.reth.rs (default)"),
113                Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
114            ],
115            default_base_url: Cow::Borrowed(RETH_SNAPSHOTS_BASE_URL),
116            default_chain_aware_base_url: None,
117            snapshot_api_url: Cow::Borrowed(RETH_SNAPSHOTS_API_URL),
118            long_help: None,
119        }
120    }
121
122    /// Generates the long help text for the download URL argument using these defaults.
123    ///
124    /// If a custom long_help is set, it will be returned. Otherwise, help text is generated
125    /// from the available_snapshots list.
126    pub fn long_help(&self) -> String {
127        if let Some(ref custom_help) = self.long_help {
128            return custom_help.clone();
129        }
130
131        let mut help = format!(
132            "Specify a snapshot URL or let the command propose a default one.\n\n\
133             Browse available snapshots at {}\n\
134             or use --list-snapshots to see them from the CLI.\n\nAvailable snapshot sources:\n",
135            self.snapshot_api_url.trim_end_matches("/api/snapshots"),
136        );
137
138        for source in &self.available_snapshots {
139            help.push_str("- ");
140            help.push_str(source);
141            help.push('\n');
142        }
143
144        help.push_str(
145            "\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
146        );
147        help.push_str(
148            self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
149        );
150        help.push_str(
151            ".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
152        );
153        help
154    }
155
156    /// Add a snapshot source to the list
157    pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
158        self.available_snapshots.push(source.into());
159        self
160    }
161
162    /// Replace all snapshot sources
163    pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
164        self.available_snapshots = sources;
165        self
166    }
167
168    /// Set the default base URL, e.g. `https://downloads.merkle.io`.
169    pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
170        self.default_base_url = url.into();
171        self
172    }
173
174    /// Set the default chain-aware base URL.
175    pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
176        self.default_chain_aware_base_url = Some(url.into());
177        self
178    }
179
180    /// Set the snapshot discovery API URL.
181    pub fn with_snapshot_api_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
182        self.snapshot_api_url = url.into();
183        self
184    }
185
186    /// Builder: Set custom long help text, overriding the generated help
187    pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
188        self.long_help = Some(help.into());
189        self
190    }
191}
192
193impl Default for DownloadDefaults {
194    fn default() -> Self {
195        Self::default_download_defaults()
196    }
197}
198
199/// CLI command that downloads snapshot archives and configures a reth node from them.
200#[derive(Debug, Parser)]
201pub struct DownloadCommand<C: ChainSpecParser> {
202    #[command(flatten)]
203    env: EnvironmentArgs<C>,
204
205    /// Custom URL to download a single snapshot archive (legacy mode).
206    ///
207    /// When provided, downloads and extracts a single archive without component selection.
208    /// Browse available snapshots with --list-snapshots.
209    #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
210    url: Option<String>,
211
212    /// URL to a snapshot manifest.json for modular component downloads.
213    ///
214    /// When provided, fetches this manifest instead of discovering it from the default
215    /// base URL. Useful for testing with custom or local manifests.
216    #[arg(long, value_name = "URL", conflicts_with = "url")]
217    manifest_url: Option<String>,
218
219    /// Local path to a snapshot manifest.json for modular component downloads.
220    #[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
221    manifest_path: Option<PathBuf>,
222
223    /// Include all transaction static files.
224    #[arg(long, conflicts_with_all = ["with_txs_since", "with_txs_distance", "minimal", "full", "archive"])]
225    with_txs: bool,
226
227    /// Include transaction static files starting at the specified block.
228    #[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_txs", "with_txs_distance", "minimal", "full", "archive"])]
229    with_txs_since: Option<u64>,
230
231    /// Include transaction static files covering the last N blocks.
232    #[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_txs", "with_txs_since", "minimal", "full", "archive"])]
233    with_txs_distance: Option<u64>,
234
235    /// Include all receipt static files.
236    #[arg(long, conflicts_with_all = ["with_receipts_since", "with_receipts_distance", "minimal", "full", "archive"])]
237    with_receipts: bool,
238
239    /// Include receipt static files starting at the specified block.
240    #[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_receipts", "with_receipts_distance", "minimal", "full", "archive"])]
241    with_receipts_since: Option<u64>,
242
243    /// Include receipt static files covering the last N blocks.
244    #[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_receipts", "with_receipts_since", "minimal", "full", "archive"])]
245    with_receipts_distance: Option<u64>,
246
247    /// Include all account and storage history static files.
248    #[arg(long, alias = "with-changesets", conflicts_with_all = ["with_state_history_since", "with_state_history_distance", "minimal", "full", "archive"])]
249    with_state_history: bool,
250
251    /// Include account and storage history static files starting at the specified block.
252    #[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_state_history", "with_state_history_distance", "minimal", "full", "archive"])]
253    with_state_history_since: Option<u64>,
254
255    /// Include account and storage history static files covering the last N blocks.
256    #[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_state_history", "with_state_history_since", "minimal", "full", "archive"])]
257    with_state_history_distance: Option<u64>,
258
259    /// Include transaction sender static files. Requires `--with-txs`.
260    #[arg(long, requires = "with_txs", conflicts_with_all = ["minimal", "full", "archive"])]
261    with_senders: bool,
262
263    /// Include RocksDB index files.
264    #[arg(long, conflicts_with_all = ["minimal", "full", "archive", "without_rocksdb"])]
265    with_rocksdb: bool,
266
267    /// Download all available components (archive node, no pruning).
268    #[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "minimal", "full"])]
269    archive: bool,
270
271    /// Download the minimal component set (same default as --non-interactive).
272    #[arg(long, conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "archive", "full"])]
273    minimal: bool,
274
275    /// Download the full node component set (matches default full prune settings).
276    #[arg(long, conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "archive", "minimal"])]
277    full: bool,
278
279    /// Skip optional RocksDB indices even when archive components are selected.
280    ///
281    /// This affects `--archive`/`--all` and TUI archive preset (`a`).
282    #[arg(long, conflicts_with_all = ["url", "with_rocksdb"])]
283    without_rocksdb: bool,
284
285    /// Skip interactive component selection. Downloads the minimal set
286    /// (state + headers + transactions + changesets) unless explicit --with-* flags narrow it.
287    #[arg(long, short = 'y')]
288    non_interactive: bool,
289
290    /// Enable resumable two-phase downloads (download to disk first, then extract).
291    ///
292    /// Archives are downloaded to a `.part` file with HTTP Range resume support
293    /// before extraction. This is enabled by default because it tolerates
294    /// network interruptions without restarting. Pass `--resumable=false` to
295    /// stream archives directly into the extractor instead.
296    #[arg(long, default_value_t = true, num_args = 0..=1, default_missing_value = "true")]
297    resumable: bool,
298
299    /// Maximum number of concurrent modular archive workers.
300    #[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
301    download_concurrency: usize,
302
303    /// List available snapshots and exit.
304    ///
305    /// Queries the snapshots API and prints all available snapshots for the selected chain,
306    /// including block number, size, and manifest URL.
307    #[arg(long, alias = "list-snapshots", conflicts_with_all = ["url", "manifest_url", "manifest_path"])]
308    list: bool,
309}
310
311impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
312    pub async fn execute<N>(self) -> Result<()> {
313        let chain = self.env.chain.chain();
314        let chain_id = chain.id();
315        let data_dir = self.env.datadir.clone().resolve_datadir(chain);
316        fs::create_dir_all(&data_dir)?;
317
318        let cancel_token = CancellationToken::new();
319        let _cancel_guard = cancel_token.drop_guard();
320
321        // --list: print available snapshots and exit
322        if self.list {
323            let entries = fetch_snapshot_api_entries(chain_id).await?;
324            print_snapshot_listing(&entries, chain_id);
325            return Ok(());
326        }
327
328        // Resolve custom static files directory (None when using the default).
329        let default_sf = data_dir.data_dir().join("static_files");
330        let custom_sf = data_dir.static_files();
331        let static_files_dir = if custom_sf != default_sf { Some(custom_sf) } else { None };
332
333        // Legacy single-URL mode: download one archive and extract it
334        if let Some(ref url) = self.url {
335            info!(target: "reth::cli",
336                dir = ?data_dir.data_dir(),
337                url = %url,
338                "Starting snapshot download and extraction"
339            );
340
341            stream_and_extract(
342                url,
343                data_dir.data_dir(),
344                static_files_dir,
345                None,
346                self.resumable,
347                cancel_token.clone(),
348            )
349            .await?;
350            info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
351
352            return Ok(());
353        }
354
355        // Modular download: fetch manifest and select components
356        let manifest_source = self.resolve_manifest_source(chain_id).await?;
357
358        info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
359        let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
360        manifest.base_url = Some(resolve_manifest_base_url(&manifest, &manifest_source)?);
361
362        info!(target: "reth::cli",
363            block = manifest.block,
364            chain_id = manifest.chain_id,
365            storage_version = %manifest.storage_version,
366            components = manifest.components.len(),
367            "Loaded snapshot manifest"
368        );
369
370        let ResolvedComponents { mut selections, preset } = self.resolve_components(&manifest)?;
371
372        if matches!(preset, Some(SelectionPreset::Archive)) {
373            inject_archive_only_components(&mut selections, &manifest, !self.without_rocksdb);
374        }
375
376        // Collect all archive descriptors across selected components.
377        let target_dir = data_dir.data_dir();
378        let mut all_downloads: Vec<PlannedArchive> = Vec::new();
379        for (ty, sel) in &selections {
380            let distance = match sel {
381                ComponentSelection::All => None,
382                ComponentSelection::Distance(d) => Some(*d),
383                ComponentSelection::Since(block) => Some(manifest.block - block + 1),
384                ComponentSelection::None => continue,
385            };
386            let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
387            let name = ty.display_name().to_string();
388
389            if !descriptors.is_empty() {
390                info!(target: "reth::cli",
391                    component = %name,
392                    archives = descriptors.len(),
393                    selection = %sel,
394                    "Queued component for download"
395                );
396            }
397
398            for descriptor in descriptors {
399                if descriptor.output_files.is_empty() {
400                    eyre::bail!(
401                        "Invalid modular manifest: {} is missing plain output checksum metadata",
402                        descriptor.file_name
403                    );
404                }
405                all_downloads.push(PlannedArchive {
406                    ty: *ty,
407                    component: name.clone(),
408                    archive: descriptor,
409                });
410            }
411        }
412
413        all_downloads.sort_by(|a, b| {
414            archive_priority_rank(a.ty)
415                .cmp(&archive_priority_rank(b.ty))
416                .then_with(|| a.component.cmp(&b.component))
417                .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
418        });
419
420        let download_cache_dir = if self.resumable {
421            let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
422            fs::create_dir_all(&dir)?;
423            Some(dir)
424        } else {
425            None
426        };
427
428        let total_archives = all_downloads.len();
429        let total_size: u64 = selections
430            .iter()
431            .map(|(ty, sel)| match sel {
432                ComponentSelection::All => manifest.size_for_distance(*ty, None),
433                ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
434                ComponentSelection::Since(block) => {
435                    manifest.size_for_distance(*ty, Some(manifest.block - block + 1))
436                }
437                ComponentSelection::None => 0,
438            })
439            .sum();
440
441        let startup_summary =
442            summarize_download_startup(&all_downloads, target_dir, static_files_dir.as_deref())?;
443        info!(target: "reth::cli",
444            reusable = startup_summary.reusable,
445            needs_download = startup_summary.needs_download,
446            "Startup integrity summary (plain output files)"
447        );
448
449        info!(target: "reth::cli",
450            archives = total_archives,
451            total = %DownloadProgress::format_size(total_size),
452            "Downloading all archives"
453        );
454
455        let shared = SharedProgress::new(total_size, total_archives as u64, cancel_token.clone());
456        let progress_handle = spawn_progress_display(Arc::clone(&shared));
457
458        let target = target_dir.to_path_buf();
459        let sf_dir = static_files_dir;
460        let cache_dir = download_cache_dir;
461        let resumable = self.resumable;
462        let download_concurrency = self.download_concurrency.max(1);
463        let results: Vec<Result<()>> = stream::iter(all_downloads)
464            .map(|planned| {
465                let dir = target.clone();
466                let sf = sf_dir.clone();
467                let cache = cache_dir.clone();
468                let sp = Arc::clone(&shared);
469                let ct = cancel_token.clone();
470                async move {
471                    process_modular_archive(
472                        planned,
473                        &dir,
474                        sf.as_deref(),
475                        cache.as_deref(),
476                        Some(sp),
477                        resumable,
478                        ct,
479                    )
480                    .await?;
481                    Ok(())
482                }
483            })
484            .buffer_unordered(download_concurrency)
485            .collect()
486            .await;
487
488        shared.done.store(true, Ordering::Relaxed);
489        let _ = progress_handle.await;
490
491        // Check for errors
492        for result in results {
493            result?;
494        }
495
496        // Generate reth.toml and set prune checkpoints
497        let config =
498            config_for_selections(&selections, &manifest, preset, Some(self.env.chain.as_ref()));
499        if write_config(&config, target_dir)? {
500            let desc = config_gen::describe_prune_config(&config);
501            info!(target: "reth::cli", "{}", desc.join(", "));
502        }
503
504        // Open the DB to write checkpoints
505        let db_path = data_dir.db();
506        let db = init_db(&db_path, self.env.db.database_args())?;
507
508        // Write prune checkpoints to the DB so the pruner knows data before the
509        // snapshot block is already in the expected pruned state
510        let should_write_prune = config.prune.segments != Default::default();
511        let should_reset_indices = should_reset_index_stage_checkpoints(&selections);
512        if should_write_prune || should_reset_indices {
513            let tx = db.tx_mut()?;
514
515            if should_write_prune {
516                config_gen::write_prune_checkpoints_tx(&tx, &config, manifest.block)?;
517            }
518
519            // Reset stage checkpoints for history indexing stages only if RocksDB
520            // indices weren't downloaded. When archive snapshots include the
521            // optional RocksDB indices component, we preserve source checkpoints.
522            if should_reset_indices {
523                config_gen::reset_index_stage_checkpoints_tx(&tx)?;
524            }
525
526            tx.commit()?;
527        }
528
529        let start_command = startup_node_command::<C>(self.env.chain.as_ref());
530        info!(target: "reth::cli", "Snapshot download complete. Run `{}` to start syncing.", start_command);
531
532        Ok(())
533    }
534
535    /// Determines which components to download based on CLI flags or interactive selection.
536    fn resolve_components(&self, manifest: &SnapshotManifest) -> Result<ResolvedComponents> {
537        let available = |ty: SnapshotComponentType| manifest.component(ty).is_some();
538
539        // --archive/--all: everything available as All
540        if self.archive {
541            return Ok(ResolvedComponents {
542                selections: SnapshotComponentType::ALL
543                    .iter()
544                    .copied()
545                    .filter(|ty| available(*ty))
546                    .filter(|ty| {
547                        !self.without_rocksdb || *ty != SnapshotComponentType::RocksdbIndices
548                    })
549                    .map(|ty| (ty, ComponentSelection::All))
550                    .collect(),
551                preset: Some(SelectionPreset::Archive),
552            });
553        }
554
555        if self.full {
556            return Ok(ResolvedComponents {
557                selections: self.full_preset_selections(manifest),
558                preset: Some(SelectionPreset::Full),
559            });
560        }
561
562        if self.minimal {
563            return Ok(ResolvedComponents {
564                selections: self.minimal_preset_selections(manifest),
565                preset: Some(SelectionPreset::Minimal),
566            });
567        }
568
569        let has_explicit_flags = self.with_txs ||
570            self.with_txs_since.is_some() ||
571            self.with_txs_distance.is_some() ||
572            self.with_receipts ||
573            self.with_receipts_since.is_some() ||
574            self.with_receipts_distance.is_some() ||
575            self.with_state_history ||
576            self.with_state_history_since.is_some() ||
577            self.with_state_history_distance.is_some() ||
578            self.with_senders ||
579            self.with_rocksdb;
580
581        if has_explicit_flags {
582            let mut selections = BTreeMap::new();
583            let tx_selection = explicit_component_selection(
584                "--with-txs-since",
585                self.with_txs,
586                self.with_txs_since,
587                self.with_txs_distance,
588                manifest.block,
589            )?;
590            let receipt_selection = explicit_component_selection(
591                "--with-receipts-since",
592                self.with_receipts,
593                self.with_receipts_since,
594                self.with_receipts_distance,
595                manifest.block,
596            )?;
597            let state_history_selection = explicit_component_selection(
598                "--with-state-history-since",
599                self.with_state_history,
600                self.with_state_history_since,
601                self.with_state_history_distance,
602                manifest.block,
603            )?;
604
605            // Required components always All
606            if available(SnapshotComponentType::State) {
607                selections.insert(SnapshotComponentType::State, ComponentSelection::All);
608            }
609            if available(SnapshotComponentType::Headers) {
610                selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
611            }
612            if let Some(selection) = tx_selection &&
613                available(SnapshotComponentType::Transactions)
614            {
615                selections.insert(SnapshotComponentType::Transactions, selection);
616            }
617            if let Some(selection) = receipt_selection &&
618                available(SnapshotComponentType::Receipts)
619            {
620                selections.insert(SnapshotComponentType::Receipts, selection);
621            }
622            if let Some(selection) = state_history_selection {
623                if available(SnapshotComponentType::AccountChangesets) {
624                    selections.insert(SnapshotComponentType::AccountChangesets, selection);
625                }
626                if available(SnapshotComponentType::StorageChangesets) {
627                    selections.insert(SnapshotComponentType::StorageChangesets, selection);
628                }
629            }
630            if self.with_senders && available(SnapshotComponentType::TransactionSenders) {
631                selections
632                    .insert(SnapshotComponentType::TransactionSenders, ComponentSelection::All);
633            }
634            if self.with_rocksdb && available(SnapshotComponentType::RocksdbIndices) {
635                selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
636            }
637            return Ok(ResolvedComponents { selections, preset: None });
638        }
639
640        if self.non_interactive {
641            return Ok(ResolvedComponents {
642                selections: self.minimal_preset_selections(manifest),
643                preset: Some(SelectionPreset::Minimal),
644            });
645        }
646
647        // Interactive TUI
648        let full_preset = self.full_preset_selections(manifest);
649        let SelectorOutput { selections, preset } = run_selector(manifest.clone(), &full_preset)?;
650        let selected =
651            selections.into_iter().filter(|(_, sel)| *sel != ComponentSelection::None).collect();
652
653        Ok(ResolvedComponents { selections: selected, preset })
654    }
655
656    fn minimal_preset_selections(
657        &self,
658        manifest: &SnapshotManifest,
659    ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
660        SnapshotComponentType::ALL
661            .iter()
662            .copied()
663            .filter(|ty| manifest.component(*ty).is_some())
664            .map(|ty| (ty, ty.minimal_selection()))
665            .collect()
666    }
667
668    fn full_preset_selections(
669        &self,
670        manifest: &SnapshotManifest,
671    ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
672        let mut selections = BTreeMap::new();
673
674        for ty in [
675            SnapshotComponentType::State,
676            SnapshotComponentType::Headers,
677            SnapshotComponentType::Transactions,
678            SnapshotComponentType::Receipts,
679            SnapshotComponentType::AccountChangesets,
680            SnapshotComponentType::StorageChangesets,
681            SnapshotComponentType::TransactionSenders,
682            SnapshotComponentType::RocksdbIndices,
683        ] {
684            if manifest.component(ty).is_none() {
685                continue;
686            }
687
688            let selection = self.full_selection_for_component(ty, manifest.block);
689            if selection != ComponentSelection::None {
690                selections.insert(ty, selection);
691            }
692        }
693
694        selections
695    }
696
697    fn full_selection_for_component(
698        &self,
699        ty: SnapshotComponentType,
700        snapshot_block: u64,
701    ) -> ComponentSelection {
702        let defaults = DefaultPruningValues::get_global();
703        match ty {
704            SnapshotComponentType::State | SnapshotComponentType::Headers => {
705                ComponentSelection::All
706            }
707            SnapshotComponentType::Transactions => {
708                if defaults.full_bodies_history_use_pre_merge {
709                    match self
710                        .env
711                        .chain
712                        .ethereum_fork_activation(EthereumHardfork::Paris)
713                        .block_number()
714                    {
715                        Some(paris) if snapshot_block >= paris => ComponentSelection::Since(paris),
716                        Some(_) => ComponentSelection::None,
717                        None => ComponentSelection::All,
718                    }
719                } else {
720                    selection_from_prune_mode(
721                        defaults.full_prune_modes.bodies_history,
722                        snapshot_block,
723                    )
724                }
725            }
726            SnapshotComponentType::Receipts => {
727                selection_from_prune_mode(defaults.full_prune_modes.receipts, snapshot_block)
728            }
729            SnapshotComponentType::AccountChangesets => {
730                selection_from_prune_mode(defaults.full_prune_modes.account_history, snapshot_block)
731            }
732            SnapshotComponentType::StorageChangesets => {
733                selection_from_prune_mode(defaults.full_prune_modes.storage_history, snapshot_block)
734            }
735            SnapshotComponentType::TransactionSenders => {
736                selection_from_prune_mode(defaults.full_prune_modes.sender_recovery, snapshot_block)
737            }
738            // Keep hidden by default in full mode; if users want indices they can use archive.
739            SnapshotComponentType::RocksdbIndices => ComponentSelection::None,
740        }
741    }
742
743    async fn resolve_manifest_source(&self, chain_id: u64) -> Result<String> {
744        if let Some(path) = &self.manifest_path {
745            return Ok(path.display().to_string());
746        }
747
748        match &self.manifest_url {
749            Some(url) => Ok(url.clone()),
750            None => discover_manifest_url(chain_id).await,
751        }
752    }
753}
754
755fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> ComponentSelection {
756    match mode {
757        None => ComponentSelection::All,
758        Some(PruneMode::Full) => ComponentSelection::None,
759        Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
760        Some(PruneMode::Before(block)) => {
761            if snapshot_block >= block {
762                ComponentSelection::Since(block)
763            } else {
764                ComponentSelection::None
765            }
766        }
767    }
768}
769
770fn explicit_component_selection(
771    since_flag_name: &str,
772    all: bool,
773    since: Option<u64>,
774    distance: Option<u64>,
775    snapshot_block: u64,
776) -> Result<Option<ComponentSelection>> {
777    if all {
778        return Ok(Some(ComponentSelection::All));
779    }
780
781    if let Some(since) = since {
782        if since > snapshot_block {
783            eyre::bail!("{since_flag_name} {since} is beyond the snapshot block {snapshot_block}");
784        }
785        return Ok(Some(ComponentSelection::Since(since)));
786    }
787
788    Ok(distance.map(ComponentSelection::Distance))
789}
790
791/// If all data components (txs, receipts, changesets) are `All`, automatically
792/// include hidden archive-only components when available in the manifest.
793fn inject_archive_only_components(
794    selections: &mut BTreeMap<SnapshotComponentType, ComponentSelection>,
795    manifest: &SnapshotManifest,
796    include_rocksdb: bool,
797) {
798    let is_all =
799        |ty: SnapshotComponentType| selections.get(&ty).copied() == Some(ComponentSelection::All);
800
801    let is_archive = is_all(SnapshotComponentType::Transactions) &&
802        is_all(SnapshotComponentType::Receipts) &&
803        is_all(SnapshotComponentType::AccountChangesets) &&
804        is_all(SnapshotComponentType::StorageChangesets);
805
806    if !is_archive {
807        return;
808    }
809
810    for component in
811        [SnapshotComponentType::TransactionSenders, SnapshotComponentType::RocksdbIndices]
812    {
813        if component == SnapshotComponentType::RocksdbIndices && !include_rocksdb {
814            continue;
815        }
816
817        if manifest.component(component).is_some() {
818            selections.insert(component, ComponentSelection::All);
819        }
820    }
821}
822
823fn should_reset_index_stage_checkpoints(
824    selections: &BTreeMap<SnapshotComponentType, ComponentSelection>,
825) -> bool {
826    !matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
827}
828
829fn startup_node_command<C>(chain_spec: &C::ChainSpec) -> String
830where
831    C: ChainSpecParser,
832    C::ChainSpec: EthChainSpec,
833{
834    startup_node_command_for_binary::<C>(&current_binary_name(), chain_spec)
835}
836
837fn startup_node_command_for_binary<C>(binary_name: &str, chain_spec: &C::ChainSpec) -> String
838where
839    C: ChainSpecParser,
840    C::ChainSpec: EthChainSpec,
841{
842    let mut command = format!("{binary_name} node");
843
844    if let Some(chain_arg) = startup_chain_arg::<C>(chain_spec) {
845        command.push_str(" --chain ");
846        command.push_str(&chain_arg);
847    }
848
849    command
850}
851
852fn current_binary_name() -> String {
853    std::env::args_os()
854        .next()
855        .map(PathBuf::from)
856        .and_then(|path| path.file_stem().map(|name| name.to_owned()))
857        .and_then(|name| name.into_string().ok())
858        .filter(|name| !name.is_empty())
859        .unwrap_or_else(|| "reth".to_string())
860}
861
862fn startup_chain_arg<C>(chain_spec: &C::ChainSpec) -> Option<String>
863where
864    C: ChainSpecParser,
865    C::ChainSpec: EthChainSpec,
866{
867    let current_chain = chain_spec.chain();
868    let current_genesis_hash = chain_spec.genesis_hash();
869    let default_chain = C::default_value().and_then(|chain_name| C::parse(chain_name).ok());
870
871    if default_chain.as_ref().is_some_and(|default_chain| {
872        default_chain.chain() == current_chain &&
873            default_chain.genesis_hash() == current_genesis_hash
874    }) {
875        return None;
876    }
877
878    C::SUPPORTED_CHAINS
879        .iter()
880        .find_map(|chain_name| {
881            let parsed_chain = C::parse(chain_name).ok()?;
882            (parsed_chain.chain() == current_chain &&
883                parsed_chain.genesis_hash() == current_genesis_hash)
884                .then(|| (*chain_name).to_string())
885        })
886        .or_else(|| Some("<chain-or-chainspec>".to_string()))
887}
888
889impl<C: ChainSpecParser> DownloadCommand<C> {
890    /// Returns the underlying chain being used to run this command
891    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
892        Some(&self.env.chain)
893    }
894}
895
896/// Tracks download progress and throttles display updates to every 100ms.
897pub(crate) struct DownloadProgress {
898    downloaded: u64,
899    total_size: u64,
900    last_displayed: Instant,
901    started_at: Instant,
902}
903
904#[derive(Debug, Clone)]
905struct PlannedArchive {
906    ty: SnapshotComponentType,
907    component: String,
908    archive: ArchiveDescriptor,
909}
910
911const fn archive_priority_rank(ty: SnapshotComponentType) -> u8 {
912    match ty {
913        SnapshotComponentType::State => 0,
914        SnapshotComponentType::RocksdbIndices => 1,
915        _ => 2,
916    }
917}
918
919#[derive(Debug, Default, Clone, Copy)]
920struct DownloadStartupSummary {
921    reusable: usize,
922    needs_download: usize,
923}
924
925fn summarize_download_startup(
926    all_downloads: &[PlannedArchive],
927    target_dir: &Path,
928    static_files_dir: Option<&Path>,
929) -> Result<DownloadStartupSummary> {
930    let mut summary = DownloadStartupSummary::default();
931
932    for planned in all_downloads {
933        if verify_output_files(target_dir, static_files_dir, &planned.archive.output_files)? {
934            summary.reusable += 1;
935        } else {
936            summary.needs_download += 1;
937        }
938    }
939
940    Ok(summary)
941}
942
943impl DownloadProgress {
944    /// Creates new progress tracker with given total size
945    fn new(total_size: u64) -> Self {
946        let now = Instant::now();
947        Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
948    }
949
950    /// Converts bytes to human readable format (B, KB, MB, GB)
951    pub(crate) fn format_size(size: u64) -> String {
952        let mut size = size as f64;
953        let mut unit_index = 0;
954
955        while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
956            size /= 1024.0;
957            unit_index += 1;
958        }
959
960        format!("{:.2} {}", size, BYTE_UNITS[unit_index])
961    }
962
963    /// Format duration as human readable string
964    fn format_duration(duration: Duration) -> String {
965        let secs = duration.as_secs();
966        if secs < 60 {
967            format!("{secs}s")
968        } else if secs < 3600 {
969            format!("{}m {}s", secs / 60, secs % 60)
970        } else {
971            format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
972        }
973    }
974
975    /// Updates progress bar (for single-archive legacy downloads)
976    fn update(&mut self, chunk_size: u64) -> Result<()> {
977        self.downloaded += chunk_size;
978
979        if self.last_displayed.elapsed() >= Duration::from_millis(100) {
980            let formatted_downloaded = Self::format_size(self.downloaded);
981            let formatted_total = Self::format_size(self.total_size);
982            let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
983
984            let elapsed = self.started_at.elapsed();
985            let eta = if self.downloaded > 0 {
986                let remaining = self.total_size.saturating_sub(self.downloaded);
987                let speed = self.downloaded as f64 / elapsed.as_secs_f64();
988                if speed > 0.0 {
989                    Duration::from_secs_f64(remaining as f64 / speed)
990                } else {
991                    Duration::ZERO
992                }
993            } else {
994                Duration::ZERO
995            };
996            let eta_str = Self::format_duration(eta);
997
998            print!(
999                "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str}     ",
1000            );
1001            io::stdout().flush()?;
1002            self.last_displayed = Instant::now();
1003        }
1004
1005        Ok(())
1006    }
1007}
1008
1009/// Shared progress counter for parallel downloads.
1010///
1011/// Each download thread atomically increments `downloaded`. A single display
1012/// task on the main thread reads the counter periodically and prints one
1013/// aggregated progress line.
1014struct SharedProgress {
1015    downloaded: AtomicU64,
1016    total_size: u64,
1017    total_archives: u64,
1018    archives_done: AtomicU64,
1019    done: AtomicBool,
1020    cancel_token: CancellationToken,
1021}
1022
1023impl SharedProgress {
1024    fn new(total_size: u64, total_archives: u64, cancel_token: CancellationToken) -> Arc<Self> {
1025        Arc::new(Self {
1026            downloaded: AtomicU64::new(0),
1027            total_size,
1028            total_archives,
1029            archives_done: AtomicU64::new(0),
1030            done: AtomicBool::new(false),
1031            cancel_token,
1032        })
1033    }
1034
1035    fn is_cancelled(&self) -> bool {
1036        self.cancel_token.is_cancelled()
1037    }
1038
1039    fn add(&self, bytes: u64) {
1040        self.downloaded.fetch_add(bytes, Ordering::Relaxed);
1041    }
1042
1043    fn archive_done(&self) {
1044        self.archives_done.fetch_add(1, Ordering::Relaxed);
1045    }
1046}
1047
1048/// Spawns a background task that prints aggregated download progress.
1049/// Returns a handle; drop it (or call `.abort()`) to stop.
1050fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
1051    tokio::spawn(async move {
1052        let started_at = Instant::now();
1053        let mut prev_downloaded = 0u64;
1054        let mut prev_time = started_at;
1055        let mut interval = tokio::time::interval(Duration::from_secs(3));
1056        interval.tick().await; // first tick is immediate, skip it
1057        loop {
1058            interval.tick().await;
1059
1060            if progress.done.load(Ordering::Relaxed) {
1061                break;
1062            }
1063
1064            let downloaded = progress.downloaded.load(Ordering::Relaxed);
1065            let total = progress.total_size;
1066            if total == 0 {
1067                continue;
1068            }
1069
1070            let done = progress.archives_done.load(Ordering::Relaxed);
1071            let all = progress.total_archives;
1072            let pct = (downloaded as f64 / total as f64) * 100.0;
1073            let dl = DownloadProgress::format_size(downloaded);
1074            let tot = DownloadProgress::format_size(total);
1075
1076            let remaining = total.saturating_sub(downloaded);
1077
1078            if remaining == 0 {
1079                // Downloads done, waiting for extraction
1080                info!(target: "reth::cli",
1081                    archives = format_args!("{done}/{all}"),
1082                    downloaded = %dl,
1083                    "Extracting remaining archives"
1084                );
1085            } else {
1086                let now = Instant::now();
1087                let dt = now.duration_since(prev_time).as_secs_f64();
1088                let speed = if dt > 0.0 {
1089                    (downloaded.saturating_sub(prev_downloaded)) as f64 / dt
1090                } else {
1091                    0.0
1092                };
1093                prev_downloaded = downloaded;
1094                prev_time = now;
1095
1096                let eta = if speed > 0.0 {
1097                    DownloadProgress::format_duration(Duration::from_secs_f64(
1098                        remaining as f64 / speed,
1099                    ))
1100                } else {
1101                    "??".to_string()
1102                };
1103
1104                info!(target: "reth::cli",
1105                    archives = format_args!("{done}/{all}"),
1106                    progress = format_args!("{pct:.1}%"),
1107                    downloaded = %dl,
1108                    total = %tot,
1109                    eta = %eta,
1110                    "Downloading"
1111                );
1112            }
1113        }
1114
1115        // Final line
1116        let downloaded = progress.downloaded.load(Ordering::Relaxed);
1117        let dl = DownloadProgress::format_size(downloaded);
1118        let tot = DownloadProgress::format_size(progress.total_size);
1119        let elapsed = DownloadProgress::format_duration(started_at.elapsed());
1120        info!(target: "reth::cli",
1121            downloaded = %dl,
1122            total = %tot,
1123            elapsed = %elapsed,
1124            "Downloads complete"
1125        );
1126    })
1127}
1128
1129/// Adapter to track progress while reading (used for extraction in legacy path)
1130struct ProgressReader<R> {
1131    reader: R,
1132    progress: DownloadProgress,
1133    cancel_token: CancellationToken,
1134}
1135
1136impl<R: Read> ProgressReader<R> {
1137    fn new(reader: R, total_size: u64, cancel_token: CancellationToken) -> Self {
1138        Self { reader, progress: DownloadProgress::new(total_size), cancel_token }
1139    }
1140}
1141
1142impl<R: Read> Read for ProgressReader<R> {
1143    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1144        if self.cancel_token.is_cancelled() {
1145            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1146        }
1147        let bytes = self.reader.read(buf)?;
1148        if bytes > 0 &&
1149            let Err(e) = self.progress.update(bytes as u64)
1150        {
1151            return Err(io::Error::other(e));
1152        }
1153        Ok(bytes)
1154    }
1155}
1156
1157/// Supported compression formats for snapshots
1158#[derive(Debug, Clone, Copy)]
1159enum CompressionFormat {
1160    Lz4,
1161    Zstd,
1162}
1163
1164impl CompressionFormat {
1165    /// Detect compression format from file extension
1166    fn from_url(url: &str) -> Result<Self> {
1167        let path =
1168            Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
1169
1170        if path.ends_with(EXTENSION_TAR_LZ4) {
1171            Ok(Self::Lz4)
1172        } else if path.ends_with(EXTENSION_TAR_ZSTD) {
1173            Ok(Self::Zstd)
1174        } else {
1175            Err(eyre::eyre!(
1176                "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
1177                path
1178            ))
1179        }
1180    }
1181}
1182
1183/// Resolves the filesystem path for an output file, remapping `static_files/`
1184/// entries to the custom static files directory when one is configured.
1185fn resolve_output_path(
1186    target_dir: &Path,
1187    relative_path: &str,
1188    static_files_dir: Option<&Path>,
1189) -> PathBuf {
1190    if let Some(sf_dir) = static_files_dir &&
1191        let Some(rest) = relative_path.strip_prefix(STATIC_FILES_PREFIX)
1192    {
1193        return sf_dir.join(rest);
1194    }
1195    target_dir.join(relative_path)
1196}
1197
1198/// Unpacks a tar archive entry-by-entry, remapping `static_files/` paths to
1199/// the custom static files directory when one is configured.
1200fn unpack_archive_with_remap<R: Read>(
1201    archive: &mut Archive<R>,
1202    target_dir: &Path,
1203    static_files_dir: Option<&Path>,
1204) -> Result<()> {
1205    // Fast path: no remapping needed, use standard unpack.
1206    let Some(sf_dir) = static_files_dir else {
1207        archive.unpack(target_dir)?;
1208        return Ok(());
1209    };
1210
1211    fs::create_dir_all(sf_dir)?;
1212
1213    for entry in archive.entries()? {
1214        let mut entry = entry?;
1215        let path = entry.path()?.into_owned();
1216        let path_str = path.to_string_lossy();
1217
1218        if let Some(rest) = path_str.strip_prefix(STATIC_FILES_PREFIX) {
1219            if rest.is_empty() {
1220                // Directory entry for `static_files/` itself — skip, we
1221                // already created the custom directory above.
1222                continue;
1223            }
1224            let dest = sf_dir.join(rest);
1225            if let Some(parent) = dest.parent() {
1226                fs::create_dir_all(parent)?;
1227            }
1228            entry.unpack(&dest)?;
1229        } else if path_str == "static_files" {
1230            // Bare directory entry without trailing slash.
1231            continue;
1232        } else {
1233            let dest = target_dir.join(&path);
1234            if let Some(parent) = dest.parent() {
1235                fs::create_dir_all(parent)?;
1236            }
1237            entry.unpack(&dest)?;
1238        }
1239    }
1240    Ok(())
1241}
1242
1243/// Extracts a compressed tar archive to the target directory with progress tracking.
1244fn extract_archive<R: Read>(
1245    reader: R,
1246    total_size: u64,
1247    format: CompressionFormat,
1248    target_dir: &Path,
1249    static_files_dir: Option<&Path>,
1250    cancel_token: CancellationToken,
1251) -> Result<()> {
1252    let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
1253
1254    match format {
1255        CompressionFormat::Lz4 => {
1256            let decoder = Decoder::new(progress_reader)?;
1257            unpack_archive_with_remap(&mut Archive::new(decoder), target_dir, static_files_dir)?;
1258        }
1259        CompressionFormat::Zstd => {
1260            let decoder = ZstdDecoder::new(progress_reader)?;
1261            unpack_archive_with_remap(&mut Archive::new(decoder), target_dir, static_files_dir)?;
1262        }
1263    }
1264
1265    println!();
1266    Ok(())
1267}
1268
1269/// Extracts a compressed tar archive without progress tracking.
1270fn extract_archive_raw<R: Read>(
1271    reader: R,
1272    format: CompressionFormat,
1273    target_dir: &Path,
1274    static_files_dir: Option<&Path>,
1275) -> Result<()> {
1276    match format {
1277        CompressionFormat::Lz4 => {
1278            unpack_archive_with_remap(
1279                &mut Archive::new(Decoder::new(reader)?),
1280                target_dir,
1281                static_files_dir,
1282            )?;
1283        }
1284        CompressionFormat::Zstd => {
1285            unpack_archive_with_remap(
1286                &mut Archive::new(ZstdDecoder::new(reader)?),
1287                target_dir,
1288                static_files_dir,
1289            )?;
1290        }
1291    }
1292    Ok(())
1293}
1294
1295/// Extracts a snapshot from a local file.
1296fn extract_from_file(
1297    path: &Path,
1298    format: CompressionFormat,
1299    target_dir: &Path,
1300    static_files_dir: Option<&Path>,
1301) -> Result<()> {
1302    let file = std::fs::File::open(path)?;
1303    let total_size = file.metadata()?.len();
1304    info!(target: "reth::cli",
1305        file = %path.display(),
1306        size = %DownloadProgress::format_size(total_size),
1307        "Extracting local archive"
1308    );
1309    let start = Instant::now();
1310    extract_archive(
1311        file,
1312        total_size,
1313        format,
1314        target_dir,
1315        static_files_dir,
1316        CancellationToken::new(),
1317    )?;
1318    info!(target: "reth::cli",
1319        file = %path.display(),
1320        elapsed = %DownloadProgress::format_duration(start.elapsed()),
1321        "Local extraction complete"
1322    );
1323    Ok(())
1324}
1325
1326const MAX_DOWNLOAD_RETRIES: u32 = 10;
1327const RETRY_BACKOFF_SECS: u64 = 5;
1328
1329/// Wrapper that tracks download progress while writing data.
1330/// Used with [`io::copy`] to display progress during downloads.
1331struct ProgressWriter<W> {
1332    inner: W,
1333    progress: DownloadProgress,
1334    cancel_token: CancellationToken,
1335}
1336
1337impl<W: Write> Write for ProgressWriter<W> {
1338    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1339        if self.cancel_token.is_cancelled() {
1340            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1341        }
1342        let n = self.inner.write(buf)?;
1343        let _ = self.progress.update(n as u64);
1344        Ok(n)
1345    }
1346
1347    fn flush(&mut self) -> io::Result<()> {
1348        self.inner.flush()
1349    }
1350}
1351
1352/// Wrapper that bumps a shared atomic counter while writing data.
1353/// Used for parallel downloads where a single display task shows aggregated progress.
1354struct SharedProgressWriter<W> {
1355    inner: W,
1356    progress: Arc<SharedProgress>,
1357}
1358
1359impl<W: Write> Write for SharedProgressWriter<W> {
1360    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1361        if self.progress.is_cancelled() {
1362            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1363        }
1364        let n = self.inner.write(buf)?;
1365        self.progress.add(n as u64);
1366        Ok(n)
1367    }
1368
1369    fn flush(&mut self) -> io::Result<()> {
1370        self.inner.flush()
1371    }
1372}
1373
1374/// Wrapper that bumps a shared atomic counter while reading data.
1375/// Used for streaming downloads where a single display task shows aggregated progress.
1376struct SharedProgressReader<R> {
1377    inner: R,
1378    progress: Arc<SharedProgress>,
1379}
1380
1381impl<R: Read> Read for SharedProgressReader<R> {
1382    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1383        if self.progress.is_cancelled() {
1384            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1385        }
1386        let n = self.inner.read(buf)?;
1387        self.progress.add(n as u64);
1388        Ok(n)
1389    }
1390}
1391
1392/// Downloads a file with resume support using HTTP Range requests.
1393/// Automatically retries on failure, resuming from where it left off.
1394/// Returns the path to the downloaded file and its total size.
1395///
1396/// When `shared` is provided, progress is reported to the shared counter
1397/// (for parallel downloads). Otherwise uses a local progress bar.
1398fn resumable_download(
1399    url: &str,
1400    target_dir: &Path,
1401    shared: Option<&Arc<SharedProgress>>,
1402    cancel_token: CancellationToken,
1403) -> Result<(PathBuf, u64)> {
1404    let file_name = Url::parse(url)
1405        .ok()
1406        .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
1407        .unwrap_or_else(|| "snapshot.tar".to_string());
1408
1409    let final_path = target_dir.join(&file_name);
1410    let part_path = target_dir.join(format!("{file_name}.part"));
1411
1412    let quiet = shared.is_some();
1413
1414    if !quiet {
1415        info!(target: "reth::cli", file = %file_name, "Connecting to download server");
1416    }
1417    let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
1418
1419    let mut total_size: Option<u64> = None;
1420    let mut retries: u32 = 0;
1421
1422    let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
1423        fs::rename(&part_path, &final_path)?;
1424        if !quiet {
1425            info!(target: "reth::cli", file = %file_name, "Download complete");
1426        }
1427        Ok((final_path.clone(), size))
1428    };
1429
1430    loop {
1431        let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1432
1433        if let Some(total) = total_size &&
1434            existing_size >= total
1435        {
1436            return finalize_download(total);
1437        }
1438
1439        if retries > 0 {
1440            info!(target: "reth::cli",
1441                file = %file_name,
1442                retries,
1443                "Resuming download from {} bytes", existing_size
1444            );
1445        }
1446
1447        let mut request = client.get(url);
1448        if existing_size > 0 {
1449            request = request.header(RANGE, format!("bytes={existing_size}-"));
1450            if !quiet && retries == 0 {
1451                info!(target: "reth::cli", file = %file_name, "Resuming from {} bytes", existing_size);
1452            }
1453        }
1454
1455        let response = match request.send().and_then(|r| r.error_for_status()) {
1456            Ok(r) => r,
1457            Err(e) => {
1458                retries += 1;
1459                if retries >= MAX_DOWNLOAD_RETRIES {
1460                    return Err(e.into());
1461                }
1462                warn!(target: "reth::cli",
1463                    file = %file_name,
1464                    %e,
1465                    "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
1466                );
1467                std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1468                continue;
1469            }
1470        };
1471
1472        let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
1473
1474        let size = if is_partial {
1475            response
1476                .headers()
1477                .get("Content-Range")
1478                .and_then(|v| v.to_str().ok())
1479                .and_then(|v| v.split('/').next_back())
1480                .and_then(|v| v.parse().ok())
1481        } else {
1482            response.content_length()
1483        };
1484
1485        if total_size.is_none() {
1486            total_size = size;
1487            if !quiet && let Some(s) = size {
1488                info!(target: "reth::cli",
1489                    file = %file_name,
1490                    size = %DownloadProgress::format_size(s),
1491                    "Downloading"
1492                );
1493            }
1494        }
1495
1496        let current_total = total_size.ok_or_else(|| {
1497            eyre::eyre!("Server did not provide Content-Length or Content-Range header")
1498        })?;
1499
1500        let file = if is_partial && existing_size > 0 {
1501            OpenOptions::new()
1502                .append(true)
1503                .open(&part_path)
1504                .map_err(|e| fs::FsPathError::open(e, &part_path))?
1505        } else {
1506            fs::create_file(&part_path)?
1507        };
1508
1509        let start_offset = if is_partial { existing_size } else { 0 };
1510        let mut reader = response;
1511
1512        let copy_result;
1513        let flush_result;
1514
1515        if let Some(sp) = shared {
1516            // Parallel path: bump shared atomic counter
1517            if start_offset > 0 {
1518                sp.add(start_offset);
1519            }
1520            let mut writer =
1521                SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
1522            copy_result = io::copy(&mut reader, &mut writer);
1523            flush_result = writer.inner.flush();
1524        } else {
1525            // Legacy single-download path: local progress bar
1526            let mut progress = DownloadProgress::new(current_total);
1527            progress.downloaded = start_offset;
1528            let mut writer = ProgressWriter {
1529                inner: BufWriter::new(file),
1530                progress,
1531                cancel_token: cancel_token.clone(),
1532            };
1533            copy_result = io::copy(&mut reader, &mut writer);
1534            flush_result = writer.inner.flush();
1535            println!();
1536        }
1537
1538        match copy_result.and(flush_result) {
1539            Err(e) => {
1540                // Check if any new data was written since we started this attempt.
1541                // If so, the connection was productive — reset the consecutive failure
1542                // counter so transient mid-stream errors don't exhaust retries.
1543                let new_size = std::fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1544                if new_size > existing_size {
1545                    retries = 0;
1546                } else {
1547                    retries += 1;
1548                }
1549
1550                if retries >= MAX_DOWNLOAD_RETRIES {
1551                    return Err(e.into());
1552                }
1553
1554                warn!(target: "reth::cli",
1555                    file = %file_name,
1556                    %e,
1557                    "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
1558                );
1559                std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1560                continue;
1561            }
1562            Ok(_) => return finalize_download(current_total),
1563        }
1564    }
1565}
1566
1567/// Streams a remote archive directly into the extractor without writing to disk.
1568///
1569/// On failure, retries from scratch up to [`MAX_DOWNLOAD_RETRIES`] times.
1570fn streaming_download_and_extract(
1571    url: &str,
1572    format: CompressionFormat,
1573    target_dir: &Path,
1574    static_files_dir: Option<&Path>,
1575    shared: Option<&Arc<SharedProgress>>,
1576    cancel_token: CancellationToken,
1577) -> Result<()> {
1578    let quiet = shared.is_some();
1579    let mut last_error: Option<eyre::Error> = None;
1580
1581    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1582        if attempt > 1 {
1583            info!(target: "reth::cli",
1584                url = %url,
1585                attempt,
1586                max = MAX_DOWNLOAD_RETRIES,
1587                "Retrying streaming download from scratch"
1588            );
1589        }
1590
1591        let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
1592
1593        let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
1594            Ok(r) => r,
1595            Err(e) => {
1596                let err = eyre::Error::from(e);
1597                if attempt < MAX_DOWNLOAD_RETRIES {
1598                    warn!(target: "reth::cli",
1599                        url = %url,
1600                        attempt,
1601                        max = MAX_DOWNLOAD_RETRIES,
1602                        err = %err,
1603                        "Streaming request failed, retrying"
1604                    );
1605                }
1606                last_error = Some(err);
1607                if attempt < MAX_DOWNLOAD_RETRIES {
1608                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1609                }
1610                continue;
1611            }
1612        };
1613
1614        if !quiet && let Some(size) = response.content_length() {
1615            info!(target: "reth::cli",
1616                url = %url,
1617                size = %DownloadProgress::format_size(size),
1618                "Streaming archive"
1619            );
1620        }
1621
1622        let result = if let Some(sp) = shared {
1623            let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
1624            extract_archive_raw(reader, format, target_dir, static_files_dir)
1625        } else {
1626            let total_size = response.content_length().unwrap_or(0);
1627            extract_archive(
1628                response,
1629                total_size,
1630                format,
1631                target_dir,
1632                static_files_dir,
1633                cancel_token.clone(),
1634            )
1635        };
1636
1637        match result {
1638            Ok(()) => return Ok(()),
1639            Err(e) => {
1640                if attempt < MAX_DOWNLOAD_RETRIES {
1641                    warn!(target: "reth::cli",
1642                        url = %url,
1643                        attempt,
1644                        max = MAX_DOWNLOAD_RETRIES,
1645                        err = %e,
1646                        "Streaming extraction failed, retrying"
1647                    );
1648                }
1649                last_error = Some(e);
1650                if attempt < MAX_DOWNLOAD_RETRIES {
1651                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1652                }
1653            }
1654        }
1655    }
1656
1657    Err(last_error.unwrap_or_else(|| {
1658        eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
1659    }))
1660}
1661
1662/// Fetches the snapshot from a remote URL with resume support, then extracts it.
1663fn download_and_extract(
1664    url: &str,
1665    format: CompressionFormat,
1666    target_dir: &Path,
1667    static_files_dir: Option<&Path>,
1668    shared: Option<&Arc<SharedProgress>>,
1669    cancel_token: CancellationToken,
1670) -> Result<()> {
1671    let quiet = shared.is_some();
1672    let (downloaded_path, total_size) =
1673        resumable_download(url, target_dir, shared, cancel_token.clone())?;
1674
1675    let file_name =
1676        downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
1677
1678    if !quiet {
1679        info!(target: "reth::cli",
1680            file = %file_name,
1681            size = %DownloadProgress::format_size(total_size),
1682            "Extracting archive"
1683        );
1684    }
1685    let file = fs::open(&downloaded_path)?;
1686
1687    if quiet {
1688        // Skip progress tracking for extraction in parallel mode
1689        extract_archive_raw(file, format, target_dir, static_files_dir)?;
1690    } else {
1691        extract_archive(file, total_size, format, target_dir, static_files_dir, cancel_token)?;
1692        info!(target: "reth::cli",
1693            file = %file_name,
1694            "Extraction complete"
1695        );
1696    }
1697
1698    fs::remove_file(&downloaded_path)?;
1699
1700    if let Some(sp) = shared {
1701        sp.archive_done();
1702    }
1703
1704    Ok(())
1705}
1706
1707/// Downloads and extracts a snapshot, blocking until finished.
1708///
1709/// Supports `file://` URLs for local files and HTTP(S) URLs for remote downloads.
1710/// When `resumable` is true, downloads to a `.part` file first with HTTP Range resume
1711/// support. Otherwise streams directly into the extractor.
1712fn blocking_download_and_extract(
1713    url: &str,
1714    target_dir: &Path,
1715    static_files_dir: Option<&Path>,
1716    shared: Option<Arc<SharedProgress>>,
1717    resumable: bool,
1718    cancel_token: CancellationToken,
1719) -> Result<()> {
1720    let format = CompressionFormat::from_url(url)?;
1721
1722    if let Ok(parsed_url) = Url::parse(url) &&
1723        parsed_url.scheme() == "file"
1724    {
1725        let file_path = parsed_url
1726            .to_file_path()
1727            .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
1728        let result = extract_from_file(&file_path, format, target_dir, static_files_dir);
1729        if result.is_ok() &&
1730            let Some(sp) = shared
1731        {
1732            sp.archive_done();
1733        }
1734        result
1735    } else if resumable {
1736        download_and_extract(
1737            url,
1738            format,
1739            target_dir,
1740            static_files_dir,
1741            shared.as_ref(),
1742            cancel_token,
1743        )
1744    } else {
1745        let result = streaming_download_and_extract(
1746            url,
1747            format,
1748            target_dir,
1749            static_files_dir,
1750            shared.as_ref(),
1751            cancel_token,
1752        );
1753        if result.is_ok() &&
1754            let Some(sp) = shared
1755        {
1756            sp.archive_done();
1757        }
1758        result
1759    }
1760}
1761
1762/// Downloads and extracts a snapshot archive asynchronously.
1763///
1764/// When `shared` is provided, download progress is reported to the shared
1765/// counter for aggregated display. Otherwise uses a local progress bar.
1766/// When `resumable` is true, uses two-phase download with `.part` files.
1767async fn stream_and_extract(
1768    url: &str,
1769    target_dir: &Path,
1770    static_files_dir: Option<PathBuf>,
1771    shared: Option<Arc<SharedProgress>>,
1772    resumable: bool,
1773    cancel_token: CancellationToken,
1774) -> Result<()> {
1775    let target_dir = target_dir.to_path_buf();
1776    let url = url.to_string();
1777    task::spawn_blocking(move || {
1778        blocking_download_and_extract(
1779            &url,
1780            &target_dir,
1781            static_files_dir.as_deref(),
1782            shared,
1783            resumable,
1784            cancel_token,
1785        )
1786    })
1787    .await??;
1788
1789    Ok(())
1790}
1791
1792async fn process_modular_archive(
1793    planned: PlannedArchive,
1794    target_dir: &Path,
1795    static_files_dir: Option<&Path>,
1796    cache_dir: Option<&Path>,
1797    shared: Option<Arc<SharedProgress>>,
1798    resumable: bool,
1799    cancel_token: CancellationToken,
1800) -> Result<()> {
1801    let target_dir = target_dir.to_path_buf();
1802    let static_files_dir = static_files_dir.map(Path::to_path_buf);
1803    let cache_dir = cache_dir.map(Path::to_path_buf);
1804
1805    task::spawn_blocking(move || {
1806        blocking_process_modular_archive(
1807            &planned,
1808            &target_dir,
1809            static_files_dir.as_deref(),
1810            cache_dir.as_deref(),
1811            shared,
1812            resumable,
1813            cancel_token,
1814        )
1815    })
1816    .await??;
1817
1818    Ok(())
1819}
1820
1821fn blocking_process_modular_archive(
1822    planned: &PlannedArchive,
1823    target_dir: &Path,
1824    static_files_dir: Option<&Path>,
1825    cache_dir: Option<&Path>,
1826    shared: Option<Arc<SharedProgress>>,
1827    resumable: bool,
1828    cancel_token: CancellationToken,
1829) -> Result<()> {
1830    let archive = &planned.archive;
1831    if verify_output_files(target_dir, static_files_dir, &archive.output_files)? {
1832        if let Some(sp) = &shared {
1833            sp.add(archive.size);
1834            sp.archive_done();
1835        }
1836        info!(target: "reth::cli", file = %archive.file_name, component = %planned.component, "Skipping already verified plain files");
1837        return Ok(());
1838    }
1839
1840    let format = CompressionFormat::from_url(&archive.file_name)?;
1841    let mut last_error: Option<eyre::Error> = None;
1842    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1843        cleanup_output_files(target_dir, static_files_dir, &archive.output_files);
1844
1845        if resumable {
1846            let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
1847            let archive_path = cache_dir.join(&archive.file_name);
1848            let part_path = cache_dir.join(format!("{}.part", archive.file_name));
1849            let result =
1850                resumable_download(&archive.url, cache_dir, shared.as_ref(), cancel_token.clone())
1851                    .and_then(|(downloaded_path, _)| {
1852                        let file = fs::open(&downloaded_path)?;
1853                        extract_archive_raw(file, format, target_dir, static_files_dir)
1854                    });
1855            let _ = fs::remove_file(&archive_path);
1856            let _ = fs::remove_file(&part_path);
1857
1858            if let Err(e) = result {
1859                warn!(target: "reth::cli",
1860                    file = %archive.file_name,
1861                    component = %planned.component,
1862                    attempt,
1863                    err = %e,
1864                    "Download or extraction failed, retrying"
1865                );
1866                last_error = Some(e);
1867                if attempt < MAX_DOWNLOAD_RETRIES {
1868                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1869                }
1870                continue;
1871            }
1872        } else {
1873            // streaming_download_and_extract already has its own internal retry loop
1874            streaming_download_and_extract(
1875                &archive.url,
1876                format,
1877                target_dir,
1878                static_files_dir,
1879                shared.as_ref(),
1880                cancel_token.clone(),
1881            )?;
1882        }
1883
1884        if verify_output_files(target_dir, static_files_dir, &archive.output_files)? {
1885            if let Some(sp) = &shared {
1886                sp.archive_done();
1887            }
1888            return Ok(());
1889        }
1890
1891        warn!(target: "reth::cli", file = %archive.file_name, component = %planned.component, attempt, "Extracted files failed integrity checks, retrying");
1892    }
1893
1894    if let Some(e) = last_error {
1895        return Err(e.wrap_err(format!(
1896            "Failed after {} attempts for {}",
1897            MAX_DOWNLOAD_RETRIES, archive.file_name
1898        )));
1899    }
1900
1901    eyre::bail!(
1902        "Failed integrity validation after {} attempts for {}",
1903        MAX_DOWNLOAD_RETRIES,
1904        archive.file_name
1905    )
1906}
1907
1908fn verify_output_files(
1909    target_dir: &Path,
1910    static_files_dir: Option<&Path>,
1911    output_files: &[OutputFileChecksum],
1912) -> Result<bool> {
1913    if output_files.is_empty() {
1914        return Ok(false);
1915    }
1916
1917    for expected in output_files {
1918        let output_path = resolve_output_path(target_dir, &expected.path, static_files_dir);
1919        let meta = match fs::metadata(&output_path) {
1920            Ok(meta) => meta,
1921            Err(_) => return Ok(false),
1922        };
1923        if meta.len() != expected.size {
1924            return Ok(false);
1925        }
1926
1927        let actual = file_blake3_hex(&output_path)?;
1928        if !actual.eq_ignore_ascii_case(&expected.blake3) {
1929            return Ok(false);
1930        }
1931    }
1932
1933    Ok(true)
1934}
1935
1936fn cleanup_output_files(
1937    target_dir: &Path,
1938    static_files_dir: Option<&Path>,
1939    output_files: &[OutputFileChecksum],
1940) {
1941    for output in output_files {
1942        let _ = fs::remove_file(resolve_output_path(target_dir, &output.path, static_files_dir));
1943    }
1944}
1945
1946fn file_blake3_hex(path: &Path) -> Result<String> {
1947    let mut file = fs::open(path)?;
1948    let mut hasher = Hasher::new();
1949    let mut buf = [0_u8; 64 * 1024];
1950
1951    loop {
1952        let n = file.read(&mut buf)?;
1953        if n == 0 {
1954            break;
1955        }
1956        hasher.update(&buf[..n]);
1957    }
1958
1959    Ok(hasher.finalize().to_hex().to_string())
1960}
1961
1962/// Discovers the latest snapshot manifest URL for the given chain from the snapshots API.
1963///
1964/// Queries the configured snapshot API and returns the manifest URL for the most
1965/// recent modular snapshot matching the requested chain.
1966async fn discover_manifest_url(chain_id: u64) -> Result<String> {
1967    let defaults = DownloadDefaults::get_global();
1968    let api_url = &*defaults.snapshot_api_url;
1969
1970    info!(target: "reth::cli", %api_url, %chain_id, "Discovering latest snapshot manifest");
1971
1972    let entries = fetch_snapshot_api_entries(chain_id).await?;
1973
1974    let entry =
1975        entries.iter().filter(|s| s.is_modular()).max_by_key(|s| s.block).ok_or_else(|| {
1976            eyre::eyre!(
1977                "No modular snapshot manifest found for chain \
1978                 {chain_id} at {api_url}\n\n\
1979                 You can provide a manifest URL directly with --manifest-url, or\n\
1980                 use a direct snapshot URL with -u from:\n\
1981                 \t- {}\n\n\
1982                 Use --list to see all available snapshots.",
1983                api_url.trim_end_matches("/api/snapshots"),
1984            )
1985        })?;
1986
1987    info!(target: "reth::cli",
1988        block = entry.block,
1989        url = %entry.metadata_url,
1990        "Found latest snapshot manifest"
1991    );
1992
1993    Ok(entry.metadata_url.clone())
1994}
1995
1996/// Deserializes a JSON value that may be either a number or a string-encoded number.
1997fn deserialize_string_or_u64<'de, D>(deserializer: D) -> std::result::Result<u64, D::Error>
1998where
1999    D: serde::Deserializer<'de>,
2000{
2001    use serde::Deserialize;
2002    let value = serde_json::Value::deserialize(deserializer)?;
2003    match &value {
2004        serde_json::Value::Number(n) => {
2005            n.as_u64().ok_or_else(|| serde::de::Error::custom("expected u64"))
2006        }
2007        serde_json::Value::String(s) => {
2008            s.parse::<u64>().map_err(|_| serde::de::Error::custom("expected numeric string"))
2009        }
2010        _ => Err(serde::de::Error::custom("expected number or string")),
2011    }
2012}
2013
2014/// An entry from the snapshot discovery API listing.
2015#[derive(serde::Deserialize)]
2016#[serde(rename_all = "camelCase")]
2017struct SnapshotApiEntry {
2018    #[serde(deserialize_with = "deserialize_string_or_u64")]
2019    chain_id: u64,
2020    #[serde(deserialize_with = "deserialize_string_or_u64")]
2021    block: u64,
2022    #[serde(default)]
2023    date: Option<String>,
2024    #[serde(default)]
2025    profile: Option<String>,
2026    metadata_url: String,
2027    #[serde(default)]
2028    size: u64,
2029}
2030
2031impl SnapshotApiEntry {
2032    fn is_modular(&self) -> bool {
2033        self.metadata_url.ends_with("manifest.json")
2034    }
2035}
2036
2037/// Fetches the full snapshot listing from the snapshots API, filtered by chain ID.
2038async fn fetch_snapshot_api_entries(chain_id: u64) -> Result<Vec<SnapshotApiEntry>> {
2039    let api_url = &*DownloadDefaults::get_global().snapshot_api_url;
2040
2041    let entries: Vec<SnapshotApiEntry> = Client::new()
2042        .get(api_url)
2043        .send()
2044        .await
2045        .and_then(|r| r.error_for_status())
2046        .wrap_err_with(|| format!("Failed to fetch snapshot listing from {api_url}"))?
2047        .json()
2048        .await?;
2049
2050    Ok(entries.into_iter().filter(|e| e.chain_id == chain_id).collect())
2051}
2052
2053/// Prints a formatted table of available modular snapshots.
2054fn print_snapshot_listing(entries: &[SnapshotApiEntry], chain_id: u64) {
2055    let modular: Vec<_> = entries.iter().filter(|e| e.is_modular()).collect();
2056
2057    let api_url = &*DownloadDefaults::get_global().snapshot_api_url;
2058    println!(
2059        "Available snapshots for chain {chain_id} ({}):\n",
2060        api_url.trim_end_matches("/api/snapshots"),
2061    );
2062    println!("{:<12}  {:>10}  {:<10}  {:>10}  MANIFEST URL", "DATE", "BLOCK", "PROFILE", "SIZE");
2063    println!("{}", "-".repeat(100));
2064
2065    for entry in &modular {
2066        let date = entry.date.as_deref().unwrap_or("-");
2067        let profile = entry.profile.as_deref().unwrap_or("-");
2068        let size = if entry.size > 0 {
2069            DownloadProgress::format_size(entry.size)
2070        } else {
2071            "-".to_string()
2072        };
2073
2074        println!(
2075            "{date:<12}  {:>10}  {profile:<10}  {size:>10}  {}",
2076            entry.block, entry.metadata_url
2077        );
2078    }
2079
2080    if modular.is_empty() {
2081        println!("  (no modular snapshots found)");
2082    }
2083
2084    println!(
2085        "\nTo download a specific snapshot, copy its manifest URL and run:\n  \
2086         reth download --manifest-url <URL>"
2087    );
2088}
2089
2090async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
2091    if let Ok(parsed) = Url::parse(source) {
2092        return match parsed.scheme() {
2093            "http" | "https" => {
2094                let response = Client::new()
2095                    .get(source)
2096                    .send()
2097                    .await
2098                    .and_then(|r| r.error_for_status())
2099                    .wrap_err_with(|| {
2100                        let sources = DownloadDefaults::get_global()
2101                            .available_snapshots
2102                            .iter()
2103                            .map(|s| format!("\t- {s}"))
2104                            .collect::<Vec<_>>()
2105                            .join("\n");
2106                        format!(
2107                            "Failed to fetch snapshot manifest from {source}\n\n\
2108                             The manifest endpoint may not be available for this snapshot source.\n\
2109                             You can use a direct snapshot URL instead:\n\n\
2110                             \treth download -u <snapshot-url>\n\n\
2111                             Available snapshot sources:\n{sources}"
2112                        )
2113                    })?;
2114                Ok(response.json().await?)
2115            }
2116            "file" => {
2117                let path = parsed
2118                    .to_file_path()
2119                    .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
2120                let content = fs::read_to_string(path)?;
2121                Ok(serde_json::from_str(&content)?)
2122            }
2123            _ => Err(eyre::eyre!("Unsupported manifest URL scheme: {}", parsed.scheme())),
2124        };
2125    }
2126
2127    let content = fs::read_to_string(source)?;
2128    Ok(serde_json::from_str(&content)?)
2129}
2130
2131fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Result<String> {
2132    if let Some(base_url) = manifest.base_url.as_deref() &&
2133        !base_url.is_empty()
2134    {
2135        return Ok(base_url.trim_end_matches('/').to_string());
2136    }
2137
2138    if let Ok(mut url) = Url::parse(source) {
2139        if url.scheme() == "file" {
2140            let mut path = url
2141                .to_file_path()
2142                .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
2143            path.pop();
2144            let mut base = Url::from_directory_path(path)
2145                .map_err(|_| eyre::eyre!("Invalid manifest directory for source: {source}"))?
2146                .to_string();
2147            if base.ends_with('/') {
2148                base.pop();
2149            }
2150            return Ok(base);
2151        }
2152
2153        {
2154            let mut segments = url
2155                .path_segments_mut()
2156                .map_err(|_| eyre::eyre!("manifest_url must have a hierarchical path"))?;
2157            segments.pop_if_empty();
2158            segments.pop();
2159        }
2160        return Ok(url.as_str().trim_end_matches('/').to_string());
2161    }
2162
2163    let path = Path::new(source);
2164    let manifest_dir = if path.is_absolute() {
2165        path.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
2166    } else {
2167        let joined = std::env::current_dir()?.join(path);
2168        joined.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
2169    };
2170    let mut base = Url::from_directory_path(&manifest_dir)
2171        .map_err(|_| eyre::eyre!("Invalid manifest directory: {}", manifest_dir.display()))?
2172        .to_string();
2173    if base.ends_with('/') {
2174        base.pop();
2175    }
2176    Ok(base)
2177}
2178
2179#[cfg(test)]
2180mod tests {
2181    use super::*;
2182    use clap::{Args, Parser};
2183    use manifest::{ChunkedArchive, ComponentManifest, SingleArchive};
2184    use reth_chainspec::{HOLESKY, MAINNET};
2185    use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
2186    use tempfile::tempdir;
2187
2188    #[derive(Parser)]
2189    struct CommandParser<T: Args> {
2190        #[command(flatten)]
2191        args: T,
2192    }
2193
2194    fn manifest_with_archive_only_components() -> SnapshotManifest {
2195        let mut components = BTreeMap::new();
2196        components.insert(
2197            SnapshotComponentType::TransactionSenders.key().to_string(),
2198            ComponentManifest::Single(SingleArchive {
2199                file: "transaction_senders.tar.zst".to_string(),
2200                size: 1,
2201                blake3: None,
2202                output_files: vec![],
2203            }),
2204        );
2205        components.insert(
2206            SnapshotComponentType::RocksdbIndices.key().to_string(),
2207            ComponentManifest::Single(SingleArchive {
2208                file: "rocksdb_indices.tar.zst".to_string(),
2209                size: 1,
2210                blake3: None,
2211                output_files: vec![],
2212            }),
2213        );
2214        SnapshotManifest {
2215            block: 0,
2216            chain_id: 1,
2217            storage_version: 2,
2218            timestamp: 0,
2219            base_url: Some("https://example.com".to_string()),
2220            reth_version: None,
2221            components,
2222        }
2223    }
2224
2225    fn manifest_with_modular_components(block: u64) -> SnapshotManifest {
2226        let mut components = BTreeMap::new();
2227        for ty in [
2228            SnapshotComponentType::State,
2229            SnapshotComponentType::Headers,
2230            SnapshotComponentType::Transactions,
2231            SnapshotComponentType::Receipts,
2232            SnapshotComponentType::AccountChangesets,
2233            SnapshotComponentType::StorageChangesets,
2234        ] {
2235            let component = if ty.is_chunked() {
2236                ComponentManifest::Chunked(ChunkedArchive {
2237                    blocks_per_file: 1_000_000,
2238                    total_blocks: block + 1,
2239                    chunk_sizes: vec![1],
2240                    chunk_output_files: vec![vec![]],
2241                })
2242            } else {
2243                ComponentManifest::Single(SingleArchive {
2244                    file: format!("{}.tar.zst", ty.key()),
2245                    size: 1,
2246                    blake3: None,
2247                    output_files: vec![],
2248                })
2249            };
2250
2251            components.insert(ty.key().to_string(), component);
2252        }
2253
2254        SnapshotManifest {
2255            block,
2256            chain_id: 1,
2257            storage_version: 2,
2258            timestamp: 0,
2259            base_url: Some("https://example.com".to_string()),
2260            reth_version: None,
2261            components,
2262        }
2263    }
2264
2265    #[test]
2266    fn test_download_defaults_builder() {
2267        let defaults = DownloadDefaults::default()
2268            .with_snapshot("https://example.com/snapshots (example)")
2269            .with_base_url("https://example.com");
2270
2271        assert_eq!(defaults.default_base_url, "https://example.com");
2272        assert_eq!(defaults.available_snapshots.len(), 3); // 2 defaults + 1 added
2273    }
2274
2275    #[test]
2276    fn test_download_defaults_replace_snapshots() {
2277        let defaults = DownloadDefaults::default().with_snapshots(vec![
2278            Cow::Borrowed("https://custom1.com"),
2279            Cow::Borrowed("https://custom2.com"),
2280        ]);
2281
2282        assert_eq!(defaults.available_snapshots.len(), 2);
2283        assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
2284    }
2285
2286    #[test]
2287    fn test_long_help_generation() {
2288        let defaults = DownloadDefaults::default();
2289        let help = defaults.long_help();
2290
2291        assert!(help.contains("Available snapshot sources:"));
2292        assert!(help.contains("snapshots.reth.rs"));
2293        assert!(help.contains("publicnode.com"));
2294        assert!(help.contains("file://"));
2295    }
2296
2297    #[test]
2298    fn test_long_help_override() {
2299        let custom_help = "This is custom help text for downloading snapshots.";
2300        let defaults = DownloadDefaults::default().with_long_help(custom_help);
2301
2302        let help = defaults.long_help();
2303        assert_eq!(help, custom_help);
2304        assert!(!help.contains("Available snapshot sources:"));
2305    }
2306
2307    #[test]
2308    fn test_builder_chaining() {
2309        let defaults = DownloadDefaults::default()
2310            .with_base_url("https://custom.example.com")
2311            .with_snapshot("https://snapshot1.com")
2312            .with_snapshot("https://snapshot2.com")
2313            .with_long_help("Custom help for snapshots");
2314
2315        assert_eq!(defaults.default_base_url, "https://custom.example.com");
2316        assert_eq!(defaults.available_snapshots.len(), 4); // 2 defaults + 2 added
2317        assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
2318    }
2319
2320    #[test]
2321    fn test_download_resumable_defaults_to_true() {
2322        let args =
2323            CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from(["reth"]).args;
2324
2325        assert!(args.resumable);
2326    }
2327
2328    #[test]
2329    fn test_download_resumable_implicit_true() {
2330        let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2331            "reth",
2332            "--resumable",
2333        ])
2334        .args;
2335
2336        assert!(args.resumable);
2337    }
2338
2339    #[test]
2340    fn test_download_resumable_explicit_false() {
2341        let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2342            "reth",
2343            "--resumable=false",
2344        ])
2345        .args;
2346
2347        assert!(!args.resumable);
2348    }
2349
2350    #[test]
2351    fn resolve_components_supports_since_and_distance_flags() {
2352        let manifest = manifest_with_modular_components(20_000_000);
2353        let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2354            "reth",
2355            "--with-txs-since",
2356            "15537394",
2357            "--with-receipts-distance",
2358            "10064",
2359            "--with-state-history-since",
2360            "15537394",
2361        ])
2362        .args;
2363
2364        let resolved = args.resolve_components(&manifest).unwrap();
2365
2366        assert_eq!(resolved.preset, None);
2367        assert_eq!(
2368            resolved.selections.get(&SnapshotComponentType::Transactions),
2369            Some(&ComponentSelection::Since(15_537_394))
2370        );
2371        assert_eq!(
2372            resolved.selections.get(&SnapshotComponentType::Receipts),
2373            Some(&ComponentSelection::Distance(10_064))
2374        );
2375        assert_eq!(
2376            resolved.selections.get(&SnapshotComponentType::AccountChangesets),
2377            Some(&ComponentSelection::Since(15_537_394))
2378        );
2379        assert_eq!(
2380            resolved.selections.get(&SnapshotComponentType::StorageChangesets),
2381            Some(&ComponentSelection::Since(15_537_394))
2382        );
2383    }
2384
2385    #[test]
2386    fn resolve_components_rejects_since_after_snapshot() {
2387        let manifest = manifest_with_modular_components(20_000_000);
2388        let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2389            "reth",
2390            "--with-txs-since",
2391            "20000001",
2392        ])
2393        .args;
2394
2395        let err = args.resolve_components(&manifest).unwrap_err();
2396        assert!(err
2397            .to_string()
2398            .contains("--with-txs-since 20000001 is beyond the snapshot block 20000000"));
2399    }
2400
2401    #[test]
2402    fn test_compression_format_detection() {
2403        assert!(matches!(
2404            CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
2405            Ok(CompressionFormat::Lz4)
2406        ));
2407        assert!(matches!(
2408            CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
2409            Ok(CompressionFormat::Zstd)
2410        ));
2411        assert!(matches!(
2412            CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
2413            Ok(CompressionFormat::Lz4)
2414        ));
2415        assert!(matches!(
2416            CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
2417            Ok(CompressionFormat::Zstd)
2418        ));
2419        assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
2420    }
2421
2422    #[test]
2423    fn inject_archive_only_components_for_archive_selection() {
2424        let manifest = manifest_with_archive_only_components();
2425        let mut selections = BTreeMap::new();
2426        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
2427        selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
2428        selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
2429        selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
2430
2431        inject_archive_only_components(&mut selections, &manifest, true);
2432
2433        assert_eq!(
2434            selections.get(&SnapshotComponentType::TransactionSenders),
2435            Some(&ComponentSelection::All)
2436        );
2437        assert_eq!(
2438            selections.get(&SnapshotComponentType::RocksdbIndices),
2439            Some(&ComponentSelection::All)
2440        );
2441    }
2442
2443    #[test]
2444    fn inject_archive_only_components_without_rocksdb() {
2445        let manifest = manifest_with_archive_only_components();
2446        let mut selections = BTreeMap::new();
2447        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
2448        selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
2449        selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
2450        selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
2451
2452        inject_archive_only_components(&mut selections, &manifest, false);
2453
2454        assert_eq!(
2455            selections.get(&SnapshotComponentType::TransactionSenders),
2456            Some(&ComponentSelection::All)
2457        );
2458        assert_eq!(selections.get(&SnapshotComponentType::RocksdbIndices), None);
2459    }
2460
2461    #[test]
2462    fn should_reset_index_stage_checkpoints_without_rocksdb_indices() {
2463        let mut selections = BTreeMap::new();
2464        selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
2465        assert!(should_reset_index_stage_checkpoints(&selections));
2466
2467        selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
2468        assert!(!should_reset_index_stage_checkpoints(&selections));
2469    }
2470
2471    #[test]
2472    fn summarize_download_startup_counts_reusable_and_needs_download() {
2473        let dir = tempdir().unwrap();
2474        let target_dir = dir.path();
2475        let ok_file = target_dir.join("ok.bin");
2476        std::fs::write(&ok_file, vec![1_u8; 4]).unwrap();
2477        let ok_hash = file_blake3_hex(&ok_file).unwrap();
2478
2479        let planned = vec![
2480            PlannedArchive {
2481                ty: SnapshotComponentType::State,
2482                component: "State".to_string(),
2483                archive: ArchiveDescriptor {
2484                    url: "https://example.com/ok.tar.zst".to_string(),
2485                    file_name: "ok.tar.zst".to_string(),
2486                    size: 10,
2487                    blake3: None,
2488                    output_files: vec![OutputFileChecksum {
2489                        path: "ok.bin".to_string(),
2490                        size: 4,
2491                        blake3: ok_hash,
2492                    }],
2493                },
2494            },
2495            PlannedArchive {
2496                ty: SnapshotComponentType::Headers,
2497                component: "Headers".to_string(),
2498                archive: ArchiveDescriptor {
2499                    url: "https://example.com/missing.tar.zst".to_string(),
2500                    file_name: "missing.tar.zst".to_string(),
2501                    size: 10,
2502                    blake3: None,
2503                    output_files: vec![OutputFileChecksum {
2504                        path: "missing.bin".to_string(),
2505                        size: 1,
2506                        blake3: "deadbeef".to_string(),
2507                    }],
2508                },
2509            },
2510            PlannedArchive {
2511                ty: SnapshotComponentType::Transactions,
2512                component: "Transactions".to_string(),
2513                archive: ArchiveDescriptor {
2514                    url: "https://example.com/bad-size.tar.zst".to_string(),
2515                    file_name: "bad-size.tar.zst".to_string(),
2516                    size: 10,
2517                    blake3: None,
2518                    output_files: vec![],
2519                },
2520            },
2521        ];
2522
2523        let summary = summarize_download_startup(&planned, target_dir, None).unwrap();
2524        assert_eq!(summary.reusable, 1);
2525        assert_eq!(summary.needs_download, 2);
2526    }
2527
2528    #[test]
2529    fn archive_priority_prefers_state_then_rocksdb() {
2530        let mut planned = [
2531            PlannedArchive {
2532                ty: SnapshotComponentType::Transactions,
2533                component: "Transactions".to_string(),
2534                archive: ArchiveDescriptor {
2535                    url: "u3".to_string(),
2536                    file_name: "t.tar.zst".to_string(),
2537                    size: 1,
2538                    blake3: None,
2539                    output_files: vec![OutputFileChecksum {
2540                        path: "a".to_string(),
2541                        size: 1,
2542                        blake3: "x".to_string(),
2543                    }],
2544                },
2545            },
2546            PlannedArchive {
2547                ty: SnapshotComponentType::RocksdbIndices,
2548                component: "RocksDB Indices".to_string(),
2549                archive: ArchiveDescriptor {
2550                    url: "u2".to_string(),
2551                    file_name: "rocksdb_indices.tar.zst".to_string(),
2552                    size: 1,
2553                    blake3: None,
2554                    output_files: vec![OutputFileChecksum {
2555                        path: "b".to_string(),
2556                        size: 1,
2557                        blake3: "y".to_string(),
2558                    }],
2559                },
2560            },
2561            PlannedArchive {
2562                ty: SnapshotComponentType::State,
2563                component: "State (mdbx)".to_string(),
2564                archive: ArchiveDescriptor {
2565                    url: "u1".to_string(),
2566                    file_name: "state.tar.zst".to_string(),
2567                    size: 1,
2568                    blake3: None,
2569                    output_files: vec![OutputFileChecksum {
2570                        path: "c".to_string(),
2571                        size: 1,
2572                        blake3: "z".to_string(),
2573                    }],
2574                },
2575            },
2576        ];
2577
2578        planned.sort_by(|a, b| {
2579            archive_priority_rank(a.ty)
2580                .cmp(&archive_priority_rank(b.ty))
2581                .then_with(|| a.component.cmp(&b.component))
2582                .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
2583        });
2584
2585        assert_eq!(planned[0].ty, SnapshotComponentType::State);
2586        assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
2587        assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
2588    }
2589
2590    #[test]
2591    fn startup_node_command_omits_default_chain_arg() {
2592        let command =
2593            startup_node_command_for_binary::<EthereumChainSpecParser>("reth", MAINNET.as_ref());
2594
2595        assert_eq!(command, "reth node");
2596    }
2597
2598    #[test]
2599    fn startup_node_command_includes_non_default_chain_arg() {
2600        let command =
2601            startup_node_command_for_binary::<EthereumChainSpecParser>("reth", HOLESKY.as_ref());
2602
2603        assert_eq!(command, "reth node --chain holesky");
2604    }
2605
2606    #[test]
2607    fn startup_node_command_uses_running_binary_name() {
2608        let command =
2609            startup_node_command_for_binary::<EthereumChainSpecParser>("tempo", HOLESKY.as_ref());
2610
2611        assert_eq!(command, "tempo node --chain holesky");
2612    }
2613}