1pub mod config_gen;
2pub mod manifest;
3pub mod manifest_cmd;
4mod tui;
5
6use crate::common::EnvironmentArgs;
7use blake3::Hasher;
8use clap::Parser;
9use config_gen::{config_for_selections, write_config};
10use eyre::Result;
11use futures::stream::{self, StreamExt};
12use lz4::Decoder;
13use manifest::{
14 ArchiveDescriptor, ComponentSelection, OutputFileChecksum, SnapshotComponentType,
15 SnapshotManifest,
16};
17use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
18use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
19use reth_cli::chainspec::ChainSpecParser;
20use reth_db::{init_db, Database};
21use reth_db_api::transaction::DbTx;
22use reth_fs_util as fs;
23use reth_node_core::args::DefaultPruningValues;
24use reth_prune_types::PruneMode;
25use std::{
26 borrow::Cow,
27 collections::BTreeMap,
28 fs::OpenOptions,
29 io::{self, BufWriter, Read, Write},
30 path::{Path, PathBuf},
31 sync::{
32 atomic::{AtomicBool, AtomicU64, Ordering},
33 Arc, OnceLock,
34 },
35 time::{Duration, Instant},
36};
37use tar::Archive;
38use tokio::task;
39use tracing::{info, warn};
40use tui::{run_selector, SelectorOutput};
41use url::Url;
42use zstd::stream::read::Decoder as ZstdDecoder;
43
44const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
45const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
46const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
47const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
48const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
49
50const MAX_CONCURRENT_DOWNLOADS: usize = 8;
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub(crate) enum SelectionPreset {
55 Minimal,
56 Full,
57 Archive,
58}
59
60struct ResolvedComponents {
61 selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
62 preset: Option<SelectionPreset>,
63}
64
65static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
67
68#[derive(Debug, Clone)]
72pub struct DownloadDefaults {
73 pub available_snapshots: Vec<Cow<'static, str>>,
75 pub default_base_url: Cow<'static, str>,
77 pub default_chain_aware_base_url: Option<Cow<'static, str>>,
85 pub long_help: Option<String>,
87}
88
89impl DownloadDefaults {
90 pub fn try_init(self) -> Result<(), Self> {
92 DOWNLOAD_DEFAULTS.set(self)
93 }
94
95 pub fn get_global() -> &'static DownloadDefaults {
97 DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
98 }
99
100 pub fn default_download_defaults() -> Self {
102 Self {
103 available_snapshots: vec![
104 Cow::Borrowed("https://www.merkle.io/snapshots (default, mainnet archive)"),
105 Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
106 ],
107 default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
108 default_chain_aware_base_url: None,
109 long_help: None,
110 }
111 }
112
113 pub fn long_help(&self) -> String {
118 if let Some(ref custom_help) = self.long_help {
119 return custom_help.clone();
120 }
121
122 let mut help = String::from(
123 "Specify a snapshot URL or let the command propose a default one.\n\nAvailable snapshot sources:\n",
124 );
125
126 for source in &self.available_snapshots {
127 help.push_str("- ");
128 help.push_str(source);
129 help.push('\n');
130 }
131
132 help.push_str(
133 "\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
134 );
135 help.push_str(
136 self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
137 );
138 help.push_str(
139 ".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
140 );
141 help
142 }
143
144 pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
146 self.available_snapshots.push(source.into());
147 self
148 }
149
150 pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
152 self.available_snapshots = sources;
153 self
154 }
155
156 pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
158 self.default_base_url = url.into();
159 self
160 }
161
162 pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
164 self.default_chain_aware_base_url = Some(url.into());
165 self
166 }
167
168 pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
170 self.long_help = Some(help.into());
171 self
172 }
173}
174
175impl Default for DownloadDefaults {
176 fn default() -> Self {
177 Self::default_download_defaults()
178 }
179}
180
181#[derive(Debug, Parser)]
183pub struct DownloadCommand<C: ChainSpecParser> {
184 #[command(flatten)]
185 env: EnvironmentArgs<C>,
186
187 #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
191 url: Option<String>,
192
193 #[arg(long, value_name = "URL", conflicts_with = "url")]
198 manifest_url: Option<String>,
199
200 #[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
202 manifest_path: Option<PathBuf>,
203
204 #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
206 with_txs: bool,
207
208 #[arg(long, conflicts_with_all = ["minimal", "full", "archive"])]
210 with_receipts: bool,
211
212 #[arg(long, alias = "with-changesets", conflicts_with_all = ["minimal", "full", "archive"])]
214 with_state_history: bool,
215
216 #[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "minimal", "full"])]
218 archive: bool,
219
220 #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "archive", "full"])]
222 minimal: bool,
223
224 #[arg(long, conflicts_with_all = ["with_txs", "with_receipts", "with_state_history", "archive", "minimal"])]
226 full: bool,
227
228 #[arg(long, conflicts_with = "url")]
232 without_rocksdb: bool,
233
234 #[arg(long, short = 'y')]
237 non_interactive: bool,
238
239 #[arg(long)]
245 resumable: bool,
246
247 #[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
249 download_concurrency: usize,
250}
251
252impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
253 pub async fn execute<N>(self) -> Result<()> {
254 let chain = self.env.chain.chain();
255 let chain_id = chain.id();
256 let data_dir = self.env.datadir.clone().resolve_datadir(chain);
257 fs::create_dir_all(&data_dir)?;
258
259 if let Some(url) = self.url {
261 info!(target: "reth::cli",
262 dir = ?data_dir.data_dir(),
263 url = %url,
264 "Starting snapshot download and extraction"
265 );
266
267 stream_and_extract(&url, data_dir.data_dir(), None, self.resumable).await?;
268 info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
269
270 return Ok(());
271 }
272
273 let manifest_source = self.resolve_manifest_source(chain_id);
275
276 info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
277 let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
278 manifest.base_url = Some(resolve_manifest_base_url(&manifest, &manifest_source)?);
279
280 info!(target: "reth::cli",
281 block = manifest.block,
282 chain_id = manifest.chain_id,
283 storage_version = %manifest.storage_version,
284 components = manifest.components.len(),
285 "Loaded snapshot manifest"
286 );
287
288 let ResolvedComponents { mut selections, preset } = self.resolve_components(&manifest)?;
289
290 if matches!(preset, Some(SelectionPreset::Archive)) {
291 inject_archive_only_components(&mut selections, &manifest, !self.without_rocksdb);
292 }
293
294 let target_dir = data_dir.data_dir();
296 let mut all_downloads: Vec<PlannedArchive> = Vec::new();
297 for (ty, sel) in &selections {
298 let distance = match sel {
299 ComponentSelection::All => None,
300 ComponentSelection::Distance(d) => Some(*d),
301 ComponentSelection::None => continue,
302 };
303 let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
304 let name = ty.display_name().to_string();
305
306 if !descriptors.is_empty() {
307 info!(target: "reth::cli",
308 component = %name,
309 archives = descriptors.len(),
310 selection = %sel,
311 "Queued component for download"
312 );
313 }
314
315 for descriptor in descriptors {
316 if descriptor.output_files.is_empty() {
317 eyre::bail!(
318 "Invalid modular manifest: {} is missing plain output checksum metadata",
319 descriptor.file_name
320 );
321 }
322 all_downloads.push(PlannedArchive {
323 ty: *ty,
324 component: name.clone(),
325 archive: descriptor,
326 });
327 }
328 }
329
330 all_downloads.sort_by(|a, b| {
331 archive_priority_rank(a.ty)
332 .cmp(&archive_priority_rank(b.ty))
333 .then_with(|| a.component.cmp(&b.component))
334 .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
335 });
336
337 let download_cache_dir = if self.resumable {
338 let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
339 fs::create_dir_all(&dir)?;
340 Some(dir)
341 } else {
342 None
343 };
344
345 let total_archives = all_downloads.len();
346 let total_size: u64 = selections
347 .iter()
348 .map(|(ty, sel)| match sel {
349 ComponentSelection::All => manifest.size_for_distance(*ty, None),
350 ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
351 ComponentSelection::None => 0,
352 })
353 .sum();
354
355 let startup_summary = summarize_download_startup(&all_downloads, target_dir)?;
356 info!(target: "reth::cli",
357 reusable = startup_summary.reusable,
358 needs_download = startup_summary.needs_download,
359 "Startup integrity summary (plain output files)"
360 );
361
362 info!(target: "reth::cli",
363 archives = total_archives,
364 total = %DownloadProgress::format_size(total_size),
365 "Downloading all archives"
366 );
367
368 let shared = SharedProgress::new(total_size, total_archives as u64);
369 let progress_handle = spawn_progress_display(Arc::clone(&shared));
370
371 let target = target_dir.to_path_buf();
372 let cache_dir = download_cache_dir;
373 let resumable = self.resumable;
374 let download_concurrency = self.download_concurrency.max(1);
375 let results: Vec<Result<()>> = stream::iter(all_downloads)
376 .map(|planned| {
377 let dir = target.clone();
378 let cache = cache_dir.clone();
379 let sp = Arc::clone(&shared);
380 async move {
381 process_modular_archive(planned, &dir, cache.as_deref(), Some(sp), resumable)
382 .await?;
383 Ok(())
384 }
385 })
386 .buffer_unordered(download_concurrency)
387 .collect()
388 .await;
389
390 shared.done.store(true, Ordering::Relaxed);
391 let _ = progress_handle.await;
392
393 for result in results {
395 result?;
396 }
397
398 let config =
400 config_for_selections(&selections, &manifest, preset, Some(self.env.chain.as_ref()));
401 if write_config(&config, target_dir)? {
402 let desc = config_gen::describe_prune_config(&config);
403 info!(target: "reth::cli", "{}", desc.join(", "));
404 }
405
406 let db_path = data_dir.db();
408 let db = init_db(&db_path, self.env.db.database_args())?;
409
410 let should_write_prune = config.prune.segments != Default::default();
413 let should_reset_indices = should_reset_index_stage_checkpoints(&selections);
414 if should_write_prune || should_reset_indices {
415 let tx = db.tx_mut()?;
416
417 if should_write_prune {
418 config_gen::write_prune_checkpoints_tx(&tx, &config, manifest.block)?;
419 }
420
421 if should_reset_indices {
425 config_gen::reset_index_stage_checkpoints_tx(&tx)?;
426 }
427
428 tx.commit()?;
429 }
430
431 info!(target: "reth::cli", "Snapshot download complete. Run `reth node` to start syncing.");
432
433 Ok(())
434 }
435
436 fn resolve_components(&self, manifest: &SnapshotManifest) -> Result<ResolvedComponents> {
438 let available = |ty: SnapshotComponentType| manifest.component(ty).is_some();
439
440 if self.archive {
442 return Ok(ResolvedComponents {
443 selections: SnapshotComponentType::ALL
444 .iter()
445 .copied()
446 .filter(|ty| available(*ty))
447 .filter(|ty| {
448 !self.without_rocksdb || *ty != SnapshotComponentType::RocksdbIndices
449 })
450 .map(|ty| (ty, ComponentSelection::All))
451 .collect(),
452 preset: Some(SelectionPreset::Archive),
453 });
454 }
455
456 if self.full {
457 return Ok(ResolvedComponents {
458 selections: self.full_preset_selections(manifest),
459 preset: Some(SelectionPreset::Full),
460 });
461 }
462
463 if self.minimal {
464 return Ok(ResolvedComponents {
465 selections: self.minimal_preset_selections(manifest),
466 preset: Some(SelectionPreset::Minimal),
467 });
468 }
469
470 let has_explicit_flags = self.with_txs || self.with_receipts || self.with_state_history;
471
472 if has_explicit_flags {
473 let mut selections = BTreeMap::new();
474 if available(SnapshotComponentType::State) {
476 selections.insert(SnapshotComponentType::State, ComponentSelection::All);
477 }
478 if available(SnapshotComponentType::Headers) {
479 selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
480 }
481 if self.with_txs && available(SnapshotComponentType::Transactions) {
482 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
483 }
484 if self.with_receipts && available(SnapshotComponentType::Receipts) {
485 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
486 }
487 if self.with_state_history {
488 if available(SnapshotComponentType::AccountChangesets) {
489 selections
490 .insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
491 }
492 if available(SnapshotComponentType::StorageChangesets) {
493 selections
494 .insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
495 }
496 }
497 return Ok(ResolvedComponents { selections, preset: None });
498 }
499
500 if self.non_interactive {
501 return Ok(ResolvedComponents {
502 selections: self.minimal_preset_selections(manifest),
503 preset: Some(SelectionPreset::Minimal),
504 });
505 }
506
507 let full_preset = self.full_preset_selections(manifest);
509 let SelectorOutput { selections, preset } = run_selector(manifest.clone(), &full_preset)?;
510 let selected =
511 selections.into_iter().filter(|(_, sel)| *sel != ComponentSelection::None).collect();
512
513 Ok(ResolvedComponents { selections: selected, preset })
514 }
515
516 fn minimal_preset_selections(
517 &self,
518 manifest: &SnapshotManifest,
519 ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
520 SnapshotComponentType::ALL
521 .iter()
522 .copied()
523 .filter(|ty| manifest.component(*ty).is_some())
524 .map(|ty| (ty, ty.minimal_selection()))
525 .collect()
526 }
527
528 fn full_preset_selections(
529 &self,
530 manifest: &SnapshotManifest,
531 ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
532 let mut selections = BTreeMap::new();
533
534 for ty in [
535 SnapshotComponentType::State,
536 SnapshotComponentType::Headers,
537 SnapshotComponentType::Transactions,
538 SnapshotComponentType::Receipts,
539 SnapshotComponentType::AccountChangesets,
540 SnapshotComponentType::StorageChangesets,
541 SnapshotComponentType::TransactionSenders,
542 SnapshotComponentType::RocksdbIndices,
543 ] {
544 if manifest.component(ty).is_none() {
545 continue;
546 }
547
548 let selection = self.full_selection_for_component(ty, manifest.block);
549 if selection != ComponentSelection::None {
550 selections.insert(ty, selection);
551 }
552 }
553
554 selections
555 }
556
557 fn full_selection_for_component(
558 &self,
559 ty: SnapshotComponentType,
560 snapshot_block: u64,
561 ) -> ComponentSelection {
562 let defaults = DefaultPruningValues::get_global();
563 match ty {
564 SnapshotComponentType::State | SnapshotComponentType::Headers => {
565 ComponentSelection::All
566 }
567 SnapshotComponentType::Transactions => {
568 if defaults.full_bodies_history_use_pre_merge {
569 match self
570 .env
571 .chain
572 .ethereum_fork_activation(EthereumHardfork::Paris)
573 .block_number()
574 {
575 Some(paris) if snapshot_block >= paris => {
576 ComponentSelection::Distance(snapshot_block - paris + 1)
577 }
578 Some(_) => ComponentSelection::None,
579 None => ComponentSelection::All,
580 }
581 } else {
582 selection_from_prune_mode(
583 defaults.full_prune_modes.bodies_history,
584 snapshot_block,
585 )
586 }
587 }
588 SnapshotComponentType::Receipts => {
589 selection_from_prune_mode(defaults.full_prune_modes.receipts, snapshot_block)
590 }
591 SnapshotComponentType::AccountChangesets => {
592 selection_from_prune_mode(defaults.full_prune_modes.account_history, snapshot_block)
593 }
594 SnapshotComponentType::StorageChangesets => {
595 selection_from_prune_mode(defaults.full_prune_modes.storage_history, snapshot_block)
596 }
597 SnapshotComponentType::TransactionSenders => {
598 selection_from_prune_mode(defaults.full_prune_modes.sender_recovery, snapshot_block)
599 }
600 SnapshotComponentType::RocksdbIndices => ComponentSelection::None,
602 }
603 }
604
605 fn resolve_manifest_source(&self, chain_id: u64) -> String {
606 if let Some(path) = &self.manifest_path {
607 return path.display().to_string();
608 }
609
610 match &self.manifest_url {
611 Some(url) => url.clone(),
612 None => {
613 let base_url = get_base_url(chain_id);
614 format!("{base_url}/manifest.json")
615 }
616 }
617 }
618}
619
620fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> ComponentSelection {
621 match mode {
622 None => ComponentSelection::All,
623 Some(PruneMode::Full) => ComponentSelection::None,
624 Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
625 Some(PruneMode::Before(block)) => {
626 if snapshot_block >= block {
627 ComponentSelection::Distance(snapshot_block - block + 1)
628 } else {
629 ComponentSelection::None
630 }
631 }
632 }
633}
634
635fn inject_archive_only_components(
638 selections: &mut BTreeMap<SnapshotComponentType, ComponentSelection>,
639 manifest: &SnapshotManifest,
640 include_rocksdb: bool,
641) {
642 let is_all =
643 |ty: SnapshotComponentType| selections.get(&ty).copied() == Some(ComponentSelection::All);
644
645 let is_archive = is_all(SnapshotComponentType::Transactions) &&
646 is_all(SnapshotComponentType::Receipts) &&
647 is_all(SnapshotComponentType::AccountChangesets) &&
648 is_all(SnapshotComponentType::StorageChangesets);
649
650 if !is_archive {
651 return;
652 }
653
654 for component in
655 [SnapshotComponentType::TransactionSenders, SnapshotComponentType::RocksdbIndices]
656 {
657 if component == SnapshotComponentType::RocksdbIndices && !include_rocksdb {
658 continue;
659 }
660
661 if manifest.component(component).is_some() {
662 selections.insert(component, ComponentSelection::All);
663 }
664 }
665}
666
667fn should_reset_index_stage_checkpoints(
668 selections: &BTreeMap<SnapshotComponentType, ComponentSelection>,
669) -> bool {
670 !matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
671}
672
673impl<C: ChainSpecParser> DownloadCommand<C> {
674 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
676 Some(&self.env.chain)
677 }
678}
679
680pub(crate) struct DownloadProgress {
682 downloaded: u64,
683 total_size: u64,
684 last_displayed: Instant,
685 started_at: Instant,
686}
687
688#[derive(Debug, Clone)]
689struct PlannedArchive {
690 ty: SnapshotComponentType,
691 component: String,
692 archive: ArchiveDescriptor,
693}
694
695const fn archive_priority_rank(ty: SnapshotComponentType) -> u8 {
696 match ty {
697 SnapshotComponentType::State => 0,
698 SnapshotComponentType::RocksdbIndices => 1,
699 _ => 2,
700 }
701}
702
703#[derive(Debug, Default, Clone, Copy)]
704struct DownloadStartupSummary {
705 reusable: usize,
706 needs_download: usize,
707}
708
709fn summarize_download_startup(
710 all_downloads: &[PlannedArchive],
711 target_dir: &Path,
712) -> Result<DownloadStartupSummary> {
713 let mut summary = DownloadStartupSummary::default();
714
715 for planned in all_downloads {
716 if verify_output_files(target_dir, &planned.archive.output_files)? {
717 summary.reusable += 1;
718 } else {
719 summary.needs_download += 1;
720 }
721 }
722
723 Ok(summary)
724}
725
726impl DownloadProgress {
727 fn new(total_size: u64) -> Self {
729 let now = Instant::now();
730 Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
731 }
732
733 pub(crate) fn format_size(size: u64) -> String {
735 let mut size = size as f64;
736 let mut unit_index = 0;
737
738 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
739 size /= 1024.0;
740 unit_index += 1;
741 }
742
743 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
744 }
745
746 fn format_duration(duration: Duration) -> String {
748 let secs = duration.as_secs();
749 if secs < 60 {
750 format!("{secs}s")
751 } else if secs < 3600 {
752 format!("{}m {}s", secs / 60, secs % 60)
753 } else {
754 format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
755 }
756 }
757
758 fn update(&mut self, chunk_size: u64) -> Result<()> {
760 self.downloaded += chunk_size;
761
762 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
763 let formatted_downloaded = Self::format_size(self.downloaded);
764 let formatted_total = Self::format_size(self.total_size);
765 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
766
767 let elapsed = self.started_at.elapsed();
768 let eta = if self.downloaded > 0 {
769 let remaining = self.total_size.saturating_sub(self.downloaded);
770 let speed = self.downloaded as f64 / elapsed.as_secs_f64();
771 if speed > 0.0 {
772 Duration::from_secs_f64(remaining as f64 / speed)
773 } else {
774 Duration::ZERO
775 }
776 } else {
777 Duration::ZERO
778 };
779 let eta_str = Self::format_duration(eta);
780
781 print!(
782 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str} ",
783 );
784 io::stdout().flush()?;
785 self.last_displayed = Instant::now();
786 }
787
788 Ok(())
789 }
790}
791
792struct SharedProgress {
798 downloaded: AtomicU64,
799 total_size: u64,
800 total_archives: u64,
801 archives_done: AtomicU64,
802 done: AtomicBool,
803}
804
805impl SharedProgress {
806 fn new(total_size: u64, total_archives: u64) -> Arc<Self> {
807 Arc::new(Self {
808 downloaded: AtomicU64::new(0),
809 total_size,
810 total_archives,
811 archives_done: AtomicU64::new(0),
812 done: AtomicBool::new(false),
813 })
814 }
815
816 fn add(&self, bytes: u64) {
817 self.downloaded.fetch_add(bytes, Ordering::Relaxed);
818 }
819
820 fn archive_done(&self) {
821 self.archives_done.fetch_add(1, Ordering::Relaxed);
822 }
823}
824
825fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
828 tokio::spawn(async move {
829 let started_at = Instant::now();
830 let mut interval = tokio::time::interval(Duration::from_secs(3));
831 interval.tick().await; loop {
833 interval.tick().await;
834
835 if progress.done.load(Ordering::Relaxed) {
836 break;
837 }
838
839 let downloaded = progress.downloaded.load(Ordering::Relaxed);
840 let total = progress.total_size;
841 if total == 0 {
842 continue;
843 }
844
845 let done = progress.archives_done.load(Ordering::Relaxed);
846 let all = progress.total_archives;
847 let pct = (downloaded as f64 / total as f64) * 100.0;
848 let dl = DownloadProgress::format_size(downloaded);
849 let tot = DownloadProgress::format_size(total);
850
851 let elapsed = started_at.elapsed();
852 let remaining = total.saturating_sub(downloaded);
853
854 if remaining == 0 {
855 info!(target: "reth::cli",
857 archives = format_args!("{done}/{all}"),
858 downloaded = %dl,
859 "Extracting remaining archives"
860 );
861 } else {
862 let eta = if downloaded > 0 {
863 let speed = downloaded as f64 / elapsed.as_secs_f64();
864 if speed > 0.0 {
865 DownloadProgress::format_duration(Duration::from_secs_f64(
866 remaining as f64 / speed,
867 ))
868 } else {
869 "??".to_string()
870 }
871 } else {
872 "??".to_string()
873 };
874
875 info!(target: "reth::cli",
876 archives = format_args!("{done}/{all}"),
877 progress = format_args!("{pct:.1}%"),
878 downloaded = %dl,
879 total = %tot,
880 eta = %eta,
881 "Downloading"
882 );
883 }
884 }
885
886 let downloaded = progress.downloaded.load(Ordering::Relaxed);
888 let dl = DownloadProgress::format_size(downloaded);
889 let tot = DownloadProgress::format_size(progress.total_size);
890 let elapsed = DownloadProgress::format_duration(started_at.elapsed());
891 info!(target: "reth::cli",
892 downloaded = %dl,
893 total = %tot,
894 elapsed = %elapsed,
895 "Downloads complete"
896 );
897 })
898}
899
900struct ProgressReader<R> {
902 reader: R,
903 progress: DownloadProgress,
904}
905
906impl<R: Read> ProgressReader<R> {
907 fn new(reader: R, total_size: u64) -> Self {
908 Self { reader, progress: DownloadProgress::new(total_size) }
909 }
910}
911
912impl<R: Read> Read for ProgressReader<R> {
913 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
914 let bytes = self.reader.read(buf)?;
915 if bytes > 0 &&
916 let Err(e) = self.progress.update(bytes as u64)
917 {
918 return Err(io::Error::other(e));
919 }
920 Ok(bytes)
921 }
922}
923
924#[derive(Debug, Clone, Copy)]
926enum CompressionFormat {
927 Lz4,
928 Zstd,
929}
930
931impl CompressionFormat {
932 fn from_url(url: &str) -> Result<Self> {
934 let path =
935 Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
936
937 if path.ends_with(EXTENSION_TAR_LZ4) {
938 Ok(Self::Lz4)
939 } else if path.ends_with(EXTENSION_TAR_ZSTD) {
940 Ok(Self::Zstd)
941 } else {
942 Err(eyre::eyre!(
943 "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
944 path
945 ))
946 }
947 }
948}
949
950fn extract_archive<R: Read>(
952 reader: R,
953 total_size: u64,
954 format: CompressionFormat,
955 target_dir: &Path,
956) -> Result<()> {
957 let progress_reader = ProgressReader::new(reader, total_size);
958
959 match format {
960 CompressionFormat::Lz4 => {
961 let decoder = Decoder::new(progress_reader)?;
962 Archive::new(decoder).unpack(target_dir)?;
963 }
964 CompressionFormat::Zstd => {
965 let decoder = ZstdDecoder::new(progress_reader)?;
966 Archive::new(decoder).unpack(target_dir)?;
967 }
968 }
969
970 println!();
971 Ok(())
972}
973
974fn extract_archive_raw<R: Read>(
976 reader: R,
977 format: CompressionFormat,
978 target_dir: &Path,
979) -> Result<()> {
980 match format {
981 CompressionFormat::Lz4 => {
982 Archive::new(Decoder::new(reader)?).unpack(target_dir)?;
983 }
984 CompressionFormat::Zstd => {
985 Archive::new(ZstdDecoder::new(reader)?).unpack(target_dir)?;
986 }
987 }
988 Ok(())
989}
990
991fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
993 let file = std::fs::File::open(path)?;
994 let total_size = file.metadata()?.len();
995 info!(target: "reth::cli",
996 file = %path.display(),
997 size = %DownloadProgress::format_size(total_size),
998 "Extracting local archive"
999 );
1000 let start = Instant::now();
1001 extract_archive(file, total_size, format, target_dir)?;
1002 info!(target: "reth::cli",
1003 file = %path.display(),
1004 elapsed = %DownloadProgress::format_duration(start.elapsed()),
1005 "Local extraction complete"
1006 );
1007 Ok(())
1008}
1009
1010const MAX_DOWNLOAD_RETRIES: u32 = 10;
1011const RETRY_BACKOFF_SECS: u64 = 5;
1012
1013struct ProgressWriter<W> {
1016 inner: W,
1017 progress: DownloadProgress,
1018}
1019
1020impl<W: Write> Write for ProgressWriter<W> {
1021 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1022 let n = self.inner.write(buf)?;
1023 let _ = self.progress.update(n as u64);
1024 Ok(n)
1025 }
1026
1027 fn flush(&mut self) -> io::Result<()> {
1028 self.inner.flush()
1029 }
1030}
1031
1032struct SharedProgressWriter<W> {
1035 inner: W,
1036 progress: Arc<SharedProgress>,
1037}
1038
1039impl<W: Write> Write for SharedProgressWriter<W> {
1040 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1041 let n = self.inner.write(buf)?;
1042 self.progress.add(n as u64);
1043 Ok(n)
1044 }
1045
1046 fn flush(&mut self) -> io::Result<()> {
1047 self.inner.flush()
1048 }
1049}
1050
1051struct SharedProgressReader<R> {
1054 inner: R,
1055 progress: Arc<SharedProgress>,
1056}
1057
1058impl<R: Read> Read for SharedProgressReader<R> {
1059 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1060 let n = self.inner.read(buf)?;
1061 self.progress.add(n as u64);
1062 Ok(n)
1063 }
1064}
1065
1066fn resumable_download(
1073 url: &str,
1074 target_dir: &Path,
1075 shared: Option<&Arc<SharedProgress>>,
1076) -> Result<(PathBuf, u64)> {
1077 let file_name = Url::parse(url)
1078 .ok()
1079 .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
1080 .unwrap_or_else(|| "snapshot.tar".to_string());
1081
1082 let final_path = target_dir.join(&file_name);
1083 let part_path = target_dir.join(format!("{file_name}.part"));
1084
1085 let quiet = shared.is_some();
1086
1087 if !quiet {
1088 info!(target: "reth::cli", file = %file_name, "Connecting to download server");
1089 }
1090 let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
1091
1092 let mut total_size: Option<u64> = None;
1093 let mut last_error: Option<eyre::Error> = None;
1094
1095 let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
1096 fs::rename(&part_path, &final_path)?;
1097 if !quiet {
1098 info!(target: "reth::cli", file = %file_name, "Download complete");
1099 }
1100 Ok((final_path.clone(), size))
1101 };
1102
1103 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1104 let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1105
1106 if let Some(total) = total_size &&
1107 existing_size >= total
1108 {
1109 return finalize_download(total);
1110 }
1111
1112 if attempt > 1 {
1113 info!(target: "reth::cli",
1114 file = %file_name,
1115 "Retry attempt {}/{} - resuming from {} bytes",
1116 attempt, MAX_DOWNLOAD_RETRIES, existing_size
1117 );
1118 }
1119
1120 let mut request = client.get(url);
1121 if existing_size > 0 {
1122 request = request.header(RANGE, format!("bytes={existing_size}-"));
1123 if !quiet && attempt == 1 {
1124 info!(target: "reth::cli", file = %file_name, "Resuming from {} bytes", existing_size);
1125 }
1126 }
1127
1128 let response = match request.send().and_then(|r| r.error_for_status()) {
1129 Ok(r) => r,
1130 Err(e) => {
1131 last_error = Some(e.into());
1132 if attempt < MAX_DOWNLOAD_RETRIES {
1133 info!(target: "reth::cli",
1134 file = %file_name,
1135 "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
1136 );
1137 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1138 }
1139 continue;
1140 }
1141 };
1142
1143 let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
1144
1145 let size = if is_partial {
1146 response
1147 .headers()
1148 .get("Content-Range")
1149 .and_then(|v| v.to_str().ok())
1150 .and_then(|v| v.split('/').next_back())
1151 .and_then(|v| v.parse().ok())
1152 } else {
1153 response.content_length()
1154 };
1155
1156 if total_size.is_none() {
1157 total_size = size;
1158 if !quiet && let Some(s) = size {
1159 info!(target: "reth::cli",
1160 file = %file_name,
1161 size = %DownloadProgress::format_size(s),
1162 "Downloading"
1163 );
1164 }
1165 }
1166
1167 let current_total = total_size.ok_or_else(|| {
1168 eyre::eyre!("Server did not provide Content-Length or Content-Range header")
1169 })?;
1170
1171 let file = if is_partial && existing_size > 0 {
1172 OpenOptions::new()
1173 .append(true)
1174 .open(&part_path)
1175 .map_err(|e| fs::FsPathError::open(e, &part_path))?
1176 } else {
1177 fs::create_file(&part_path)?
1178 };
1179
1180 let start_offset = if is_partial { existing_size } else { 0 };
1181 let mut reader = response;
1182
1183 let copy_result;
1184 let flush_result;
1185
1186 if let Some(sp) = shared {
1187 if start_offset > 0 {
1189 sp.add(start_offset);
1190 }
1191 let mut writer =
1192 SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
1193 copy_result = io::copy(&mut reader, &mut writer);
1194 flush_result = writer.inner.flush();
1195 } else {
1196 let mut progress = DownloadProgress::new(current_total);
1198 progress.downloaded = start_offset;
1199 let mut writer = ProgressWriter { inner: BufWriter::new(file), progress };
1200 copy_result = io::copy(&mut reader, &mut writer);
1201 flush_result = writer.inner.flush();
1202 println!();
1203 }
1204
1205 if let Err(e) = copy_result.and(flush_result) {
1206 last_error = Some(e.into());
1207 if attempt < MAX_DOWNLOAD_RETRIES {
1208 info!(target: "reth::cli",
1209 file = %file_name,
1210 "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
1211 );
1212 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1213 }
1214 continue;
1215 }
1216
1217 return finalize_download(current_total);
1218 }
1219
1220 Err(last_error
1221 .unwrap_or_else(|| eyre::eyre!("Download failed after {} attempts", MAX_DOWNLOAD_RETRIES)))
1222}
1223
1224fn streaming_download_and_extract(
1228 url: &str,
1229 format: CompressionFormat,
1230 target_dir: &Path,
1231 shared: Option<&Arc<SharedProgress>>,
1232) -> Result<()> {
1233 let quiet = shared.is_some();
1234 let mut last_error: Option<eyre::Error> = None;
1235
1236 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1237 if attempt > 1 {
1238 info!(target: "reth::cli",
1239 url = %url,
1240 attempt,
1241 max = MAX_DOWNLOAD_RETRIES,
1242 "Retrying streaming download from scratch"
1243 );
1244 }
1245
1246 let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
1247
1248 let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
1249 Ok(r) => r,
1250 Err(e) => {
1251 last_error = Some(e.into());
1252 if attempt < MAX_DOWNLOAD_RETRIES {
1253 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1254 }
1255 continue;
1256 }
1257 };
1258
1259 if !quiet && let Some(size) = response.content_length() {
1260 info!(target: "reth::cli",
1261 url = %url,
1262 size = %DownloadProgress::format_size(size),
1263 "Streaming archive"
1264 );
1265 }
1266
1267 let result = if let Some(sp) = shared {
1268 let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
1269 extract_archive_raw(reader, format, target_dir)
1270 } else {
1271 extract_archive_raw(response, format, target_dir)
1272 };
1273
1274 match result {
1275 Ok(()) => return Ok(()),
1276 Err(e) => {
1277 last_error = Some(e);
1278 if attempt < MAX_DOWNLOAD_RETRIES {
1279 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1280 }
1281 }
1282 }
1283 }
1284
1285 Err(last_error.unwrap_or_else(|| {
1286 eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
1287 }))
1288}
1289
1290fn download_and_extract(
1292 url: &str,
1293 format: CompressionFormat,
1294 target_dir: &Path,
1295 shared: Option<&Arc<SharedProgress>>,
1296) -> Result<()> {
1297 let quiet = shared.is_some();
1298 let (downloaded_path, total_size) = resumable_download(url, target_dir, shared)?;
1299
1300 let file_name =
1301 downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
1302
1303 if !quiet {
1304 info!(target: "reth::cli",
1305 file = %file_name,
1306 size = %DownloadProgress::format_size(total_size),
1307 "Extracting archive"
1308 );
1309 }
1310 let file = fs::open(&downloaded_path)?;
1311
1312 if quiet {
1313 extract_archive_raw(file, format, target_dir)?;
1315 } else {
1316 extract_archive(file, total_size, format, target_dir)?;
1317 info!(target: "reth::cli",
1318 file = %file_name,
1319 "Extraction complete"
1320 );
1321 }
1322
1323 fs::remove_file(&downloaded_path)?;
1324
1325 if let Some(sp) = shared {
1326 sp.archive_done();
1327 }
1328
1329 Ok(())
1330}
1331
1332fn blocking_download_and_extract(
1338 url: &str,
1339 target_dir: &Path,
1340 shared: Option<Arc<SharedProgress>>,
1341 resumable: bool,
1342) -> Result<()> {
1343 let format = CompressionFormat::from_url(url)?;
1344
1345 if let Ok(parsed_url) = Url::parse(url) &&
1346 parsed_url.scheme() == "file"
1347 {
1348 let file_path = parsed_url
1349 .to_file_path()
1350 .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
1351 let result = extract_from_file(&file_path, format, target_dir);
1352 if result.is_ok() &&
1353 let Some(sp) = shared
1354 {
1355 sp.archive_done();
1356 }
1357 result
1358 } else if resumable {
1359 download_and_extract(url, format, target_dir, shared.as_ref())
1360 } else {
1361 let result = streaming_download_and_extract(url, format, target_dir, shared.as_ref());
1362 if result.is_ok() &&
1363 let Some(sp) = shared
1364 {
1365 sp.archive_done();
1366 }
1367 result
1368 }
1369}
1370
1371async fn stream_and_extract(
1377 url: &str,
1378 target_dir: &Path,
1379 shared: Option<Arc<SharedProgress>>,
1380 resumable: bool,
1381) -> Result<()> {
1382 let target_dir = target_dir.to_path_buf();
1383 let url = url.to_string();
1384 task::spawn_blocking(move || {
1385 blocking_download_and_extract(&url, &target_dir, shared, resumable)
1386 })
1387 .await??;
1388
1389 Ok(())
1390}
1391
1392async fn process_modular_archive(
1393 planned: PlannedArchive,
1394 target_dir: &Path,
1395 cache_dir: Option<&Path>,
1396 shared: Option<Arc<SharedProgress>>,
1397 resumable: bool,
1398) -> Result<()> {
1399 let target_dir = target_dir.to_path_buf();
1400 let cache_dir = cache_dir.map(Path::to_path_buf);
1401
1402 task::spawn_blocking(move || {
1403 blocking_process_modular_archive(
1404 &planned,
1405 &target_dir,
1406 cache_dir.as_deref(),
1407 shared,
1408 resumable,
1409 )
1410 })
1411 .await??;
1412
1413 Ok(())
1414}
1415
1416fn blocking_process_modular_archive(
1417 planned: &PlannedArchive,
1418 target_dir: &Path,
1419 cache_dir: Option<&Path>,
1420 shared: Option<Arc<SharedProgress>>,
1421 resumable: bool,
1422) -> Result<()> {
1423 let archive = &planned.archive;
1424 if verify_output_files(target_dir, &archive.output_files)? {
1425 if let Some(sp) = &shared {
1426 sp.add(archive.size);
1427 sp.archive_done();
1428 }
1429 info!(target: "reth::cli", file = %archive.file_name, component = %planned.component, "Skipping already verified plain files");
1430 return Ok(());
1431 }
1432
1433 let format = CompressionFormat::from_url(&archive.file_name)?;
1434 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1435 cleanup_output_files(target_dir, &archive.output_files);
1436
1437 if resumable {
1438 let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
1439 let archive_path = cache_dir.join(&archive.file_name);
1440 let part_path = cache_dir.join(format!("{}.part", archive.file_name));
1441 let (downloaded_path, _downloaded_size) =
1442 resumable_download(&archive.url, cache_dir, shared.as_ref())?;
1443 let file = fs::open(&downloaded_path)?;
1444 extract_archive_raw(file, format, target_dir)?;
1445 let _ = fs::remove_file(&archive_path);
1446 let _ = fs::remove_file(&part_path);
1447 } else {
1448 streaming_download_and_extract(&archive.url, format, target_dir, shared.as_ref())?;
1449 }
1450
1451 if verify_output_files(target_dir, &archive.output_files)? {
1452 if let Some(sp) = &shared {
1453 sp.archive_done();
1454 }
1455 return Ok(());
1456 }
1457
1458 warn!(target: "reth::cli", file = %archive.file_name, component = %planned.component, attempt, "Extracted files failed integrity checks, retrying");
1459 }
1460
1461 eyre::bail!(
1462 "Failed integrity validation after {} attempts for {}",
1463 MAX_DOWNLOAD_RETRIES,
1464 archive.file_name
1465 )
1466}
1467
1468fn verify_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) -> Result<bool> {
1469 if output_files.is_empty() {
1470 return Ok(false);
1471 }
1472
1473 for expected in output_files {
1474 let output_path = target_dir.join(&expected.path);
1475 let meta = match fs::metadata(&output_path) {
1476 Ok(meta) => meta,
1477 Err(_) => return Ok(false),
1478 };
1479 if meta.len() != expected.size {
1480 return Ok(false);
1481 }
1482
1483 let actual = file_blake3_hex(&output_path)?;
1484 if !actual.eq_ignore_ascii_case(&expected.blake3) {
1485 return Ok(false);
1486 }
1487 }
1488
1489 Ok(true)
1490}
1491
1492fn cleanup_output_files(target_dir: &Path, output_files: &[OutputFileChecksum]) {
1493 for output in output_files {
1494 let _ = fs::remove_file(target_dir.join(&output.path));
1495 }
1496}
1497
1498fn file_blake3_hex(path: &Path) -> Result<String> {
1499 let mut file = fs::open(path)?;
1500 let mut hasher = Hasher::new();
1501 let mut buf = [0_u8; 64 * 1024];
1502
1503 loop {
1504 let n = file.read(&mut buf)?;
1505 if n == 0 {
1506 break;
1507 }
1508 hasher.update(&buf[..n]);
1509 }
1510
1511 Ok(hasher.finalize().to_hex().to_string())
1512}
1513
1514fn get_base_url(chain_id: u64) -> String {
1516 let defaults = DownloadDefaults::get_global();
1517 match &defaults.default_chain_aware_base_url {
1518 Some(url) => format!("{url}/{chain_id}"),
1519 None => defaults.default_base_url.to_string(),
1520 }
1521}
1522
1523async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
1524 if let Ok(parsed) = Url::parse(source) {
1525 return match parsed.scheme() {
1526 "http" | "https" => {
1527 Ok(Client::new().get(source).send().await?.error_for_status()?.json().await?)
1528 }
1529 "file" => {
1530 let path = parsed
1531 .to_file_path()
1532 .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1533 let content = fs::read_to_string(path)?;
1534 Ok(serde_json::from_str(&content)?)
1535 }
1536 _ => Err(eyre::eyre!("Unsupported manifest URL scheme: {}", parsed.scheme())),
1537 };
1538 }
1539
1540 let content = fs::read_to_string(source)?;
1541 Ok(serde_json::from_str(&content)?)
1542}
1543
1544fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Result<String> {
1545 if let Some(base_url) = manifest.base_url.as_deref() &&
1546 !base_url.is_empty()
1547 {
1548 return Ok(base_url.trim_end_matches('/').to_string());
1549 }
1550
1551 if let Ok(mut url) = Url::parse(source) {
1552 if url.scheme() == "file" {
1553 let mut path = url
1554 .to_file_path()
1555 .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
1556 path.pop();
1557 let mut base = Url::from_directory_path(path)
1558 .map_err(|_| eyre::eyre!("Invalid manifest directory for source: {source}"))?
1559 .to_string();
1560 if base.ends_with('/') {
1561 base.pop();
1562 }
1563 return Ok(base);
1564 }
1565
1566 {
1567 let mut segments = url
1568 .path_segments_mut()
1569 .map_err(|_| eyre::eyre!("manifest_url must have a hierarchical path"))?;
1570 segments.pop_if_empty();
1571 segments.pop();
1572 }
1573 return Ok(url.as_str().trim_end_matches('/').to_string());
1574 }
1575
1576 let path = Path::new(source);
1577 let manifest_dir = if path.is_absolute() {
1578 path.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1579 } else {
1580 let joined = std::env::current_dir()?.join(path);
1581 joined.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
1582 };
1583 let mut base = Url::from_directory_path(&manifest_dir)
1584 .map_err(|_| eyre::eyre!("Invalid manifest directory: {}", manifest_dir.display()))?
1585 .to_string();
1586 if base.ends_with('/') {
1587 base.pop();
1588 }
1589 Ok(base)
1590}
1591
1592#[allow(dead_code)]
1596async fn get_latest_snapshot_url(chain_id: u64) -> Result<String> {
1597 let base_url = get_base_url(chain_id);
1598 let latest_url = format!("{base_url}/latest.txt");
1599 let filename = Client::new()
1600 .get(latest_url)
1601 .send()
1602 .await?
1603 .error_for_status()?
1604 .text()
1605 .await?
1606 .trim()
1607 .to_string();
1608
1609 Ok(format!("{base_url}/{filename}"))
1610}
1611
1612#[cfg(test)]
1613mod tests {
1614 use super::*;
1615 use manifest::{ComponentManifest, SingleArchive};
1616 use tempfile::tempdir;
1617
1618 fn manifest_with_archive_only_components() -> SnapshotManifest {
1619 let mut components = BTreeMap::new();
1620 components.insert(
1621 SnapshotComponentType::TransactionSenders.key().to_string(),
1622 ComponentManifest::Single(SingleArchive {
1623 file: "transaction_senders.tar.zst".to_string(),
1624 size: 1,
1625 blake3: None,
1626 output_files: vec![],
1627 }),
1628 );
1629 components.insert(
1630 SnapshotComponentType::RocksdbIndices.key().to_string(),
1631 ComponentManifest::Single(SingleArchive {
1632 file: "rocksdb_indices.tar.zst".to_string(),
1633 size: 1,
1634 blake3: None,
1635 output_files: vec![],
1636 }),
1637 );
1638 SnapshotManifest {
1639 block: 0,
1640 chain_id: 1,
1641 storage_version: 2,
1642 timestamp: 0,
1643 base_url: Some("https://example.com".to_string()),
1644 components,
1645 }
1646 }
1647
1648 #[test]
1649 fn test_download_defaults_builder() {
1650 let defaults = DownloadDefaults::default()
1651 .with_snapshot("https://example.com/snapshots (example)")
1652 .with_base_url("https://example.com");
1653
1654 assert_eq!(defaults.default_base_url, "https://example.com");
1655 assert_eq!(defaults.available_snapshots.len(), 3); }
1657
1658 #[test]
1659 fn test_download_defaults_replace_snapshots() {
1660 let defaults = DownloadDefaults::default().with_snapshots(vec![
1661 Cow::Borrowed("https://custom1.com"),
1662 Cow::Borrowed("https://custom2.com"),
1663 ]);
1664
1665 assert_eq!(defaults.available_snapshots.len(), 2);
1666 assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
1667 }
1668
1669 #[test]
1670 fn test_long_help_generation() {
1671 let defaults = DownloadDefaults::default();
1672 let help = defaults.long_help();
1673
1674 assert!(help.contains("Available snapshot sources:"));
1675 assert!(help.contains("merkle.io"));
1676 assert!(help.contains("publicnode.com"));
1677 assert!(help.contains("file://"));
1678 }
1679
1680 #[test]
1681 fn test_long_help_override() {
1682 let custom_help = "This is custom help text for downloading snapshots.";
1683 let defaults = DownloadDefaults::default().with_long_help(custom_help);
1684
1685 let help = defaults.long_help();
1686 assert_eq!(help, custom_help);
1687 assert!(!help.contains("Available snapshot sources:"));
1688 }
1689
1690 #[test]
1691 fn test_builder_chaining() {
1692 let defaults = DownloadDefaults::default()
1693 .with_base_url("https://custom.example.com")
1694 .with_snapshot("https://snapshot1.com")
1695 .with_snapshot("https://snapshot2.com")
1696 .with_long_help("Custom help for snapshots");
1697
1698 assert_eq!(defaults.default_base_url, "https://custom.example.com");
1699 assert_eq!(defaults.available_snapshots.len(), 4); assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
1701 }
1702
1703 #[test]
1704 fn test_compression_format_detection() {
1705 assert!(matches!(
1706 CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
1707 Ok(CompressionFormat::Lz4)
1708 ));
1709 assert!(matches!(
1710 CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
1711 Ok(CompressionFormat::Zstd)
1712 ));
1713 assert!(matches!(
1714 CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
1715 Ok(CompressionFormat::Lz4)
1716 ));
1717 assert!(matches!(
1718 CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
1719 Ok(CompressionFormat::Zstd)
1720 ));
1721 assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
1722 }
1723
1724 #[test]
1725 fn inject_archive_only_components_for_archive_selection() {
1726 let manifest = manifest_with_archive_only_components();
1727 let mut selections = BTreeMap::new();
1728 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1729 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1730 selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1731 selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1732
1733 inject_archive_only_components(&mut selections, &manifest, true);
1734
1735 assert_eq!(
1736 selections.get(&SnapshotComponentType::TransactionSenders),
1737 Some(&ComponentSelection::All)
1738 );
1739 assert_eq!(
1740 selections.get(&SnapshotComponentType::RocksdbIndices),
1741 Some(&ComponentSelection::All)
1742 );
1743 }
1744
1745 #[test]
1746 fn inject_archive_only_components_without_rocksdb() {
1747 let manifest = manifest_with_archive_only_components();
1748 let mut selections = BTreeMap::new();
1749 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1750 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
1751 selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
1752 selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
1753
1754 inject_archive_only_components(&mut selections, &manifest, false);
1755
1756 assert_eq!(
1757 selections.get(&SnapshotComponentType::TransactionSenders),
1758 Some(&ComponentSelection::All)
1759 );
1760 assert_eq!(selections.get(&SnapshotComponentType::RocksdbIndices), None);
1761 }
1762
1763 #[test]
1764 fn should_reset_index_stage_checkpoints_without_rocksdb_indices() {
1765 let mut selections = BTreeMap::new();
1766 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
1767 assert!(should_reset_index_stage_checkpoints(&selections));
1768
1769 selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
1770 assert!(!should_reset_index_stage_checkpoints(&selections));
1771 }
1772
1773 #[test]
1774 fn summarize_download_startup_counts_reusable_and_needs_download() {
1775 let dir = tempdir().unwrap();
1776 let target_dir = dir.path();
1777 let ok_file = target_dir.join("ok.bin");
1778 std::fs::write(&ok_file, vec![1_u8; 4]).unwrap();
1779 let ok_hash = file_blake3_hex(&ok_file).unwrap();
1780
1781 let planned = vec![
1782 PlannedArchive {
1783 ty: SnapshotComponentType::State,
1784 component: "State".to_string(),
1785 archive: ArchiveDescriptor {
1786 url: "https://example.com/ok.tar.zst".to_string(),
1787 file_name: "ok.tar.zst".to_string(),
1788 size: 10,
1789 blake3: None,
1790 output_files: vec![OutputFileChecksum {
1791 path: "ok.bin".to_string(),
1792 size: 4,
1793 blake3: ok_hash,
1794 }],
1795 },
1796 },
1797 PlannedArchive {
1798 ty: SnapshotComponentType::Headers,
1799 component: "Headers".to_string(),
1800 archive: ArchiveDescriptor {
1801 url: "https://example.com/missing.tar.zst".to_string(),
1802 file_name: "missing.tar.zst".to_string(),
1803 size: 10,
1804 blake3: None,
1805 output_files: vec![OutputFileChecksum {
1806 path: "missing.bin".to_string(),
1807 size: 1,
1808 blake3: "deadbeef".to_string(),
1809 }],
1810 },
1811 },
1812 PlannedArchive {
1813 ty: SnapshotComponentType::Transactions,
1814 component: "Transactions".to_string(),
1815 archive: ArchiveDescriptor {
1816 url: "https://example.com/bad-size.tar.zst".to_string(),
1817 file_name: "bad-size.tar.zst".to_string(),
1818 size: 10,
1819 blake3: None,
1820 output_files: vec![],
1821 },
1822 },
1823 ];
1824
1825 let summary = summarize_download_startup(&planned, target_dir).unwrap();
1826 assert_eq!(summary.reusable, 1);
1827 assert_eq!(summary.needs_download, 2);
1828 }
1829
1830 #[test]
1831 fn archive_priority_prefers_state_then_rocksdb() {
1832 let mut planned = [
1833 PlannedArchive {
1834 ty: SnapshotComponentType::Transactions,
1835 component: "Transactions".to_string(),
1836 archive: ArchiveDescriptor {
1837 url: "u3".to_string(),
1838 file_name: "t.tar.zst".to_string(),
1839 size: 1,
1840 blake3: None,
1841 output_files: vec![OutputFileChecksum {
1842 path: "a".to_string(),
1843 size: 1,
1844 blake3: "x".to_string(),
1845 }],
1846 },
1847 },
1848 PlannedArchive {
1849 ty: SnapshotComponentType::RocksdbIndices,
1850 component: "RocksDB Indices".to_string(),
1851 archive: ArchiveDescriptor {
1852 url: "u2".to_string(),
1853 file_name: "rocksdb_indices.tar.zst".to_string(),
1854 size: 1,
1855 blake3: None,
1856 output_files: vec![OutputFileChecksum {
1857 path: "b".to_string(),
1858 size: 1,
1859 blake3: "y".to_string(),
1860 }],
1861 },
1862 },
1863 PlannedArchive {
1864 ty: SnapshotComponentType::State,
1865 component: "State (mdbx)".to_string(),
1866 archive: ArchiveDescriptor {
1867 url: "u1".to_string(),
1868 file_name: "state.tar.zst".to_string(),
1869 size: 1,
1870 blake3: None,
1871 output_files: vec![OutputFileChecksum {
1872 path: "c".to_string(),
1873 size: 1,
1874 blake3: "z".to_string(),
1875 }],
1876 },
1877 },
1878 ];
1879
1880 planned.sort_by(|a, b| {
1881 archive_priority_rank(a.ty)
1882 .cmp(&archive_priority_rank(b.ty))
1883 .then_with(|| a.component.cmp(&b.component))
1884 .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
1885 });
1886
1887 assert_eq!(planned[0].ty, SnapshotComponentType::State);
1888 assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
1889 assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
1890 }
1891}