1pub mod config_gen;
2pub mod manifest;
3pub mod manifest_cmd;
4mod tui;
5
6use crate::common::EnvironmentArgs;
7use blake3::Hasher;
8use clap::Parser;
9use config_gen::{config_for_selections, write_config};
10use eyre::{Result, WrapErr};
11use futures::stream::{self, StreamExt};
12use lz4::Decoder;
13use manifest::{
14 ArchiveDescriptor, ComponentSelection, OutputFileChecksum, SnapshotComponentType,
15 SnapshotManifest,
16};
17use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
18use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
19use reth_cli::chainspec::ChainSpecParser;
20use reth_cli_util::cancellation::CancellationToken;
21use reth_db::{init_db, Database};
22use reth_db_api::transaction::DbTx;
23use reth_fs_util as fs;
24use reth_node_core::args::DefaultPruningValues;
25use reth_prune_types::PruneMode;
26use std::{
27 borrow::Cow,
28 collections::BTreeMap,
29 fs::OpenOptions,
30 io::{self, BufWriter, Read, Write},
31 path::{Path, PathBuf},
32 sync::{
33 atomic::{AtomicBool, AtomicU64, Ordering},
34 Arc, OnceLock,
35 },
36 time::{Duration, Instant},
37};
38use tar::Archive;
39use tokio::task;
40use tracing::{info, warn};
41use tui::{run_selector, SelectorOutput};
42use url::Url;
43use zstd::stream::read::Decoder as ZstdDecoder;
44
45const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
46const RETH_SNAPSHOTS_BASE_URL: &str = "https://snapshots-r2.reth.rs";
47const RETH_SNAPSHOTS_API_URL: &str = "https://snapshots.reth.rs/api/snapshots";
48const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
49const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
50const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
51
52const MAX_CONCURRENT_DOWNLOADS: usize = 8;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub(crate) enum SelectionPreset {
57 Minimal,
58 Full,
59 Archive,
60}
61
62struct ResolvedComponents {
63 selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
64 preset: Option<SelectionPreset>,
65}
66
67static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
69
70#[derive(Debug, Clone)]
74pub struct DownloadDefaults {
75 pub available_snapshots: Vec<Cow<'static, str>>,
77 pub default_base_url: Cow<'static, str>,
79 pub default_chain_aware_base_url: Option<Cow<'static, str>>,
87 pub long_help: Option<String>,
89}
90
91impl DownloadDefaults {
92 pub fn try_init(self) -> Result<(), Self> {
94 DOWNLOAD_DEFAULTS.set(self)
95 }
96
97 pub fn get_global() -> &'static DownloadDefaults {
99 DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
100 }
101
102 pub fn default_download_defaults() -> Self {
104 Self {
105 available_snapshots: vec![
106 Cow::Borrowed("https://snapshots.reth.rs (default)"),
107 Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
108 ],
109 default_base_url: Cow::Borrowed(RETH_SNAPSHOTS_BASE_URL),
110 default_chain_aware_base_url: None,
111 long_help: None,
112 }
113 }
114
115 pub fn long_help(&self) -> String {
120 if let Some(ref custom_help) = self.long_help {
121 return custom_help.clone();
122 }
123
124 let mut help = String::from(
125 "Specify a snapshot URL or let the command propose a default one.\n\n\
126 Browse available snapshots at https://snapshots.reth.rs\n\
127 or use --list-snapshots to see them from the CLI.\n\nAvailable snapshot sources:\n",
128 );
129
130 for source in &self.available_snapshots {
131 help.push_str("- ");
132 help.push_str(source);
133 help.push('\n');
134 }
135
136 help.push_str(
137 "\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
138 );
139 help.push_str(
140 self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
141 );
142 help.push_str(
143 ".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
144 );
145 help
146 }
147
148 pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
150 self.available_snapshots.push(source.into());
151 self
152 }
153
154 pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
156 self.available_snapshots = sources;
157 self
158 }
159
160 pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
162 self.default_base_url = url.into();
163 self
164 }
165
166 pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
168 self.default_chain_aware_base_url = Some(url.into());
169 self
170 }
171
172 pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
174 self.long_help = Some(help.into());
175 self
176 }
177}
178
179impl Default for DownloadDefaults {
180 fn default() -> Self {
181 Self::default_download_defaults()
182 }
183}
184
185#[derive(Debug, Parser)]
187pub struct DownloadCommand<C: ChainSpecParser> {
188 #[command(flatten)]
189 env: EnvironmentArgs<C>,
190
191 #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
196 url: Option<String>,
197
198 #[arg(long, value_name = "URL", conflicts_with = "url")]
203 manifest_url: Option<String>,
204
205 #[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
207 manifest_path: Option<PathBuf>,
208
209 #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
211 with_txs: bool,
212
213 #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
215 with_receipts: bool,
216
217 #[arg(long, alias = "with-changesets", conflicts_with_all = ["minimal", "full", "archive"])]
219 with_state_history: bool,
220
221 #[arg(long, requires = "with_txs", conflicts_with_all = ["minimal", "full", "archive"])]
223 with_senders: bool,
224
225 #[arg(long, conflicts_with_all = ["minimal", "full", "archive", "without_rocksdb"])]
227 with_rocksdb: bool,
228
229 #[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "minimal", "full"])]
231 archive: bool,
232
233 #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "full"])]
235 minimal: bool,
236
237 #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "with_senders", "with_rocksdb", "archive", "minimal"])]
239 full: bool,
240
241 #[arg(long, conflicts_with_all = ["url", "with_rocksdb"])]
245 without_rocksdb: bool,
246
247 #[arg(long, short = 'y')]
250 non_interactive: bool,
251
252 #[arg(long)]
258 resumable: bool,
259
260 #[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
262 download_concurrency: usize,
263
264 #[arg(long, alias = "list-snapshots", conflicts_with_all = ["url", "manifest_url", "manifest_path"])]
269 list: bool,
270}
271
272impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
273 pub async fn execute<N>(self) -> Result<()> {
274 let chain = self.env.chain.chain();
275 let chain_id = chain.id();
276 let data_dir = self.env.datadir.clone().resolve_datadir(chain);
277 fs::create_dir_all(&data_dir)?;
278
279 let cancel_token = CancellationToken::new();
280 let _cancel_guard = cancel_token.drop_guard();
281
282 if self.list {
284 let entries = fetch_snapshot_api_entries(chain_id).await?;
285 print_snapshot_listing(&entries, chain_id);
286 return Ok(());
287 }
288
289 if let Some(ref url) = self.url {
291 info!(target: "reth::cli",
292 dir = ?data_dir.data_dir(),
293 url = %url,
294 "Starting snapshot download and extraction"
295 );
296
297 stream_and_extract(
298 url,
299 data_dir.data_dir(),
300 None,
301 self.resumable,
302 cancel_token.clone(),
303 )
304 .await?;
305 info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
306
307 return Ok(());
308 }
309
310 let manifest_source = self.resolve_manifest_source(chain_id).await?;
312
313 info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
314 let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
315 manifest.base_url = Some(resolve_manifest_base_url(&manifest, &manifest_source)?);
316
317 info!(target: "reth::cli",
318 block = manifest.block,
319 chain_id = manifest.chain_id,
320 storage_version = %manifest.storage_version,
321 components = manifest.components.len(),
322 "Loaded snapshot manifest"
323 );
324
325 let ResolvedComponents { mut selections, preset } = self.resolve_components(&manifest)?;
326
327 if matches!(preset, Some(SelectionPreset::Archive)) {
328 inject_archive_only_components(&mut selections, &manifest, !self.without_rocksdb);
329 }
330
331 let target_dir = data_dir.data_dir();
333 let mut all_downloads: Vec<PlannedArchive> = Vec::new();
334 for (ty, sel) in &selections {
335 let distance = match sel {
336 ComponentSelection::All => None,
337 ComponentSelection::Distance(d) => Some(*d),
338 ComponentSelection::None => continue,
339 };
340 let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
341 let name = ty.display_name().to_string();
342
343 if !descriptors.is_empty() {
344 info!(target: "reth::cli",
345 component = %name,
346 archives = descriptors.len(),
347 selection = %sel,
348 "Queued component for download"
349 );
350 }
351
352 for descriptor in descriptors {
353 if descriptor.output_files.is_empty() {
354 eyre::bail!(
355 "Invalid modular manifest: {} is missing plain output checksum metadata",
356 descriptor.file_name
357 );
358 }
359 all_downloads.push(PlannedArchive {
360 ty: *ty,
361 component: name.clone(),
362 archive: descriptor,
363 });
364 }
365 }
366
367 all_downloads.sort_by(|a, b| {
368 archive_priority_rank(a.ty)
369 .cmp(&archive_priority_rank(b.ty))
370 .then_with(|| a.component.cmp(&b.component))
371 .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
372 });
373
374 let download_cache_dir = if self.resumable {
375 let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
376 fs::create_dir_all(&dir)?;
377 Some(dir)
378 } else {
379 None
380 };
381
382 let total_archives = all_downloads.len();
383 let total_size: u64 = selections
384 .iter()
385 .map(|(ty, sel)| match sel {
386 ComponentSelection::All => manifest.size_for_distance(*ty, None),
387 ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
388 ComponentSelection::None => 0,
389 })
390 .sum();
391
392 let startup_summary = summarize_download_startup(&all_downloads, target_dir)?;
393 info!(target: "reth::cli",
394 reusable = startup_summary.reusable,
395 needs_download = startup_summary.needs_download,
396 "Startup integrity summary (plain output files)"
397 );
398
399 info!(target: "reth::cli",
400 archives = total_archives,
401 total = %DownloadProgress::format_size(total_size),
402 "Downloading all archives"
403 );
404
405 let shared = SharedProgress::new(total_size, total_archives as u64, cancel_token.clone());
406 let progress_handle = spawn_progress_display(Arc::clone(&shared));
407
408 let target = target_dir.to_path_buf();
409 let cache_dir = download_cache_dir;
410 let resumable = self.resumable;
411 let download_concurrency = self.download_concurrency.max(1);
412 let results: Vec<Result<()>> = stream::iter(all_downloads)
413 .map(|planned| {
414 let dir = target.clone();
415 let cache = cache_dir.clone();
416 let sp = Arc::clone(&shared);
417 let ct = cancel_token.clone();
418 async move {
419 process_modular_archive(
420 planned,
421 &dir,
422 cache.as_deref(),
423 Some(sp),
424 resumable,
425 ct,
426 )
427 .await?;
428 Ok(())
429 }
430 })
431 .buffer_unordered(download_concurrency)
432 .collect()
433 .await;
434
435 shared.done.store(true, Ordering::Relaxed);
436 let _ = progress_handle.await;
437
438 for result in results {
440 result?;
441 }
442
443 let config =
445 config_for_selections(&selections, &manifest, preset, Some(self.env.chain.as_ref()));
446 if write_config(&config, target_dir)? {
447 let desc = config_gen::describe_prune_config(&config);
448 info!(target: "reth::cli", "{}", desc.join(", "));
449 }
450
451 let db_path = data_dir.db();
453 let db = init_db(&db_path, self.env.db.database_args())?;
454
455 let should_write_prune = config.prune.segments != Default::default();
458 let should_reset_indices = should_reset_index_stage_checkpoints(&selections);
459 if should_write_prune || should_reset_indices {
460 let tx = db.tx_mut()?;
461
462 if should_write_prune {
463 config_gen::write_prune_checkpoints_tx(&tx, &config, manifest.block)?;
464 }
465
466 if should_reset_indices {
470 config_gen::reset_index_stage_checkpoints_tx(&tx)?;
471 }
472
473 tx.commit()?;
474 }
475
476 info!(target: "reth::cli", "Snapshot download complete. Run `reth node` to start syncing.");
477
478 Ok(())
479 }
480
481 fn resolve_components(&self, manifest: &SnapshotManifest) -> Result<ResolvedComponents> {
483 let available = |ty: SnapshotComponentType| manifest.component(ty).is_some();
484
485 if self.archive {
487 return Ok(ResolvedComponents {
488 selections: SnapshotComponentType::ALL
489 .iter()
490 .copied()
491 .filter(|ty| available(*ty))
492 .filter(|ty| {
493 !self.without_rocksdb || *ty != SnapshotComponentType::RocksdbIndices
494 })
495 .map(|ty| (ty, ComponentSelection::All))
496 .collect(),
497 preset: Some(SelectionPreset::Archive),
498 });
499 }
500
501 if self.full {
502 return Ok(ResolvedComponents {
503 selections: self.full_preset_selections(manifest),
504 preset: Some(SelectionPreset::Full),
505 });
506 }
507
508 if self.minimal {
509 return Ok(ResolvedComponents {
510 selections: self.minimal_preset_selections(manifest),
511 preset: Some(SelectionPreset::Minimal),
512 });
513 }
514
515 let has_explicit_flags = self.with_txs ||
516 self.with_receipts ||
517 self.with_state_history ||
518 self.with_senders ||
519 self.with_rocksdb;
520
521 if has_explicit_flags {
522 let mut selections = BTreeMap::new();
523 if available(SnapshotComponentType::State) {
525 selections.insert(SnapshotComponentType::State, ComponentSelection::All);
526 }
527 if available(SnapshotComponentType::Headers) {
528 selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
529 }
530 if self.with_txs && available(SnapshotComponentType::Transactions) {
531 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
532 }
533 if self.with_receipts && available(SnapshotComponentType::Receipts) {
534 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
535 }
536 if self.with_state_history {
537 if available(SnapshotComponentType::AccountChangesets) {
538 selections
539 .insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
540 }
541 if available(SnapshotComponentType::StorageChangesets) {
542 selections
543 .insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
544 }
545 }
546 if self.with_senders && available(SnapshotComponentType::TransactionSenders) {
547 selections
548 .insert(SnapshotComponentType::TransactionSenders, ComponentSelection::All);
549 }
550 if self.with_rocksdb && available(SnapshotComponentType::RocksdbIndices) {
551 selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
552 }
553 return Ok(ResolvedComponents { selections, preset: None });
554 }
555
556 if self.non_interactive {
557 return Ok(ResolvedComponents {
558 selections: self.minimal_preset_selections(manifest),
559 preset: Some(SelectionPreset::Minimal),
560 });
561 }
562
563 let full_preset = self.full_preset_selections(manifest);
565 let SelectorOutput { selections, preset } = run_selector(manifest.clone(), &full_preset)?;
566 let selected =
567 selections.into_iter().filter(|(_, sel)| *sel != ComponentSelection::None).collect();
568
569 Ok(ResolvedComponents { selections: selected, preset })
570 }
571
572 fn minimal_preset_selections(
573 &self,
574 manifest: &SnapshotManifest,
575 ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
576 SnapshotComponentType::ALL
577 .iter()
578 .copied()
579 .filter(|ty| manifest.component(*ty).is_some())
580 .map(|ty| (ty, ty.minimal_selection()))
581 .collect()
582 }
583
584 fn full_preset_selections(
585 &self,
586 manifest: &SnapshotManifest,
587 ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
588 let mut selections = BTreeMap::new();
589
590 for ty in [
591 SnapshotComponentType::State,
592 SnapshotComponentType::Headers,
593 SnapshotComponentType::Transactions,
594 SnapshotComponentType::Receipts,
595 SnapshotComponentType::AccountChangesets,
596 SnapshotComponentType::StorageChangesets,
597 SnapshotComponentType::TransactionSenders,
598 SnapshotComponentType::RocksdbIndices,
599 ] {
600 if manifest.component(ty).is_none() {
601 continue;
602 }
603
604 let selection = self.full_selection_for_component(ty, manifest.block);
605 if selection != ComponentSelection::None {
606 selections.insert(ty, selection);
607 }
608 }
609
610 selections
611 }
612
613 fn full_selection_for_component(
614 &self,
615 ty: SnapshotComponentType,
616 snapshot_block: u64,
617 ) -> ComponentSelection {
618 let defaults = DefaultPruningValues::get_global();
619 match ty {
620 SnapshotComponentType::State | SnapshotComponentType::Headers => {
621 ComponentSelection::All
622 }
623 SnapshotComponentType::Transactions => {
624 if defaults.full_bodies_history_use_pre_merge {
625 match self
626 .env
627 .chain
628 .ethereum_fork_activation(EthereumHardfork::Paris)
629 .block_number()
630 {
631 Some(paris) if snapshot_block >= paris => {
632 ComponentSelection::Distance(snapshot_block - paris + 1)
633 }
634 Some(_) => ComponentSelection::None,
635 None => ComponentSelection::All,
636 }
637 } else {
638 selection_from_prune_mode(
639 defaults.full_prune_modes.bodies_history,
640 snapshot_block,
641 )
642 }
643 }
644 SnapshotComponentType::Receipts => {
645 selection_from_prune_mode(defaults.full_prune_modes.receipts, snapshot_block)
646 }
647 SnapshotComponentType::AccountChangesets => {
648 selection_from_prune_mode(defaults.full_prune_modes.account_history, snapshot_block)
649 }
650 SnapshotComponentType::StorageChangesets => {
651 selection_from_prune_mode(defaults.full_prune_modes.storage_history, snapshot_block)
652 }
653 SnapshotComponentType::TransactionSenders => {
654 selection_from_prune_mode(defaults.full_prune_modes.sender_recovery, snapshot_block)
655 }
656 SnapshotComponentType::RocksdbIndices => ComponentSelection::None,
658 }
659 }
660
661 async fn resolve_manifest_source(&self, chain_id: u64) -> Result<String> {
662 if let Some(path) = &self.manifest_path {
663 return Ok(path.display().to_string());
664 }
665
666 match &self.manifest_url {
667 Some(url) => Ok(url.clone()),
668 None => discover_manifest_url(chain_id).await,
669 }
670 }
671}
672
673fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> ComponentSelection {
674 match mode {
675 None => ComponentSelection::All,
676 Some(PruneMode::Full) => ComponentSelection::None,
677 Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
678 Some(PruneMode::Before(block)) => {
679 if snapshot_block >= block {
680 ComponentSelection::Distance(snapshot_block - block + 1)
681 } else {
682 ComponentSelection::None
683 }
684 }
685 }
686}
687
688fn inject_archive_only_components(
691 selections: &mut BTreeMap<SnapshotComponentType, ComponentSelection>,
692 manifest: &SnapshotManifest,
693 include_rocksdb: bool,
694) {
695 let is_all =
696 |ty: SnapshotComponentType| selections.get(&ty).copied() == Some(ComponentSelection::All);
697
698 let is_archive = is_all(SnapshotComponentType::Transactions) &&
699 is_all(SnapshotComponentType::Receipts) &&
700 is_all(SnapshotComponentType::AccountChangesets) &&
701 is_all(SnapshotComponentType::StorageChangesets);
702
703 if !is_archive {
704 return;
705 }
706
707 for component in
708 [SnapshotComponentType::TransactionSenders, SnapshotComponentType::RocksdbIndices]
709 {
710 if component == SnapshotComponentType::RocksdbIndices && !include_rocksdb {
711 continue;
712 }
713
714 if manifest.component(component).is_some() {
715 selections.insert(component, ComponentSelection::All);
716 }
717 }
718}
719
720fn should_reset_index_stage_checkpoints(
721 selections: &BTreeMap<SnapshotComponentType, ComponentSelection>,
722) -> bool {
723 !matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
724}
725
726impl<C: ChainSpecParser> DownloadCommand<C> {
727 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
729 Some(&self.env.chain)
730 }
731}
732
733pub(crate) struct DownloadProgress {
735 downloaded: u64,
736 total_size: u64,
737 last_displayed: Instant,
738 started_at: Instant,
739}
740
741#[derive(Debug, Clone)]
742struct PlannedArchive {
743 ty: SnapshotComponentType,
744 component: String,
745 archive: ArchiveDescriptor,
746}
747
748const fn archive_priority_rank(ty: SnapshotComponentType) -> u8 {
749 match ty {
750 SnapshotComponentType::State => 0,
751 SnapshotComponentType::RocksdbIndices => 1,
752 _ => 2,
753 }
754}
755
756#[derive(Debug, Default, Clone, Copy)]
757struct DownloadStartupSummary {
758 reusable: usize,
759 needs_download: usize,
760}
761
762fn summarize_download_startup(
763 all_downloads: &[PlannedArchive],
764 target_dir: &Path,
765) -> Result<DownloadStartupSummary> {
766 let mut summary = DownloadStartupSummary::default();
767
768 for planned in all_downloads {
769 if verify_output_files(target_dir, &planned.archive.output_files)? {
770 summary.reusable += 1;
771 } else {
772 summary.needs_download += 1;
773 }
774 }
775
776 Ok(summary)
777}
778
779impl DownloadProgress {
780 fn new(total_size: u64) -> Self {
782 let now = Instant::now();
783 Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
784 }
785
786 pub(crate) fn format_size(size: u64) -> String {
788 let mut size = size as f64;
789 let mut unit_index = 0;
790
791 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
792 size /= 1024.0;
793 unit_index += 1;
794 }
795
796 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
797 }
798
799 fn format_duration(duration: Duration) -> String {
801 let secs = duration.as_secs();
802 if secs < 60 {
803 format!("{secs}s")
804 } else if secs < 3600 {
805 format!("{}m {}s", secs / 60, secs % 60)
806 } else {
807 format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
808 }
809 }
810
811 fn update(&mut self, chunk_size: u64) -> Result<()> {
813 self.downloaded += chunk_size;
814
815 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
816 let formatted_downloaded = Self::format_size(self.downloaded);
817 let formatted_total = Self::format_size(self.total_size);
818 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
819
820 let elapsed = self.started_at.elapsed();
821 let eta = if self.downloaded > 0 {
822 let remaining = self.total_size.saturating_sub(self.downloaded);
823 let speed = self.downloaded as f64 / elapsed.as_secs_f64();
824 if speed > 0.0 {
825 Duration::from_secs_f64(remaining as f64 / speed)
826 } else {
827 Duration::ZERO
828 }
829 } else {
830 Duration::ZERO
831 };
832 let eta_str = Self::format_duration(eta);
833
834 print!(
835 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str} ",
836 );
837 io::stdout().flush()?;
838 self.last_displayed = Instant::now();
839 }
840
841 Ok(())
842 }
843}
844
845struct SharedProgress {
851 downloaded: AtomicU64,
852 total_size: u64,
853 total_archives: u64,
854 archives_done: AtomicU64,
855 done: AtomicBool,
856 cancel_token: CancellationToken,
857}
858
859impl SharedProgress {
860 fn new(total_size: u64, total_archives: u64, cancel_token: CancellationToken) -> Arc<Self> {
861 Arc::new(Self {
862 downloaded: AtomicU64::new(0),
863 total_size,
864 total_archives,
865 archives_done: AtomicU64::new(0),
866 done: AtomicBool::new(false),
867 cancel_token,
868 })
869 }
870
871 fn is_cancelled(&self) -> bool {
872 self.cancel_token.is_cancelled()
873 }
874
875 fn add(&self, bytes: u64) {
876 self.downloaded.fetch_add(bytes, Ordering::Relaxed);
877 }
878
879 fn archive_done(&self) {
880 self.archives_done.fetch_add(1, Ordering::Relaxed);
881 }
882}
883
884fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
887 tokio::spawn(async move {
888 let started_at = Instant::now();
889 let mut interval = tokio::time::interval(Duration::from_secs(3));
890 interval.tick().await; loop {
892 interval.tick().await;
893
894 if progress.done.load(Ordering::Relaxed) {
895 break;
896 }
897
898 let downloaded = progress.downloaded.load(Ordering::Relaxed);
899 let total = progress.total_size;
900 if total == 0 {
901 continue;
902 }
903
904 let done = progress.archives_done.load(Ordering::Relaxed);
905 let all = progress.total_archives;
906 let pct = (downloaded as f64 / total as f64) * 100.0;
907 let dl = DownloadProgress::format_size(downloaded);
908 let tot = DownloadProgress::format_size(total);
909
910 let elapsed = started_at.elapsed();
911 let remaining = total.saturating_sub(downloaded);
912
913 if remaining == 0 {
914 info!(target: "reth::cli",
916 archives = format_args!("{done}/{all}"),
917 downloaded = %dl,
918 "Extracting remaining archives"
919 );
920 } else {
921 let eta = if downloaded > 0 {
922 let speed = downloaded as f64 / elapsed.as_secs_f64();
923 if speed > 0.0 {
924 DownloadProgress::format_duration(Duration::from_secs_f64(
925 remaining as f64 / speed,
926 ))
927 } else {
928 "??".to_string()
929 }
930 } else {
931 "??".to_string()
932 };
933
934 info!(target: "reth::cli",
935 archives = format_args!("{done}/{all}"),
936 progress = format_args!("{pct:.1}%"),
937 downloaded = %dl,
938 total = %tot,
939 eta = %eta,
940 "Downloading"
941 );
942 }
943 }
944
945 let downloaded = progress.downloaded.load(Ordering::Relaxed);
947 let dl = DownloadProgress::format_size(downloaded);
948 let tot = DownloadProgress::format_size(progress.total_size);
949 let elapsed = DownloadProgress::format_duration(started_at.elapsed());
950 info!(target: "reth::cli",
951 downloaded = %dl,
952 total = %tot,
953 elapsed = %elapsed,
954 "Downloads complete"
955 );
956 })
957}
958
959struct ProgressReader<R> {
961 reader: R,
962 progress: DownloadProgress,
963 cancel_token: CancellationToken,
964}
965
966impl<R: Read> ProgressReader<R> {
967 fn new(reader: R, total_size: u64, cancel_token: CancellationToken) -> Self {
968 Self { reader, progress: DownloadProgress::new(total_size), cancel_token }
969 }
970}
971
972impl<R: Read> Read for ProgressReader<R> {
973 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
974 if self.cancel_token.is_cancelled() {
975 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
976 }
977 let bytes = self.reader.read(buf)?;
978 if bytes > 0 &&
979 let Err(e) = self.progress.update(bytes as u64)
980 {
981 return Err(io::Error::other(e));
982 }
983 Ok(bytes)
984 }
985}
986
987#[derive(Debug, Clone, Copy)]
989enum CompressionFormat {
990 Lz4,
991 Zstd,
992}
993
994impl CompressionFormat {
995 fn from_url(url: &str) -> Result<Self> {
997 let path =
998 Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
999
1000 if path.ends_with(EXTENSION_TAR_LZ4) {
1001 Ok(Self::Lz4)
1002 } else if path.ends_with(EXTENSION_TAR_ZSTD) {
1003 Ok(Self::Zstd)
1004 } else {
1005 Err(eyre::eyre!(
1006 "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
1007 path
1008 ))
1009 }
1010 }
1011}
1012
1013fn extract_archive<R: Read>(
1015 reader: R,
1016 total_size: u64,
1017 format: CompressionFormat,
1018 target_dir: &Path,
1019 cancel_token: CancellationToken,
1020) -> Result<()> {
1021 let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
1022
1023 match format {
1024 CompressionFormat::Lz4 => {
1025 let decoder = Decoder::new(progress_reader)?;
1026 Archive::new(decoder).unpack(target_dir)?;
1027 }
1028 CompressionFormat::Zstd => {
1029 let decoder = ZstdDecoder::new(progress_reader)?;
1030 Archive::new(decoder).unpack(target_dir)?;
1031 }
1032 }
1033
1034 println!();
1035 Ok(())
1036}
1037
1038fn extract_archive_raw<R: Read>(
1040 reader: R,
1041 format: CompressionFormat,
1042 target_dir: &Path,
1043) -> Result<()> {
1044 match format {
1045 CompressionFormat::Lz4 => {
1046 Archive::new(Decoder::new(reader)?).unpack(target_dir)?;
1047 }
1048 CompressionFormat::Zstd => {
1049 Archive::new(ZstdDecoder::new(reader)?).unpack(target_dir)?;
1050 }
1051 }
1052 Ok(())
1053}
1054
1055fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
1057 let file = std::fs::File::open(path)?;
1058 let total_size = file.metadata()?.len();
1059 info!(target: "reth::cli",
1060 file = %path.display(),
1061 size = %DownloadProgress::format_size(total_size),
1062 "Extracting local archive"
1063 );
1064 let start = Instant::now();
1065 extract_archive(file, total_size, format, target_dir, CancellationToken::new())?;
1066 info!(target: "reth::cli",
1067 file = %path.display(),
1068 elapsed = %DownloadProgress::format_duration(start.elapsed()),
1069 "Local extraction complete"
1070 );
1071 Ok(())
1072}
1073
1074const MAX_DOWNLOAD_RETRIES: u32 = 10;
1075const RETRY_BACKOFF_SECS: u64 = 5;
1076
1077struct ProgressWriter<W> {
1080 inner: W,
1081 progress: DownloadProgress,
1082 cancel_token: CancellationToken,
1083}
1084
1085impl<W: Write> Write for ProgressWriter<W> {
1086 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1087 if self.cancel_token.is_cancelled() {
1088 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1089 }
1090 let n = self.inner.write(buf)?;
1091 let _ = self.progress.update(n as u64);
1092 Ok(n)
1093 }
1094
1095 fn flush(&mut self) -> io::Result<()> {
1096 self.inner.flush()
1097 }
1098}
1099
1100struct SharedProgressWriter<W> {
1103 inner: W,
1104 progress: Arc<SharedProgress>,
1105}
1106
1107impl<W: Write> Write for SharedProgressWriter<W> {
1108 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1109 if self.progress.is_cancelled() {
1110 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1111 }
1112 let n = self.inner.write(buf)?;
1113 self.progress.add(n as u64);
1114 Ok(n)
1115 }
1116
1117 fn flush(&mut self) -> io::Result<()> {
1118 self.inner.flush()
1119 }
1120}
1121
1122struct SharedProgressReader<R> {
1125 inner: R,
1126 progress: Arc<SharedProgress>,
1127}
1128
1129impl<R: Read> Read for SharedProgressReader<R> {
1130 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1131 if self.progress.is_cancelled() {
1132 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1133 }
1134 let n = self.inner.read(buf)?;
1135 self.progress.add(n as u64);
1136 Ok(n)
1137 }
1138}
1139
1140fn resumable_download(
1147 url: &str,
1148 target_dir: &Path,
1149 shared: Option<&Arc<SharedProgress>>,
1150 cancel_token: CancellationToken,
1151) -> Result<(PathBuf, u64)> {
1152 let file_name = Url::parse(url)
1153 .ok()
1154 .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
1155 .unwrap_or_else(|| "snapshot.tar".to_string());
1156
1157 let final_path = target_dir.join(&file_name);
1158 let part_path = target_dir.join(format!("{file_name}.part"));
1159
1160 let quiet = shared.is_some();
1161
1162 if !quiet {
1163 info!(target: "reth::cli", file = %file_name, "Connecting to download server");
1164 }
1165 let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
1166
1167 let mut total_size: Option<u64> = None;
1168 let mut last_error: Option<eyre::Error> = None;
1169
1170 let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
1171 fs::rename(&part_path, &final_path)?;
1172 if !quiet {
1173 info!(target: "reth::cli", file = %file_name, "Download complete");
1174 }
1175 Ok((final_path.clone(), size))
1176 };
1177
1178 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1179 let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1180
1181 if let Some(total) = total_size &&
1182 existing_size >= total
1183 {
1184 return finalize_download(total);
1185 }
1186
1187 if attempt > 1 {
1188 info!(target: "reth::cli",
1189 file = %file_name,
1190 "Retry attempt {}/{} - resuming from {} bytes",
1191 attempt, MAX_DOWNLOAD_RETRIES, existing_size
1192 );
1193 }
1194
1195 let mut request = client.get(url);
1196 if existing_size > 0 {
1197 request = request.header(RANGE, format!("bytes={existing_size}-"));
1198 if !quiet && attempt == 1 {
1199 info!(target: "reth::cli", file = %file_name, "Resuming from {} bytes", existing_size);
1200 }
1201 }
1202
1203 let response = match request.send().and_then(|r| r.error_for_status()) {
1204 Ok(r) => r,
1205 Err(e) => {
1206 last_error = Some(e.into());
1207 if attempt < MAX_DOWNLOAD_RETRIES {
1208 info!(target: "reth::cli",
1209 file = %file_name,
1210 "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
1211 );
1212 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1213 }
1214 continue;
1215 }
1216 };
1217
1218 let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
1219
1220 let size = if is_partial {
1221 response
1222 .headers()
1223 .get("Content-Range")
1224 .and_then(|v| v.to_str().ok())
1225 .and_then(|v| v.split('/').next_back())
1226 .and_then(|v| v.parse().ok())
1227 } else {
1228 response.content_length()
1229 };
1230
1231 if total_size.is_none() {
1232 total_size = size;
1233 if !quiet && let Some(s) = size {
1234 info!(target: "reth::cli",
1235 file = %file_name,
1236 size = %DownloadProgress::format_size(s),
1237 "Downloading"
1238 );
1239 }
1240 }
1241
1242 let current_total = total_size.ok_or_else(|| {
1243 eyre::eyre!("Server did not provide Content-Length or Content-Range header")
1244 })?;
1245
1246 let file = if is_partial && existing_size > 0 {
1247 OpenOptions::new()
1248 .append(true)
1249 .open(&part_path)
1250 .map_err(|e| fs::FsPathError::open(e, &part_path))?
1251 } else {
1252 fs::create_file(&part_path)?
1253 };
1254
1255 let start_offset = if is_partial { existing_size } else { 0 };
1256 let mut reader = response;
1257
1258 let copy_result;
1259 let flush_result;
1260
1261 if let Some(sp) = shared {
1262 if start_offset > 0 {
1264 sp.add(start_offset);
1265 }
1266 let mut writer =
1267 SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
1268 copy_result = io::copy(&mut reader, &mut writer);
1269 flush_result = writer.inner.flush();
1270 } else {
1271 let mut progress = DownloadProgress::new(current_total);
1273 progress.downloaded = start_offset;
1274 let mut writer = ProgressWriter {
1275 inner: BufWriter::new(file),
1276 progress,
1277 cancel_token: cancel_token.clone(),
1278 };
1279 copy_result = io::copy(&mut reader, &mut writer);
1280 flush_result = writer.inner.flush();
1281 println!();
1282 }
1283
1284 if let Err(e) = copy_result.and(flush_result) {
1285 last_error = Some(e.into());
1286 if attempt < MAX_DOWNLOAD_RETRIES {
1287 info!(target: "reth::cli",
1288 file = %file_name,
1289 "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
1290 );
1291 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1292 }
1293 continue;
1294 }
1295
1296 return finalize_download(current_total);
1297 }
1298
1299 Err(last_error
1300 .unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
1301}
1302
1303fn streaming_download_and_extract(
1307 url: &str,
1308 format: CompressionFormat,
1309 target_dir: &Path,
1310 shared: Option<&Arc<SharedProgress>>,
1311 cancel_token: CancellationToken,
1312) -> Result<()> {
1313 let quiet = shared.is_some();
1314 let mut last_error: Option<eyre::Error> = None;
1315
1316 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1317 if attempt > 1 {
1318 info!(target: "reth::cli",
1319 url = %url,
1320 attempt,
1321 max = MAX_DOWNLOAD_RETRIES,
1322 "Retrying streaming download from scratch"
1323 );
1324 }
1325
1326 let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
1327
1328 let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
1329 Ok(r) => r,
1330 Err(e) => {
1331 last_error = Some(e.into());
1332 if attempt < MAX_DOWNLOAD_RETRIES {
1333 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1334 }
1335 continue;
1336 }
1337 };
1338
1339 if !quiet && let Some(size) = response.content_length() {
1340 info!(target: "reth::cli",
1341 url = %url,
1342 size = %DownloadProgress::format_size(size),
1343 "Streaming archive"
1344 );
1345 }
1346
1347 let result = if let Some(sp) = shared {
1348 let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
1349 extract_archive_raw(reader, format, target_dir)
1350 } else {
1351 let total_size = response.content_length().unwrap_or(0);
1352 extract_archive(response, total_size, format, target_dir, cancel_token.clone())
1353 };
1354
1355 match result {
1356 Ok(()) => return Ok(()),
1357 Err(e) => {
1358 last_error = Some(e);
1359 if attempt < MAX_DOWNLOAD_RETRIES {
1360 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1361 }
1362 }
1363 }
1364 }
1365
1366 Err(last_error.unwrap_or_else(|| {
1367 eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
1368 }))
1369}
1370
1371fn download_and_extract(
1373 url: &str,
1374 format: CompressionFormat,
1375 target_dir: &Path,
1376 shared: Option<&Arc<SharedProgress>>,
1377 cancel_token: CancellationToken,
1378) -> Result<()> {
1379 let quiet = shared.is_some();
1380 let (downloaded_path, total_size) =
1381 resumable_download(url, target_dir, shared, cancel_token.clone())?;
1382
1383 let file_name =
1384 downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
1385
1386 if !quiet {
1387 info!(target: "reth::cli",
1388 file = %file_name,
1389 size = %DownloadProgress::format_size(total_size),
1390 "Extracting archive"
1391 );
1392 }
1393 let file = fs::open(&downloaded_path)?;
1394
1395 if quiet {
1396 extract_archive_raw(file, format, target_dir)?;
1398 } else {
1399 extract_archive(file, total_size, format, target_dir, cancel_token)?;
1400 info!(target: "reth::cli",
1401 file = %file_name,
1402 "Extraction complete"
1403 );
1404 }
1405
1406 fs::remove_file(&downloaded_path)?;
1407
1408 if let Some(sp) = shared {
1409 sp.archive_done();
1410 }
1411
1412 Ok(())
1413}
1414
1415fn blocking_download_and_extract(
1421 url: &str,
1422 target_dir: &Path,
1423 shared: Option<Arc<SharedProgress>>,
1424 resumable: bool,
1425 cancel_token: CancellationToken,
1426) -> Result<()> {
1427 let format = CompressionFormat::from_url(url)?;
1428
1429 if let Ok(parsed_url) = Url::parse(url) &&
1430 parsed_url.scheme() == "file"
1431 {
1432 let file_path = parsed_url
1433 .to_file_path()
1434 .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
1435 let result = extract_from_file(&file_path, format, target_dir);
1436 if result.is_ok() &&
1437 let Some(sp) = shared
1438 {
1439 sp.archive_done();
1440 }
1441 result
1442 } else if resumable {
1443 download_and_extract(url, format, target_dir, shared.as_ref(), cancel_token)
1444 } else {
1445 let result =
1446 streaming_download_and_extract(url, format, target_dir, shared.as_ref(), cancel_token);
1447 if result.is_ok() &&
1448 let Some(sp) = shared
1449 {
1450 sp.archive_done();
1451 }
1452 result
1453 }
1454}
1455
1456async fn stream_and_extract(
1462 url: &str,
1463 target_dir: &Path,
1464 shared: Option<Arc<SharedProgress>>,
1465 resumable: bool,
1466 cancel_token: CancellationToken,
1467) -> Result<()> {
1468 let target_dir = target_dir.to_path_buf();
1469 let url = url.to_string();
1470 task::spawn_blocking(move || {
1471 blocking_download_and_extract(&url, &target_dir, shared, resumable, cancel_token)
1472 })
1473 .await??;
1474
1475 Ok(())
1476}
1477
1478async fn process_modular_archive(
1479 planned: PlannedArchive,
1480 target_dir: &Path,
1481 cache_dir: Option<&Path>,
1482 shared: Option<Arc<SharedProgress>>,
1483 resumable: bool,
1484 cancel_token: CancellationToken,
1485) -> Result<()> {
1486 let target_dir = target_dir.to_path_buf();
1487 let cache_dir = cache_dir.map(Path::to_path_buf);
1488
1489 task::spawn_blocking(move || {
1490 blocking_process_modular_archive(
1491 &planned,
1492 &target_dir,
1493 cache_dir.as_deref(),
1494 shared,
1495 resumable,
1496 cancel_token,
1497 )
1498 })
1499 .await??;
1500
1501 Ok(())
1502}
1503
1504fn blocking_process_modular_archive(
1505 planned: &PlannedArchive,
1506 target_dir: &Path,
1507 cache_dir: Option<&Path>,
1508 shared: Option<Arc<SharedProgress>>,
1509 resumable: bool,
1510 cancel_token: CancellationToken,
1511) -> Result<()> {
1512 let archive = &planned.archive;
1513 if verify_output_files(target_dir, &archive.output_files)? {
1514 if let Some(sp) = &shared {
1515 sp.add(archive.size);
1516 sp.archive_done();
1517 }
1518 info!(target: "reth::cli", file = %archive.file_name, component = %planned.component, "Skipping already verified plain files");
1519 return Ok(());
1520 }
1521
1522 let format = CompressionFormat::from_url(&archive.file_name)?;
1523 let mut last_error: Option<eyre::Error> = None;
1524 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1525 cleanup_output_files(target_dir, &archive.output_files);
1526
1527 if resumable {
1528 let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
1529 let archive_path = cache_dir.join(&archive.file_name);
1530 let part_path = cache_dir.join(format!("{}.part", archive.file_name));
1531 let result =
1532 resumable_download(&archive.url, cache_dir, shared.as_ref(), cancel_token.clone())
1533 .and_then(|(downloaded_path, _)| {
1534 let file = fs::open(&downloaded_path)?;
1535 extract_archive_raw(file, format, target_dir)
1536 });
1537 let _ = fs::remove_file(&archive_path);
1538 let _ = fs::remove_file(&part_path);
1539
1540 if let Err(e) = result {
1541 warn!(target: "reth::cli",
1542 file = %archive.file_name,
1543 component = %planned.component,
1544 attempt,
1545 err = %e,
1546 "Download or extraction failed, retrying"
1547 );
1548 last_error = Some(e);
1549 if attempt < MAX_DOWNLOAD_RETRIES {
1550 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1551 }
1552 continue;
1553 }
1554 } else {
1555 streaming_download_and_extract(
1557 &archive.url,
1558 format,
1559 target_dir,
1560 shared.as_ref(),
1561 cancel_token.clone(),
1562 )?;
1563 }
1564
1565 if verify_output_files(target_dir, &archive.output_files)? {
1566 if let Some(sp) = &shared {
1567 sp.archive_done();
1568 }
1569 return Ok(());
1570 }
1571
1572 warn!(target: "reth::cli", file = %archive.file_name, component = %planned.component, attempt, "Extracted files failed integrity checks, retrying");
1573 }
1574
1575 if let Some(e) = last_error {
1576 return Err(e.wrap_err(format!(
1577 "Failed after {} attempts for {}",
1578 MAX_DOWNLOAD_RETRIES, archive.file_name
1579 )));
1580 }
1581
1582 eyre::bail!(
1583 "Failed integrity validation after {} attempts for {}",
1584 MAX_DOWNLOAD_RETRIES,
1585 archive.file_name
1586 )
1587}
1588
1589fn verify_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) -> Result<bool> {
1590 if output_files.is_empty() {
1591 return Ok(false);
1592 }
1593
1594 for expected in output_files {
1595 let output_path = target_dir.join(&expected.path);
1596 let meta = match fs::metadata(&output_path) {
1597 Ok(meta) => meta,
1598 Err(_) => return Ok(false),
1599 };
1600 if meta.len() != expected.size {
1601 return Ok(false);
1602 }
1603
1604 let actual = file_blake3_hex(&output_path)?;
1605 if !actual.eq_ignore_ascii_case(&expected.blake3) {
1606 return Ok(false);
1607 }
1608 }
1609
1610 Ok(true)
1611}
1612
1613fn cleanup_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) {
1614 for output in output_files {
1615 let _ = fs::remove_file(target_dir.join(&output.path));
1616 }
1617}
1618
1619fn file_blake3_hex(path: &Path) -> Result<String> {
1620 let mut file = fs::open(path)?;
1621 let mut hasher = Hasher::new();
1622 let mut buf = [0_u8; 64 * 1024];
1623
1624 loop {
1625 let n = file.read(&mut buf)?;
1626 if n == 0 {
1627 break;
1628 }
1629 hasher.update(&buf[..n]);
1630 }
1631
1632 Ok(hasher.finalize().to_hex().to_string())
1633}
1634
1635async fn discover_manifest_url(chain_id: u64) -> Result<String> {
1640 let api_url = RETH_SNAPSHOTS_API_URL;
1641
1642 info!(target: "reth::cli", %api_url, %chain_id, "Discovering latest snapshot manifest");
1643
1644 let entries = fetch_snapshot_api_entries(chain_id).await?;
1645
1646 let entry =
1647 entries.iter().filter(|s| s.is_modular()).max_by_key(|s| s.block).ok_or_else(|| {
1648 eyre::eyre!(
1649 "No modular snapshot manifest found for chain \
1650 {chain_id} at {api_url}\n\n\
1651 You can provide a manifest URL directly with --manifest-url, or\n\
1652 use a direct snapshot URL with -u from:\n\
1653 \t- https://snapshots.reth.rs\n\n\
1654 Use --list to see all available snapshots."
1655 )
1656 })?;
1657
1658 info!(target: "reth::cli",
1659 block = entry.block,
1660 url = %entry.metadata_url,
1661 "Found latest snapshot manifest"
1662 );
1663
1664 Ok(entry.metadata_url.clone())
1665}
1666
1667fn deserialize_string_or_u64<'de, D>(deserializer: D) -> std::result::Result<u64, D::Error>
1669where
1670 D: serde::Deserializer<'de>,
1671{
1672 use serde::Deserialize;
1673 let value = serde_json::Value::deserialize(deserializer)?;
1674 match &value {
1675 serde_json::Value::Number(n) => {
1676 n.as_u64().ok_or_else(|| serde::de::Error::custom("expected u64"))
1677 }
1678 serde_json::Value::String(s) => {
1679 s.parse::<u64>().map_err(|_| serde::de::Error::custom("expected numeric string"))
1680 }
1681 _ => Err(serde::de::Error::custom("expected number or string")),
1682 }
1683}
1684
1685#[derive(serde::Deserialize)]
1687#[serde(rename_all = "camelCase")]
1688struct SnapshotApiEntry {
1689 #[serde(deserialize_with = "deserialize_string_or_u64")]
1690 chain_id: u64,
1691 #[serde(deserialize_with = "deserialize_string_or_u64")]
1692 block: u64,
1693 #[serde(default)]
1694 date: Option<String>,
1695 #[serde(default)]
1696 profile: Option<String>,
1697 metadata_url: String,
1698 #[serde(default)]
1699 size: u64,
1700}
1701
1702impl SnapshotApiEntry {
1703 fn is_modular(&self) -> bool {
1704 self.metadata_url.ends_with("manifest.json")
1705 }
1706}
1707
1708async fn fetch_snapshot_api_entries(chain_id: u64) -> Result<Vec<SnapshotApiEntry>> {
1710 let api_url = RETH_SNAPSHOTS_API_URL;
1711
1712 let entries: Vec<SnapshotApiEntry> = Client::new()
1713 .get(api_url)
1714 .send()
1715 .await
1716 .and_then(|r| r.error_for_status())
1717 .wrap_err_with(|| format!("Failed to fetch snapshot listing from {api_url}"))?
1718 .json()
1719 .await?;
1720
1721 Ok(entries.into_iter().filter(|e| e.chain_id == chain_id).collect())
1722}
1723
1724fn print_snapshot_listing(entries: &[SnapshotApiEntry], chain_id: u64) {
1726 let modular: Vec<_> = entries.iter().filter(|e| e.is_modular()).collect();
1727
1728 println!("Available snapshots for chain {chain_id} (https://snapshots.reth.rs):\n");
1729 println!("{:<12} {:>10} {:<10} {:>10} MANIFEST URL", "DATE", "BLOCK", "PROFILE", "SIZE");
1730 println!("{}", "-".repeat(100));
1731
1732 for entry in &modular {
1733 let date = entry.date.as_deref().unwrap_or("-");
1734 let profile = entry.profile.as_deref().unwrap_or("-");
1735 let size = if entry.size > 0 {
1736 DownloadProgress::format_size(entry.size)
1737 } else {
1738 "-".to_string()
1739 };
1740
1741 println!(
1742 "{date:<12} {:>10} {profile:<10} {size:>10} {}",
1743 entry.block, entry.metadata_url
1744 );
1745 }
1746
1747 if modular.is_empty() {
1748 println!(" (no modular snapshots found)");
1749 }
1750
1751 println!(
1752 "\nTo download a specific snapshot, copy its manifest URL and run:\n \
1753 reth download --manifest-url <URL>"
1754 );
1755}
1756
1757async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
1758 if let Ok(parsed) = Url::parse(source) {
1759 return match parsed.scheme() {
1760 "http" | "https" => {
1761 let response = Client::new()
1762 .get(source)
1763 .send()
1764 .await
1765 .and_then(|r| r.error_for_status())
1766 .wrap_err_with(|| {
1767 format!(
1768 "Failed to fetch snapshot manifest from {source}\n\n\
1769 The manifest endpoint may not be available for this snapshot source.\n\
1770 You can use a direct snapshot URL instead:\n\n\
1771 \treth download -u <snapshot-url>\n\n\
1772 Available snapshot sources:\n\
1773 \t- https://snapshots.reth.rs\n\
1774 \t- https://publicnode.com/snapshots"
1775 )
1776 })?;
1777 Ok(response.json().await?)
1778 }
1779 "file" => {
1780 let path = parsed
1781 .to_file_path()
1782 .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1783 let content = fs::read_to_string(path)?;
1784 Ok(serde_json::from_str(&content)?)
1785 }
1786 _ => Err(eyre::eyre!("Unsupported manifest URL scheme: {}", parsed.scheme())),
1787 };
1788 }
1789
1790 let content = fs::read_to_string(source)?;
1791 Ok(serde_json::from_str(&content)?)
1792}
1793
1794fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Result<String> {
1795 if let Some(base_url) = manifest.base_url.as_deref() &&
1796 !base_url.is_empty()
1797 {
1798 return Ok(base_url.trim_end_matches('/').to_string());
1799 }
1800
1801 if let Ok(mut url) = Url::parse(source) {
1802 if url.scheme() == "file" {
1803 let mut path = url
1804 .to_file_path()
1805 .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1806 path.pop();
1807 let mut base = Url::from_directory_path(path)
1808 .map_err(|_| eyre::eyre!("Invalid manifest directory for source: {source}"))?
1809 .to_string();
1810 if base.ends_with('/') {
1811 base.pop();
1812 }
1813 return Ok(base);
1814 }
1815
1816 {
1817 let mut segments = url
1818 .path_segments_mut()
1819 .map_err(|_| eyre::eyre!("manifest_url must have a hierarchical path"))?;
1820 segments.pop_if_empty();
1821 segments.pop();
1822 }
1823 return Ok(url.as_str().trim_end_matches('/').to_string());
1824 }
1825
1826 let path = Path::new(source);
1827 let manifest_dir = if path.is_absolute() {
1828 path.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1829 } else {
1830 let joined = std::env::current_dir()?.join(path);
1831 joined.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1832 };
1833 let mut base = Url::from_directory_path(&manifest_dir)
1834 .map_err(|_| eyre::eyre!("Invalid manifest directory: {}", manifest_dir.display()))?
1835 .to_string();
1836 if base.ends_with('/') {
1837 base.pop();
1838 }
1839 Ok(base)
1840}
1841
1842#[cfg(test)]
1843mod tests {
1844 use super::*;
1845 use manifest::{ComponentManifest, SingleArchive};
1846 use tempfile::tempdir;
1847
1848 fn manifest_with_archive_only_components() -> SnapshotManifest {
1849 let mut components = BTreeMap::new();
1850 components.insert(
1851 SnapshotComponentType::TransactionSenders.key().to_string(),
1852 ComponentManifest::Single(SingleArchive {
1853 file: "transaction_senders.tar.zst".to_string(),
1854 size: 1,
1855 blake3: None,
1856 output_files: vec![],
1857 }),
1858 );
1859 components.insert(
1860 SnapshotComponentType::RocksdbIndices.key().to_string(),
1861 ComponentManifest::Single(SingleArchive {
1862 file: "rocksdb_indices.tar.zst".to_string(),
1863 size: 1,
1864 blake3: None,
1865 output_files: vec![],
1866 }),
1867 );
1868 SnapshotManifest {
1869 block: 0,
1870 chain_id: 1,
1871 storage_version: 2,
1872 timestamp: 0,
1873 base_url: Some("https://example.com".to_string()),
1874 reth_version: None,
1875 components,
1876 }
1877 }
1878
1879 #[test]
1880 fn test_download_defaults_builder() {
1881 let defaults = DownloadDefaults::default()
1882 .with_snapshot("https://example.com/snapshots (example)")
1883 .with_base_url("https://example.com");
1884
1885 assert_eq!(defaults.default_base_url, "https://example.com");
1886 assert_eq!(defaults.available_snapshots.len(), 3); }
1888
1889 #[test]
1890 fn test_download_defaults_replace_snapshots() {
1891 let defaults = DownloadDefaults::default().with_snapshots(vec![
1892 Cow::Borrowed("https://custom1.com"),
1893 Cow::Borrowed("https://custom2.com"),
1894 ]);
1895
1896 assert_eq!(defaults.available_snapshots.len(), 2);
1897 assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
1898 }
1899
1900 #[test]
1901 fn test_long_help_generation() {
1902 let defaults = DownloadDefaults::default();
1903 let help = defaults.long_help();
1904
1905 assert!(help.contains("Available snapshot sources:"));
1906 assert!(help.contains("snapshots.reth.rs"));
1907 assert!(help.contains("publicnode.com"));
1908 assert!(help.contains("file://"));
1909 }
1910
1911 #[test]
1912 fn test_long_help_override() {
1913 let custom_help = "This is custom help text for downloading snapshots.";
1914 let defaults = DownloadDefaults::default().with_long_help(custom_help);
1915
1916 let help = defaults.long_help();
1917 assert_eq!(help, custom_help);
1918 assert!(!help.contains("Available snapshot sources:"));
1919 }
1920
1921 #[test]
1922 fn test_builder_chaining() {
1923 let defaults = DownloadDefaults::default()
1924 .with_base_url("https://custom.example.com")
1925 .with_snapshot("https://snapshot1.com")
1926 .with_snapshot("https://snapshot2.com")
1927 .with_long_help("Custom help for snapshots");
1928
1929 assert_eq!(defaults.default_base_url, "https://custom.example.com");
1930 assert_eq!(defaults.available_snapshots.len(), 4); assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
1932 }
1933
1934 #[test]
1935 fn test_compression_format_detection() {
1936 assert!(matches!(
1937 CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
1938 Ok(CompressionFormat::Lz4)
1939 ));
1940 assert!(matches!(
1941 CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
1942 Ok(CompressionFormat::Zstd)
1943 ));
1944 assert!(matches!(
1945 CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
1946 Ok(CompressionFormat::Lz4)
1947 ));
1948 assert!(matches!(
1949 CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
1950 Ok(CompressionFormat::Zstd)
1951 ));
1952 assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
1953 }
1954
1955 #[test]
1956 fn inject_archive_only_components_for_archive_selection() {
1957 let manifest = manifest_with_archive_only_components();
1958 let mut selections = BTreeMap::new();
1959 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1960 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1961 selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1962 selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1963
1964 inject_archive_only_components(&mut selections, &manifest, true);
1965
1966 assert_eq!(
1967 selections.get(&SnapshotComponentType::TransactionSenders),
1968 Some(&ComponentSelection::All)
1969 );
1970 assert_eq!(
1971 selections.get(&SnapshotComponentType::RocksdbIndices),
1972 Some(&ComponentSelection::All)
1973 );
1974 }
1975
1976 #[test]
1977 fn inject_archive_only_components_without_rocksdb() {
1978 let manifest = manifest_with_archive_only_components();
1979 let mut selections = BTreeMap::new();
1980 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1981 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1982 selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1983 selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1984
1985 inject_archive_only_components(&mut selections, &manifest, false);
1986
1987 assert_eq!(
1988 selections.get(&SnapshotComponentType::TransactionSenders),
1989 Some(&ComponentSelection::All)
1990 );
1991 assert_eq!(selections.get(&SnapshotComponentType::RocksdbIndices), None);
1992 }
1993
1994 #[test]
1995 fn should_reset_index_stage_checkpoints_without_rocksdb_indices() {
1996 let mut selections = BTreeMap::new();
1997 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1998 assert!(should_reset_index_stage_checkpoints(&selections));
1999
2000 selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
2001 assert!(!should_reset_index_stage_checkpoints(&selections));
2002 }
2003
2004 #[test]
2005 fn summarize_download_startup_counts_reusable_and_needs_download() {
2006 let dir = tempdir().unwrap();
2007 let target_dir = dir.path();
2008 let ok_file = target_dir.join("ok.bin");
2009 std::fs::write(&ok_file, vec![1_u8; 4]).unwrap();
2010 let ok_hash = file_blake3_hex(&ok_file).unwrap();
2011
2012 let planned = vec![
2013 PlannedArchive {
2014 ty: SnapshotComponentType::State,
2015 component: "State".to_string(),
2016 archive: ArchiveDescriptor {
2017 url: "https://example.com/ok.tar.zst".to_string(),
2018 file_name: "ok.tar.zst".to_string(),
2019 size: 10,
2020 blake3: None,
2021 output_files: vec![OutputFileChecksum {
2022 path: "ok.bin".to_string(),
2023 size: 4,
2024 blake3: ok_hash,
2025 }],
2026 },
2027 },
2028 PlannedArchive {
2029 ty: SnapshotComponentType::Headers,
2030 component: "Headers".to_string(),
2031 archive: ArchiveDescriptor {
2032 url: "https://example.com/missing.tar.zst".to_string(),
2033 file_name: "missing.tar.zst".to_string(),
2034 size: 10,
2035 blake3: None,
2036 output_files: vec![OutputFileChecksum {
2037 path: "missing.bin".to_string(),
2038 size: 1,
2039 blake3: "deadbeef".to_string(),
2040 }],
2041 },
2042 },
2043 PlannedArchive {
2044 ty: SnapshotComponentType::Transactions,
2045 component: "Transactions".to_string(),
2046 archive: ArchiveDescriptor {
2047 url: "https://example.com/bad-size.tar.zst".to_string(),
2048 file_name: "bad-size.tar.zst".to_string(),
2049 size: 10,
2050 blake3: None,
2051 output_files: vec![],
2052 },
2053 },
2054 ];
2055
2056 let summary = summarize_download_startup(&planned, target_dir).unwrap();
2057 assert_eq!(summary.reusable, 1);
2058 assert_eq!(summary.needs_download, 2);
2059 }
2060
2061 #[test]
2062 fn archive_priority_prefers_state_then_rocksdb() {
2063 let mut planned = [
2064 PlannedArchive {
2065 ty: SnapshotComponentType::Transactions,
2066 component: "Transactions".to_string(),
2067 archive: ArchiveDescriptor {
2068 url: "u3".to_string(),
2069 file_name: "t.tar.zst".to_string(),
2070 size: 1,
2071 blake3: None,
2072 output_files: vec![OutputFileChecksum {
2073 path: "a".to_string(),
2074 size: 1,
2075 blake3: "x".to_string(),
2076 }],
2077 },
2078 },
2079 PlannedArchive {
2080 ty: SnapshotComponentType::RocksdbIndices,
2081 component: "RocksDB Indices".to_string(),
2082 archive: ArchiveDescriptor {
2083 url: "u2".to_string(),
2084 file_name: "rocksdb_indices.tar.zst".to_string(),
2085 size: 1,
2086 blake3: None,
2087 output_files: vec![OutputFileChecksum {
2088 path: "b".to_string(),
2089 size: 1,
2090 blake3: "y".to_string(),
2091 }],
2092 },
2093 },
2094 PlannedArchive {
2095 ty: SnapshotComponentType::State,
2096 component: "State (mdbx)".to_string(),
2097 archive: ArchiveDescriptor {
2098 url: "u1".to_string(),
2099 file_name: "state.tar.zst".to_string(),
2100 size: 1,
2101 blake3: None,
2102 output_files: vec![OutputFileChecksum {
2103 path: "c".to_string(),
2104 size: 1,
2105 blake3: "z".to_string(),
2106 }],
2107 },
2108 },
2109 ];
2110
2111 planned.sort_by(|a, b| {
2112 archive_priority_rank(a.ty)
2113 .cmp(&archive_priority_rank(b.ty))
2114 .then_with(|| a.component.cmp(&b.component))
2115 .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
2116 });
2117
2118 assert_eq!(planned[0].ty, SnapshotComponentType::State);
2119 assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
2120 assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
2121 }
2122}