Skip to main content

reth_cli_commands/download/
manifest.rs

1use blake3::Hasher;
2use eyre::Result;
3use rayon::prelude::*;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::{
7    collections::BTreeMap,
8    io::Read,
9    path::{Path, PathBuf},
10};
11use tracing::info;
12
13/// A snapshot manifest describes available components for a snapshot at a given block height.
14///
15/// Each component is either a single archive (state) or a set of chunked archives (static file
16/// segments like transactions, receipts, etc). Chunked components use `blocks_per_file` to
17/// define the block range per archive, matching reth's static file segment boundaries.
18///
19/// Archive naming convention for chunked components:
20///   `{component}-{start_block}-{end_block}.tar.zst`
21///
22/// For example with `blocks_per_file: 500000` and `total_blocks: 1500000`:
23///   `transactions-0-499999.tar.zst`
24///   `transactions-500000-999999.tar.zst`
25///   `transactions-1000000-1499999.tar.zst`
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SnapshotManifest {
28    /// Block number this snapshot was taken at.
29    pub block: u64,
30    /// Chain ID.
31    pub chain_id: u64,
32    /// Storage version (1 = legacy, 2 = current).
33    pub storage_version: u64,
34    /// Timestamp when the snapshot was created (unix seconds).
35    pub timestamp: u64,
36    /// Base URL for archive downloads. Component archive URLs are relative to this.
37    ///
38    /// When omitted, downloaders should derive the base URL from the manifest URL.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub base_url: Option<String>,
41    /// Reth version that produced this snapshot.
42    #[serde(default, skip_serializing_if = "Option::is_none")]
43    pub reth_version: Option<String>,
44    /// Available snapshot components.
45    pub components: BTreeMap<String, ComponentManifest>,
46}
47
48/// Manifest entry for a single snapshot component.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50#[serde(untagged)]
51pub enum ComponentManifest {
52    /// A single archive file (used for state).
53    Single(SingleArchive),
54    /// A set of chunked archives split by block range (used for static file segments).
55    Chunked(ChunkedArchive),
56}
57
58/// A single, non-chunked archive.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct SingleArchive {
61    /// Archive file name (relative to base_url).
62    pub file: String,
63    /// Compressed archive size in bytes.
64    pub size: u64,
65    /// Optional BLAKE3 checksum of the compressed archive.
66    #[serde(default, skip_serializing_if = "Option::is_none")]
67    pub blake3: Option<String>,
68    /// Expected extracted plain files for this archive.
69    ///
70    /// This is the authoritative integrity source for the modular download path.
71    #[serde(default)]
72    pub output_files: Vec<OutputFileChecksum>,
73}
74
75/// A chunked archive set where each chunk covers a fixed block range.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct ChunkedArchive {
78    /// Number of blocks per archive file. Matches reth's `blocks_per_file` config.
79    pub blocks_per_file: u64,
80    /// Total number of blocks covered by this component.
81    pub total_blocks: u64,
82    /// Compressed size of each chunk in bytes, ordered from first to last.
83    /// Computed during manifest generation. Older manifests may omit this.
84    #[serde(default)]
85    pub chunk_sizes: Vec<u64>,
86    /// Expected extracted plain files per chunk, ordered from first to last.
87    ///
88    /// This is the authoritative integrity source for the modular download path.
89    #[serde(default)]
90    pub chunk_output_files: Vec<Vec<OutputFileChecksum>>,
91}
92
93/// Expected metadata for one extracted plain file.
94#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
95pub struct OutputFileChecksum {
96    /// Relative path under the target datadir where this file is extracted.
97    pub path: String,
98    /// Plain file size in bytes.
99    pub size: u64,
100    /// BLAKE3 checksum of the plain file contents.
101    pub blake3: String,
102}
103
104/// A single archive with concrete URL and optional integrity metadata.
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct ArchiveDescriptor {
107    pub url: String,
108    pub file_name: String,
109    pub size: u64,
110    pub blake3: Option<String>,
111    pub output_files: Vec<OutputFileChecksum>,
112}
113
114/// How much of a component to download.
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum ComponentSelection {
117    /// Download all chunks (full archive).
118    All,
119    /// Download only the most recent chunks covering at least `distance` blocks.
120    /// Maps to `PruneMode::Distance(distance)` in the generated config.
121    Distance(u64),
122    /// Don't download this component at all.
123    /// Maps to `PruneMode::Full` for tx-based segments, or a minimal distance for others.
124    None,
125}
126
127impl std::fmt::Display for ComponentSelection {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        match self {
130            Self::All => write!(f, "All"),
131            Self::Distance(d) => write!(f, "Last {d} blocks"),
132            Self::None => write!(f, "None"),
133        }
134    }
135}
136
137/// The types of snapshot components that can be downloaded.
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
139pub enum SnapshotComponentType {
140    /// State database (mdbx). Always required. Single archive.
141    State,
142    /// Block headers static files. Chunked.
143    Headers,
144    /// Transaction static files. Chunked.
145    Transactions,
146    /// Transaction sender static files. Chunked. Only downloaded for archive nodes.
147    TransactionSenders,
148    /// Receipt static files. Chunked.
149    Receipts,
150    /// Account changeset static files. Chunked.
151    AccountChangesets,
152    /// Storage changeset static files. Chunked.
153    StorageChangesets,
154    /// RocksDB index files. Single archive. Optional and archive-only.
155    RocksdbIndices,
156}
157
158impl SnapshotComponentType {
159    /// All component types in display order.
160    pub const ALL: [Self; 8] = [
161        Self::State,
162        Self::Headers,
163        Self::Transactions,
164        Self::TransactionSenders,
165        Self::Receipts,
166        Self::AccountChangesets,
167        Self::StorageChangesets,
168        Self::RocksdbIndices,
169    ];
170
171    /// The string key used in the manifest JSON.
172    pub const fn key(&self) -> &'static str {
173        match self {
174            Self::State => "state",
175            Self::Headers => "headers",
176            Self::Transactions => "transactions",
177            Self::TransactionSenders => "transaction_senders",
178            Self::Receipts => "receipts",
179            Self::AccountChangesets => "account_changesets",
180            Self::StorageChangesets => "storage_changesets",
181            Self::RocksdbIndices => "rocksdb_indices",
182        }
183    }
184
185    /// Human-readable display name.
186    pub const fn display_name(&self) -> &'static str {
187        match self {
188            Self::State => "State (mdbx)",
189            Self::Headers => "Headers",
190            Self::Transactions => "Transactions",
191            Self::TransactionSenders => "Transaction Senders",
192            Self::Receipts => "Receipts",
193            Self::AccountChangesets => "Account Changesets",
194            Self::StorageChangesets => "Storage Changesets",
195            Self::RocksdbIndices => "RocksDB Indices",
196        }
197    }
198
199    /// Whether this component is always required for a functional node.
200    ///
201    /// State and headers are always needed — a node cannot operate without block headers.
202    pub const fn is_required(&self) -> bool {
203        matches!(self, Self::State | Self::Headers)
204    }
205
206    /// Returns the default selection for this component in the minimal download preset.
207    ///
208    /// Matches the `--minimal` prune configuration:
209    /// - State/Headers: always All (required)
210    /// - Transactions/Changesets: Distance(10_064) (`MINIMUM_UNWIND_SAFE_DISTANCE`)
211    /// - Receipts: Distance(64) (`MINIMUM_DISTANCE`)
212    /// - TransactionSenders: None (only downloaded for archive nodes)
213    /// - RocksdbIndices: None (only downloaded for archive nodes)
214    ///
215    /// `tx_lookup` and `sender_recovery` are always pruned full regardless.
216    pub const fn minimal_selection(&self) -> ComponentSelection {
217        match self {
218            Self::State | Self::Headers => ComponentSelection::All,
219            Self::Transactions | Self::AccountChangesets | Self::StorageChangesets => {
220                ComponentSelection::Distance(10_064)
221            }
222            Self::Receipts => ComponentSelection::Distance(64),
223            Self::TransactionSenders => ComponentSelection::None,
224            Self::RocksdbIndices => ComponentSelection::None,
225        }
226    }
227
228    /// Whether this component type uses chunked archives.
229    pub const fn is_chunked(&self) -> bool {
230        !matches!(self, Self::State | Self::RocksdbIndices)
231    }
232}
233
234impl SnapshotManifest {
235    fn base_url_or_empty(&self) -> &str {
236        self.base_url.as_deref().unwrap_or("")
237    }
238
239    /// Look up a component by type.
240    pub fn component(&self, ty: SnapshotComponentType) -> Option<&ComponentManifest> {
241        self.components.get(ty.key())
242    }
243
244    /// Returns the total download size for the given set of component types.
245    pub fn total_size(&self, types: &[SnapshotComponentType]) -> u64 {
246        types.iter().filter_map(|ty| self.component(*ty).map(|c| c.total_size())).sum()
247    }
248
249    /// Returns all archive URLs for a given component type.
250    pub fn archive_urls(&self, ty: SnapshotComponentType) -> Vec<String> {
251        let Some(component) = self.component(ty) else {
252            return vec![];
253        };
254
255        match component {
256            ComponentManifest::Single(single) => {
257                vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
258            }
259            ComponentManifest::Chunked(chunked) => {
260                let key = ty.key();
261                let num_chunks = chunked.num_chunks();
262                (0..num_chunks)
263                    .map(|i| {
264                        let start = i * chunked.blocks_per_file;
265                        let end = (i + 1) * chunked.blocks_per_file - 1;
266                        format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
267                    })
268                    .collect()
269            }
270        }
271    }
272
273    /// Returns archive URLs for a component, limited to chunks covering at least `distance`
274    /// blocks from the tip. Returns all URLs if distance is `None` (All mode).
275    pub fn archive_urls_for_distance(
276        &self,
277        ty: SnapshotComponentType,
278        distance: Option<u64>,
279    ) -> Vec<String> {
280        let Some(component) = self.component(ty) else {
281            return vec![];
282        };
283
284        match component {
285            ComponentManifest::Single(single) => {
286                vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
287            }
288            ComponentManifest::Chunked(chunked) => {
289                let key = ty.key();
290                let num_chunks = chunked.num_chunks();
291
292                // Calculate which chunks to include
293                let start_chunk = match distance {
294                    Some(dist) => {
295                        // We need chunks covering the last `dist` blocks
296                        let needed_blocks = dist.min(chunked.total_blocks);
297                        let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
298                        num_chunks.saturating_sub(needed_chunks)
299                    }
300                    None => 0, // All chunks
301                };
302
303                (start_chunk..num_chunks)
304                    .map(|i| {
305                        let start = i * chunked.blocks_per_file;
306                        let end = (i + 1) * chunked.blocks_per_file - 1;
307                        format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
308                    })
309                    .collect()
310            }
311        }
312    }
313
314    /// Returns concrete archive descriptors for a component, optionally limited to distance.
315    pub fn archive_descriptors_for_distance(
316        &self,
317        ty: SnapshotComponentType,
318        distance: Option<u64>,
319    ) -> Vec<ArchiveDescriptor> {
320        let Some(component) = self.component(ty) else {
321            return vec![];
322        };
323
324        match component {
325            ComponentManifest::Single(single) => {
326                vec![ArchiveDescriptor {
327                    url: format!("{}/{}", self.base_url_or_empty(), single.file),
328                    file_name: single.file.clone(),
329                    size: single.size,
330                    blake3: single.blake3.clone(),
331                    output_files: single.output_files.clone(),
332                }]
333            }
334            ComponentManifest::Chunked(chunked) => {
335                let key = ty.key();
336                let num_chunks = chunked.num_chunks();
337
338                let start_chunk = match distance {
339                    Some(dist) => {
340                        let needed_blocks = dist.min(chunked.total_blocks);
341                        let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
342                        num_chunks.saturating_sub(needed_chunks)
343                    }
344                    None => 0,
345                };
346
347                (start_chunk..num_chunks)
348                    .map(|i| {
349                        let start = i * chunked.blocks_per_file;
350                        let end = (i + 1) * chunked.blocks_per_file - 1;
351                        let file_name = format!("{key}-{start}-{end}.tar.zst");
352                        let size = chunked.chunk_sizes.get(i as usize).copied().unwrap_or_default();
353                        let output_files =
354                            chunked.chunk_output_files.get(i as usize).cloned().unwrap_or_default();
355
356                        ArchiveDescriptor {
357                            url: format!("{}/{}", self.base_url_or_empty(), file_name),
358                            file_name,
359                            size,
360                            blake3: None,
361                            output_files,
362                        }
363                    })
364                    .collect()
365            }
366        }
367    }
368
369    /// Returns the exact download size for a component given a distance selection.
370    ///
371    /// For single archives, returns the full size. For chunked archives, sums the
372    /// sizes of the selected tail chunks from [`ChunkedArchive::chunk_sizes`].
373    pub fn size_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
374        let Some(component) = self.component(ty) else {
375            return 0;
376        };
377        match component {
378            ComponentManifest::Single(s) => s.size,
379            ComponentManifest::Chunked(chunked) => {
380                if chunked.chunk_sizes.is_empty() {
381                    return 0;
382                }
383                let num_chunks = chunked.chunk_sizes.len() as u64;
384                let start_chunk = match distance {
385                    Some(dist) => {
386                        let needed = dist.min(chunked.total_blocks);
387                        let needed_chunks = needed.div_ceil(chunked.blocks_per_file);
388                        num_chunks.saturating_sub(needed_chunks)
389                    }
390                    None => 0,
391                };
392                chunked.chunk_sizes[start_chunk as usize..].iter().sum()
393            }
394        }
395    }
396
397    /// Returns the number of chunks that would be downloaded for a given distance.
398    pub fn chunks_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
399        let Some(ComponentManifest::Chunked(chunked)) = self.component(ty) else {
400            return if self.component(ty).is_some() { 1 } else { 0 };
401        };
402        match distance {
403            Some(dist) => {
404                let needed = dist.min(chunked.total_blocks);
405                needed.div_ceil(chunked.blocks_per_file)
406            }
407            None => chunked.num_chunks(),
408        }
409    }
410}
411
412impl ComponentManifest {
413    /// Returns the total download size for this component.
414    pub fn total_size(&self) -> u64 {
415        match self {
416            Self::Single(s) => s.size,
417            Self::Chunked(c) => c.chunk_sizes.iter().sum(),
418        }
419    }
420}
421
422impl ChunkedArchive {
423    /// Returns the number of chunks.
424    pub fn num_chunks(&self) -> u64 {
425        self.total_blocks.div_ceil(self.blocks_per_file)
426    }
427}
428
429/// Fetch a snapshot manifest from a URL.
430pub async fn fetch_manifest(manifest_url: &str) -> Result<SnapshotManifest> {
431    let client = Client::new();
432    let manifest: SnapshotManifest =
433        client.get(manifest_url).send().await?.error_for_status()?.json().await?;
434    Ok(manifest)
435}
436
437/// Package chunk archives from a source datadir and generate a manifest.
438pub fn generate_manifest(
439    source_datadir: &Path,
440    output_dir: &Path,
441    base_url: Option<&str>,
442    block: u64,
443    chain_id: u64,
444    blocks_per_file: u64,
445) -> Result<SnapshotManifest> {
446    std::fs::create_dir_all(output_dir)?;
447
448    let mut components = BTreeMap::new();
449
450    // Package chunked static-file components.
451    for ty in &[
452        SnapshotComponentType::Headers,
453        SnapshotComponentType::Transactions,
454        SnapshotComponentType::TransactionSenders,
455        SnapshotComponentType::Receipts,
456        SnapshotComponentType::AccountChangesets,
457        SnapshotComponentType::StorageChangesets,
458    ] {
459        let key = ty.key();
460        let num_chunks = block.div_ceil(blocks_per_file);
461        let mut planned_chunks = Vec::with_capacity(num_chunks as usize);
462        let mut found_any = false;
463
464        for i in 0..num_chunks {
465            let start = i * blocks_per_file;
466            let end = (i + 1) * blocks_per_file - 1;
467            let source_files = source_files_for_chunk(source_datadir, *ty, start, end)?;
468
469            if source_files.is_empty() {
470                if found_any {
471                    eyre::bail!("Missing source files for {} chunk {}-{}", key, start, end);
472                }
473                continue;
474            }
475
476            found_any = true;
477            planned_chunks.push(PlannedChunk {
478                chunk_idx: i,
479                archive_path: output_dir.join(chunk_filename(key, start, end)),
480                source_files,
481            });
482        }
483
484        if found_any {
485            let mut packaged_chunks = planned_chunks
486                .into_par_iter()
487                .map(|planned| -> Result<PackagedChunk> {
488                    let output_files =
489                        write_chunk_archive(&planned.archive_path, &planned.source_files)?;
490                    let size = std::fs::metadata(&planned.archive_path)?.len();
491                    Ok(PackagedChunk { chunk_idx: planned.chunk_idx, size, output_files })
492                })
493                .collect::<Vec<_>>()
494                .into_iter()
495                .collect::<Result<Vec<_>>>()?;
496
497            packaged_chunks.sort_unstable_by_key(|chunk| chunk.chunk_idx);
498            let chunk_sizes = packaged_chunks.iter().map(|chunk| chunk.size).collect::<Vec<_>>();
499            let chunk_output_files =
500                packaged_chunks.into_iter().map(|chunk| chunk.output_files).collect::<Vec<_>>();
501            let total_size: u64 = chunk_sizes.iter().sum();
502            info!(target: "reth::cli",
503                component = ty.display_name(),
504                chunks = chunk_sizes.len(),
505                total_blocks = block,
506                size = %super::DownloadProgress::format_size(total_size),
507                "Found chunked component"
508            );
509            components.insert(
510                key.to_string(),
511                ComponentManifest::Chunked(ChunkedArchive {
512                    blocks_per_file,
513                    total_blocks: block,
514                    chunk_sizes,
515                    chunk_output_files,
516                }),
517            );
518        }
519    }
520
521    let (state_size, state_output_files) = package_single_component(
522        output_dir,
523        "state.tar.zst",
524        &state_source_files(source_datadir)?,
525    )?;
526    components.insert(
527        SnapshotComponentType::State.key().to_string(),
528        ComponentManifest::Single(SingleArchive {
529            file: "state.tar.zst".to_string(),
530            size: state_size,
531            blake3: None,
532            output_files: state_output_files,
533        }),
534    );
535
536    let rocksdb_files = rocksdb_source_files(source_datadir)?;
537    if !rocksdb_files.is_empty() {
538        let (rocksdb_size, rocksdb_output_files) =
539            package_single_component(output_dir, "rocksdb_indices.tar.zst", &rocksdb_files)?;
540        components.insert(
541            SnapshotComponentType::RocksdbIndices.key().to_string(),
542            ComponentManifest::Single(SingleArchive {
543                file: "rocksdb_indices.tar.zst".to_string(),
544                size: rocksdb_size,
545                blake3: None,
546                output_files: rocksdb_output_files,
547            }),
548        );
549    }
550
551    let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?.as_secs();
552
553    Ok(SnapshotManifest {
554        block,
555        chain_id,
556        storage_version: 2,
557        timestamp,
558        base_url: base_url.map(str::to_owned),
559        reth_version: Some(reth_node_core::version::version_metadata().short_version.to_string()),
560        components,
561    })
562}
563
564/// Resolves an archive file path from a component key and naming convention.
565pub fn chunk_filename(component_key: &str, start: u64, end: u64) -> String {
566    format!("{component_key}-{start}-{end}.tar.zst")
567}
568
569#[derive(Debug)]
570struct PlannedChunk {
571    chunk_idx: u64,
572    archive_path: PathBuf,
573    source_files: Vec<PathBuf>,
574}
575
576#[derive(Debug)]
577struct PackagedChunk {
578    chunk_idx: u64,
579    size: u64,
580    output_files: Vec<OutputFileChecksum>,
581}
582
583#[derive(Debug)]
584struct PlannedFile {
585    source_path: PathBuf,
586    relative_path: PathBuf,
587}
588
589fn source_files_for_chunk(
590    source_datadir: &Path,
591    component: SnapshotComponentType,
592    start: u64,
593    end: u64,
594) -> Result<Vec<PathBuf>> {
595    let Some(segment_name) = static_segment_name(component) else {
596        return Ok(Vec::new());
597    };
598
599    let static_files_dir = source_datadir.join("static_files");
600    let static_files_dir =
601        if static_files_dir.exists() { static_files_dir } else { source_datadir.to_path_buf() };
602    let prefix = format!("static_file_{segment_name}_{start}_{end}");
603
604    let mut files = Vec::new();
605    for entry in std::fs::read_dir(&static_files_dir)? {
606        let entry = entry?;
607        if !entry.file_type()?.is_file() {
608            continue;
609        }
610        if entry.file_name().to_string_lossy().starts_with(&prefix) {
611            files.push(entry.path());
612        }
613    }
614
615    files.sort_unstable();
616    Ok(files)
617}
618
619fn static_segment_name(component: SnapshotComponentType) -> Option<&'static str> {
620    match component {
621        SnapshotComponentType::Headers => Some("headers"),
622        SnapshotComponentType::Transactions => Some("transactions"),
623        SnapshotComponentType::TransactionSenders => Some("transaction-senders"),
624        SnapshotComponentType::Receipts => Some("receipts"),
625        SnapshotComponentType::AccountChangesets => Some("account-change-sets"),
626        SnapshotComponentType::StorageChangesets => Some("storage-change-sets"),
627        SnapshotComponentType::State | SnapshotComponentType::RocksdbIndices => None,
628    }
629}
630
631fn state_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
632    let db_dir = source_datadir.join("db");
633    if db_dir.exists() {
634        return collect_files_recursive(&db_dir, Path::new("db"));
635    }
636
637    if looks_like_db_dir(source_datadir)? {
638        return collect_files_recursive(source_datadir, Path::new("db"));
639    }
640
641    eyre::bail!("Could not find source state DB directory under {}", source_datadir.display())
642}
643
644fn rocksdb_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
645    let rocksdb_dir = source_datadir.join("rocksdb");
646    if !rocksdb_dir.exists() {
647        return Ok(Vec::new());
648    }
649
650    collect_files_recursive(&rocksdb_dir, Path::new("rocksdb"))
651}
652
653fn looks_like_db_dir(path: &Path) -> Result<bool> {
654    let entries = match std::fs::read_dir(path) {
655        Ok(entries) => entries,
656        Err(_) => return Ok(false),
657    };
658
659    for entry in entries {
660        let entry = entry?;
661        if !entry.file_type()?.is_file() {
662            continue;
663        }
664        let name = entry.file_name();
665        let name = name.to_string_lossy();
666        if name == "mdbx.dat" || name == "lock.mdb" || name == "data.mdb" {
667            return Ok(true);
668        }
669    }
670
671    Ok(false)
672}
673
674fn collect_files_recursive(root: &Path, output_prefix: &Path) -> Result<Vec<PlannedFile>> {
675    let mut files = Vec::new();
676    collect_files_recursive_inner(root, root, output_prefix, &mut files)?;
677    files.sort_unstable_by(|a, b| a.relative_path.cmp(&b.relative_path));
678    Ok(files)
679}
680
681fn collect_files_recursive_inner(
682    root: &Path,
683    dir: &Path,
684    output_prefix: &Path,
685    files: &mut Vec<PlannedFile>,
686) -> Result<()> {
687    for entry in std::fs::read_dir(dir)? {
688        let entry = entry?;
689        let path = entry.path();
690        let file_type = entry.file_type()?;
691        if file_type.is_dir() {
692            collect_files_recursive_inner(root, &path, output_prefix, files)?;
693            continue;
694        }
695        if !file_type.is_file() {
696            continue;
697        }
698
699        let relative = path.strip_prefix(root)?.to_path_buf();
700        files.push(PlannedFile { source_path: path, relative_path: output_prefix.join(relative) });
701    }
702
703    Ok(())
704}
705
706fn package_single_component(
707    output_dir: &Path,
708    archive_file_name: &str,
709    files: &[PlannedFile],
710) -> Result<(u64, Vec<OutputFileChecksum>)> {
711    if files.is_empty() {
712        eyre::bail!("Cannot package empty single archive: {}", archive_file_name);
713    }
714
715    let archive_path = output_dir.join(archive_file_name);
716    let output_files = write_archive_from_planned_files(&archive_path, files)?;
717    let size = std::fs::metadata(&archive_path)?.len();
718    Ok((size, output_files))
719}
720
721fn write_chunk_archive(path: &Path, source_files: &[PathBuf]) -> Result<Vec<OutputFileChecksum>> {
722    let planned_files = source_files
723        .iter()
724        .map(|source_path| {
725            let file_name = source_path.file_name().ok_or_else(|| {
726                eyre::eyre!("Invalid source file path: {}", source_path.display())
727            })?;
728            Ok::<_, eyre::Error>(PlannedFile {
729                source_path: source_path.clone(),
730                relative_path: PathBuf::from("static_files").join(file_name),
731            })
732        })
733        .collect::<Result<Vec<_>>>()?;
734
735    write_archive_from_planned_files(path, &planned_files)
736}
737
738fn write_archive_from_planned_files(
739    path: &Path,
740    files: &[PlannedFile],
741) -> Result<Vec<OutputFileChecksum>> {
742    let file = std::fs::File::create(path)?;
743    let mut encoder = zstd::Encoder::new(file, 0)?;
744    // Emit standard zstd frames with checksums for compatibility with external
745    // tools such as `pzstd -d`.
746    encoder.include_checksum(true)?;
747    let mut builder = tar::Builder::new(encoder);
748
749    let mut output_files = Vec::with_capacity(files.len());
750    for planned in files {
751        let mut header = tar::Header::new_gnu();
752        header.set_size(std::fs::metadata(&planned.source_path)?.len());
753        header.set_mode(0o644);
754        header.set_cksum();
755
756        let source_file = std::fs::File::open(&planned.source_path)?;
757        let mut reader = HashingReader::new(source_file);
758        builder.append_data(&mut header, &planned.relative_path, &mut reader)?;
759
760        output_files.push(OutputFileChecksum {
761            path: planned.relative_path.to_string_lossy().to_string(),
762            size: reader.bytes_read,
763            blake3: reader.finalize(),
764        });
765    }
766
767    builder.finish()?;
768    let encoder = builder.into_inner()?;
769    encoder.finish()?;
770
771    Ok(output_files)
772}
773
774struct HashingReader<R> {
775    inner: R,
776    hasher: Hasher,
777    bytes_read: u64,
778}
779
780impl<R: Read> HashingReader<R> {
781    fn new(inner: R) -> Self {
782        Self { inner, hasher: Hasher::new(), bytes_read: 0 }
783    }
784
785    fn finalize(self) -> String {
786        self.hasher.finalize().to_hex().to_string()
787    }
788}
789
790impl<R: Read> Read for HashingReader<R> {
791    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
792        let n = self.inner.read(buf)?;
793        if n > 0 {
794            self.bytes_read += n as u64;
795            self.hasher.update(&buf[..n]);
796        }
797        Ok(n)
798    }
799}
800
801#[cfg(test)]
802mod tests {
803    use super::*;
804    use tempfile::tempdir;
805
806    fn test_manifest() -> SnapshotManifest {
807        let mut components = BTreeMap::new();
808        components.insert(
809            "state".to_string(),
810            ComponentManifest::Single(SingleArchive {
811                file: "state.tar.zst".to_string(),
812                size: 100,
813                blake3: None,
814                output_files: vec![],
815            }),
816        );
817        components.insert(
818            "transactions".to_string(),
819            ComponentManifest::Chunked(ChunkedArchive {
820                blocks_per_file: 500_000,
821                total_blocks: 1_500_000,
822                chunk_sizes: vec![80_000, 100_000, 120_000],
823                chunk_output_files: vec![vec![], vec![], vec![]],
824            }),
825        );
826        components.insert(
827            "headers".to_string(),
828            ComponentManifest::Chunked(ChunkedArchive {
829                blocks_per_file: 500_000,
830                total_blocks: 1_500_000,
831                chunk_sizes: vec![40_000, 50_000, 60_000],
832                chunk_output_files: vec![vec![], vec![], vec![]],
833            }),
834        );
835        SnapshotManifest {
836            block: 1_500_000,
837            chain_id: 1,
838            storage_version: 2,
839            timestamp: 0,
840            base_url: Some("https://example.com".to_string()),
841            reth_version: None,
842            components,
843        }
844    }
845
846    #[test]
847    fn archive_urls_for_distance_all() {
848        let m = test_manifest();
849        let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, None);
850        assert_eq!(urls.len(), 3);
851        assert_eq!(urls[0], "https://example.com/transactions-0-499999.tar.zst");
852        assert_eq!(urls[2], "https://example.com/transactions-1000000-1499999.tar.zst");
853    }
854
855    #[test]
856    fn archive_urls_for_distance_partial() {
857        let m = test_manifest();
858        // 600k blocks → needs 2 chunks (each 500k)
859        let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, Some(600_000));
860        assert_eq!(urls.len(), 2);
861        assert_eq!(urls[0], "https://example.com/transactions-500000-999999.tar.zst");
862        assert_eq!(urls[1], "https://example.com/transactions-1000000-1499999.tar.zst");
863    }
864
865    #[test]
866    fn archive_urls_for_distance_single_component() {
867        let m = test_manifest();
868        // Single archives always return one URL regardless of distance
869        let urls = m.archive_urls_for_distance(SnapshotComponentType::State, Some(100));
870        assert_eq!(urls.len(), 1);
871        assert_eq!(urls[0], "https://example.com/state.tar.zst");
872    }
873
874    #[test]
875    fn archive_urls_for_distance_rocksdb_indices_single_component() {
876        let mut components = BTreeMap::new();
877        components.insert(
878            "rocksdb_indices".to_string(),
879            ComponentManifest::Single(SingleArchive {
880                file: "rocksdb_indices.tar.zst".to_string(),
881                size: 777,
882                blake3: None,
883                output_files: vec![],
884            }),
885        );
886        let m = SnapshotManifest {
887            block: 1,
888            chain_id: 1,
889            storage_version: 2,
890            timestamp: 0,
891            base_url: Some("https://example.com".to_string()),
892            reth_version: None,
893            components,
894        };
895
896        let urls = m.archive_urls_for_distance(SnapshotComponentType::RocksdbIndices, Some(10));
897        assert_eq!(urls.len(), 1);
898        assert_eq!(urls[0], "https://example.com/rocksdb_indices.tar.zst");
899        assert_eq!(m.size_for_distance(SnapshotComponentType::RocksdbIndices, Some(10)), 777);
900    }
901
902    #[test]
903    fn archive_urls_for_distance_missing_component() {
904        let m = test_manifest();
905        let urls = m.archive_urls_for_distance(SnapshotComponentType::Receipts, None);
906        assert!(urls.is_empty());
907    }
908
909    #[test]
910    fn chunks_for_distance_all() {
911        let m = test_manifest();
912        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, None), 3);
913    }
914
915    #[test]
916    fn chunks_for_distance_partial() {
917        let m = test_manifest();
918        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(600_000)), 2);
919        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(100_000)), 1);
920    }
921
922    #[test]
923    fn chunks_for_distance_single() {
924        let m = test_manifest();
925        assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, None), 1);
926        assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, Some(100)), 1);
927    }
928
929    #[test]
930    fn chunks_for_distance_missing() {
931        let m = test_manifest();
932        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Receipts, None), 0);
933    }
934
935    #[test]
936    fn component_selection_display() {
937        assert_eq!(ComponentSelection::All.to_string(), "All");
938        assert_eq!(ComponentSelection::Distance(10_064).to_string(), "Last 10064 blocks");
939        assert_eq!(ComponentSelection::None.to_string(), "None");
940    }
941
942    #[test]
943    fn archive_urls_aligned_to_blocks_per_file() {
944        // When total_blocks is not aligned to blocks_per_file, chunk boundaries
945        // must still align to blocks_per_file (not total_blocks).
946        let mut components = BTreeMap::new();
947        components.insert(
948            "storage_changesets".to_string(),
949            ComponentManifest::Chunked(ChunkedArchive {
950                blocks_per_file: 500_000,
951                total_blocks: 24_396_822,
952                chunk_sizes: vec![100; 49], // 49 chunks
953                chunk_output_files: vec![vec![]; 49],
954            }),
955        );
956        let m = SnapshotManifest {
957            block: 24_396_822,
958            chain_id: 1,
959            storage_version: 2,
960            timestamp: 0,
961            base_url: Some("https://example.com".to_string()),
962            reth_version: None,
963            components,
964        };
965        let urls = m.archive_urls(SnapshotComponentType::StorageChangesets);
966        assert_eq!(urls.len(), 49);
967        // First chunk: 0-499999 (not 0-396821 or similar)
968        assert_eq!(urls[0], "https://example.com/storage_changesets-0-499999.tar.zst");
969        // Last chunk: 24000000-24499999 (not 24000000-24396821)
970        assert_eq!(urls[48], "https://example.com/storage_changesets-24000000-24499999.tar.zst");
971    }
972
973    #[test]
974    fn size_for_distance_sums_tail_chunks() {
975        let m = test_manifest();
976        // Transactions has chunk_sizes [80_000, 100_000, 120_000]
977        // All: sum of all 3
978        assert_eq!(m.size_for_distance(SnapshotComponentType::Transactions, None), 300_000);
979        // Last 500K blocks = 1 chunk = last chunk only
980        assert_eq!(
981            m.size_for_distance(SnapshotComponentType::Transactions, Some(500_000)),
982            120_000
983        );
984        // Last 600K blocks = 2 chunks = last two
985        assert_eq!(
986            m.size_for_distance(SnapshotComponentType::Transactions, Some(600_000)),
987            220_000
988        );
989        // Single archive (state) always returns full size
990        assert_eq!(m.size_for_distance(SnapshotComponentType::State, Some(100)), 100);
991        // Missing component
992        assert_eq!(m.size_for_distance(SnapshotComponentType::Receipts, None), 0);
993    }
994
995    #[test]
996    fn archive_descriptors_include_checksum_metadata() {
997        let mut components = BTreeMap::new();
998        components.insert(
999            "state".to_string(),
1000            ComponentManifest::Single(SingleArchive {
1001                file: "state.tar.zst".to_string(),
1002                size: 100,
1003                blake3: Some("abc123".to_string()),
1004                output_files: vec![OutputFileChecksum {
1005                    path: "db/mdbx.dat".to_string(),
1006                    size: 1000,
1007                    blake3: "s0".to_string(),
1008                }],
1009            }),
1010        );
1011        components.insert(
1012            "transactions".to_string(),
1013            ComponentManifest::Chunked(ChunkedArchive {
1014                blocks_per_file: 500_000,
1015                total_blocks: 1_000_000,
1016                chunk_sizes: vec![80_000, 120_000],
1017                chunk_output_files: vec![
1018                    vec![OutputFileChecksum {
1019                        path: "static_files/static_file_transactions_0_499999.bin".to_string(),
1020                        size: 111,
1021                        blake3: "h0".to_string(),
1022                    }],
1023                    vec![OutputFileChecksum {
1024                        path: "static_files/static_file_transactions_500000_999999.bin".to_string(),
1025                        size: 222,
1026                        blake3: "h1".to_string(),
1027                    }],
1028                ],
1029            }),
1030        );
1031
1032        let m = SnapshotManifest {
1033            block: 1_000_000,
1034            chain_id: 1,
1035            storage_version: 2,
1036            timestamp: 0,
1037            base_url: Some("https://example.com".to_string()),
1038            reth_version: None,
1039            components,
1040        };
1041
1042        let state = m.archive_descriptors_for_distance(SnapshotComponentType::State, None);
1043        assert_eq!(state.len(), 1);
1044        assert_eq!(state[0].file_name, "state.tar.zst");
1045        assert_eq!(state[0].blake3.as_deref(), Some("abc123"));
1046        assert_eq!(state[0].output_files.len(), 1);
1047
1048        let tx = m.archive_descriptors_for_distance(SnapshotComponentType::Transactions, None);
1049        assert_eq!(tx.len(), 2);
1050        assert_eq!(tx[0].blake3, None);
1051        assert_eq!(tx[1].blake3, None);
1052        assert_eq!(tx[0].output_files[0].size, 111);
1053    }
1054
1055    #[test]
1056    fn generate_manifest_includes_state_single_archive() {
1057        let source = tempdir().unwrap();
1058        let output = tempdir().unwrap();
1059        let db_dir = source.path().join("db");
1060        std::fs::create_dir_all(&db_dir).unwrap();
1061        std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1062
1063        let manifest =
1064            generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1065
1066        let state = manifest.component(SnapshotComponentType::State).unwrap();
1067        let ComponentManifest::Single(state) = state else {
1068            panic!("state should be a single archive")
1069        };
1070        assert_eq!(state.file, "state.tar.zst");
1071        assert!(!state.output_files.is_empty());
1072        assert_eq!(state.output_files[0].path, "db/mdbx.dat");
1073        assert!(output.path().join("state.tar.zst").exists());
1074    }
1075
1076    #[test]
1077    fn generate_manifest_includes_rocksdb_single_archive_when_present() {
1078        let source = tempdir().unwrap();
1079        let output = tempdir().unwrap();
1080        let db_dir = source.path().join("db");
1081        std::fs::create_dir_all(&db_dir).unwrap();
1082        std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1083        let rocksdb_dir = source.path().join("rocksdb");
1084        std::fs::create_dir_all(&rocksdb_dir).unwrap();
1085        std::fs::write(rocksdb_dir.join("CURRENT"), b"MANIFEST-000001").unwrap();
1086
1087        let manifest =
1088            generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1089
1090        let rocksdb = manifest.component(SnapshotComponentType::RocksdbIndices).unwrap();
1091        let ComponentManifest::Single(rocksdb) = rocksdb else {
1092            panic!("rocksdb indices should be a single archive")
1093        };
1094        assert_eq!(rocksdb.file, "rocksdb_indices.tar.zst");
1095        assert!(!rocksdb.output_files.is_empty());
1096        assert_eq!(rocksdb.output_files[0].path, "rocksdb/CURRENT");
1097        assert!(output.path().join("rocksdb_indices.tar.zst").exists());
1098    }
1099}