1pub mod config_gen;
2pub mod manifest;
3pub mod manifest_cmd;
4mod tui;
5
6use crate::common::EnvironmentArgs;
7use blake3::Hasher;
8use clap::{builder::RangedU64ValueParser, Parser};
9use config_gen::{config_for_selections, write_config};
10use eyre::{Result, WrapErr};
11use futures::stream::{self, StreamExt};
12use lz4::Decoder;
13use manifest::{
14 ArchiveDescriptor, ComponentSelection, OutputFileChecksum, SnapshotComponentType,
15 SnapshotManifest,
16};
17use reqwest::{blocking::Client as BlockingClient, header::RANGE, Client, StatusCode};
18use reth_chainspec::{EthChainSpec, EthereumHardfork, EthereumHardforks};
19use reth_cli::chainspec::ChainSpecParser;
20use reth_cli_util::cancellation::CancellationToken;
21use reth_db::{init_db, Database};
22use reth_db_api::transaction::DbTx;
23use reth_fs_util as fs;
24use reth_node_core::args::DefaultPruningValues;
25use reth_prune_types::PruneMode;
26use std::{
27 borrow::Cow,
28 collections::BTreeMap,
29 fs::OpenOptions,
30 io::{self, BufWriter, Read, Write},
31 path::{Path, PathBuf},
32 sync::{
33 atomic::{AtomicBool, AtomicU64, Ordering},
34 Arc, OnceLock,
35 },
36 time::{Duration, Instant},
37};
38use tar::Archive;
39use tokio::task;
40use tracing::{info, warn};
41use tui::{run_selector, SelectorOutput};
42use url::Url;
43use zstd::stream::read::Decoder as ZstdDecoder;
44
45const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
46const RETH_SNAPSHOTS_BASE_URL: &str = "https://snapshots-r2.reth.rs";
47const RETH_SNAPSHOTS_API_URL: &str = "https://snapshots.reth.rs/api/snapshots";
48const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
49const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
50const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
51const STATIC_FILES_PREFIX: &str = "static_files/";
52
53const MAX_CONCURRENT_DOWNLOADS: usize = 8;
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub(crate) enum SelectionPreset {
58 Minimal,
59 Full,
60 Archive,
61}
62
63#[derive(Debug)]
64struct ResolvedComponents {
65 selections: BTreeMap<SnapshotComponentType, ComponentSelection>,
66 preset: Option<SelectionPreset>,
67}
68
69static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
71
72#[derive(Debug, Clone)]
76pub struct DownloadDefaults {
77 pub available_snapshots: Vec<Cow<'static, str>>,
79 pub default_base_url: Cow<'static, str>,
81 pub default_chain_aware_base_url: Option<Cow<'static, str>>,
89 pub snapshot_api_url: Cow<'static, str>,
93 pub long_help: Option<String>,
95}
96
97impl DownloadDefaults {
98 pub fn try_init(self) -> Result<(), Self> {
100 DOWNLOAD_DEFAULTS.set(self)
101 }
102
103 pub fn get_global() -> &'static DownloadDefaults {
105 DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
106 }
107
108 pub fn default_download_defaults() -> Self {
110 Self {
111 available_snapshots: vec![
112 Cow::Borrowed("https://snapshots.reth.rs (default)"),
113 Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
114 ],
115 default_base_url: Cow::Borrowed(RETH_SNAPSHOTS_BASE_URL),
116 default_chain_aware_base_url: None,
117 snapshot_api_url: Cow::Borrowed(RETH_SNAPSHOTS_API_URL),
118 long_help: None,
119 }
120 }
121
122 pub fn long_help(&self) -> String {
127 if let Some(ref custom_help) = self.long_help {
128 return custom_help.clone();
129 }
130
131 let mut help = format!(
132 "Specify a snapshot URL or let the command propose a default one.\n\n\
133 Browse available snapshots at {}\n\
134 or use --list-snapshots to see them from the CLI.\n\nAvailable snapshot sources:\n",
135 self.snapshot_api_url.trim_end_matches("/api/snapshots"),
136 );
137
138 for source in &self.available_snapshots {
139 help.push_str("- ");
140 help.push_str(source);
141 help.push('\n');
142 }
143
144 help.push_str(
145 "\nIf no URL is provided, the latest archive snapshot for the selected chain\nwill be proposed for download from ",
146 );
147 help.push_str(
148 self.default_chain_aware_base_url.as_deref().unwrap_or(&self.default_base_url),
149 );
150 help.push_str(
151 ".\n\nLocal file:// URLs are also supported for extracting snapshots from disk.",
152 );
153 help
154 }
155
156 pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
158 self.available_snapshots.push(source.into());
159 self
160 }
161
162 pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
164 self.available_snapshots = sources;
165 self
166 }
167
168 pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
170 self.default_base_url = url.into();
171 self
172 }
173
174 pub fn with_chain_aware_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
176 self.default_chain_aware_base_url = Some(url.into());
177 self
178 }
179
180 pub fn with_snapshot_api_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
182 self.snapshot_api_url = url.into();
183 self
184 }
185
186 pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
188 self.long_help = Some(help.into());
189 self
190 }
191}
192
193impl Default for DownloadDefaults {
194 fn default() -> Self {
195 Self::default_download_defaults()
196 }
197}
198
199#[derive(Debug, Parser)]
201pub struct DownloadCommand<C: ChainSpecParser> {
202 #[command(flatten)]
203 env: EnvironmentArgs<C>,
204
205 #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
210 url: Option<String>,
211
212 #[arg(long, value_name = "URL", conflicts_with = "url")]
217 manifest_url: Option<String>,
218
219 #[arg(long, value_name = "PATH", conflicts_with_all = ["url", "manifest_url"])]
221 manifest_path: Option<PathBuf>,
222
223 #[arg(long, conflicts_with_all = ["with_txs_since", "with_txs_distance", "minimal", "full", "archive"])]
225 with_txs: bool,
226
227 #[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_txs", "with_txs_distance", "minimal", "full", "archive"])]
229 with_txs_since: Option<u64>,
230
231 #[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_txs", "with_txs_since", "minimal", "full", "archive"])]
233 with_txs_distance: Option<u64>,
234
235 #[arg(long, conflicts_with_all = ["with_receipts_since", "with_receipts_distance", "minimal", "full", "archive"])]
237 with_receipts: bool,
238
239 #[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_receipts", "with_receipts_distance", "minimal", "full", "archive"])]
241 with_receipts_since: Option<u64>,
242
243 #[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_receipts", "with_receipts_since", "minimal", "full", "archive"])]
245 with_receipts_distance: Option<u64>,
246
247 #[arg(long, alias = "with-changesets", conflicts_with_all = ["with_state_history_since", "with_state_history_distance", "minimal", "full", "archive"])]
249 with_state_history: bool,
250
251 #[arg(long, value_name = "BLOCK_NUMBER", conflicts_with_all = ["with_state_history", "with_state_history_distance", "minimal", "full", "archive"])]
253 with_state_history_since: Option<u64>,
254
255 #[arg(long, value_name = "BLOCKS", value_parser = RangedU64ValueParser::<u64>::new().range(1..), conflicts_with_all = ["with_state_history", "with_state_history_since", "minimal", "full", "archive"])]
257 with_state_history_distance: Option<u64>,
258
259 #[arg(long, requires = "with_txs", conflicts_with_all = ["minimal", "full", "archive"])]
261 with_senders: bool,
262
263 #[arg(long, conflicts_with_all = ["minimal", "full", "archive", "without_rocksdb"])]
265 with_rocksdb: bool,
266
267 #[arg(long, alias = "all", conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "minimal", "full"])]
269 archive: bool,
270
271 #[arg(long, conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "archive", "full"])]
273 minimal: bool,
274
275 #[arg(long, conflicts_with_all = ["with_txs", "with_txs_since", "with_txs_distance", "with_receipts", "with_receipts_since", "with_receipts_distance", "with_state_history", "with_state_history_since", "with_state_history_distance", "with_senders", "with_rocksdb", "archive", "minimal"])]
277 full: bool,
278
279 #[arg(long, conflicts_with_all = ["url", "with_rocksdb"])]
283 without_rocksdb: bool,
284
285 #[arg(long, short = 'y')]
288 non_interactive: bool,
289
290 #[arg(long, default_value_t = true, num_args = 0..=1, default_missing_value = "true")]
297 resumable: bool,
298
299 #[arg(long, default_value_t = MAX_CONCURRENT_DOWNLOADS)]
301 download_concurrency: usize,
302
303 #[arg(long, alias = "list-snapshots", conflicts_with_all = ["url", "manifest_url", "manifest_path"])]
308 list: bool,
309}
310
311impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
312 pub async fn execute<N>(self) -> Result<()> {
313 let chain = self.env.chain.chain();
314 let chain_id = chain.id();
315 let data_dir = self.env.datadir.clone().resolve_datadir(chain);
316 fs::create_dir_all(&data_dir)?;
317
318 let cancel_token = CancellationToken::new();
319 let _cancel_guard = cancel_token.drop_guard();
320
321 if self.list {
323 let entries = fetch_snapshot_api_entries(chain_id).await?;
324 print_snapshot_listing(&entries, chain_id);
325 return Ok(());
326 }
327
328 let default_sf = data_dir.data_dir().join("static_files");
330 let custom_sf = data_dir.static_files();
331 let static_files_dir = if custom_sf != default_sf { Some(custom_sf) } else { None };
332
333 if let Some(ref url) = self.url {
335 info!(target: "reth::cli",
336 dir = ?data_dir.data_dir(),
337 url = %url,
338 "Starting snapshot download and extraction"
339 );
340
341 stream_and_extract(
342 url,
343 data_dir.data_dir(),
344 static_files_dir,
345 None,
346 self.resumable,
347 cancel_token.clone(),
348 )
349 .await?;
350 info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
351
352 return Ok(());
353 }
354
355 let manifest_source = self.resolve_manifest_source(chain_id).await?;
357
358 info!(target: "reth::cli", source = %manifest_source, "Fetching snapshot manifest");
359 let mut manifest = fetch_manifest_from_source(&manifest_source).await?;
360 manifest.base_url = Some(resolve_manifest_base_url(&manifest, &manifest_source)?);
361
362 info!(target: "reth::cli",
363 block = manifest.block,
364 chain_id = manifest.chain_id,
365 storage_version = %manifest.storage_version,
366 components = manifest.components.len(),
367 "Loaded snapshot manifest"
368 );
369
370 let ResolvedComponents { mut selections, preset } = self.resolve_components(&manifest)?;
371
372 if matches!(preset, Some(SelectionPreset::Archive)) {
373 inject_archive_only_components(&mut selections, &manifest, !self.without_rocksdb);
374 }
375
376 let target_dir = data_dir.data_dir();
378 let mut all_downloads: Vec<PlannedArchive> = Vec::new();
379 for (ty, sel) in &selections {
380 let distance = match sel {
381 ComponentSelection::All => None,
382 ComponentSelection::Distance(d) => Some(*d),
383 ComponentSelection::Since(block) => Some(manifest.block - block + 1),
384 ComponentSelection::None => continue,
385 };
386 let descriptors = manifest.archive_descriptors_for_distance(*ty, distance);
387 let name = ty.display_name().to_string();
388
389 if !descriptors.is_empty() {
390 info!(target: "reth::cli",
391 component = %name,
392 archives = descriptors.len(),
393 selection = %sel,
394 "Queued component for download"
395 );
396 }
397
398 for descriptor in descriptors {
399 if descriptor.output_files.is_empty() {
400 eyre::bail!(
401 "Invalid modular manifest: {} is missing plain output checksum metadata",
402 descriptor.file_name
403 );
404 }
405 all_downloads.push(PlannedArchive {
406 ty: *ty,
407 component: name.clone(),
408 archive: descriptor,
409 });
410 }
411 }
412
413 all_downloads.sort_by(|a, b| {
414 archive_priority_rank(a.ty)
415 .cmp(&archive_priority_rank(b.ty))
416 .then_with(|| a.component.cmp(&b.component))
417 .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
418 });
419
420 let download_cache_dir = if self.resumable {
421 let dir = target_dir.join(DOWNLOAD_CACHE_DIR);
422 fs::create_dir_all(&dir)?;
423 Some(dir)
424 } else {
425 None
426 };
427
428 let total_archives = all_downloads.len();
429 let total_size: u64 = selections
430 .iter()
431 .map(|(ty, sel)| match sel {
432 ComponentSelection::All => manifest.size_for_distance(*ty, None),
433 ComponentSelection::Distance(d) => manifest.size_for_distance(*ty, Some(*d)),
434 ComponentSelection::Since(block) => {
435 manifest.size_for_distance(*ty, Some(manifest.block - block + 1))
436 }
437 ComponentSelection::None => 0,
438 })
439 .sum();
440
441 let startup_summary =
442 summarize_download_startup(&all_downloads, target_dir, static_files_dir.as_deref())?;
443 info!(target: "reth::cli",
444 reusable = startup_summary.reusable,
445 needs_download = startup_summary.needs_download,
446 "Startup integrity summary (plain output files)"
447 );
448
449 info!(target: "reth::cli",
450 archives = total_archives,
451 total = %DownloadProgress::format_size(total_size),
452 "Downloading all archives"
453 );
454
455 let shared = SharedProgress::new(total_size, total_archives as u64, cancel_token.clone());
456 let progress_handle = spawn_progress_display(Arc::clone(&shared));
457
458 let target = target_dir.to_path_buf();
459 let sf_dir = static_files_dir;
460 let cache_dir = download_cache_dir;
461 let resumable = self.resumable;
462 let download_concurrency = self.download_concurrency.max(1);
463 let results: Vec<Result<()>> = stream::iter(all_downloads)
464 .map(|planned| {
465 let dir = target.clone();
466 let sf = sf_dir.clone();
467 let cache = cache_dir.clone();
468 let sp = Arc::clone(&shared);
469 let ct = cancel_token.clone();
470 async move {
471 process_modular_archive(
472 planned,
473 &dir,
474 sf.as_deref(),
475 cache.as_deref(),
476 Some(sp),
477 resumable,
478 ct,
479 )
480 .await?;
481 Ok(())
482 }
483 })
484 .buffer_unordered(download_concurrency)
485 .collect()
486 .await;
487
488 shared.done.store(true, Ordering::Relaxed);
489 let _ = progress_handle.await;
490
491 for result in results {
493 result?;
494 }
495
496 let config =
498 config_for_selections(&selections, &manifest, preset, Some(self.env.chain.as_ref()));
499 if write_config(&config, target_dir)? {
500 let desc = config_gen::describe_prune_config(&config);
501 info!(target: "reth::cli", "{}", desc.join(", "));
502 }
503
504 let db_path = data_dir.db();
506 let db = init_db(&db_path, self.env.db.database_args())?;
507
508 let should_write_prune = config.prune.segments != Default::default();
511 let should_reset_indices = should_reset_index_stage_checkpoints(&selections);
512 if should_write_prune || should_reset_indices {
513 let tx = db.tx_mut()?;
514
515 if should_write_prune {
516 config_gen::write_prune_checkpoints_tx(&tx, &config, manifest.block)?;
517 }
518
519 if should_reset_indices {
523 config_gen::reset_index_stage_checkpoints_tx(&tx)?;
524 }
525
526 tx.commit()?;
527 }
528
529 let start_command = startup_node_command::<C>(self.env.chain.as_ref());
530 info!(target: "reth::cli", "Snapshot download complete. Run `{}` to start syncing.", start_command);
531
532 Ok(())
533 }
534
535 fn resolve_components(&self, manifest: &SnapshotManifest) -> Result<ResolvedComponents> {
537 let available = |ty: SnapshotComponentType| manifest.component(ty).is_some();
538
539 if self.archive {
541 return Ok(ResolvedComponents {
542 selections: SnapshotComponentType::ALL
543 .iter()
544 .copied()
545 .filter(|ty| available(*ty))
546 .filter(|ty| {
547 !self.without_rocksdb || *ty != SnapshotComponentType::RocksdbIndices
548 })
549 .map(|ty| (ty, ComponentSelection::All))
550 .collect(),
551 preset: Some(SelectionPreset::Archive),
552 });
553 }
554
555 if self.full {
556 return Ok(ResolvedComponents {
557 selections: self.full_preset_selections(manifest),
558 preset: Some(SelectionPreset::Full),
559 });
560 }
561
562 if self.minimal {
563 return Ok(ResolvedComponents {
564 selections: self.minimal_preset_selections(manifest),
565 preset: Some(SelectionPreset::Minimal),
566 });
567 }
568
569 let has_explicit_flags = self.with_txs ||
570 self.with_txs_since.is_some() ||
571 self.with_txs_distance.is_some() ||
572 self.with_receipts ||
573 self.with_receipts_since.is_some() ||
574 self.with_receipts_distance.is_some() ||
575 self.with_state_history ||
576 self.with_state_history_since.is_some() ||
577 self.with_state_history_distance.is_some() ||
578 self.with_senders ||
579 self.with_rocksdb;
580
581 if has_explicit_flags {
582 let mut selections = BTreeMap::new();
583 let tx_selection = explicit_component_selection(
584 "--with-txs-since",
585 self.with_txs,
586 self.with_txs_since,
587 self.with_txs_distance,
588 manifest.block,
589 )?;
590 let receipt_selection = explicit_component_selection(
591 "--with-receipts-since",
592 self.with_receipts,
593 self.with_receipts_since,
594 self.with_receipts_distance,
595 manifest.block,
596 )?;
597 let state_history_selection = explicit_component_selection(
598 "--with-state-history-since",
599 self.with_state_history,
600 self.with_state_history_since,
601 self.with_state_history_distance,
602 manifest.block,
603 )?;
604
605 if available(SnapshotComponentType::State) {
607 selections.insert(SnapshotComponentType::State, ComponentSelection::All);
608 }
609 if available(SnapshotComponentType::Headers) {
610 selections.insert(SnapshotComponentType::Headers, ComponentSelection::All);
611 }
612 if let Some(selection) = tx_selection &&
613 available(SnapshotComponentType::Transactions)
614 {
615 selections.insert(SnapshotComponentType::Transactions, selection);
616 }
617 if let Some(selection) = receipt_selection &&
618 available(SnapshotComponentType::Receipts)
619 {
620 selections.insert(SnapshotComponentType::Receipts, selection);
621 }
622 if let Some(selection) = state_history_selection {
623 if available(SnapshotComponentType::AccountChangesets) {
624 selections.insert(SnapshotComponentType::AccountChangesets, selection);
625 }
626 if available(SnapshotComponentType::StorageChangesets) {
627 selections.insert(SnapshotComponentType::StorageChangesets, selection);
628 }
629 }
630 if self.with_senders && available(SnapshotComponentType::TransactionSenders) {
631 selections
632 .insert(SnapshotComponentType::TransactionSenders, ComponentSelection::All);
633 }
634 if self.with_rocksdb && available(SnapshotComponentType::RocksdbIndices) {
635 selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
636 }
637 return Ok(ResolvedComponents { selections, preset: None });
638 }
639
640 if self.non_interactive {
641 return Ok(ResolvedComponents {
642 selections: self.minimal_preset_selections(manifest),
643 preset: Some(SelectionPreset::Minimal),
644 });
645 }
646
647 let full_preset = self.full_preset_selections(manifest);
649 let SelectorOutput { selections, preset } = run_selector(manifest.clone(), &full_preset)?;
650 let selected =
651 selections.into_iter().filter(|(_, sel)| *sel != ComponentSelection::None).collect();
652
653 Ok(ResolvedComponents { selections: selected, preset })
654 }
655
656 fn minimal_preset_selections(
657 &self,
658 manifest: &SnapshotManifest,
659 ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
660 SnapshotComponentType::ALL
661 .iter()
662 .copied()
663 .filter(|ty| manifest.component(*ty).is_some())
664 .map(|ty| (ty, ty.minimal_selection()))
665 .collect()
666 }
667
668 fn full_preset_selections(
669 &self,
670 manifest: &SnapshotManifest,
671 ) -> BTreeMap<SnapshotComponentType, ComponentSelection> {
672 let mut selections = BTreeMap::new();
673
674 for ty in [
675 SnapshotComponentType::State,
676 SnapshotComponentType::Headers,
677 SnapshotComponentType::Transactions,
678 SnapshotComponentType::Receipts,
679 SnapshotComponentType::AccountChangesets,
680 SnapshotComponentType::StorageChangesets,
681 SnapshotComponentType::TransactionSenders,
682 SnapshotComponentType::RocksdbIndices,
683 ] {
684 if manifest.component(ty).is_none() {
685 continue;
686 }
687
688 let selection = self.full_selection_for_component(ty, manifest.block);
689 if selection != ComponentSelection::None {
690 selections.insert(ty, selection);
691 }
692 }
693
694 selections
695 }
696
697 fn full_selection_for_component(
698 &self,
699 ty: SnapshotComponentType,
700 snapshot_block: u64,
701 ) -> ComponentSelection {
702 let defaults = DefaultPruningValues::get_global();
703 match ty {
704 SnapshotComponentType::State | SnapshotComponentType::Headers => {
705 ComponentSelection::All
706 }
707 SnapshotComponentType::Transactions => {
708 if defaults.full_bodies_history_use_pre_merge {
709 match self
710 .env
711 .chain
712 .ethereum_fork_activation(EthereumHardfork::Paris)
713 .block_number()
714 {
715 Some(paris) if snapshot_block >= paris => ComponentSelection::Since(paris),
716 Some(_) => ComponentSelection::None,
717 None => ComponentSelection::All,
718 }
719 } else {
720 selection_from_prune_mode(
721 defaults.full_prune_modes.bodies_history,
722 snapshot_block,
723 )
724 }
725 }
726 SnapshotComponentType::Receipts => {
727 selection_from_prune_mode(defaults.full_prune_modes.receipts, snapshot_block)
728 }
729 SnapshotComponentType::AccountChangesets => {
730 selection_from_prune_mode(defaults.full_prune_modes.account_history, snapshot_block)
731 }
732 SnapshotComponentType::StorageChangesets => {
733 selection_from_prune_mode(defaults.full_prune_modes.storage_history, snapshot_block)
734 }
735 SnapshotComponentType::TransactionSenders => {
736 selection_from_prune_mode(defaults.full_prune_modes.sender_recovery, snapshot_block)
737 }
738 SnapshotComponentType::RocksdbIndices => ComponentSelection::None,
740 }
741 }
742
743 async fn resolve_manifest_source(&self, chain_id: u64) -> Result<String> {
744 if let Some(path) = &self.manifest_path {
745 return Ok(path.display().to_string());
746 }
747
748 match &self.manifest_url {
749 Some(url) => Ok(url.clone()),
750 None => discover_manifest_url(chain_id).await,
751 }
752 }
753}
754
755fn selection_from_prune_mode(mode: Option<PruneMode>, snapshot_block: u64) -> ComponentSelection {
756 match mode {
757 None => ComponentSelection::All,
758 Some(PruneMode::Full) => ComponentSelection::None,
759 Some(PruneMode::Distance(d)) => ComponentSelection::Distance(d),
760 Some(PruneMode::Before(block)) => {
761 if snapshot_block >= block {
762 ComponentSelection::Since(block)
763 } else {
764 ComponentSelection::None
765 }
766 }
767 }
768}
769
770fn explicit_component_selection(
771 since_flag_name: &str,
772 all: bool,
773 since: Option<u64>,
774 distance: Option<u64>,
775 snapshot_block: u64,
776) -> Result<Option<ComponentSelection>> {
777 if all {
778 return Ok(Some(ComponentSelection::All));
779 }
780
781 if let Some(since) = since {
782 if since > snapshot_block {
783 eyre::bail!("{since_flag_name} {since} is beyond the snapshot block {snapshot_block}");
784 }
785 return Ok(Some(ComponentSelection::Since(since)));
786 }
787
788 Ok(distance.map(ComponentSelection::Distance))
789}
790
791fn inject_archive_only_components(
794 selections: &mut BTreeMap<SnapshotComponentType, ComponentSelection>,
795 manifest: &SnapshotManifest,
796 include_rocksdb: bool,
797) {
798 let is_all =
799 |ty: SnapshotComponentType| selections.get(&ty).copied() == Some(ComponentSelection::All);
800
801 let is_archive = is_all(SnapshotComponentType::Transactions) &&
802 is_all(SnapshotComponentType::Receipts) &&
803 is_all(SnapshotComponentType::AccountChangesets) &&
804 is_all(SnapshotComponentType::StorageChangesets);
805
806 if !is_archive {
807 return;
808 }
809
810 for component in
811 [SnapshotComponentType::TransactionSenders, SnapshotComponentType::RocksdbIndices]
812 {
813 if component == SnapshotComponentType::RocksdbIndices && !include_rocksdb {
814 continue;
815 }
816
817 if manifest.component(component).is_some() {
818 selections.insert(component, ComponentSelection::All);
819 }
820 }
821}
822
823fn should_reset_index_stage_checkpoints(
824 selections: &BTreeMap<SnapshotComponentType, ComponentSelection>,
825) -> bool {
826 !matches!(selections.get(&SnapshotComponentType::RocksdbIndices), Some(ComponentSelection::All))
827}
828
829fn startup_node_command<C>(chain_spec: &C::ChainSpec) -> String
830where
831 C: ChainSpecParser,
832 C::ChainSpec: EthChainSpec,
833{
834 startup_node_command_for_binary::<C>(¤t_binary_name(), chain_spec)
835}
836
837fn startup_node_command_for_binary<C>(binary_name: &str, chain_spec: &C::ChainSpec) -> String
838where
839 C: ChainSpecParser,
840 C::ChainSpec: EthChainSpec,
841{
842 let mut command = format!("{binary_name} node");
843
844 if let Some(chain_arg) = startup_chain_arg::<C>(chain_spec) {
845 command.push_str(" --chain ");
846 command.push_str(&chain_arg);
847 }
848
849 command
850}
851
852fn current_binary_name() -> String {
853 std::env::args_os()
854 .next()
855 .map(PathBuf::from)
856 .and_then(|path| path.file_stem().map(|name| name.to_owned()))
857 .and_then(|name| name.into_string().ok())
858 .filter(|name| !name.is_empty())
859 .unwrap_or_else(|| "reth".to_string())
860}
861
862fn startup_chain_arg<C>(chain_spec: &C::ChainSpec) -> Option<String>
863where
864 C: ChainSpecParser,
865 C::ChainSpec: EthChainSpec,
866{
867 let current_chain = chain_spec.chain();
868 let current_genesis_hash = chain_spec.genesis_hash();
869 let default_chain = C::default_value().and_then(|chain_name| C::parse(chain_name).ok());
870
871 if default_chain.as_ref().is_some_and(|default_chain| {
872 default_chain.chain() == current_chain &&
873 default_chain.genesis_hash() == current_genesis_hash
874 }) {
875 return None;
876 }
877
878 C::SUPPORTED_CHAINS
879 .iter()
880 .find_map(|chain_name| {
881 let parsed_chain = C::parse(chain_name).ok()?;
882 (parsed_chain.chain() == current_chain &&
883 parsed_chain.genesis_hash() == current_genesis_hash)
884 .then(|| (*chain_name).to_string())
885 })
886 .or_else(|| Some("<chain-or-chainspec>".to_string()))
887}
888
889impl<C: ChainSpecParser> DownloadCommand<C> {
890 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
892 Some(&self.env.chain)
893 }
894}
895
896pub(crate) struct DownloadProgress {
898 downloaded: u64,
899 total_size: u64,
900 last_displayed: Instant,
901 started_at: Instant,
902}
903
904#[derive(Debug, Clone)]
905struct PlannedArchive {
906 ty: SnapshotComponentType,
907 component: String,
908 archive: ArchiveDescriptor,
909}
910
911const fn archive_priority_rank(ty: SnapshotComponentType) -> u8 {
912 match ty {
913 SnapshotComponentType::State => 0,
914 SnapshotComponentType::RocksdbIndices => 1,
915 _ => 2,
916 }
917}
918
919#[derive(Debug, Default, Clone, Copy)]
920struct DownloadStartupSummary {
921 reusable: usize,
922 needs_download: usize,
923}
924
925fn summarize_download_startup(
926 all_downloads: &[PlannedArchive],
927 target_dir: &Path,
928 static_files_dir: Option<&Path>,
929) -> Result<DownloadStartupSummary> {
930 let mut summary = DownloadStartupSummary::default();
931
932 for planned in all_downloads {
933 if verify_output_files(target_dir, static_files_dir, &planned.archive.output_files)? {
934 summary.reusable += 1;
935 } else {
936 summary.needs_download += 1;
937 }
938 }
939
940 Ok(summary)
941}
942
943impl DownloadProgress {
944 fn new(total_size: u64) -> Self {
946 let now = Instant::now();
947 Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
948 }
949
950 pub(crate) fn format_size(size: u64) -> String {
952 let mut size = size as f64;
953 let mut unit_index = 0;
954
955 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
956 size /= 1024.0;
957 unit_index += 1;
958 }
959
960 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
961 }
962
963 fn format_duration(duration: Duration) -> String {
965 let secs = duration.as_secs();
966 if secs < 60 {
967 format!("{secs}s")
968 } else if secs < 3600 {
969 format!("{}m {}s", secs / 60, secs % 60)
970 } else {
971 format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
972 }
973 }
974
975 fn update(&mut self, chunk_size: u64) -> Result<()> {
977 self.downloaded += chunk_size;
978
979 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
980 let formatted_downloaded = Self::format_size(self.downloaded);
981 let formatted_total = Self::format_size(self.total_size);
982 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
983
984 let elapsed = self.started_at.elapsed();
985 let eta = if self.downloaded > 0 {
986 let remaining = self.total_size.saturating_sub(self.downloaded);
987 let speed = self.downloaded as f64 / elapsed.as_secs_f64();
988 if speed > 0.0 {
989 Duration::from_secs_f64(remaining as f64 / speed)
990 } else {
991 Duration::ZERO
992 }
993 } else {
994 Duration::ZERO
995 };
996 let eta_str = Self::format_duration(eta);
997
998 print!(
999 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str} ",
1000 );
1001 io::stdout().flush()?;
1002 self.last_displayed = Instant::now();
1003 }
1004
1005 Ok(())
1006 }
1007}
1008
1009struct SharedProgress {
1015 downloaded: AtomicU64,
1016 total_size: u64,
1017 total_archives: u64,
1018 archives_done: AtomicU64,
1019 done: AtomicBool,
1020 cancel_token: CancellationToken,
1021}
1022
1023impl SharedProgress {
1024 fn new(total_size: u64, total_archives: u64, cancel_token: CancellationToken) -> Arc<Self> {
1025 Arc::new(Self {
1026 downloaded: AtomicU64::new(0),
1027 total_size,
1028 total_archives,
1029 archives_done: AtomicU64::new(0),
1030 done: AtomicBool::new(false),
1031 cancel_token,
1032 })
1033 }
1034
1035 fn is_cancelled(&self) -> bool {
1036 self.cancel_token.is_cancelled()
1037 }
1038
1039 fn add(&self, bytes: u64) {
1040 self.downloaded.fetch_add(bytes, Ordering::Relaxed);
1041 }
1042
1043 fn archive_done(&self) {
1044 self.archives_done.fetch_add(1, Ordering::Relaxed);
1045 }
1046}
1047
1048fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
1051 tokio::spawn(async move {
1052 let started_at = Instant::now();
1053 let mut prev_downloaded = 0u64;
1054 let mut prev_time = started_at;
1055 let mut interval = tokio::time::interval(Duration::from_secs(3));
1056 interval.tick().await; loop {
1058 interval.tick().await;
1059
1060 if progress.done.load(Ordering::Relaxed) {
1061 break;
1062 }
1063
1064 let downloaded = progress.downloaded.load(Ordering::Relaxed);
1065 let total = progress.total_size;
1066 if total == 0 {
1067 continue;
1068 }
1069
1070 let done = progress.archives_done.load(Ordering::Relaxed);
1071 let all = progress.total_archives;
1072 let pct = (downloaded as f64 / total as f64) * 100.0;
1073 let dl = DownloadProgress::format_size(downloaded);
1074 let tot = DownloadProgress::format_size(total);
1075
1076 let remaining = total.saturating_sub(downloaded);
1077
1078 if remaining == 0 {
1079 info!(target: "reth::cli",
1081 archives = format_args!("{done}/{all}"),
1082 downloaded = %dl,
1083 "Extracting remaining archives"
1084 );
1085 } else {
1086 let now = Instant::now();
1087 let dt = now.duration_since(prev_time).as_secs_f64();
1088 let speed = if dt > 0.0 {
1089 (downloaded.saturating_sub(prev_downloaded)) as f64 / dt
1090 } else {
1091 0.0
1092 };
1093 prev_downloaded = downloaded;
1094 prev_time = now;
1095
1096 let eta = if speed > 0.0 {
1097 DownloadProgress::format_duration(Duration::from_secs_f64(
1098 remaining as f64 / speed,
1099 ))
1100 } else {
1101 "??".to_string()
1102 };
1103
1104 info!(target: "reth::cli",
1105 archives = format_args!("{done}/{all}"),
1106 progress = format_args!("{pct:.1}%"),
1107 downloaded = %dl,
1108 total = %tot,
1109 eta = %eta,
1110 "Downloading"
1111 );
1112 }
1113 }
1114
1115 let downloaded = progress.downloaded.load(Ordering::Relaxed);
1117 let dl = DownloadProgress::format_size(downloaded);
1118 let tot = DownloadProgress::format_size(progress.total_size);
1119 let elapsed = DownloadProgress::format_duration(started_at.elapsed());
1120 info!(target: "reth::cli",
1121 downloaded = %dl,
1122 total = %tot,
1123 elapsed = %elapsed,
1124 "Downloads complete"
1125 );
1126 })
1127}
1128
1129struct ProgressReader<R> {
1131 reader: R,
1132 progress: DownloadProgress,
1133 cancel_token: CancellationToken,
1134}
1135
1136impl<R: Read> ProgressReader<R> {
1137 fn new(reader: R, total_size: u64, cancel_token: CancellationToken) -> Self {
1138 Self { reader, progress: DownloadProgress::new(total_size), cancel_token }
1139 }
1140}
1141
1142impl<R: Read> Read for ProgressReader<R> {
1143 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1144 if self.cancel_token.is_cancelled() {
1145 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1146 }
1147 let bytes = self.reader.read(buf)?;
1148 if bytes > 0 &&
1149 let Err(e) = self.progress.update(bytes as u64)
1150 {
1151 return Err(io::Error::other(e));
1152 }
1153 Ok(bytes)
1154 }
1155}
1156
1157#[derive(Debug, Clone, Copy)]
1159enum CompressionFormat {
1160 Lz4,
1161 Zstd,
1162}
1163
1164impl CompressionFormat {
1165 fn from_url(url: &str) -> Result<Self> {
1167 let path =
1168 Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
1169
1170 if path.ends_with(EXTENSION_TAR_LZ4) {
1171 Ok(Self::Lz4)
1172 } else if path.ends_with(EXTENSION_TAR_ZSTD) {
1173 Ok(Self::Zstd)
1174 } else {
1175 Err(eyre::eyre!(
1176 "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
1177 path
1178 ))
1179 }
1180 }
1181}
1182
1183fn resolve_output_path(
1186 target_dir: &Path,
1187 relative_path: &str,
1188 static_files_dir: Option<&Path>,
1189) -> PathBuf {
1190 if let Some(sf_dir) = static_files_dir &&
1191 let Some(rest) = relative_path.strip_prefix(STATIC_FILES_PREFIX)
1192 {
1193 return sf_dir.join(rest);
1194 }
1195 target_dir.join(relative_path)
1196}
1197
1198fn unpack_archive_with_remap<R: Read>(
1201 archive: &mut Archive<R>,
1202 target_dir: &Path,
1203 static_files_dir: Option<&Path>,
1204) -> Result<()> {
1205 let Some(sf_dir) = static_files_dir else {
1207 archive.unpack(target_dir)?;
1208 return Ok(());
1209 };
1210
1211 fs::create_dir_all(sf_dir)?;
1212
1213 for entry in archive.entries()? {
1214 let mut entry = entry?;
1215 let path = entry.path()?.into_owned();
1216 let path_str = path.to_string_lossy();
1217
1218 if let Some(rest) = path_str.strip_prefix(STATIC_FILES_PREFIX) {
1219 if rest.is_empty() {
1220 continue;
1223 }
1224 let dest = sf_dir.join(rest);
1225 if let Some(parent) = dest.parent() {
1226 fs::create_dir_all(parent)?;
1227 }
1228 entry.unpack(&dest)?;
1229 } else if path_str == "static_files" {
1230 continue;
1232 } else {
1233 let dest = target_dir.join(&path);
1234 if let Some(parent) = dest.parent() {
1235 fs::create_dir_all(parent)?;
1236 }
1237 entry.unpack(&dest)?;
1238 }
1239 }
1240 Ok(())
1241}
1242
1243fn extract_archive<R: Read>(
1245 reader: R,
1246 total_size: u64,
1247 format: CompressionFormat,
1248 target_dir: &Path,
1249 static_files_dir: Option<&Path>,
1250 cancel_token: CancellationToken,
1251) -> Result<()> {
1252 let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
1253
1254 match format {
1255 CompressionFormat::Lz4 => {
1256 let decoder = Decoder::new(progress_reader)?;
1257 unpack_archive_with_remap(&mut Archive::new(decoder), target_dir, static_files_dir)?;
1258 }
1259 CompressionFormat::Zstd => {
1260 let decoder = ZstdDecoder::new(progress_reader)?;
1261 unpack_archive_with_remap(&mut Archive::new(decoder), target_dir, static_files_dir)?;
1262 }
1263 }
1264
1265 println!();
1266 Ok(())
1267}
1268
1269fn extract_archive_raw<R: Read>(
1271 reader: R,
1272 format: CompressionFormat,
1273 target_dir: &Path,
1274 static_files_dir: Option<&Path>,
1275) -> Result<()> {
1276 match format {
1277 CompressionFormat::Lz4 => {
1278 unpack_archive_with_remap(
1279 &mut Archive::new(Decoder::new(reader)?),
1280 target_dir,
1281 static_files_dir,
1282 )?;
1283 }
1284 CompressionFormat::Zstd => {
1285 unpack_archive_with_remap(
1286 &mut Archive::new(ZstdDecoder::new(reader)?),
1287 target_dir,
1288 static_files_dir,
1289 )?;
1290 }
1291 }
1292 Ok(())
1293}
1294
1295fn extract_from_file(
1297 path: &Path,
1298 format: CompressionFormat,
1299 target_dir: &Path,
1300 static_files_dir: Option<&Path>,
1301) -> Result<()> {
1302 let file = std::fs::File::open(path)?;
1303 let total_size = file.metadata()?.len();
1304 info!(target: "reth::cli",
1305 file = %path.display(),
1306 size = %DownloadProgress::format_size(total_size),
1307 "Extracting local archive"
1308 );
1309 let start = Instant::now();
1310 extract_archive(
1311 file,
1312 total_size,
1313 format,
1314 target_dir,
1315 static_files_dir,
1316 CancellationToken::new(),
1317 )?;
1318 info!(target: "reth::cli",
1319 file = %path.display(),
1320 elapsed = %DownloadProgress::format_duration(start.elapsed()),
1321 "Local extraction complete"
1322 );
1323 Ok(())
1324}
1325
1326const MAX_DOWNLOAD_RETRIES: u32 = 10;
1327const RETRY_BACKOFF_SECS: u64 = 5;
1328
1329struct ProgressWriter<W> {
1332 inner: W,
1333 progress: DownloadProgress,
1334 cancel_token: CancellationToken,
1335}
1336
1337impl<W: Write> Write for ProgressWriter<W> {
1338 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1339 if self.cancel_token.is_cancelled() {
1340 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1341 }
1342 let n = self.inner.write(buf)?;
1343 let _ = self.progress.update(n as u64);
1344 Ok(n)
1345 }
1346
1347 fn flush(&mut self) -> io::Result<()> {
1348 self.inner.flush()
1349 }
1350}
1351
1352struct SharedProgressWriter<W> {
1355 inner: W,
1356 progress: Arc<SharedProgress>,
1357}
1358
1359impl<W: Write> Write for SharedProgressWriter<W> {
1360 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
1361 if self.progress.is_cancelled() {
1362 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1363 }
1364 let n = self.inner.write(buf)?;
1365 self.progress.add(n as u64);
1366 Ok(n)
1367 }
1368
1369 fn flush(&mut self) -> io::Result<()> {
1370 self.inner.flush()
1371 }
1372}
1373
1374struct SharedProgressReader<R> {
1377 inner: R,
1378 progress: Arc<SharedProgress>,
1379}
1380
1381impl<R: Read> Read for SharedProgressReader<R> {
1382 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
1383 if self.progress.is_cancelled() {
1384 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
1385 }
1386 let n = self.inner.read(buf)?;
1387 self.progress.add(n as u64);
1388 Ok(n)
1389 }
1390}
1391
1392fn resumable_download(
1399 url: &str,
1400 target_dir: &Path,
1401 shared: Option<&Arc<SharedProgress>>,
1402 cancel_token: CancellationToken,
1403) -> Result<(PathBuf, u64)> {
1404 let file_name = Url::parse(url)
1405 .ok()
1406 .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
1407 .unwrap_or_else(|| "snapshot.tar".to_string());
1408
1409 let final_path = target_dir.join(&file_name);
1410 let part_path = target_dir.join(format!("{file_name}.part"));
1411
1412 let quiet = shared.is_some();
1413
1414 if !quiet {
1415 info!(target: "reth::cli", file = %file_name, "Connecting to download server");
1416 }
1417 let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
1418
1419 let mut total_size: Option<u64> = None;
1420 let mut retries: u32 = 0;
1421
1422 let finalize_download = |size: u64| -> Result<(PathBuf, u64)> {
1423 fs::rename(&part_path, &final_path)?;
1424 if !quiet {
1425 info!(target: "reth::cli", file = %file_name, "Download complete");
1426 }
1427 Ok((final_path.clone(), size))
1428 };
1429
1430 loop {
1431 let existing_size = fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1432
1433 if let Some(total) = total_size &&
1434 existing_size >= total
1435 {
1436 return finalize_download(total);
1437 }
1438
1439 if retries > 0 {
1440 info!(target: "reth::cli",
1441 file = %file_name,
1442 retries,
1443 "Resuming download from {} bytes", existing_size
1444 );
1445 }
1446
1447 let mut request = client.get(url);
1448 if existing_size > 0 {
1449 request = request.header(RANGE, format!("bytes={existing_size}-"));
1450 if !quiet && retries == 0 {
1451 info!(target: "reth::cli", file = %file_name, "Resuming from {} bytes", existing_size);
1452 }
1453 }
1454
1455 let response = match request.send().and_then(|r| r.error_for_status()) {
1456 Ok(r) => r,
1457 Err(e) => {
1458 retries += 1;
1459 if retries >= MAX_DOWNLOAD_RETRIES {
1460 return Err(e.into());
1461 }
1462 warn!(target: "reth::cli",
1463 file = %file_name,
1464 %e,
1465 "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
1466 );
1467 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1468 continue;
1469 }
1470 };
1471
1472 let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
1473
1474 let size = if is_partial {
1475 response
1476 .headers()
1477 .get("Content-Range")
1478 .and_then(|v| v.to_str().ok())
1479 .and_then(|v| v.split('/').next_back())
1480 .and_then(|v| v.parse().ok())
1481 } else {
1482 response.content_length()
1483 };
1484
1485 if total_size.is_none() {
1486 total_size = size;
1487 if !quiet && let Some(s) = size {
1488 info!(target: "reth::cli",
1489 file = %file_name,
1490 size = %DownloadProgress::format_size(s),
1491 "Downloading"
1492 );
1493 }
1494 }
1495
1496 let current_total = total_size.ok_or_else(|| {
1497 eyre::eyre!("Server did not provide Content-Length or Content-Range header")
1498 })?;
1499
1500 let file = if is_partial && existing_size > 0 {
1501 OpenOptions::new()
1502 .append(true)
1503 .open(&part_path)
1504 .map_err(|e| fs::FsPathError::open(e, &part_path))?
1505 } else {
1506 fs::create_file(&part_path)?
1507 };
1508
1509 let start_offset = if is_partial { existing_size } else { 0 };
1510 let mut reader = response;
1511
1512 let copy_result;
1513 let flush_result;
1514
1515 if let Some(sp) = shared {
1516 if start_offset > 0 {
1518 sp.add(start_offset);
1519 }
1520 let mut writer =
1521 SharedProgressWriter { inner: BufWriter::new(file), progress: Arc::clone(sp) };
1522 copy_result = io::copy(&mut reader, &mut writer);
1523 flush_result = writer.inner.flush();
1524 } else {
1525 let mut progress = DownloadProgress::new(current_total);
1527 progress.downloaded = start_offset;
1528 let mut writer = ProgressWriter {
1529 inner: BufWriter::new(file),
1530 progress,
1531 cancel_token: cancel_token.clone(),
1532 };
1533 copy_result = io::copy(&mut reader, &mut writer);
1534 flush_result = writer.inner.flush();
1535 println!();
1536 }
1537
1538 match copy_result.and(flush_result) {
1539 Err(e) => {
1540 let new_size = std::fs::metadata(&part_path).map(|m| m.len()).unwrap_or(0);
1544 if new_size > existing_size {
1545 retries = 0;
1546 } else {
1547 retries += 1;
1548 }
1549
1550 if retries >= MAX_DOWNLOAD_RETRIES {
1551 return Err(e.into());
1552 }
1553
1554 warn!(target: "reth::cli",
1555 file = %file_name,
1556 %e,
1557 "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
1558 );
1559 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1560 continue;
1561 }
1562 Ok(_) => return finalize_download(current_total),
1563 }
1564 }
1565}
1566
1567fn streaming_download_and_extract(
1571 url: &str,
1572 format: CompressionFormat,
1573 target_dir: &Path,
1574 static_files_dir: Option<&Path>,
1575 shared: Option<&Arc<SharedProgress>>,
1576 cancel_token: CancellationToken,
1577) -> Result<()> {
1578 let quiet = shared.is_some();
1579 let mut last_error: Option<eyre::Error> = None;
1580
1581 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1582 if attempt > 1 {
1583 info!(target: "reth::cli",
1584 url = %url,
1585 attempt,
1586 max = MAX_DOWNLOAD_RETRIES,
1587 "Retrying streaming download from scratch"
1588 );
1589 }
1590
1591 let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
1592
1593 let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
1594 Ok(r) => r,
1595 Err(e) => {
1596 let err = eyre::Error::from(e);
1597 if attempt < MAX_DOWNLOAD_RETRIES {
1598 warn!(target: "reth::cli",
1599 url = %url,
1600 attempt,
1601 max = MAX_DOWNLOAD_RETRIES,
1602 err = %err,
1603 "Streaming request failed, retrying"
1604 );
1605 }
1606 last_error = Some(err);
1607 if attempt < MAX_DOWNLOAD_RETRIES {
1608 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1609 }
1610 continue;
1611 }
1612 };
1613
1614 if !quiet && let Some(size) = response.content_length() {
1615 info!(target: "reth::cli",
1616 url = %url,
1617 size = %DownloadProgress::format_size(size),
1618 "Streaming archive"
1619 );
1620 }
1621
1622 let result = if let Some(sp) = shared {
1623 let reader = SharedProgressReader { inner: response, progress: Arc::clone(sp) };
1624 extract_archive_raw(reader, format, target_dir, static_files_dir)
1625 } else {
1626 let total_size = response.content_length().unwrap_or(0);
1627 extract_archive(
1628 response,
1629 total_size,
1630 format,
1631 target_dir,
1632 static_files_dir,
1633 cancel_token.clone(),
1634 )
1635 };
1636
1637 match result {
1638 Ok(()) => return Ok(()),
1639 Err(e) => {
1640 if attempt < MAX_DOWNLOAD_RETRIES {
1641 warn!(target: "reth::cli",
1642 url = %url,
1643 attempt,
1644 max = MAX_DOWNLOAD_RETRIES,
1645 err = %e,
1646 "Streaming extraction failed, retrying"
1647 );
1648 }
1649 last_error = Some(e);
1650 if attempt < MAX_DOWNLOAD_RETRIES {
1651 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1652 }
1653 }
1654 }
1655 }
1656
1657 Err(last_error.unwrap_or_else(|| {
1658 eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
1659 }))
1660}
1661
1662fn download_and_extract(
1664 url: &str,
1665 format: CompressionFormat,
1666 target_dir: &Path,
1667 static_files_dir: Option<&Path>,
1668 shared: Option<&Arc<SharedProgress>>,
1669 cancel_token: CancellationToken,
1670) -> Result<()> {
1671 let quiet = shared.is_some();
1672 let (downloaded_path, total_size) =
1673 resumable_download(url, target_dir, shared, cancel_token.clone())?;
1674
1675 let file_name =
1676 downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
1677
1678 if !quiet {
1679 info!(target: "reth::cli",
1680 file = %file_name,
1681 size = %DownloadProgress::format_size(total_size),
1682 "Extracting archive"
1683 );
1684 }
1685 let file = fs::open(&downloaded_path)?;
1686
1687 if quiet {
1688 extract_archive_raw(file, format, target_dir, static_files_dir)?;
1690 } else {
1691 extract_archive(file, total_size, format, target_dir, static_files_dir, cancel_token)?;
1692 info!(target: "reth::cli",
1693 file = %file_name,
1694 "Extraction complete"
1695 );
1696 }
1697
1698 fs::remove_file(&downloaded_path)?;
1699
1700 if let Some(sp) = shared {
1701 sp.archive_done();
1702 }
1703
1704 Ok(())
1705}
1706
1707fn blocking_download_and_extract(
1713 url: &str,
1714 target_dir: &Path,
1715 static_files_dir: Option<&Path>,
1716 shared: Option<Arc<SharedProgress>>,
1717 resumable: bool,
1718 cancel_token: CancellationToken,
1719) -> Result<()> {
1720 let format = CompressionFormat::from_url(url)?;
1721
1722 if let Ok(parsed_url) = Url::parse(url) &&
1723 parsed_url.scheme() == "file"
1724 {
1725 let file_path = parsed_url
1726 .to_file_path()
1727 .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
1728 let result = extract_from_file(&file_path, format, target_dir, static_files_dir);
1729 if result.is_ok() &&
1730 let Some(sp) = shared
1731 {
1732 sp.archive_done();
1733 }
1734 result
1735 } else if resumable {
1736 download_and_extract(
1737 url,
1738 format,
1739 target_dir,
1740 static_files_dir,
1741 shared.as_ref(),
1742 cancel_token,
1743 )
1744 } else {
1745 let result = streaming_download_and_extract(
1746 url,
1747 format,
1748 target_dir,
1749 static_files_dir,
1750 shared.as_ref(),
1751 cancel_token,
1752 );
1753 if result.is_ok() &&
1754 let Some(sp) = shared
1755 {
1756 sp.archive_done();
1757 }
1758 result
1759 }
1760}
1761
1762async fn stream_and_extract(
1768 url: &str,
1769 target_dir: &Path,
1770 static_files_dir: Option<PathBuf>,
1771 shared: Option<Arc<SharedProgress>>,
1772 resumable: bool,
1773 cancel_token: CancellationToken,
1774) -> Result<()> {
1775 let target_dir = target_dir.to_path_buf();
1776 let url = url.to_string();
1777 task::spawn_blocking(move || {
1778 blocking_download_and_extract(
1779 &url,
1780 &target_dir,
1781 static_files_dir.as_deref(),
1782 shared,
1783 resumable,
1784 cancel_token,
1785 )
1786 })
1787 .await??;
1788
1789 Ok(())
1790}
1791
1792async fn process_modular_archive(
1793 planned: PlannedArchive,
1794 target_dir: &Path,
1795 static_files_dir: Option<&Path>,
1796 cache_dir: Option<&Path>,
1797 shared: Option<Arc<SharedProgress>>,
1798 resumable: bool,
1799 cancel_token: CancellationToken,
1800) -> Result<()> {
1801 let target_dir = target_dir.to_path_buf();
1802 let static_files_dir = static_files_dir.map(Path::to_path_buf);
1803 let cache_dir = cache_dir.map(Path::to_path_buf);
1804
1805 task::spawn_blocking(move || {
1806 blocking_process_modular_archive(
1807 &planned,
1808 &target_dir,
1809 static_files_dir.as_deref(),
1810 cache_dir.as_deref(),
1811 shared,
1812 resumable,
1813 cancel_token,
1814 )
1815 })
1816 .await??;
1817
1818 Ok(())
1819}
1820
1821fn blocking_process_modular_archive(
1822 planned: &PlannedArchive,
1823 target_dir: &Path,
1824 static_files_dir: Option<&Path>,
1825 cache_dir: Option<&Path>,
1826 shared: Option<Arc<SharedProgress>>,
1827 resumable: bool,
1828 cancel_token: CancellationToken,
1829) -> Result<()> {
1830 let archive = &planned.archive;
1831 if verify_output_files(target_dir, static_files_dir, &archive.output_files)? {
1832 if let Some(sp) = &shared {
1833 sp.add(archive.size);
1834 sp.archive_done();
1835 }
1836 info!(target: "reth::cli", file = %archive.file_name, component = %planned.component, "Skipping already verified plain files");
1837 return Ok(());
1838 }
1839
1840 let format = CompressionFormat::from_url(&archive.file_name)?;
1841 let mut last_error: Option<eyre::Error> = None;
1842 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
1843 cleanup_output_files(target_dir, static_files_dir, &archive.output_files);
1844
1845 if resumable {
1846 let cache_dir = cache_dir.ok_or_else(|| eyre::eyre!("Missing cache directory"))?;
1847 let archive_path = cache_dir.join(&archive.file_name);
1848 let part_path = cache_dir.join(format!("{}.part", archive.file_name));
1849 let result =
1850 resumable_download(&archive.url, cache_dir, shared.as_ref(), cancel_token.clone())
1851 .and_then(|(downloaded_path, _)| {
1852 let file = fs::open(&downloaded_path)?;
1853 extract_archive_raw(file, format, target_dir, static_files_dir)
1854 });
1855 let _ = fs::remove_file(&archive_path);
1856 let _ = fs::remove_file(&part_path);
1857
1858 if let Err(e) = result {
1859 warn!(target: "reth::cli",
1860 file = %archive.file_name,
1861 component = %planned.component,
1862 attempt,
1863 err = %e,
1864 "Download or extraction failed, retrying"
1865 );
1866 last_error = Some(e);
1867 if attempt < MAX_DOWNLOAD_RETRIES {
1868 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
1869 }
1870 continue;
1871 }
1872 } else {
1873 streaming_download_and_extract(
1875 &archive.url,
1876 format,
1877 target_dir,
1878 static_files_dir,
1879 shared.as_ref(),
1880 cancel_token.clone(),
1881 )?;
1882 }
1883
1884 if verify_output_files(target_dir, static_files_dir, &archive.output_files)? {
1885 if let Some(sp) = &shared {
1886 sp.archive_done();
1887 }
1888 return Ok(());
1889 }
1890
1891 warn!(target: "reth::cli", file = %archive.file_name, component = %planned.component, attempt, "Extracted files failed integrity checks, retrying");
1892 }
1893
1894 if let Some(e) = last_error {
1895 return Err(e.wrap_err(format!(
1896 "Failed after {} attempts for {}",
1897 MAX_DOWNLOAD_RETRIES, archive.file_name
1898 )));
1899 }
1900
1901 eyre::bail!(
1902 "Failed integrity validation after {} attempts for {}",
1903 MAX_DOWNLOAD_RETRIES,
1904 archive.file_name
1905 )
1906}
1907
1908fn verify_output_files(
1909 target_dir: &Path,
1910 static_files_dir: Option<&Path>,
1911 output_files: &[OutputFileChecksum],
1912) -> Result<bool> {
1913 if output_files.is_empty() {
1914 return Ok(false);
1915 }
1916
1917 for expected in output_files {
1918 let output_path = resolve_output_path(target_dir, &expected.path, static_files_dir);
1919 let meta = match fs::metadata(&output_path) {
1920 Ok(meta) => meta,
1921 Err(_) => return Ok(false),
1922 };
1923 if meta.len() != expected.size {
1924 return Ok(false);
1925 }
1926
1927 let actual = file_blake3_hex(&output_path)?;
1928 if !actual.eq_ignore_ascii_case(&expected.blake3) {
1929 return Ok(false);
1930 }
1931 }
1932
1933 Ok(true)
1934}
1935
1936fn cleanup_output_files(
1937 target_dir: &Path,
1938 static_files_dir: Option<&Path>,
1939 output_files: &[OutputFileChecksum],
1940) {
1941 for output in output_files {
1942 let _ = fs::remove_file(resolve_output_path(target_dir, &output.path, static_files_dir));
1943 }
1944}
1945
1946fn file_blake3_hex(path: &Path) -> Result<String> {
1947 let mut file = fs::open(path)?;
1948 let mut hasher = Hasher::new();
1949 let mut buf = [0_u8; 64 * 1024];
1950
1951 loop {
1952 let n = file.read(&mut buf)?;
1953 if n == 0 {
1954 break;
1955 }
1956 hasher.update(&buf[..n]);
1957 }
1958
1959 Ok(hasher.finalize().to_hex().to_string())
1960}
1961
1962async fn discover_manifest_url(chain_id: u64) -> Result<String> {
1967 let defaults = DownloadDefaults::get_global();
1968 let api_url = &*defaults.snapshot_api_url;
1969
1970 info!(target: "reth::cli", %api_url, %chain_id, "Discovering latest snapshot manifest");
1971
1972 let entries = fetch_snapshot_api_entries(chain_id).await?;
1973
1974 let entry =
1975 entries.iter().filter(|s| s.is_modular()).max_by_key(|s| s.block).ok_or_else(|| {
1976 eyre::eyre!(
1977 "No modular snapshot manifest found for chain \
1978 {chain_id} at {api_url}\n\n\
1979 You can provide a manifest URL directly with --manifest-url, or\n\
1980 use a direct snapshot URL with -u from:\n\
1981 \t- {}\n\n\
1982 Use --list to see all available snapshots.",
1983 api_url.trim_end_matches("/api/snapshots"),
1984 )
1985 })?;
1986
1987 info!(target: "reth::cli",
1988 block = entry.block,
1989 url = %entry.metadata_url,
1990 "Found latest snapshot manifest"
1991 );
1992
1993 Ok(entry.metadata_url.clone())
1994}
1995
1996fn deserialize_string_or_u64<'de, D>(deserializer: D) -> std::result::Result<u64, D::Error>
1998where
1999 D: serde::Deserializer<'de>,
2000{
2001 use serde::Deserialize;
2002 let value = serde_json::Value::deserialize(deserializer)?;
2003 match &value {
2004 serde_json::Value::Number(n) => {
2005 n.as_u64().ok_or_else(|| serde::de::Error::custom("expected u64"))
2006 }
2007 serde_json::Value::String(s) => {
2008 s.parse::<u64>().map_err(|_| serde::de::Error::custom("expected numeric string"))
2009 }
2010 _ => Err(serde::de::Error::custom("expected number or string")),
2011 }
2012}
2013
2014#[derive(serde::Deserialize)]
2016#[serde(rename_all = "camelCase")]
2017struct SnapshotApiEntry {
2018 #[serde(deserialize_with = "deserialize_string_or_u64")]
2019 chain_id: u64,
2020 #[serde(deserialize_with = "deserialize_string_or_u64")]
2021 block: u64,
2022 #[serde(default)]
2023 date: Option<String>,
2024 #[serde(default)]
2025 profile: Option<String>,
2026 metadata_url: String,
2027 #[serde(default)]
2028 size: u64,
2029}
2030
2031impl SnapshotApiEntry {
2032 fn is_modular(&self) -> bool {
2033 self.metadata_url.ends_with("manifest.json")
2034 }
2035}
2036
2037async fn fetch_snapshot_api_entries(chain_id: u64) -> Result<Vec<SnapshotApiEntry>> {
2039 let api_url = &*DownloadDefaults::get_global().snapshot_api_url;
2040
2041 let entries: Vec<SnapshotApiEntry> = Client::new()
2042 .get(api_url)
2043 .send()
2044 .await
2045 .and_then(|r| r.error_for_status())
2046 .wrap_err_with(|| format!("Failed to fetch snapshot listing from {api_url}"))?
2047 .json()
2048 .await?;
2049
2050 Ok(entries.into_iter().filter(|e| e.chain_id == chain_id).collect())
2051}
2052
2053fn print_snapshot_listing(entries: &[SnapshotApiEntry], chain_id: u64) {
2055 let modular: Vec<_> = entries.iter().filter(|e| e.is_modular()).collect();
2056
2057 let api_url = &*DownloadDefaults::get_global().snapshot_api_url;
2058 println!(
2059 "Available snapshots for chain {chain_id} ({}):\n",
2060 api_url.trim_end_matches("/api/snapshots"),
2061 );
2062 println!("{:<12} {:>10} {:<10} {:>10} MANIFEST URL", "DATE", "BLOCK", "PROFILE", "SIZE");
2063 println!("{}", "-".repeat(100));
2064
2065 for entry in &modular {
2066 let date = entry.date.as_deref().unwrap_or("-");
2067 let profile = entry.profile.as_deref().unwrap_or("-");
2068 let size = if entry.size > 0 {
2069 DownloadProgress::format_size(entry.size)
2070 } else {
2071 "-".to_string()
2072 };
2073
2074 println!(
2075 "{date:<12} {:>10} {profile:<10} {size:>10} {}",
2076 entry.block, entry.metadata_url
2077 );
2078 }
2079
2080 if modular.is_empty() {
2081 println!(" (no modular snapshots found)");
2082 }
2083
2084 println!(
2085 "\nTo download a specific snapshot, copy its manifest URL and run:\n \
2086 reth download --manifest-url <URL>"
2087 );
2088}
2089
2090async fn fetch_manifest_from_source(source: &str) -> Result<SnapshotManifest> {
2091 if let Ok(parsed) = Url::parse(source) {
2092 return match parsed.scheme() {
2093 "http" | "https" => {
2094 let response = Client::new()
2095 .get(source)
2096 .send()
2097 .await
2098 .and_then(|r| r.error_for_status())
2099 .wrap_err_with(|| {
2100 let sources = DownloadDefaults::get_global()
2101 .available_snapshots
2102 .iter()
2103 .map(|s| format!("\t- {s}"))
2104 .collect::<Vec<_>>()
2105 .join("\n");
2106 format!(
2107 "Failed to fetch snapshot manifest from {source}\n\n\
2108 The manifest endpoint may not be available for this snapshot source.\n\
2109 You can use a direct snapshot URL instead:\n\n\
2110 \treth download -u <snapshot-url>\n\n\
2111 Available snapshot sources:\n{sources}"
2112 )
2113 })?;
2114 Ok(response.json().await?)
2115 }
2116 "file" => {
2117 let path = parsed
2118 .to_file_path()
2119 .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
2120 let content = fs::read_to_string(path)?;
2121 Ok(serde_json::from_str(&content)?)
2122 }
2123 _ => Err(eyre::eyre!("Unsupported manifest URL scheme: {}", parsed.scheme())),
2124 };
2125 }
2126
2127 let content = fs::read_to_string(source)?;
2128 Ok(serde_json::from_str(&content)?)
2129}
2130
2131fn resolve_manifest_base_url(manifest: &SnapshotManifest, source: &str) -> Result<String> {
2132 if let Some(base_url) = manifest.base_url.as_deref() &&
2133 !base_url.is_empty()
2134 {
2135 return Ok(base_url.trim_end_matches('/').to_string());
2136 }
2137
2138 if let Ok(mut url) = Url::parse(source) {
2139 if url.scheme() == "file" {
2140 let mut path = url
2141 .to_file_path()
2142 .map_err(|_| eyre::eyre!("Invalid file:// manifest path: {source}"))?;
2143 path.pop();
2144 let mut base = Url::from_directory_path(path)
2145 .map_err(|_| eyre::eyre!("Invalid manifest directory for source: {source}"))?
2146 .to_string();
2147 if base.ends_with('/') {
2148 base.pop();
2149 }
2150 return Ok(base);
2151 }
2152
2153 {
2154 let mut segments = url
2155 .path_segments_mut()
2156 .map_err(|_| eyre::eyre!("manifest_url must have a hierarchical path"))?;
2157 segments.pop_if_empty();
2158 segments.pop();
2159 }
2160 return Ok(url.as_str().trim_end_matches('/').to_string());
2161 }
2162
2163 let path = Path::new(source);
2164 let manifest_dir = if path.is_absolute() {
2165 path.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
2166 } else {
2167 let joined = std::env::current_dir()?.join(path);
2168 joined.parent().map(Path::to_path_buf).unwrap_or_else(|| PathBuf::from("."))
2169 };
2170 let mut base = Url::from_directory_path(&manifest_dir)
2171 .map_err(|_| eyre::eyre!("Invalid manifest directory: {}", manifest_dir.display()))?
2172 .to_string();
2173 if base.ends_with('/') {
2174 base.pop();
2175 }
2176 Ok(base)
2177}
2178
2179#[cfg(test)]
2180mod tests {
2181 use super::*;
2182 use clap::{Args, Parser};
2183 use manifest::{ChunkedArchive, ComponentManifest, SingleArchive};
2184 use reth_chainspec::{HOLESKY, MAINNET};
2185 use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
2186 use tempfile::tempdir;
2187
2188 #[derive(Parser)]
2189 struct CommandParser<T: Args> {
2190 #[command(flatten)]
2191 args: T,
2192 }
2193
2194 fn manifest_with_archive_only_components() -> SnapshotManifest {
2195 let mut components = BTreeMap::new();
2196 components.insert(
2197 SnapshotComponentType::TransactionSenders.key().to_string(),
2198 ComponentManifest::Single(SingleArchive {
2199 file: "transaction_senders.tar.zst".to_string(),
2200 size: 1,
2201 blake3: None,
2202 output_files: vec![],
2203 }),
2204 );
2205 components.insert(
2206 SnapshotComponentType::RocksdbIndices.key().to_string(),
2207 ComponentManifest::Single(SingleArchive {
2208 file: "rocksdb_indices.tar.zst".to_string(),
2209 size: 1,
2210 blake3: None,
2211 output_files: vec![],
2212 }),
2213 );
2214 SnapshotManifest {
2215 block: 0,
2216 chain_id: 1,
2217 storage_version: 2,
2218 timestamp: 0,
2219 base_url: Some("https://example.com".to_string()),
2220 reth_version: None,
2221 components,
2222 }
2223 }
2224
2225 fn manifest_with_modular_components(block: u64) -> SnapshotManifest {
2226 let mut components = BTreeMap::new();
2227 for ty in [
2228 SnapshotComponentType::State,
2229 SnapshotComponentType::Headers,
2230 SnapshotComponentType::Transactions,
2231 SnapshotComponentType::Receipts,
2232 SnapshotComponentType::AccountChangesets,
2233 SnapshotComponentType::StorageChangesets,
2234 ] {
2235 let component = if ty.is_chunked() {
2236 ComponentManifest::Chunked(ChunkedArchive {
2237 blocks_per_file: 1_000_000,
2238 total_blocks: block + 1,
2239 chunk_sizes: vec![1],
2240 chunk_output_files: vec![vec![]],
2241 })
2242 } else {
2243 ComponentManifest::Single(SingleArchive {
2244 file: format!("{}.tar.zst", ty.key()),
2245 size: 1,
2246 blake3: None,
2247 output_files: vec![],
2248 })
2249 };
2250
2251 components.insert(ty.key().to_string(), component);
2252 }
2253
2254 SnapshotManifest {
2255 block,
2256 chain_id: 1,
2257 storage_version: 2,
2258 timestamp: 0,
2259 base_url: Some("https://example.com".to_string()),
2260 reth_version: None,
2261 components,
2262 }
2263 }
2264
2265 #[test]
2266 fn test_download_defaults_builder() {
2267 let defaults = DownloadDefaults::default()
2268 .with_snapshot("https://example.com/snapshots (example)")
2269 .with_base_url("https://example.com");
2270
2271 assert_eq!(defaults.default_base_url, "https://example.com");
2272 assert_eq!(defaults.available_snapshots.len(), 3); }
2274
2275 #[test]
2276 fn test_download_defaults_replace_snapshots() {
2277 let defaults = DownloadDefaults::default().with_snapshots(vec![
2278 Cow::Borrowed("https://custom1.com"),
2279 Cow::Borrowed("https://custom2.com"),
2280 ]);
2281
2282 assert_eq!(defaults.available_snapshots.len(), 2);
2283 assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
2284 }
2285
2286 #[test]
2287 fn test_long_help_generation() {
2288 let defaults = DownloadDefaults::default();
2289 let help = defaults.long_help();
2290
2291 assert!(help.contains("Available snapshot sources:"));
2292 assert!(help.contains("snapshots.reth.rs"));
2293 assert!(help.contains("publicnode.com"));
2294 assert!(help.contains("file://"));
2295 }
2296
2297 #[test]
2298 fn test_long_help_override() {
2299 let custom_help = "This is custom help text for downloading snapshots.";
2300 let defaults = DownloadDefaults::default().with_long_help(custom_help);
2301
2302 let help = defaults.long_help();
2303 assert_eq!(help, custom_help);
2304 assert!(!help.contains("Available snapshot sources:"));
2305 }
2306
2307 #[test]
2308 fn test_builder_chaining() {
2309 let defaults = DownloadDefaults::default()
2310 .with_base_url("https://custom.example.com")
2311 .with_snapshot("https://snapshot1.com")
2312 .with_snapshot("https://snapshot2.com")
2313 .with_long_help("Custom help for snapshots");
2314
2315 assert_eq!(defaults.default_base_url, "https://custom.example.com");
2316 assert_eq!(defaults.available_snapshots.len(), 4); assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
2318 }
2319
2320 #[test]
2321 fn test_download_resumable_defaults_to_true() {
2322 let args =
2323 CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from(["reth"]).args;
2324
2325 assert!(args.resumable);
2326 }
2327
2328 #[test]
2329 fn test_download_resumable_implicit_true() {
2330 let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2331 "reth",
2332 "--resumable",
2333 ])
2334 .args;
2335
2336 assert!(args.resumable);
2337 }
2338
2339 #[test]
2340 fn test_download_resumable_explicit_false() {
2341 let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2342 "reth",
2343 "--resumable=false",
2344 ])
2345 .args;
2346
2347 assert!(!args.resumable);
2348 }
2349
2350 #[test]
2351 fn resolve_components_supports_since_and_distance_flags() {
2352 let manifest = manifest_with_modular_components(20_000_000);
2353 let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2354 "reth",
2355 "--with-txs-since",
2356 "15537394",
2357 "--with-receipts-distance",
2358 "10064",
2359 "--with-state-history-since",
2360 "15537394",
2361 ])
2362 .args;
2363
2364 let resolved = args.resolve_components(&manifest).unwrap();
2365
2366 assert_eq!(resolved.preset, None);
2367 assert_eq!(
2368 resolved.selections.get(&SnapshotComponentType::Transactions),
2369 Some(&ComponentSelection::Since(15_537_394))
2370 );
2371 assert_eq!(
2372 resolved.selections.get(&SnapshotComponentType::Receipts),
2373 Some(&ComponentSelection::Distance(10_064))
2374 );
2375 assert_eq!(
2376 resolved.selections.get(&SnapshotComponentType::AccountChangesets),
2377 Some(&ComponentSelection::Since(15_537_394))
2378 );
2379 assert_eq!(
2380 resolved.selections.get(&SnapshotComponentType::StorageChangesets),
2381 Some(&ComponentSelection::Since(15_537_394))
2382 );
2383 }
2384
2385 #[test]
2386 fn resolve_components_rejects_since_after_snapshot() {
2387 let manifest = manifest_with_modular_components(20_000_000);
2388 let args = CommandParser::<DownloadCommand<EthereumChainSpecParser>>::parse_from([
2389 "reth",
2390 "--with-txs-since",
2391 "20000001",
2392 ])
2393 .args;
2394
2395 let err = args.resolve_components(&manifest).unwrap_err();
2396 assert!(err
2397 .to_string()
2398 .contains("--with-txs-since 20000001 is beyond the snapshot block 20000000"));
2399 }
2400
2401 #[test]
2402 fn test_compression_format_detection() {
2403 assert!(matches!(
2404 CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
2405 Ok(CompressionFormat::Lz4)
2406 ));
2407 assert!(matches!(
2408 CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
2409 Ok(CompressionFormat::Zstd)
2410 ));
2411 assert!(matches!(
2412 CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
2413 Ok(CompressionFormat::Lz4)
2414 ));
2415 assert!(matches!(
2416 CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
2417 Ok(CompressionFormat::Zstd)
2418 ));
2419 assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
2420 }
2421
2422 #[test]
2423 fn inject_archive_only_components_for_archive_selection() {
2424 let manifest = manifest_with_archive_only_components();
2425 let mut selections = BTreeMap::new();
2426 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
2427 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
2428 selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
2429 selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
2430
2431 inject_archive_only_components(&mut selections, &manifest, true);
2432
2433 assert_eq!(
2434 selections.get(&SnapshotComponentType::TransactionSenders),
2435 Some(&ComponentSelection::All)
2436 );
2437 assert_eq!(
2438 selections.get(&SnapshotComponentType::RocksdbIndices),
2439 Some(&ComponentSelection::All)
2440 );
2441 }
2442
2443 #[test]
2444 fn inject_archive_only_components_without_rocksdb() {
2445 let manifest = manifest_with_archive_only_components();
2446 let mut selections = BTreeMap::new();
2447 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
2448 selections.insert(SnapshotComponentType::Receipts, ComponentSelection::All);
2449 selections.insert(SnapshotComponentType::AccountChangesets, ComponentSelection::All);
2450 selections.insert(SnapshotComponentType::StorageChangesets, ComponentSelection::All);
2451
2452 inject_archive_only_components(&mut selections, &manifest, false);
2453
2454 assert_eq!(
2455 selections.get(&SnapshotComponentType::TransactionSenders),
2456 Some(&ComponentSelection::All)
2457 );
2458 assert_eq!(selections.get(&SnapshotComponentType::RocksdbIndices), None);
2459 }
2460
2461 #[test]
2462 fn should_reset_index_stage_checkpoints_without_rocksdb_indices() {
2463 let mut selections = BTreeMap::new();
2464 selections.insert(SnapshotComponentType::Transactions, ComponentSelection::All);
2465 assert!(should_reset_index_stage_checkpoints(&selections));
2466
2467 selections.insert(SnapshotComponentType::RocksdbIndices, ComponentSelection::All);
2468 assert!(!should_reset_index_stage_checkpoints(&selections));
2469 }
2470
2471 #[test]
2472 fn summarize_download_startup_counts_reusable_and_needs_download() {
2473 let dir = tempdir().unwrap();
2474 let target_dir = dir.path();
2475 let ok_file = target_dir.join("ok.bin");
2476 std::fs::write(&ok_file, vec![1_u8; 4]).unwrap();
2477 let ok_hash = file_blake3_hex(&ok_file).unwrap();
2478
2479 let planned = vec![
2480 PlannedArchive {
2481 ty: SnapshotComponentType::State,
2482 component: "State".to_string(),
2483 archive: ArchiveDescriptor {
2484 url: "https://example.com/ok.tar.zst".to_string(),
2485 file_name: "ok.tar.zst".to_string(),
2486 size: 10,
2487 blake3: None,
2488 output_files: vec![OutputFileChecksum {
2489 path: "ok.bin".to_string(),
2490 size: 4,
2491 blake3: ok_hash,
2492 }],
2493 },
2494 },
2495 PlannedArchive {
2496 ty: SnapshotComponentType::Headers,
2497 component: "Headers".to_string(),
2498 archive: ArchiveDescriptor {
2499 url: "https://example.com/missing.tar.zst".to_string(),
2500 file_name: "missing.tar.zst".to_string(),
2501 size: 10,
2502 blake3: None,
2503 output_files: vec![OutputFileChecksum {
2504 path: "missing.bin".to_string(),
2505 size: 1,
2506 blake3: "deadbeef".to_string(),
2507 }],
2508 },
2509 },
2510 PlannedArchive {
2511 ty: SnapshotComponentType::Transactions,
2512 component: "Transactions".to_string(),
2513 archive: ArchiveDescriptor {
2514 url: "https://example.com/bad-size.tar.zst".to_string(),
2515 file_name: "bad-size.tar.zst".to_string(),
2516 size: 10,
2517 blake3: None,
2518 output_files: vec![],
2519 },
2520 },
2521 ];
2522
2523 let summary = summarize_download_startup(&planned, target_dir, None).unwrap();
2524 assert_eq!(summary.reusable, 1);
2525 assert_eq!(summary.needs_download, 2);
2526 }
2527
2528 #[test]
2529 fn archive_priority_prefers_state_then_rocksdb() {
2530 let mut planned = [
2531 PlannedArchive {
2532 ty: SnapshotComponentType::Transactions,
2533 component: "Transactions".to_string(),
2534 archive: ArchiveDescriptor {
2535 url: "u3".to_string(),
2536 file_name: "t.tar.zst".to_string(),
2537 size: 1,
2538 blake3: None,
2539 output_files: vec![OutputFileChecksum {
2540 path: "a".to_string(),
2541 size: 1,
2542 blake3: "x".to_string(),
2543 }],
2544 },
2545 },
2546 PlannedArchive {
2547 ty: SnapshotComponentType::RocksdbIndices,
2548 component: "RocksDB Indices".to_string(),
2549 archive: ArchiveDescriptor {
2550 url: "u2".to_string(),
2551 file_name: "rocksdb_indices.tar.zst".to_string(),
2552 size: 1,
2553 blake3: None,
2554 output_files: vec![OutputFileChecksum {
2555 path: "b".to_string(),
2556 size: 1,
2557 blake3: "y".to_string(),
2558 }],
2559 },
2560 },
2561 PlannedArchive {
2562 ty: SnapshotComponentType::State,
2563 component: "State (mdbx)".to_string(),
2564 archive: ArchiveDescriptor {
2565 url: "u1".to_string(),
2566 file_name: "state.tar.zst".to_string(),
2567 size: 1,
2568 blake3: None,
2569 output_files: vec![OutputFileChecksum {
2570 path: "c".to_string(),
2571 size: 1,
2572 blake3: "z".to_string(),
2573 }],
2574 },
2575 },
2576 ];
2577
2578 planned.sort_by(|a, b| {
2579 archive_priority_rank(a.ty)
2580 .cmp(&archive_priority_rank(b.ty))
2581 .then_with(|| a.component.cmp(&b.component))
2582 .then_with(|| a.archive.file_name.cmp(&b.archive.file_name))
2583 });
2584
2585 assert_eq!(planned[0].ty, SnapshotComponentType::State);
2586 assert_eq!(planned[1].ty, SnapshotComponentType::RocksdbIndices);
2587 assert_eq!(planned[2].ty, SnapshotComponentType::Transactions);
2588 }
2589
2590 #[test]
2591 fn startup_node_command_omits_default_chain_arg() {
2592 let command =
2593 startup_node_command_for_binary::<EthereumChainSpecParser>("reth", MAINNET.as_ref());
2594
2595 assert_eq!(command, "reth node");
2596 }
2597
2598 #[test]
2599 fn startup_node_command_includes_non_default_chain_arg() {
2600 let command =
2601 startup_node_command_for_binary::<EthereumChainSpecParser>("reth", HOLESKY.as_ref());
2602
2603 assert_eq!(command, "reth node --chain holesky");
2604 }
2605
2606 #[test]
2607 fn startup_node_command_uses_running_binary_name() {
2608 let command =
2609 startup_node_command_for_binary::<EthereumChainSpecParser>("tempo", HOLESKY.as_ref());
2610
2611 assert_eq!(command, "tempo node --chain holesky");
2612 }
2613}