Skip to main content

reth_cli_commands/download/
manifest.rs

1use blake3::Hasher;
2use eyre::Result;
3use rayon::prelude::*;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::{
7    collections::BTreeMap,
8    io::Read,
9    path::{Path, PathBuf},
10};
11use tracing::info;
12
13/// A snapshot manifest describes available components for a snapshot at a given block height.
14///
15/// Each component is either a single archive (state) or a set of chunked archives (static file
16/// segments like transactions, receipts, etc). Chunked components use `blocks_per_file` to
17/// define the block range per archive, matching reth's static file segment boundaries.
18///
19/// Archive naming convention for chunked components:
20///   `{component}-{start_block}-{end_block}.tar.zst`
21///
22/// For example with `blocks_per_file: 500000` and `total_blocks: 1500000`:
23///   `transactions-0-499999.tar.zst`
24///   `transactions-500000-999999.tar.zst`
25///   `transactions-1000000-1499999.tar.zst`
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct SnapshotManifest {
28    /// Block number this snapshot was taken at.
29    pub block: u64,
30    /// Chain ID.
31    pub chain_id: u64,
32    /// Storage version (1 = legacy, 2 = current).
33    pub storage_version: u64,
34    /// Timestamp when the snapshot was created (unix seconds).
35    pub timestamp: u64,
36    /// Base URL for archive downloads. Component archive URLs are relative to this.
37    ///
38    /// When omitted, downloaders should derive the base URL from the manifest URL.
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub base_url: Option<String>,
41    /// Available snapshot components.
42    pub components: BTreeMap<String, ComponentManifest>,
43}
44
45/// Manifest entry for a single snapshot component.
46#[derive(Debug, Clone, Serialize, Deserialize)]
47#[serde(untagged)]
48pub enum ComponentManifest {
49    /// A single archive file (used for state).
50    Single(SingleArchive),
51    /// A set of chunked archives split by block range (used for static file segments).
52    Chunked(ChunkedArchive),
53}
54
55/// A single, non-chunked archive.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct SingleArchive {
58    /// Archive file name (relative to base_url).
59    pub file: String,
60    /// Compressed archive size in bytes.
61    pub size: u64,
62    /// Optional BLAKE3 checksum of the compressed archive.
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub blake3: Option<String>,
65    /// Expected extracted plain files for this archive.
66    ///
67    /// This is the authoritative integrity source for the modular download path.
68    #[serde(default)]
69    pub output_files: Vec<OutputFileChecksum>,
70}
71
72/// A chunked archive set where each chunk covers a fixed block range.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct ChunkedArchive {
75    /// Number of blocks per archive file. Matches reth's `blocks_per_file` config.
76    pub blocks_per_file: u64,
77    /// Total number of blocks covered by this component.
78    pub total_blocks: u64,
79    /// Compressed size of each chunk in bytes, ordered from first to last.
80    /// Computed during manifest generation. Older manifests may omit this.
81    #[serde(default)]
82    pub chunk_sizes: Vec<u64>,
83    /// Expected extracted plain files per chunk, ordered from first to last.
84    ///
85    /// This is the authoritative integrity source for the modular download path.
86    #[serde(default)]
87    pub chunk_output_files: Vec<Vec<OutputFileChecksum>>,
88}
89
90/// Expected metadata for one extracted plain file.
91#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
92pub struct OutputFileChecksum {
93    /// Relative path under the target datadir where this file is extracted.
94    pub path: String,
95    /// Plain file size in bytes.
96    pub size: u64,
97    /// BLAKE3 checksum of the plain file contents.
98    pub blake3: String,
99}
100
101/// A single archive with concrete URL and optional integrity metadata.
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct ArchiveDescriptor {
104    pub url: String,
105    pub file_name: String,
106    pub size: u64,
107    pub blake3: Option<String>,
108    pub output_files: Vec<OutputFileChecksum>,
109}
110
111/// How much of a component to download.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum ComponentSelection {
114    /// Download all chunks (full archive).
115    All,
116    /// Download only the most recent chunks covering at least `distance` blocks.
117    /// Maps to `PruneMode::Distance(distance)` in the generated config.
118    Distance(u64),
119    /// Don't download this component at all.
120    /// Maps to `PruneMode::Full` for tx-based segments, or a minimal distance for others.
121    None,
122}
123
124impl std::fmt::Display for ComponentSelection {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match self {
127            Self::All => write!(f, "All"),
128            Self::Distance(d) => write!(f, "Last {d} blocks"),
129            Self::None => write!(f, "None"),
130        }
131    }
132}
133
134/// The types of snapshot components that can be downloaded.
135#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
136pub enum SnapshotComponentType {
137    /// State database (mdbx). Always required. Single archive.
138    State,
139    /// Block headers static files. Chunked.
140    Headers,
141    /// Transaction static files. Chunked.
142    Transactions,
143    /// Transaction sender static files. Chunked. Only downloaded for archive nodes.
144    TransactionSenders,
145    /// Receipt static files. Chunked.
146    Receipts,
147    /// Account changeset static files. Chunked.
148    AccountChangesets,
149    /// Storage changeset static files. Chunked.
150    StorageChangesets,
151    /// RocksDB index files. Single archive. Optional and archive-only.
152    RocksdbIndices,
153}
154
155impl SnapshotComponentType {
156    /// All component types in display order.
157    pub const ALL: [Self; 8] = [
158        Self::State,
159        Self::Headers,
160        Self::Transactions,
161        Self::TransactionSenders,
162        Self::Receipts,
163        Self::AccountChangesets,
164        Self::StorageChangesets,
165        Self::RocksdbIndices,
166    ];
167
168    /// The string key used in the manifest JSON.
169    pub const fn key(&self) -> &'static str {
170        match self {
171            Self::State => "state",
172            Self::Headers => "headers",
173            Self::Transactions => "transactions",
174            Self::TransactionSenders => "transaction_senders",
175            Self::Receipts => "receipts",
176            Self::AccountChangesets => "account_changesets",
177            Self::StorageChangesets => "storage_changesets",
178            Self::RocksdbIndices => "rocksdb_indices",
179        }
180    }
181
182    /// Human-readable display name.
183    pub const fn display_name(&self) -> &'static str {
184        match self {
185            Self::State => "State (mdbx)",
186            Self::Headers => "Headers",
187            Self::Transactions => "Transactions",
188            Self::TransactionSenders => "Transaction Senders",
189            Self::Receipts => "Receipts",
190            Self::AccountChangesets => "Account Changesets",
191            Self::StorageChangesets => "Storage Changesets",
192            Self::RocksdbIndices => "RocksDB Indices",
193        }
194    }
195
196    /// Whether this component is always required for a functional node.
197    ///
198    /// State and headers are always needed — a node cannot operate without block headers.
199    pub const fn is_required(&self) -> bool {
200        matches!(self, Self::State | Self::Headers)
201    }
202
203    /// Returns the default selection for this component in the minimal download preset.
204    ///
205    /// Matches the `--minimal` prune configuration:
206    /// - State/Headers: always All (required)
207    /// - Transactions/Changesets: Distance(10_064) (`MINIMUM_UNWIND_SAFE_DISTANCE`)
208    /// - Receipts: Distance(64) (`MINIMUM_DISTANCE`)
209    /// - TransactionSenders: None (only downloaded for archive nodes)
210    /// - RocksdbIndices: None (only downloaded for archive nodes)
211    ///
212    /// `tx_lookup` and `sender_recovery` are always pruned full regardless.
213    pub const fn minimal_selection(&self) -> ComponentSelection {
214        match self {
215            Self::State | Self::Headers => ComponentSelection::All,
216            Self::Transactions | Self::AccountChangesets | Self::StorageChangesets => {
217                ComponentSelection::Distance(10_064)
218            }
219            Self::Receipts => ComponentSelection::Distance(64),
220            Self::TransactionSenders => ComponentSelection::None,
221            Self::RocksdbIndices => ComponentSelection::None,
222        }
223    }
224
225    /// Whether this component type uses chunked archives.
226    pub const fn is_chunked(&self) -> bool {
227        !matches!(self, Self::State | Self::RocksdbIndices)
228    }
229}
230
231impl SnapshotManifest {
232    fn base_url_or_empty(&self) -> &str {
233        self.base_url.as_deref().unwrap_or("")
234    }
235
236    /// Look up a component by type.
237    pub fn component(&self, ty: SnapshotComponentType) -> Option<&ComponentManifest> {
238        self.components.get(ty.key())
239    }
240
241    /// Returns the total download size for the given set of component types.
242    pub fn total_size(&self, types: &[SnapshotComponentType]) -> u64 {
243        types.iter().filter_map(|ty| self.component(*ty).map(|c| c.total_size())).sum()
244    }
245
246    /// Returns all archive URLs for a given component type.
247    pub fn archive_urls(&self, ty: SnapshotComponentType) -> Vec<String> {
248        let Some(component) = self.component(ty) else {
249            return vec![];
250        };
251
252        match component {
253            ComponentManifest::Single(single) => {
254                vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
255            }
256            ComponentManifest::Chunked(chunked) => {
257                let key = ty.key();
258                let num_chunks = chunked.num_chunks();
259                (0..num_chunks)
260                    .map(|i| {
261                        let start = i * chunked.blocks_per_file;
262                        let end = (i + 1) * chunked.blocks_per_file - 1;
263                        format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
264                    })
265                    .collect()
266            }
267        }
268    }
269
270    /// Returns archive URLs for a component, limited to chunks covering at least `distance`
271    /// blocks from the tip. Returns all URLs if distance is `None` (All mode).
272    pub fn archive_urls_for_distance(
273        &self,
274        ty: SnapshotComponentType,
275        distance: Option<u64>,
276    ) -> Vec<String> {
277        let Some(component) = self.component(ty) else {
278            return vec![];
279        };
280
281        match component {
282            ComponentManifest::Single(single) => {
283                vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
284            }
285            ComponentManifest::Chunked(chunked) => {
286                let key = ty.key();
287                let num_chunks = chunked.num_chunks();
288
289                // Calculate which chunks to include
290                let start_chunk = match distance {
291                    Some(dist) => {
292                        // We need chunks covering the last `dist` blocks
293                        let needed_blocks = dist.min(chunked.total_blocks);
294                        let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
295                        num_chunks.saturating_sub(needed_chunks)
296                    }
297                    None => 0, // All chunks
298                };
299
300                (start_chunk..num_chunks)
301                    .map(|i| {
302                        let start = i * chunked.blocks_per_file;
303                        let end = (i + 1) * chunked.blocks_per_file - 1;
304                        format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
305                    })
306                    .collect()
307            }
308        }
309    }
310
311    /// Returns concrete archive descriptors for a component, optionally limited to distance.
312    pub fn archive_descriptors_for_distance(
313        &self,
314        ty: SnapshotComponentType,
315        distance: Option<u64>,
316    ) -> Vec<ArchiveDescriptor> {
317        let Some(component) = self.component(ty) else {
318            return vec![];
319        };
320
321        match component {
322            ComponentManifest::Single(single) => {
323                vec![ArchiveDescriptor {
324                    url: format!("{}/{}", self.base_url_or_empty(), single.file),
325                    file_name: single.file.clone(),
326                    size: single.size,
327                    blake3: single.blake3.clone(),
328                    output_files: single.output_files.clone(),
329                }]
330            }
331            ComponentManifest::Chunked(chunked) => {
332                let key = ty.key();
333                let num_chunks = chunked.num_chunks();
334
335                let start_chunk = match distance {
336                    Some(dist) => {
337                        let needed_blocks = dist.min(chunked.total_blocks);
338                        let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
339                        num_chunks.saturating_sub(needed_chunks)
340                    }
341                    None => 0,
342                };
343
344                (start_chunk..num_chunks)
345                    .map(|i| {
346                        let start = i * chunked.blocks_per_file;
347                        let end = (i + 1) * chunked.blocks_per_file - 1;
348                        let file_name = format!("{key}-{start}-{end}.tar.zst");
349                        let size = chunked.chunk_sizes.get(i as usize).copied().unwrap_or_default();
350                        let output_files =
351                            chunked.chunk_output_files.get(i as usize).cloned().unwrap_or_default();
352
353                        ArchiveDescriptor {
354                            url: format!("{}/{}", self.base_url_or_empty(), file_name),
355                            file_name,
356                            size,
357                            blake3: None,
358                            output_files,
359                        }
360                    })
361                    .collect()
362            }
363        }
364    }
365
366    /// Returns the exact download size for a component given a distance selection.
367    ///
368    /// For single archives, returns the full size. For chunked archives, sums the
369    /// sizes of the selected tail chunks from [`ChunkedArchive::chunk_sizes`].
370    pub fn size_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
371        let Some(component) = self.component(ty) else {
372            return 0;
373        };
374        match component {
375            ComponentManifest::Single(s) => s.size,
376            ComponentManifest::Chunked(chunked) => {
377                if chunked.chunk_sizes.is_empty() {
378                    return 0;
379                }
380                let num_chunks = chunked.chunk_sizes.len() as u64;
381                let start_chunk = match distance {
382                    Some(dist) => {
383                        let needed = dist.min(chunked.total_blocks);
384                        let needed_chunks = needed.div_ceil(chunked.blocks_per_file);
385                        num_chunks.saturating_sub(needed_chunks)
386                    }
387                    None => 0,
388                };
389                chunked.chunk_sizes[start_chunk as usize..].iter().sum()
390            }
391        }
392    }
393
394    /// Returns the number of chunks that would be downloaded for a given distance.
395    pub fn chunks_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
396        let Some(ComponentManifest::Chunked(chunked)) = self.component(ty) else {
397            return if self.component(ty).is_some() { 1 } else { 0 };
398        };
399        match distance {
400            Some(dist) => {
401                let needed = dist.min(chunked.total_blocks);
402                needed.div_ceil(chunked.blocks_per_file)
403            }
404            None => chunked.num_chunks(),
405        }
406    }
407}
408
409impl ComponentManifest {
410    /// Returns the total download size for this component.
411    pub fn total_size(&self) -> u64 {
412        match self {
413            Self::Single(s) => s.size,
414            Self::Chunked(c) => c.chunk_sizes.iter().sum(),
415        }
416    }
417}
418
419impl ChunkedArchive {
420    /// Returns the number of chunks.
421    pub fn num_chunks(&self) -> u64 {
422        self.total_blocks.div_ceil(self.blocks_per_file)
423    }
424}
425
426/// Fetch a snapshot manifest from a URL.
427pub async fn fetch_manifest(manifest_url: &str) -> Result<SnapshotManifest> {
428    let client = Client::new();
429    let manifest: SnapshotManifest =
430        client.get(manifest_url).send().await?.error_for_status()?.json().await?;
431    Ok(manifest)
432}
433
434/// Package chunk archives from a source datadir and generate a manifest.
435pub fn generate_manifest(
436    source_datadir: &Path,
437    output_dir: &Path,
438    base_url: Option<&str>,
439    block: u64,
440    chain_id: u64,
441    blocks_per_file: u64,
442) -> Result<SnapshotManifest> {
443    std::fs::create_dir_all(output_dir)?;
444
445    let mut components = BTreeMap::new();
446
447    // Package chunked static-file components.
448    for ty in &[
449        SnapshotComponentType::Headers,
450        SnapshotComponentType::Transactions,
451        SnapshotComponentType::TransactionSenders,
452        SnapshotComponentType::Receipts,
453        SnapshotComponentType::AccountChangesets,
454        SnapshotComponentType::StorageChangesets,
455    ] {
456        let key = ty.key();
457        let num_chunks = block.div_ceil(blocks_per_file);
458        let mut planned_chunks = Vec::with_capacity(num_chunks as usize);
459        let mut found_any = false;
460
461        for i in 0..num_chunks {
462            let start = i * blocks_per_file;
463            let end = (i + 1) * blocks_per_file - 1;
464            let source_files = source_files_for_chunk(source_datadir, *ty, start, end)?;
465
466            if source_files.is_empty() {
467                if found_any {
468                    eyre::bail!("Missing source files for {} chunk {}-{}", key, start, end);
469                }
470                continue;
471            }
472
473            found_any = true;
474            planned_chunks.push(PlannedChunk {
475                chunk_idx: i,
476                archive_path: output_dir.join(chunk_filename(key, start, end)),
477                source_files,
478            });
479        }
480
481        if found_any {
482            let mut packaged_chunks = planned_chunks
483                .into_par_iter()
484                .map(|planned| -> Result<PackagedChunk> {
485                    let output_files =
486                        write_chunk_archive(&planned.archive_path, &planned.source_files)?;
487                    let size = std::fs::metadata(&planned.archive_path)?.len();
488                    Ok(PackagedChunk { chunk_idx: planned.chunk_idx, size, output_files })
489                })
490                .collect::<Vec<_>>()
491                .into_iter()
492                .collect::<Result<Vec<_>>>()?;
493
494            packaged_chunks.sort_unstable_by_key(|chunk| chunk.chunk_idx);
495            let chunk_sizes = packaged_chunks.iter().map(|chunk| chunk.size).collect::<Vec<_>>();
496            let chunk_output_files =
497                packaged_chunks.into_iter().map(|chunk| chunk.output_files).collect::<Vec<_>>();
498            let total_size: u64 = chunk_sizes.iter().sum();
499            info!(target: "reth::cli",
500                component = ty.display_name(),
501                chunks = chunk_sizes.len(),
502                total_blocks = block,
503                size = %super::DownloadProgress::format_size(total_size),
504                "Found chunked component"
505            );
506            components.insert(
507                key.to_string(),
508                ComponentManifest::Chunked(ChunkedArchive {
509                    blocks_per_file,
510                    total_blocks: block,
511                    chunk_sizes,
512                    chunk_output_files,
513                }),
514            );
515        }
516    }
517
518    let (state_size, state_output_files) = package_single_component(
519        output_dir,
520        "state.tar.zst",
521        &state_source_files(source_datadir)?,
522    )?;
523    components.insert(
524        SnapshotComponentType::State.key().to_string(),
525        ComponentManifest::Single(SingleArchive {
526            file: "state.tar.zst".to_string(),
527            size: state_size,
528            blake3: None,
529            output_files: state_output_files,
530        }),
531    );
532
533    let rocksdb_files = rocksdb_source_files(source_datadir)?;
534    if !rocksdb_files.is_empty() {
535        let (rocksdb_size, rocksdb_output_files) =
536            package_single_component(output_dir, "rocksdb_indices.tar.zst", &rocksdb_files)?;
537        components.insert(
538            SnapshotComponentType::RocksdbIndices.key().to_string(),
539            ComponentManifest::Single(SingleArchive {
540                file: "rocksdb_indices.tar.zst".to_string(),
541                size: rocksdb_size,
542                blake3: None,
543                output_files: rocksdb_output_files,
544            }),
545        );
546    }
547
548    let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?.as_secs();
549
550    Ok(SnapshotManifest {
551        block,
552        chain_id,
553        storage_version: 2,
554        timestamp,
555        base_url: base_url.map(str::to_owned),
556        components,
557    })
558}
559
560/// Resolves an archive file path from a component key and naming convention.
561pub fn chunk_filename(component_key: &str, start: u64, end: u64) -> String {
562    format!("{component_key}-{start}-{end}.tar.zst")
563}
564
565#[derive(Debug)]
566struct PlannedChunk {
567    chunk_idx: u64,
568    archive_path: PathBuf,
569    source_files: Vec<PathBuf>,
570}
571
572#[derive(Debug)]
573struct PackagedChunk {
574    chunk_idx: u64,
575    size: u64,
576    output_files: Vec<OutputFileChecksum>,
577}
578
579#[derive(Debug)]
580struct PlannedFile {
581    source_path: PathBuf,
582    relative_path: PathBuf,
583}
584
585fn source_files_for_chunk(
586    source_datadir: &Path,
587    component: SnapshotComponentType,
588    start: u64,
589    end: u64,
590) -> Result<Vec<PathBuf>> {
591    let Some(segment_name) = static_segment_name(component) else {
592        return Ok(Vec::new());
593    };
594
595    let static_files_dir = source_datadir.join("static_files");
596    let static_files_dir =
597        if static_files_dir.exists() { static_files_dir } else { source_datadir.to_path_buf() };
598    let prefix = format!("static_file_{segment_name}_{start}_{end}");
599
600    let mut files = Vec::new();
601    for entry in std::fs::read_dir(&static_files_dir)? {
602        let entry = entry?;
603        if !entry.file_type()?.is_file() {
604            continue;
605        }
606        if entry.file_name().to_string_lossy().starts_with(&prefix) {
607            files.push(entry.path());
608        }
609    }
610
611    files.sort_unstable();
612    Ok(files)
613}
614
615fn static_segment_name(component: SnapshotComponentType) -> Option<&'static str> {
616    match component {
617        SnapshotComponentType::Headers => Some("headers"),
618        SnapshotComponentType::Transactions => Some("transactions"),
619        SnapshotComponentType::TransactionSenders => Some("transaction-senders"),
620        SnapshotComponentType::Receipts => Some("receipts"),
621        SnapshotComponentType::AccountChangesets => Some("account-change-sets"),
622        SnapshotComponentType::StorageChangesets => Some("storage-change-sets"),
623        SnapshotComponentType::State | SnapshotComponentType::RocksdbIndices => None,
624    }
625}
626
627fn state_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
628    let db_dir = source_datadir.join("db");
629    if db_dir.exists() {
630        return collect_files_recursive(&db_dir, Path::new("db"));
631    }
632
633    if looks_like_db_dir(source_datadir)? {
634        return collect_files_recursive(source_datadir, Path::new("db"));
635    }
636
637    eyre::bail!("Could not find source state DB directory under {}", source_datadir.display())
638}
639
640fn rocksdb_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
641    let rocksdb_dir = source_datadir.join("rocksdb");
642    if !rocksdb_dir.exists() {
643        return Ok(Vec::new());
644    }
645
646    collect_files_recursive(&rocksdb_dir, Path::new("rocksdb"))
647}
648
649fn looks_like_db_dir(path: &Path) -> Result<bool> {
650    let entries = match std::fs::read_dir(path) {
651        Ok(entries) => entries,
652        Err(_) => return Ok(false),
653    };
654
655    for entry in entries {
656        let entry = entry?;
657        if !entry.file_type()?.is_file() {
658            continue;
659        }
660        let name = entry.file_name();
661        let name = name.to_string_lossy();
662        if name == "mdbx.dat" || name == "lock.mdb" || name == "data.mdb" {
663            return Ok(true);
664        }
665    }
666
667    Ok(false)
668}
669
670fn collect_files_recursive(root: &Path, output_prefix: &Path) -> Result<Vec<PlannedFile>> {
671    let mut files = Vec::new();
672    collect_files_recursive_inner(root, root, output_prefix, &mut files)?;
673    files.sort_unstable_by(|a, b| a.relative_path.cmp(&b.relative_path));
674    Ok(files)
675}
676
677fn collect_files_recursive_inner(
678    root: &Path,
679    dir: &Path,
680    output_prefix: &Path,
681    files: &mut Vec<PlannedFile>,
682) -> Result<()> {
683    for entry in std::fs::read_dir(dir)? {
684        let entry = entry?;
685        let path = entry.path();
686        let file_type = entry.file_type()?;
687        if file_type.is_dir() {
688            collect_files_recursive_inner(root, &path, output_prefix, files)?;
689            continue;
690        }
691        if !file_type.is_file() {
692            continue;
693        }
694
695        let relative = path.strip_prefix(root)?.to_path_buf();
696        files.push(PlannedFile { source_path: path, relative_path: output_prefix.join(relative) });
697    }
698
699    Ok(())
700}
701
702fn package_single_component(
703    output_dir: &Path,
704    archive_file_name: &str,
705    files: &[PlannedFile],
706) -> Result<(u64, Vec<OutputFileChecksum>)> {
707    if files.is_empty() {
708        eyre::bail!("Cannot package empty single archive: {}", archive_file_name);
709    }
710
711    let archive_path = output_dir.join(archive_file_name);
712    let output_files = write_archive_from_planned_files(&archive_path, files)?;
713    let size = std::fs::metadata(&archive_path)?.len();
714    Ok((size, output_files))
715}
716
717fn write_chunk_archive(path: &Path, source_files: &[PathBuf]) -> Result<Vec<OutputFileChecksum>> {
718    let planned_files = source_files
719        .iter()
720        .map(|source_path| {
721            let file_name = source_path.file_name().ok_or_else(|| {
722                eyre::eyre!("Invalid source file path: {}", source_path.display())
723            })?;
724            Ok::<_, eyre::Error>(PlannedFile {
725                source_path: source_path.clone(),
726                relative_path: PathBuf::from("static_files").join(file_name),
727            })
728        })
729        .collect::<Result<Vec<_>>>()?;
730
731    write_archive_from_planned_files(path, &planned_files)
732}
733
734fn write_archive_from_planned_files(
735    path: &Path,
736    files: &[PlannedFile],
737) -> Result<Vec<OutputFileChecksum>> {
738    let file = std::fs::File::create(path)?;
739    let mut encoder = zstd::Encoder::new(file, 0)?;
740    // Emit standard zstd frames with checksums for compatibility with external
741    // tools such as `pzstd -d`.
742    encoder.include_checksum(true)?;
743    let mut builder = tar::Builder::new(encoder);
744
745    let mut output_files = Vec::with_capacity(files.len());
746    for planned in files {
747        let mut header = tar::Header::new_gnu();
748        header.set_size(std::fs::metadata(&planned.source_path)?.len());
749        header.set_mode(0o644);
750        header.set_cksum();
751
752        let source_file = std::fs::File::open(&planned.source_path)?;
753        let mut reader = HashingReader::new(source_file);
754        builder.append_data(&mut header, &planned.relative_path, &mut reader)?;
755
756        output_files.push(OutputFileChecksum {
757            path: planned.relative_path.to_string_lossy().to_string(),
758            size: reader.bytes_read,
759            blake3: reader.finalize(),
760        });
761    }
762
763    builder.finish()?;
764    let encoder = builder.into_inner()?;
765    encoder.finish()?;
766
767    Ok(output_files)
768}
769
770struct HashingReader<R> {
771    inner: R,
772    hasher: Hasher,
773    bytes_read: u64,
774}
775
776impl<R: Read> HashingReader<R> {
777    fn new(inner: R) -> Self {
778        Self { inner, hasher: Hasher::new(), bytes_read: 0 }
779    }
780
781    fn finalize(self) -> String {
782        self.hasher.finalize().to_hex().to_string()
783    }
784}
785
786impl<R: Read> Read for HashingReader<R> {
787    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
788        let n = self.inner.read(buf)?;
789        if n > 0 {
790            self.bytes_read += n as u64;
791            self.hasher.update(&buf[..n]);
792        }
793        Ok(n)
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800    use tempfile::tempdir;
801
802    fn test_manifest() -> SnapshotManifest {
803        let mut components = BTreeMap::new();
804        components.insert(
805            "state".to_string(),
806            ComponentManifest::Single(SingleArchive {
807                file: "state.tar.zst".to_string(),
808                size: 100,
809                blake3: None,
810                output_files: vec![],
811            }),
812        );
813        components.insert(
814            "transactions".to_string(),
815            ComponentManifest::Chunked(ChunkedArchive {
816                blocks_per_file: 500_000,
817                total_blocks: 1_500_000,
818                chunk_sizes: vec![80_000, 100_000, 120_000],
819                chunk_output_files: vec![vec![], vec![], vec![]],
820            }),
821        );
822        components.insert(
823            "headers".to_string(),
824            ComponentManifest::Chunked(ChunkedArchive {
825                blocks_per_file: 500_000,
826                total_blocks: 1_500_000,
827                chunk_sizes: vec![40_000, 50_000, 60_000],
828                chunk_output_files: vec![vec![], vec![], vec![]],
829            }),
830        );
831        SnapshotManifest {
832            block: 1_500_000,
833            chain_id: 1,
834            storage_version: 2,
835            timestamp: 0,
836            base_url: Some("https://example.com".to_string()),
837            components,
838        }
839    }
840
841    #[test]
842    fn archive_urls_for_distance_all() {
843        let m = test_manifest();
844        let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, None);
845        assert_eq!(urls.len(), 3);
846        assert_eq!(urls[0], "https://example.com/transactions-0-499999.tar.zst");
847        assert_eq!(urls[2], "https://example.com/transactions-1000000-1499999.tar.zst");
848    }
849
850    #[test]
851    fn archive_urls_for_distance_partial() {
852        let m = test_manifest();
853        // 600k blocks → needs 2 chunks (each 500k)
854        let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, Some(600_000));
855        assert_eq!(urls.len(), 2);
856        assert_eq!(urls[0], "https://example.com/transactions-500000-999999.tar.zst");
857        assert_eq!(urls[1], "https://example.com/transactions-1000000-1499999.tar.zst");
858    }
859
860    #[test]
861    fn archive_urls_for_distance_single_component() {
862        let m = test_manifest();
863        // Single archives always return one URL regardless of distance
864        let urls = m.archive_urls_for_distance(SnapshotComponentType::State, Some(100));
865        assert_eq!(urls.len(), 1);
866        assert_eq!(urls[0], "https://example.com/state.tar.zst");
867    }
868
869    #[test]
870    fn archive_urls_for_distance_rocksdb_indices_single_component() {
871        let mut components = BTreeMap::new();
872        components.insert(
873            "rocksdb_indices".to_string(),
874            ComponentManifest::Single(SingleArchive {
875                file: "rocksdb_indices.tar.zst".to_string(),
876                size: 777,
877                blake3: None,
878                output_files: vec![],
879            }),
880        );
881        let m = SnapshotManifest {
882            block: 1,
883            chain_id: 1,
884            storage_version: 2,
885            timestamp: 0,
886            base_url: Some("https://example.com".to_string()),
887            components,
888        };
889
890        let urls = m.archive_urls_for_distance(SnapshotComponentType::RocksdbIndices, Some(10));
891        assert_eq!(urls.len(), 1);
892        assert_eq!(urls[0], "https://example.com/rocksdb_indices.tar.zst");
893        assert_eq!(m.size_for_distance(SnapshotComponentType::RocksdbIndices, Some(10)), 777);
894    }
895
896    #[test]
897    fn archive_urls_for_distance_missing_component() {
898        let m = test_manifest();
899        let urls = m.archive_urls_for_distance(SnapshotComponentType::Receipts, None);
900        assert!(urls.is_empty());
901    }
902
903    #[test]
904    fn chunks_for_distance_all() {
905        let m = test_manifest();
906        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, None), 3);
907    }
908
909    #[test]
910    fn chunks_for_distance_partial() {
911        let m = test_manifest();
912        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(600_000)), 2);
913        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(100_000)), 1);
914    }
915
916    #[test]
917    fn chunks_for_distance_single() {
918        let m = test_manifest();
919        assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, None), 1);
920        assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, Some(100)), 1);
921    }
922
923    #[test]
924    fn chunks_for_distance_missing() {
925        let m = test_manifest();
926        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Receipts, None), 0);
927    }
928
929    #[test]
930    fn component_selection_display() {
931        assert_eq!(ComponentSelection::All.to_string(), "All");
932        assert_eq!(ComponentSelection::Distance(10_064).to_string(), "Last 10064 blocks");
933        assert_eq!(ComponentSelection::None.to_string(), "None");
934    }
935
936    #[test]
937    fn archive_urls_aligned_to_blocks_per_file() {
938        // When total_blocks is not aligned to blocks_per_file, chunk boundaries
939        // must still align to blocks_per_file (not total_blocks).
940        let mut components = BTreeMap::new();
941        components.insert(
942            "storage_changesets".to_string(),
943            ComponentManifest::Chunked(ChunkedArchive {
944                blocks_per_file: 500_000,
945                total_blocks: 24_396_822,
946                chunk_sizes: vec![100; 49], // 49 chunks
947                chunk_output_files: vec![vec![]; 49],
948            }),
949        );
950        let m = SnapshotManifest {
951            block: 24_396_822,
952            chain_id: 1,
953            storage_version: 2,
954            timestamp: 0,
955            base_url: Some("https://example.com".to_string()),
956            components,
957        };
958        let urls = m.archive_urls(SnapshotComponentType::StorageChangesets);
959        assert_eq!(urls.len(), 49);
960        // First chunk: 0-499999 (not 0-396821 or similar)
961        assert_eq!(urls[0], "https://example.com/storage_changesets-0-499999.tar.zst");
962        // Last chunk: 24000000-24499999 (not 24000000-24396821)
963        assert_eq!(urls[48], "https://example.com/storage_changesets-24000000-24499999.tar.zst");
964    }
965
966    #[test]
967    fn size_for_distance_sums_tail_chunks() {
968        let m = test_manifest();
969        // Transactions has chunk_sizes [80_000, 100_000, 120_000]
970        // All: sum of all 3
971        assert_eq!(m.size_for_distance(SnapshotComponentType::Transactions, None), 300_000);
972        // Last 500K blocks = 1 chunk = last chunk only
973        assert_eq!(
974            m.size_for_distance(SnapshotComponentType::Transactions, Some(500_000)),
975            120_000
976        );
977        // Last 600K blocks = 2 chunks = last two
978        assert_eq!(
979            m.size_for_distance(SnapshotComponentType::Transactions, Some(600_000)),
980            220_000
981        );
982        // Single archive (state) always returns full size
983        assert_eq!(m.size_for_distance(SnapshotComponentType::State, Some(100)), 100);
984        // Missing component
985        assert_eq!(m.size_for_distance(SnapshotComponentType::Receipts, None), 0);
986    }
987
988    #[test]
989    fn archive_descriptors_include_checksum_metadata() {
990        let mut components = BTreeMap::new();
991        components.insert(
992            "state".to_string(),
993            ComponentManifest::Single(SingleArchive {
994                file: "state.tar.zst".to_string(),
995                size: 100,
996                blake3: Some("abc123".to_string()),
997                output_files: vec![OutputFileChecksum {
998                    path: "db/mdbx.dat".to_string(),
999                    size: 1000,
1000                    blake3: "s0".to_string(),
1001                }],
1002            }),
1003        );
1004        components.insert(
1005            "transactions".to_string(),
1006            ComponentManifest::Chunked(ChunkedArchive {
1007                blocks_per_file: 500_000,
1008                total_blocks: 1_000_000,
1009                chunk_sizes: vec![80_000, 120_000],
1010                chunk_output_files: vec![
1011                    vec![OutputFileChecksum {
1012                        path: "static_files/static_file_transactions_0_499999.bin".to_string(),
1013                        size: 111,
1014                        blake3: "h0".to_string(),
1015                    }],
1016                    vec![OutputFileChecksum {
1017                        path: "static_files/static_file_transactions_500000_999999.bin".to_string(),
1018                        size: 222,
1019                        blake3: "h1".to_string(),
1020                    }],
1021                ],
1022            }),
1023        );
1024
1025        let m = SnapshotManifest {
1026            block: 1_000_000,
1027            chain_id: 1,
1028            storage_version: 2,
1029            timestamp: 0,
1030            base_url: Some("https://example.com".to_string()),
1031            components,
1032        };
1033
1034        let state = m.archive_descriptors_for_distance(SnapshotComponentType::State, None);
1035        assert_eq!(state.len(), 1);
1036        assert_eq!(state[0].file_name, "state.tar.zst");
1037        assert_eq!(state[0].blake3.as_deref(), Some("abc123"));
1038        assert_eq!(state[0].output_files.len(), 1);
1039
1040        let tx = m.archive_descriptors_for_distance(SnapshotComponentType::Transactions, None);
1041        assert_eq!(tx.len(), 2);
1042        assert_eq!(tx[0].blake3, None);
1043        assert_eq!(tx[1].blake3, None);
1044        assert_eq!(tx[0].output_files[0].size, 111);
1045    }
1046
1047    #[test]
1048    fn generate_manifest_includes_state_single_archive() {
1049        let source = tempdir().unwrap();
1050        let output = tempdir().unwrap();
1051        let db_dir = source.path().join("db");
1052        std::fs::create_dir_all(&db_dir).unwrap();
1053        std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1054
1055        let manifest =
1056            generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1057
1058        let state = manifest.component(SnapshotComponentType::State).unwrap();
1059        let ComponentManifest::Single(state) = state else {
1060            panic!("state should be a single archive")
1061        };
1062        assert_eq!(state.file, "state.tar.zst");
1063        assert!(!state.output_files.is_empty());
1064        assert_eq!(state.output_files[0].path, "db/mdbx.dat");
1065        assert!(output.path().join("state.tar.zst").exists());
1066    }
1067
1068    #[test]
1069    fn generate_manifest_includes_rocksdb_single_archive_when_present() {
1070        let source = tempdir().unwrap();
1071        let output = tempdir().unwrap();
1072        let db_dir = source.path().join("db");
1073        std::fs::create_dir_all(&db_dir).unwrap();
1074        std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1075        let rocksdb_dir = source.path().join("rocksdb");
1076        std::fs::create_dir_all(&rocksdb_dir).unwrap();
1077        std::fs::write(rocksdb_dir.join("CURRENT"), b"MANIFEST-000001").unwrap();
1078
1079        let manifest =
1080            generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1081
1082        let rocksdb = manifest.component(SnapshotComponentType::RocksdbIndices).unwrap();
1083        let ComponentManifest::Single(rocksdb) = rocksdb else {
1084            panic!("rocksdb indices should be a single archive")
1085        };
1086        assert_eq!(rocksdb.file, "rocksdb_indices.tar.zst");
1087        assert!(!rocksdb.output_files.is_empty());
1088        assert_eq!(rocksdb.output_files[0].path, "rocksdb/CURRENT");
1089        assert!(output.path().join("rocksdb_indices.tar.zst").exists());
1090    }
1091}