Skip to main content

reth_cli_commands/download/
manifest.rs

1use blake3::Hasher;
2use eyre::Result;
3use rayon::prelude::*;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::{
7    collections::BTreeMap,
8    io::Read,
9    path::{Path, PathBuf},
10};
11use tracing::info;
12
13fn is_zero(value: &u64) -> bool {
14    *value == 0
15}
16
17/// A snapshot manifest describes available components for a snapshot at a given block height.
18///
19/// Each component is either a single archive (state) or a set of chunked archives (static file
20/// segments like transactions, receipts, etc). Chunked components use `blocks_per_file` to
21/// define the block range per archive, matching reth's static file segment boundaries.
22///
23/// Archive naming convention for chunked components:
24///   `{component}-{start_block}-{end_block}.tar.zst`
25///
26/// For example with `blocks_per_file: 500000` and `total_blocks: 1500000`:
27///   `transactions-0-499999.tar.zst`
28///   `transactions-500000-999999.tar.zst`
29///   `transactions-1000000-1499999.tar.zst`
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct SnapshotManifest {
32    /// Block number this snapshot was taken at.
33    pub block: u64,
34    /// Chain ID.
35    pub chain_id: u64,
36    /// Storage version (1 = legacy, 2 = current).
37    pub storage_version: u64,
38    /// Timestamp when the snapshot was created (unix seconds).
39    pub timestamp: u64,
40    /// Base URL for archive downloads. Component archive URLs are relative to this.
41    ///
42    /// When omitted, downloaders should derive the base URL from the manifest URL.
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub base_url: Option<String>,
45    /// Reth version that produced this snapshot.
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub reth_version: Option<String>,
48    /// Available snapshot components.
49    pub components: BTreeMap<String, ComponentManifest>,
50}
51
52/// Manifest entry for a single snapshot component.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(untagged)]
55pub enum ComponentManifest {
56    /// A single archive file (used for state).
57    Single(SingleArchive),
58    /// A set of chunked archives split by block range (used for static file segments).
59    Chunked(ChunkedArchive),
60}
61
62/// A single, non-chunked archive.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct SingleArchive {
65    /// Archive file name (relative to base_url).
66    pub file: String,
67    /// Compressed archive size in bytes.
68    pub size: u64,
69    /// Total extracted plain-output size in bytes.
70    ///
71    /// Older manifests may omit this, in which case downloaders should derive it from
72    /// `output_files`.
73    #[serde(default, skip_serializing_if = "is_zero")]
74    pub decompressed_size: u64,
75    /// Optional BLAKE3 checksum of the compressed archive.
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub blake3: Option<String>,
78    /// Expected extracted plain files for this archive.
79    ///
80    /// This is the authoritative integrity source for the modular download path.
81    #[serde(default)]
82    pub output_files: Vec<OutputFileChecksum>,
83}
84
85/// A chunked archive set where each chunk covers a fixed block range.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ChunkedArchive {
88    /// Number of blocks per archive file. Matches reth's `blocks_per_file` config.
89    pub blocks_per_file: u64,
90    /// Total number of blocks covered by this component.
91    pub total_blocks: u64,
92    /// Compressed size of each chunk in bytes, ordered from first to last.
93    /// Computed during manifest generation. Older manifests may omit this.
94    #[serde(default)]
95    pub chunk_sizes: Vec<u64>,
96    /// Extracted plain-output size of each chunk in bytes, ordered from first to last.
97    ///
98    /// Older manifests may omit this, in which case downloaders should derive it from
99    /// `chunk_output_files`.
100    #[serde(default, skip_serializing_if = "Vec::is_empty")]
101    pub chunk_decompressed_sizes: Vec<u64>,
102    /// Expected extracted plain files per chunk, ordered from first to last.
103    ///
104    /// This is the authoritative integrity source for the modular download path.
105    #[serde(default)]
106    pub chunk_output_files: Vec<Vec<OutputFileChecksum>>,
107}
108
109/// Expected metadata for one extracted plain file.
110#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub struct OutputFileChecksum {
112    /// Relative path under the target datadir where this file is extracted.
113    pub path: String,
114    /// Plain file size in bytes.
115    pub size: u64,
116    /// BLAKE3 checksum of the plain file contents.
117    pub blake3: String,
118}
119
120/// A concrete snapshot archive with its download and verification metadata.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct SnapshotArchive {
123    pub url: String,
124    pub file_name: String,
125    pub size: u64,
126    pub blake3: Option<String>,
127    pub output_files: Vec<OutputFileChecksum>,
128}
129
130impl SnapshotArchive {
131    /// Returns the total extracted plain-output size for this archive.
132    pub fn output_size(&self) -> u64 {
133        self.output_files.iter().map(|file| file.size).sum()
134    }
135}
136
137/// How much of a component to download.
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub enum ComponentSelection {
140    /// Download all chunks (full archive).
141    All,
142    /// Download only the most recent chunks covering at least `distance` blocks.
143    /// Maps to `PruneMode::Distance(distance)` in the generated config.
144    Distance(u64),
145    /// Download chunks starting at the specified block number.
146    /// Maps to `PruneMode::Before(block)` in the generated config.
147    Since(u64),
148    /// Don't download this component at all.
149    /// Maps to `PruneMode::Full` for tx-based segments, or a minimal distance for others.
150    None,
151}
152
153impl std::fmt::Display for ComponentSelection {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        match self {
156            Self::All => write!(f, "All"),
157            Self::Distance(d) => write!(f, "Last {d} blocks"),
158            Self::Since(block) => write!(f, "Since block {block}"),
159            Self::None => write!(f, "None"),
160        }
161    }
162}
163
164/// The types of snapshot components that can be downloaded.
165#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
166pub enum SnapshotComponentType {
167    /// State database (mdbx). Always required. Single archive.
168    State,
169    /// Block headers static files. Chunked.
170    Headers,
171    /// Transaction static files. Chunked.
172    Transactions,
173    /// Transaction sender static files. Chunked. Only downloaded for archive nodes.
174    TransactionSenders,
175    /// Receipt static files. Chunked.
176    Receipts,
177    /// Account changeset static files. Chunked.
178    AccountChangesets,
179    /// Storage changeset static files. Chunked.
180    StorageChangesets,
181    /// RocksDB index files. Single archive. Optional and archive-only.
182    RocksdbIndices,
183}
184
185impl SnapshotComponentType {
186    /// All component types in display order.
187    pub const ALL: [Self; 8] = [
188        Self::State,
189        Self::Headers,
190        Self::Transactions,
191        Self::TransactionSenders,
192        Self::Receipts,
193        Self::AccountChangesets,
194        Self::StorageChangesets,
195        Self::RocksdbIndices,
196    ];
197
198    /// The string key used in the manifest JSON.
199    pub const fn key(&self) -> &'static str {
200        match self {
201            Self::State => "state",
202            Self::Headers => "headers",
203            Self::Transactions => "transactions",
204            Self::TransactionSenders => "transaction_senders",
205            Self::Receipts => "receipts",
206            Self::AccountChangesets => "account_changesets",
207            Self::StorageChangesets => "storage_changesets",
208            Self::RocksdbIndices => "rocksdb_indices",
209        }
210    }
211
212    /// Human-readable display name.
213    pub const fn display_name(&self) -> &'static str {
214        match self {
215            Self::State => "State (mdbx)",
216            Self::Headers => "Headers",
217            Self::Transactions => "Transactions",
218            Self::TransactionSenders => "Transaction Senders",
219            Self::Receipts => "Receipts",
220            Self::AccountChangesets => "Account Changesets",
221            Self::StorageChangesets => "Storage Changesets",
222            Self::RocksdbIndices => "RocksDB Indices",
223        }
224    }
225
226    /// Whether this component is always required for a functional node.
227    ///
228    /// State and headers are always needed — a node cannot operate without block headers.
229    pub const fn is_required(&self) -> bool {
230        matches!(self, Self::State | Self::Headers)
231    }
232
233    /// Returns the default selection for this component in the minimal download preset.
234    ///
235    /// Matches the `--minimal` prune configuration:
236    /// - State/Headers: always All (required)
237    /// - Transactions/Changesets: Distance(10_064) (`MINIMUM_UNWIND_SAFE_DISTANCE`)
238    /// - Receipts: Distance(64) (`MINIMUM_DISTANCE`)
239    /// - TransactionSenders: None (only downloaded for archive nodes)
240    /// - RocksdbIndices: None (only downloaded for archive nodes)
241    ///
242    /// `tx_lookup` and `sender_recovery` are always pruned full regardless.
243    pub const fn minimal_selection(&self) -> ComponentSelection {
244        match self {
245            Self::State | Self::Headers => ComponentSelection::All,
246            Self::Transactions | Self::AccountChangesets | Self::StorageChangesets => {
247                ComponentSelection::Distance(10_064)
248            }
249            Self::Receipts => ComponentSelection::Distance(64),
250            Self::TransactionSenders => ComponentSelection::None,
251            Self::RocksdbIndices => ComponentSelection::None,
252        }
253    }
254
255    /// Whether this component type uses chunked archives.
256    pub const fn is_chunked(&self) -> bool {
257        !matches!(self, Self::State | Self::RocksdbIndices)
258    }
259}
260
261impl SnapshotManifest {
262    fn base_url_or_empty(&self) -> &str {
263        self.base_url.as_deref().unwrap_or("")
264    }
265
266    /// Look up a component by type.
267    pub fn component(&self, ty: SnapshotComponentType) -> Option<&ComponentManifest> {
268        self.components.get(ty.key())
269    }
270
271    /// Returns the total download size for the given set of component types.
272    pub fn total_size(&self, types: &[SnapshotComponentType]) -> u64 {
273        types.iter().filter_map(|ty| self.component(*ty).map(|c| c.total_size())).sum()
274    }
275
276    /// Returns all archive URLs for a given component type.
277    pub fn archive_urls(&self, ty: SnapshotComponentType) -> Vec<String> {
278        let Some(component) = self.component(ty) else {
279            return vec![];
280        };
281
282        match component {
283            ComponentManifest::Single(single) => {
284                vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
285            }
286            ComponentManifest::Chunked(chunked) => {
287                let key = ty.key();
288                let num_chunks = chunked.num_chunks();
289                (0..num_chunks)
290                    .map(|i| {
291                        let start = i * chunked.blocks_per_file;
292                        let end = (i + 1) * chunked.blocks_per_file - 1;
293                        format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
294                    })
295                    .collect()
296            }
297        }
298    }
299
300    /// Returns archive URLs for a component, limited to chunks covering at least `distance`
301    /// blocks from the tip. Returns all URLs if distance is `None` (All mode).
302    pub fn archive_urls_for_distance(
303        &self,
304        ty: SnapshotComponentType,
305        distance: Option<u64>,
306    ) -> Vec<String> {
307        let Some(component) = self.component(ty) else {
308            return vec![];
309        };
310
311        match component {
312            ComponentManifest::Single(single) => {
313                vec![format!("{}/{}", self.base_url_or_empty(), single.file)]
314            }
315            ComponentManifest::Chunked(chunked) => {
316                let key = ty.key();
317                let num_chunks = chunked.num_chunks();
318
319                // Calculate which chunks to include
320                let start_chunk = match distance {
321                    Some(dist) => {
322                        // We need chunks covering the last `dist` blocks
323                        let needed_blocks = dist.min(chunked.total_blocks);
324                        let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
325                        num_chunks.saturating_sub(needed_chunks)
326                    }
327                    None => 0, // All chunks
328                };
329
330                (start_chunk..num_chunks)
331                    .map(|i| {
332                        let start = i * chunked.blocks_per_file;
333                        let end = (i + 1) * chunked.blocks_per_file - 1;
334                        format!("{}/{key}-{start}-{end}.tar.zst", self.base_url_or_empty())
335                    })
336                    .collect()
337            }
338        }
339    }
340
341    /// Returns concrete snapshot archives for a component, optionally limited to distance.
342    pub fn snapshot_archives_for_distance(
343        &self,
344        ty: SnapshotComponentType,
345        distance: Option<u64>,
346    ) -> Vec<SnapshotArchive> {
347        let Some(component) = self.component(ty) else {
348            return vec![];
349        };
350
351        match component {
352            ComponentManifest::Single(single) => {
353                vec![SnapshotArchive {
354                    url: format!("{}/{}", self.base_url_or_empty(), single.file),
355                    file_name: single.file.clone(),
356                    size: single.size,
357                    blake3: single.blake3.clone(),
358                    output_files: single.output_files.clone(),
359                }]
360            }
361            ComponentManifest::Chunked(chunked) => {
362                let key = ty.key();
363                let num_chunks = chunked.num_chunks();
364
365                let start_chunk = match distance {
366                    Some(dist) => {
367                        let needed_blocks = dist.min(chunked.total_blocks);
368                        let needed_chunks = needed_blocks.div_ceil(chunked.blocks_per_file);
369                        num_chunks.saturating_sub(needed_chunks)
370                    }
371                    None => 0,
372                };
373
374                (start_chunk..num_chunks)
375                    .map(|i| {
376                        let start = i * chunked.blocks_per_file;
377                        let end = (i + 1) * chunked.blocks_per_file - 1;
378                        let file_name = format!("{key}-{start}-{end}.tar.zst");
379                        let size = chunked.chunk_sizes.get(i as usize).copied().unwrap_or_default();
380                        let output_files =
381                            chunked.chunk_output_files.get(i as usize).cloned().unwrap_or_default();
382
383                        SnapshotArchive {
384                            url: format!("{}/{}", self.base_url_or_empty(), file_name),
385                            file_name,
386                            size,
387                            blake3: None,
388                            output_files,
389                        }
390                    })
391                    .collect()
392            }
393        }
394    }
395
396    /// Returns the exact download size for a component given a distance selection.
397    ///
398    /// For single archives, returns the full size. For chunked archives, sums the
399    /// sizes of the selected tail chunks from [`ChunkedArchive::chunk_sizes`].
400    pub fn size_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
401        let Some(component) = self.component(ty) else {
402            return 0;
403        };
404        match component {
405            ComponentManifest::Single(s) => s.size,
406            ComponentManifest::Chunked(chunked) => {
407                if chunked.chunk_sizes.is_empty() {
408                    return 0;
409                }
410                let num_chunks = chunked.chunk_sizes.len() as u64;
411                let start_chunk = match distance {
412                    Some(dist) => {
413                        let needed = dist.min(chunked.total_blocks);
414                        let needed_chunks = needed.div_ceil(chunked.blocks_per_file);
415                        num_chunks.saturating_sub(needed_chunks)
416                    }
417                    None => 0,
418                };
419                chunked.chunk_sizes[start_chunk as usize..].iter().sum()
420            }
421        }
422    }
423
424    /// Returns the exact extracted plain-output size for a component given a distance selection.
425    pub fn output_size_for_distance(
426        &self,
427        ty: SnapshotComponentType,
428        distance: Option<u64>,
429    ) -> u64 {
430        let Some(component) = self.component(ty) else {
431            return 0;
432        };
433
434        match component {
435            ComponentManifest::Single(single) => single.output_size(),
436            ComponentManifest::Chunked(chunked) => {
437                let num_chunks = chunked.num_chunks();
438                let start_chunk = match distance {
439                    Some(dist) => {
440                        let needed = dist.min(chunked.total_blocks);
441                        let needed_chunks = needed.div_ceil(chunked.blocks_per_file);
442                        num_chunks.saturating_sub(needed_chunks)
443                    }
444                    None => 0,
445                };
446
447                (start_chunk..num_chunks)
448                    .map(|index| chunked.chunk_output_size(index as usize))
449                    .sum()
450            }
451        }
452    }
453
454    /// Returns the number of chunks that would be downloaded for a given distance.
455    pub fn chunks_for_distance(&self, ty: SnapshotComponentType, distance: Option<u64>) -> u64 {
456        let Some(ComponentManifest::Chunked(chunked)) = self.component(ty) else {
457            return if self.component(ty).is_some() { 1 } else { 0 };
458        };
459        match distance {
460            Some(dist) => {
461                let needed = dist.min(chunked.total_blocks);
462                needed.div_ceil(chunked.blocks_per_file)
463            }
464            None => chunked.num_chunks(),
465        }
466    }
467}
468
469impl ComponentManifest {
470    /// Returns the total download size for this component.
471    pub fn total_size(&self) -> u64 {
472        match self {
473            Self::Single(s) => s.size,
474            Self::Chunked(c) => c.chunk_sizes.iter().sum(),
475        }
476    }
477
478    /// Returns the total extracted plain-output size for this component.
479    pub fn total_output_size(&self) -> u64 {
480        match self {
481            Self::Single(single) => single.output_size(),
482            Self::Chunked(chunked) => chunked.total_output_size(),
483        }
484    }
485}
486
487impl ChunkedArchive {
488    /// Returns the number of chunks.
489    pub fn num_chunks(&self) -> u64 {
490        self.total_blocks.div_ceil(self.blocks_per_file)
491    }
492
493    /// Returns the extracted plain-output size for one chunk.
494    pub fn chunk_output_size(&self, index: usize) -> u64 {
495        self.chunk_decompressed_sizes.get(index).copied().unwrap_or_else(|| {
496            self.chunk_output_files
497                .get(index)
498                .map(|files| files.iter().map(|file| file.size).sum())
499                .unwrap_or(0)
500        })
501    }
502
503    /// Returns the total extracted plain-output size across all chunks.
504    pub fn total_output_size(&self) -> u64 {
505        if !self.chunk_decompressed_sizes.is_empty() {
506            self.chunk_decompressed_sizes.iter().sum()
507        } else {
508            self.chunk_output_files
509                .iter()
510                .map(|files| files.iter().map(|file| file.size).sum::<u64>())
511                .sum()
512        }
513    }
514}
515
516impl SingleArchive {
517    /// Returns the total extracted plain-output size for this archive.
518    pub fn output_size(&self) -> u64 {
519        if self.decompressed_size != 0 {
520            self.decompressed_size
521        } else {
522            self.output_files.iter().map(|file| file.size).sum()
523        }
524    }
525}
526
527/// Fetch a snapshot manifest from a URL.
528pub async fn fetch_manifest(manifest_url: &str) -> Result<SnapshotManifest> {
529    let client = Client::new();
530    let manifest: SnapshotManifest =
531        client.get(manifest_url).send().await?.error_for_status()?.json().await?;
532    Ok(manifest)
533}
534
535/// Package chunk archives from a source datadir and generate a manifest.
536pub fn generate_manifest(
537    source_datadir: &Path,
538    output_dir: &Path,
539    base_url: Option<&str>,
540    block: u64,
541    chain_id: u64,
542    blocks_per_file: u64,
543) -> Result<SnapshotManifest> {
544    std::fs::create_dir_all(output_dir)?;
545
546    let mut components = BTreeMap::new();
547
548    // Package chunked static-file components.
549    for ty in &[
550        SnapshotComponentType::Headers,
551        SnapshotComponentType::Transactions,
552        SnapshotComponentType::TransactionSenders,
553        SnapshotComponentType::Receipts,
554        SnapshotComponentType::AccountChangesets,
555        SnapshotComponentType::StorageChangesets,
556    ] {
557        let key = ty.key();
558        let num_chunks = block.div_ceil(blocks_per_file);
559        let mut planned_chunks = Vec::with_capacity(num_chunks as usize);
560        let mut found_any = false;
561
562        for i in 0..num_chunks {
563            let start = i * blocks_per_file;
564            let end = (i + 1) * blocks_per_file - 1;
565            let source_files = source_files_for_chunk(source_datadir, *ty, start, end)?;
566
567            if source_files.is_empty() {
568                if found_any {
569                    eyre::bail!("Missing source files for {} chunk {}-{}", key, start, end);
570                }
571                continue;
572            }
573
574            found_any = true;
575            planned_chunks.push(PlannedChunk {
576                chunk_idx: i,
577                archive_path: output_dir.join(chunk_filename(key, start, end)),
578                source_files,
579            });
580        }
581
582        if found_any {
583            let mut packaged_chunks = planned_chunks
584                .into_par_iter()
585                .map(|planned| -> Result<PackagedChunk> {
586                    let output_files =
587                        write_chunk_archive(&planned.archive_path, &planned.source_files)?;
588                    let size = std::fs::metadata(&planned.archive_path)?.len();
589                    Ok(PackagedChunk { chunk_idx: planned.chunk_idx, size, output_files })
590                })
591                .collect::<Vec<_>>()
592                .into_iter()
593                .collect::<Result<Vec<_>>>()?;
594
595            packaged_chunks.sort_unstable_by_key(|chunk| chunk.chunk_idx);
596            let chunk_sizes = packaged_chunks.iter().map(|chunk| chunk.size).collect::<Vec<_>>();
597            let chunk_output_files =
598                packaged_chunks.into_iter().map(|chunk| chunk.output_files).collect::<Vec<_>>();
599            let total_size: u64 = chunk_sizes.iter().sum();
600            info!(target: "reth::cli",
601                component = ty.display_name(),
602                chunks = chunk_sizes.len(),
603                total_blocks = block,
604                size = %super::DownloadProgress::format_size(total_size),
605                "Found chunked component"
606            );
607            components.insert(
608                key.to_string(),
609                ComponentManifest::Chunked(ChunkedArchive {
610                    blocks_per_file,
611                    total_blocks: block,
612                    chunk_sizes,
613                    chunk_decompressed_sizes: chunk_output_files
614                        .iter()
615                        .map(|files| files.iter().map(|file| file.size).sum())
616                        .collect(),
617                    chunk_output_files,
618                }),
619            );
620        }
621    }
622
623    let (state_size, state_output_files) = package_single_component(
624        output_dir,
625        "state.tar.zst",
626        &state_source_files(source_datadir)?,
627    )?;
628    components.insert(
629        SnapshotComponentType::State.key().to_string(),
630        ComponentManifest::Single(SingleArchive {
631            file: "state.tar.zst".to_string(),
632            size: state_size,
633            decompressed_size: state_output_files.iter().map(|file| file.size).sum(),
634            blake3: None,
635            output_files: state_output_files,
636        }),
637    );
638
639    let rocksdb_files = rocksdb_source_files(source_datadir)?;
640    if !rocksdb_files.is_empty() {
641        let (rocksdb_size, rocksdb_output_files) =
642            package_single_component(output_dir, "rocksdb_indices.tar.zst", &rocksdb_files)?;
643        components.insert(
644            SnapshotComponentType::RocksdbIndices.key().to_string(),
645            ComponentManifest::Single(SingleArchive {
646                file: "rocksdb_indices.tar.zst".to_string(),
647                size: rocksdb_size,
648                decompressed_size: rocksdb_output_files.iter().map(|file| file.size).sum(),
649                blake3: None,
650                output_files: rocksdb_output_files,
651            }),
652        );
653    }
654
655    let timestamp = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH)?.as_secs();
656
657    Ok(SnapshotManifest {
658        block,
659        chain_id,
660        storage_version: 2,
661        timestamp,
662        base_url: base_url.map(str::to_owned),
663        reth_version: Some(reth_node_core::version::version_metadata().short_version.to_string()),
664        components,
665    })
666}
667
668/// Resolves an archive file path from a component key and naming convention.
669pub fn chunk_filename(component_key: &str, start: u64, end: u64) -> String {
670    format!("{component_key}-{start}-{end}.tar.zst")
671}
672
673#[derive(Debug)]
674struct PlannedChunk {
675    chunk_idx: u64,
676    archive_path: PathBuf,
677    source_files: Vec<PathBuf>,
678}
679
680#[derive(Debug)]
681struct PackagedChunk {
682    chunk_idx: u64,
683    size: u64,
684    output_files: Vec<OutputFileChecksum>,
685}
686
687#[derive(Debug)]
688struct PlannedFile {
689    source_path: PathBuf,
690    relative_path: PathBuf,
691}
692
693fn source_files_for_chunk(
694    source_datadir: &Path,
695    component: SnapshotComponentType,
696    start: u64,
697    end: u64,
698) -> Result<Vec<PathBuf>> {
699    let Some(segment_name) = static_segment_name(component) else {
700        return Ok(Vec::new());
701    };
702
703    let static_files_dir = source_datadir.join("static_files");
704    let static_files_dir =
705        if static_files_dir.exists() { static_files_dir } else { source_datadir.to_path_buf() };
706    let prefix = format!("static_file_{segment_name}_{start}_{end}");
707
708    let mut files = Vec::new();
709    for entry in std::fs::read_dir(&static_files_dir)? {
710        let entry = entry?;
711        if !entry.file_type()?.is_file() {
712            continue;
713        }
714        if entry.file_name().to_string_lossy().starts_with(&prefix) {
715            files.push(entry.path());
716        }
717    }
718
719    files.sort_unstable();
720    Ok(files)
721}
722
723fn static_segment_name(component: SnapshotComponentType) -> Option<&'static str> {
724    match component {
725        SnapshotComponentType::Headers => Some("headers"),
726        SnapshotComponentType::Transactions => Some("transactions"),
727        SnapshotComponentType::TransactionSenders => Some("transaction-senders"),
728        SnapshotComponentType::Receipts => Some("receipts"),
729        SnapshotComponentType::AccountChangesets => Some("account-change-sets"),
730        SnapshotComponentType::StorageChangesets => Some("storage-change-sets"),
731        SnapshotComponentType::State | SnapshotComponentType::RocksdbIndices => None,
732    }
733}
734
735fn state_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
736    let db_dir = source_datadir.join("db");
737    if db_dir.exists() {
738        return collect_files_recursive(&db_dir, Path::new("db"));
739    }
740
741    if looks_like_db_dir(source_datadir)? {
742        return collect_files_recursive(source_datadir, Path::new("db"));
743    }
744
745    eyre::bail!("Could not find source state DB directory under {}", source_datadir.display())
746}
747
748fn rocksdb_source_files(source_datadir: &Path) -> Result<Vec<PlannedFile>> {
749    let rocksdb_dir = source_datadir.join("rocksdb");
750    if !rocksdb_dir.exists() {
751        return Ok(Vec::new());
752    }
753
754    collect_files_recursive(&rocksdb_dir, Path::new("rocksdb"))
755}
756
757fn looks_like_db_dir(path: &Path) -> Result<bool> {
758    let entries = match std::fs::read_dir(path) {
759        Ok(entries) => entries,
760        Err(_) => return Ok(false),
761    };
762
763    for entry in entries {
764        let entry = entry?;
765        if !entry.file_type()?.is_file() {
766            continue;
767        }
768        let name = entry.file_name();
769        let name = name.to_string_lossy();
770        if name == "mdbx.dat" || name == "lock.mdb" || name == "data.mdb" {
771            return Ok(true);
772        }
773    }
774
775    Ok(false)
776}
777
778fn collect_files_recursive(root: &Path, output_prefix: &Path) -> Result<Vec<PlannedFile>> {
779    let mut files = Vec::new();
780    collect_files_recursive_inner(root, root, output_prefix, &mut files)?;
781    files.sort_unstable_by(|a, b| a.relative_path.cmp(&b.relative_path));
782    Ok(files)
783}
784
785fn collect_files_recursive_inner(
786    root: &Path,
787    dir: &Path,
788    output_prefix: &Path,
789    files: &mut Vec<PlannedFile>,
790) -> Result<()> {
791    for entry in std::fs::read_dir(dir)? {
792        let entry = entry?;
793        let path = entry.path();
794        let file_type = entry.file_type()?;
795        if file_type.is_dir() {
796            collect_files_recursive_inner(root, &path, output_prefix, files)?;
797            continue;
798        }
799        if !file_type.is_file() {
800            continue;
801        }
802
803        let relative = path.strip_prefix(root)?.to_path_buf();
804        files.push(PlannedFile { source_path: path, relative_path: output_prefix.join(relative) });
805    }
806
807    Ok(())
808}
809
810fn package_single_component(
811    output_dir: &Path,
812    archive_file_name: &str,
813    files: &[PlannedFile],
814) -> Result<(u64, Vec<OutputFileChecksum>)> {
815    if files.is_empty() {
816        eyre::bail!("Cannot package empty single archive: {}", archive_file_name);
817    }
818
819    let archive_path = output_dir.join(archive_file_name);
820    let output_files = write_archive_from_planned_files(&archive_path, files)?;
821    let size = std::fs::metadata(&archive_path)?.len();
822    Ok((size, output_files))
823}
824
825fn write_chunk_archive(path: &Path, source_files: &[PathBuf]) -> Result<Vec<OutputFileChecksum>> {
826    let planned_files = source_files
827        .iter()
828        .map(|source_path| {
829            let file_name = source_path.file_name().ok_or_else(|| {
830                eyre::eyre!("Invalid source file path: {}", source_path.display())
831            })?;
832            Ok::<_, eyre::Error>(PlannedFile {
833                source_path: source_path.clone(),
834                relative_path: PathBuf::from("static_files").join(file_name),
835            })
836        })
837        .collect::<Result<Vec<_>>>()?;
838
839    write_archive_from_planned_files(path, &planned_files)
840}
841
842fn write_archive_from_planned_files(
843    path: &Path,
844    files: &[PlannedFile],
845) -> Result<Vec<OutputFileChecksum>> {
846    let file = std::fs::File::create(path)?;
847    let mut encoder = zstd::Encoder::new(file, 0)?;
848    // Emit standard zstd frames with checksums for compatibility with external
849    // tools such as `pzstd -d`.
850    encoder.include_checksum(true)?;
851    let mut builder = tar::Builder::new(encoder);
852
853    let mut output_files = Vec::with_capacity(files.len());
854    for planned in files {
855        let mut header = tar::Header::new_gnu();
856        header.set_size(std::fs::metadata(&planned.source_path)?.len());
857        header.set_mode(0o644);
858        header.set_cksum();
859
860        let source_file = std::fs::File::open(&planned.source_path)?;
861        let mut reader = HashingReader::new(source_file);
862        builder.append_data(&mut header, &planned.relative_path, &mut reader)?;
863
864        output_files.push(OutputFileChecksum {
865            path: planned.relative_path.to_string_lossy().to_string(),
866            size: reader.bytes_read,
867            blake3: reader.finalize(),
868        });
869    }
870
871    builder.finish()?;
872    let encoder = builder.into_inner()?;
873    encoder.finish()?;
874
875    Ok(output_files)
876}
877
878struct HashingReader<R> {
879    inner: R,
880    hasher: Hasher,
881    bytes_read: u64,
882}
883
884impl<R: Read> HashingReader<R> {
885    fn new(inner: R) -> Self {
886        Self { inner, hasher: Hasher::new(), bytes_read: 0 }
887    }
888
889    fn finalize(self) -> String {
890        self.hasher.finalize().to_hex().to_string()
891    }
892}
893
894impl<R: Read> Read for HashingReader<R> {
895    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
896        let n = self.inner.read(buf)?;
897        if n > 0 {
898            self.bytes_read += n as u64;
899            self.hasher.update(&buf[..n]);
900        }
901        Ok(n)
902    }
903}
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908    use tempfile::tempdir;
909
910    fn test_manifest() -> SnapshotManifest {
911        let mut components = BTreeMap::new();
912        components.insert(
913            "state".to_string(),
914            ComponentManifest::Single(SingleArchive {
915                file: "state.tar.zst".to_string(),
916                size: 100,
917                decompressed_size: 0,
918                blake3: None,
919                output_files: vec![],
920            }),
921        );
922        components.insert(
923            "transactions".to_string(),
924            ComponentManifest::Chunked(ChunkedArchive {
925                blocks_per_file: 500_000,
926                total_blocks: 1_500_000,
927                chunk_sizes: vec![80_000, 100_000, 120_000],
928                chunk_decompressed_sizes: vec![],
929                chunk_output_files: vec![vec![], vec![], vec![]],
930            }),
931        );
932        components.insert(
933            "headers".to_string(),
934            ComponentManifest::Chunked(ChunkedArchive {
935                blocks_per_file: 500_000,
936                total_blocks: 1_500_000,
937                chunk_sizes: vec![40_000, 50_000, 60_000],
938                chunk_decompressed_sizes: vec![],
939                chunk_output_files: vec![vec![], vec![], vec![]],
940            }),
941        );
942        SnapshotManifest {
943            block: 1_500_000,
944            chain_id: 1,
945            storage_version: 2,
946            timestamp: 0,
947            base_url: Some("https://example.com".to_string()),
948            reth_version: None,
949            components,
950        }
951    }
952
953    #[test]
954    fn archive_urls_for_distance_all() {
955        let m = test_manifest();
956        let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, None);
957        assert_eq!(urls.len(), 3);
958        assert_eq!(urls[0], "https://example.com/transactions-0-499999.tar.zst");
959        assert_eq!(urls[2], "https://example.com/transactions-1000000-1499999.tar.zst");
960    }
961
962    #[test]
963    fn archive_urls_for_distance_partial() {
964        let m = test_manifest();
965        // 600k blocks → needs 2 chunks (each 500k)
966        let urls = m.archive_urls_for_distance(SnapshotComponentType::Transactions, Some(600_000));
967        assert_eq!(urls.len(), 2);
968        assert_eq!(urls[0], "https://example.com/transactions-500000-999999.tar.zst");
969        assert_eq!(urls[1], "https://example.com/transactions-1000000-1499999.tar.zst");
970    }
971
972    #[test]
973    fn archive_urls_for_distance_single_component() {
974        let m = test_manifest();
975        // Single archives always return one URL regardless of distance
976        let urls = m.archive_urls_for_distance(SnapshotComponentType::State, Some(100));
977        assert_eq!(urls.len(), 1);
978        assert_eq!(urls[0], "https://example.com/state.tar.zst");
979    }
980
981    #[test]
982    fn archive_urls_for_distance_rocksdb_indices_single_component() {
983        let mut components = BTreeMap::new();
984        components.insert(
985            "rocksdb_indices".to_string(),
986            ComponentManifest::Single(SingleArchive {
987                file: "rocksdb_indices.tar.zst".to_string(),
988                size: 777,
989                decompressed_size: 0,
990                blake3: None,
991                output_files: vec![],
992            }),
993        );
994        let m = SnapshotManifest {
995            block: 1,
996            chain_id: 1,
997            storage_version: 2,
998            timestamp: 0,
999            base_url: Some("https://example.com".to_string()),
1000            reth_version: None,
1001            components,
1002        };
1003
1004        let urls = m.archive_urls_for_distance(SnapshotComponentType::RocksdbIndices, Some(10));
1005        assert_eq!(urls.len(), 1);
1006        assert_eq!(urls[0], "https://example.com/rocksdb_indices.tar.zst");
1007        assert_eq!(m.size_for_distance(SnapshotComponentType::RocksdbIndices, Some(10)), 777);
1008    }
1009
1010    #[test]
1011    fn archive_urls_for_distance_missing_component() {
1012        let m = test_manifest();
1013        let urls = m.archive_urls_for_distance(SnapshotComponentType::Receipts, None);
1014        assert!(urls.is_empty());
1015    }
1016
1017    #[test]
1018    fn chunks_for_distance_all() {
1019        let m = test_manifest();
1020        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, None), 3);
1021    }
1022
1023    #[test]
1024    fn chunks_for_distance_partial() {
1025        let m = test_manifest();
1026        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(600_000)), 2);
1027        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Transactions, Some(100_000)), 1);
1028    }
1029
1030    #[test]
1031    fn chunks_for_distance_single() {
1032        let m = test_manifest();
1033        assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, None), 1);
1034        assert_eq!(m.chunks_for_distance(SnapshotComponentType::State, Some(100)), 1);
1035    }
1036
1037    #[test]
1038    fn chunks_for_distance_missing() {
1039        let m = test_manifest();
1040        assert_eq!(m.chunks_for_distance(SnapshotComponentType::Receipts, None), 0);
1041    }
1042
1043    #[test]
1044    fn component_selection_display() {
1045        assert_eq!(ComponentSelection::All.to_string(), "All");
1046        assert_eq!(ComponentSelection::Distance(10_064).to_string(), "Last 10064 blocks");
1047        assert_eq!(ComponentSelection::Since(15_537_394).to_string(), "Since block 15537394");
1048        assert_eq!(ComponentSelection::None.to_string(), "None");
1049    }
1050
1051    #[test]
1052    fn archive_urls_aligned_to_blocks_per_file() {
1053        // When total_blocks is not aligned to blocks_per_file, chunk boundaries
1054        // must still align to blocks_per_file (not total_blocks).
1055        let mut components = BTreeMap::new();
1056        components.insert(
1057            "storage_changesets".to_string(),
1058            ComponentManifest::Chunked(ChunkedArchive {
1059                blocks_per_file: 500_000,
1060                total_blocks: 24_396_822,
1061                chunk_sizes: vec![100; 49], // 49 chunks
1062                chunk_decompressed_sizes: vec![],
1063                chunk_output_files: vec![vec![]; 49],
1064            }),
1065        );
1066        let m = SnapshotManifest {
1067            block: 24_396_822,
1068            chain_id: 1,
1069            storage_version: 2,
1070            timestamp: 0,
1071            base_url: Some("https://example.com".to_string()),
1072            reth_version: None,
1073            components,
1074        };
1075        let urls = m.archive_urls(SnapshotComponentType::StorageChangesets);
1076        assert_eq!(urls.len(), 49);
1077        // First chunk: 0-499999 (not 0-396821 or similar)
1078        assert_eq!(urls[0], "https://example.com/storage_changesets-0-499999.tar.zst");
1079        // Last chunk: 24000000-24499999 (not 24000000-24396821)
1080        assert_eq!(urls[48], "https://example.com/storage_changesets-24000000-24499999.tar.zst");
1081    }
1082
1083    #[test]
1084    fn size_for_distance_sums_tail_chunks() {
1085        let m = test_manifest();
1086        // Transactions has chunk_sizes [80_000, 100_000, 120_000]
1087        // All: sum of all 3
1088        assert_eq!(m.size_for_distance(SnapshotComponentType::Transactions, None), 300_000);
1089        // Last 500K blocks = 1 chunk = last chunk only
1090        assert_eq!(
1091            m.size_for_distance(SnapshotComponentType::Transactions, Some(500_000)),
1092            120_000
1093        );
1094        // Last 600K blocks = 2 chunks = last two
1095        assert_eq!(
1096            m.size_for_distance(SnapshotComponentType::Transactions, Some(600_000)),
1097            220_000
1098        );
1099        // Single archive (state) always returns full size
1100        assert_eq!(m.size_for_distance(SnapshotComponentType::State, Some(100)), 100);
1101        // Missing component
1102        assert_eq!(m.size_for_distance(SnapshotComponentType::Receipts, None), 0);
1103    }
1104
1105    #[test]
1106    fn output_size_for_distance_uses_manifest_or_output_files() {
1107        let m = test_manifest();
1108        assert_eq!(m.output_size_for_distance(SnapshotComponentType::Transactions, None), 0);
1109
1110        let mut components = BTreeMap::new();
1111        components.insert(
1112            "state".to_string(),
1113            ComponentManifest::Single(SingleArchive {
1114                file: "state.tar.zst".to_string(),
1115                size: 100,
1116                decompressed_size: 1_000,
1117                blake3: None,
1118                output_files: vec![OutputFileChecksum {
1119                    path: "db/mdbx.dat".to_string(),
1120                    size: 1_000,
1121                    blake3: "h0".to_string(),
1122                }],
1123            }),
1124        );
1125        components.insert(
1126            "transactions".to_string(),
1127            ComponentManifest::Chunked(ChunkedArchive {
1128                blocks_per_file: 500_000,
1129                total_blocks: 1_000_000,
1130                chunk_sizes: vec![80_000, 120_000],
1131                chunk_decompressed_sizes: vec![111, 222],
1132                chunk_output_files: vec![
1133                    vec![OutputFileChecksum {
1134                        path: "static_files/static_file_transactions_0_499999.bin".to_string(),
1135                        size: 111,
1136                        blake3: "h0".to_string(),
1137                    }],
1138                    vec![OutputFileChecksum {
1139                        path: "static_files/static_file_transactions_500000_999999.bin".to_string(),
1140                        size: 222,
1141                        blake3: "h1".to_string(),
1142                    }],
1143                ],
1144            }),
1145        );
1146        let manifest = SnapshotManifest {
1147            block: 1_000_000,
1148            chain_id: 1,
1149            storage_version: 2,
1150            timestamp: 0,
1151            base_url: Some("https://example.com".to_string()),
1152            reth_version: None,
1153            components,
1154        };
1155
1156        assert_eq!(manifest.output_size_for_distance(SnapshotComponentType::State, None), 1_000);
1157        assert_eq!(
1158            manifest.output_size_for_distance(SnapshotComponentType::Transactions, None),
1159            333
1160        );
1161        assert_eq!(
1162            manifest.output_size_for_distance(SnapshotComponentType::Transactions, Some(500_000)),
1163            222
1164        );
1165    }
1166
1167    #[test]
1168    fn archive_descriptors_include_checksum_metadata() {
1169        let mut components = BTreeMap::new();
1170        components.insert(
1171            "state".to_string(),
1172            ComponentManifest::Single(SingleArchive {
1173                file: "state.tar.zst".to_string(),
1174                size: 100,
1175                decompressed_size: 1_000,
1176                blake3: Some("abc123".to_string()),
1177                output_files: vec![OutputFileChecksum {
1178                    path: "db/mdbx.dat".to_string(),
1179                    size: 1000,
1180                    blake3: "s0".to_string(),
1181                }],
1182            }),
1183        );
1184        components.insert(
1185            "transactions".to_string(),
1186            ComponentManifest::Chunked(ChunkedArchive {
1187                blocks_per_file: 500_000,
1188                total_blocks: 1_000_000,
1189                chunk_sizes: vec![80_000, 120_000],
1190                chunk_decompressed_sizes: vec![111, 222],
1191                chunk_output_files: vec![
1192                    vec![OutputFileChecksum {
1193                        path: "static_files/static_file_transactions_0_499999.bin".to_string(),
1194                        size: 111,
1195                        blake3: "h0".to_string(),
1196                    }],
1197                    vec![OutputFileChecksum {
1198                        path: "static_files/static_file_transactions_500000_999999.bin".to_string(),
1199                        size: 222,
1200                        blake3: "h1".to_string(),
1201                    }],
1202                ],
1203            }),
1204        );
1205
1206        let m = SnapshotManifest {
1207            block: 1_000_000,
1208            chain_id: 1,
1209            storage_version: 2,
1210            timestamp: 0,
1211            base_url: Some("https://example.com".to_string()),
1212            reth_version: None,
1213            components,
1214        };
1215
1216        let state = m.snapshot_archives_for_distance(SnapshotComponentType::State, None);
1217        assert_eq!(state.len(), 1);
1218        assert_eq!(state[0].file_name, "state.tar.zst");
1219        assert_eq!(state[0].blake3.as_deref(), Some("abc123"));
1220        assert_eq!(state[0].output_files.len(), 1);
1221
1222        let tx = m.snapshot_archives_for_distance(SnapshotComponentType::Transactions, None);
1223        assert_eq!(tx.len(), 2);
1224        assert_eq!(tx[0].blake3, None);
1225        assert_eq!(tx[1].blake3, None);
1226        assert_eq!(tx[0].output_files[0].size, 111);
1227    }
1228
1229    #[test]
1230    fn generate_manifest_includes_state_single_archive() {
1231        let source = tempdir().unwrap();
1232        let output = tempdir().unwrap();
1233        let db_dir = source.path().join("db");
1234        std::fs::create_dir_all(&db_dir).unwrap();
1235        std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1236
1237        let manifest =
1238            generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1239
1240        let state = manifest.component(SnapshotComponentType::State).unwrap();
1241        let ComponentManifest::Single(state) = state else {
1242            panic!("state should be a single archive")
1243        };
1244        assert_eq!(state.file, "state.tar.zst");
1245        assert!(state.decompressed_size > 0);
1246        assert!(!state.output_files.is_empty());
1247        assert_eq!(state.output_files[0].path, "db/mdbx.dat");
1248        assert!(output.path().join("state.tar.zst").exists());
1249    }
1250
1251    #[test]
1252    fn generate_manifest_includes_rocksdb_single_archive_when_present() {
1253        let source = tempdir().unwrap();
1254        let output = tempdir().unwrap();
1255        let db_dir = source.path().join("db");
1256        std::fs::create_dir_all(&db_dir).unwrap();
1257        std::fs::write(db_dir.join("mdbx.dat"), b"state-data").unwrap();
1258        let rocksdb_dir = source.path().join("rocksdb");
1259        std::fs::create_dir_all(&rocksdb_dir).unwrap();
1260        std::fs::write(rocksdb_dir.join("CURRENT"), b"MANIFEST-000001").unwrap();
1261
1262        let manifest =
1263            generate_manifest(source.path(), output.path(), None, 0, 1, 500_000).unwrap();
1264
1265        let rocksdb = manifest.component(SnapshotComponentType::RocksdbIndices).unwrap();
1266        let ComponentManifest::Single(rocksdb) = rocksdb else {
1267            panic!("rocksdb indices should be a single archive")
1268        };
1269        assert_eq!(rocksdb.file, "rocksdb_indices.tar.zst");
1270        assert!(rocksdb.decompressed_size > 0);
1271        assert!(!rocksdb.output_files.is_empty());
1272        assert_eq!(rocksdb.output_files[0].path, "rocksdb/CURRENT");
1273        assert!(output.path().join("rocksdb_indices.tar.zst").exists());
1274    }
1275}