Skip to main content

reth_cli_commands/download/
manifest_cmd.rs

1use crate::download::manifest::generate_manifest;
2use clap::Parser;
3use eyre::{Result, WrapErr};
4use reth_db::{mdbx::DatabaseArguments, open_db_read_only, tables, Database};
5use reth_db_api::transaction::DbTx;
6use reth_primitives_traits::FastInstant as Instant;
7use reth_stages_types::StageId;
8use reth_static_file_types::DEFAULT_BLOCKS_PER_STATIC_FILE;
9use std::path::PathBuf;
10use tracing::{info, warn};
11
12/// Generate modular chunk archives and a snapshot manifest from a source datadir.
13///
14/// Archive naming convention:
15///   - Chunked: `{component}-{start}-{end}.tar.zst` (e.g. `transactions-0-499999.tar.zst`)
16#[derive(Debug, Parser)]
17pub struct SnapshotManifestCommand {
18    /// Source datadir containing static files.
19    #[arg(long, short = 'd')]
20    source_datadir: PathBuf,
21
22    /// Optional base URL where archives will be hosted.
23    #[arg(long)]
24    base_url: Option<String>,
25
26    /// Output directory where chunk archives and manifest.json are written.
27    #[arg(long, short = 'o')]
28    output_dir: PathBuf,
29
30    /// Block number this snapshot was taken at.
31    ///
32    /// If omitted, this is inferred from the source datadir's `Finish` stage checkpoint.
33    #[arg(long)]
34    block: Option<u64>,
35
36    /// Chain ID.
37    #[arg(long, default_value = "1")]
38    chain_id: u64,
39
40    /// Blocks per archive file for chunked components.
41    ///
42    /// If omitted, this is inferred from header static file ranges in the source datadir.
43    #[arg(long)]
44    blocks_per_file: Option<u64>,
45}
46
47impl SnapshotManifestCommand {
48    /// Packages snapshot archives and writes the manifest file.
49    pub fn execute(self) -> Result<()> {
50        let block = match self.block {
51            Some(block) => block,
52            None => infer_snapshot_block(&self.source_datadir)?,
53        };
54        let blocks_per_file = match self.blocks_per_file {
55            Some(blocks_per_file) => blocks_per_file,
56            None => infer_blocks_per_file(&self.source_datadir)?,
57        };
58
59        info!(target: "reth::cli",
60            dir = ?self.source_datadir,
61            output = ?self.output_dir,
62            block,
63            blocks_per_file,
64            "Packaging modular snapshot archives"
65        );
66        let start = Instant::now();
67        let manifest = generate_manifest(
68            &self.source_datadir,
69            &self.output_dir,
70            self.base_url.as_deref(),
71            block,
72            self.chain_id,
73            blocks_per_file,
74        )?;
75
76        let num_components = manifest.components.len();
77        let json = serde_json::to_string_pretty(&manifest)?;
78        let output = self.output_dir.join("manifest.json");
79        reth_fs_util::write(&output, &json)?;
80        info!(target: "reth::cli",
81            path = ?output,
82            components = num_components,
83            block = manifest.block,
84            elapsed = ?start.elapsed(),
85            "Manifest written"
86        );
87
88        Ok(())
89    }
90}
91
92/// Infers the snapshot block from the source datadir.
93fn infer_snapshot_block(source_datadir: &std::path::Path) -> Result<u64> {
94    if let Ok(block) = infer_snapshot_block_from_db(source_datadir) {
95        return Ok(block);
96    }
97
98    let block = infer_snapshot_block_from_headers(source_datadir)?;
99    warn!(
100        target: "reth::cli",
101        block,
102        "Could not read Finish stage checkpoint from source DB, using header static-file tip"
103    );
104    Ok(block)
105}
106
107/// Reads the snapshot block from the source database Finish stage checkpoint.
108fn infer_snapshot_block_from_db(source_datadir: &std::path::Path) -> Result<u64> {
109    let candidates = [source_datadir.join("db"), source_datadir.to_path_buf()];
110
111    for db_path in candidates {
112        if !db_path.exists() {
113            continue;
114        }
115
116        let db = match open_db_read_only(&db_path, DatabaseArguments::default()) {
117            Ok(db) => db,
118            Err(_) => continue,
119        };
120
121        let tx = db.tx()?;
122        if let Some(checkpoint) = tx.get::<tables::StageCheckpoints>(StageId::Finish.to_string())? {
123            return Ok(checkpoint.block_number);
124        }
125    }
126
127    eyre::bail!(
128        "Could not infer --block from source DB (Finish checkpoint missing); pass --block manually"
129    )
130}
131
132/// Infers the snapshot block from the highest header static-file range.
133fn infer_snapshot_block_from_headers(source_datadir: &std::path::Path) -> Result<u64> {
134    let max_end = header_ranges(source_datadir)?
135        .into_iter()
136        .map(|(_, end)| end)
137        .max()
138        .ok_or_else(|| eyre::eyre!("No header static files found to infer --block"))?;
139    Ok(max_end)
140}
141
142/// Infers the static-file block span from header file ranges.
143fn infer_blocks_per_file(source_datadir: &std::path::Path) -> Result<u64> {
144    let mut inferred = None;
145    for (start, end) in header_ranges(source_datadir)? {
146        let span = end.saturating_sub(start).saturating_add(1);
147        if span == 0 {
148            continue;
149        }
150
151        if let Some(existing) = inferred {
152            if existing != span {
153                eyre::bail!(
154                    "Inconsistent header static file ranges; pass --blocks-per-file manually"
155                );
156            }
157        } else {
158            inferred = Some(span);
159        }
160    }
161
162    inferred.ok_or_else(|| {
163        eyre::eyre!(
164            "Could not infer --blocks-per-file from header static files; pass it manually (default is {DEFAULT_BLOCKS_PER_STATIC_FILE})"
165        )
166    })
167}
168
169/// Collects header static-file ranges from the source datadir.
170fn header_ranges(source_datadir: &std::path::Path) -> Result<Vec<(u64, u64)>> {
171    let static_files_dir = source_datadir.join("static_files");
172    let static_files_dir =
173        if static_files_dir.exists() { static_files_dir } else { source_datadir.to_path_buf() };
174
175    let entries = std::fs::read_dir(&static_files_dir).wrap_err_with(|| {
176        format!("Failed to read static files directory: {}", static_files_dir.display())
177    })?;
178
179    let mut ranges = Vec::new();
180    for entry in entries {
181        let entry = entry?;
182        let file_name = entry.file_name();
183        let file_name = file_name.to_string_lossy();
184        if let Some(range) = parse_headers_range(&file_name) {
185            ranges.push(range);
186        }
187    }
188
189    Ok(ranges)
190}
191
192/// Parses the block range from a header static-file name.
193fn parse_headers_range(file_name: &str) -> Option<(u64, u64)> {
194    let remainder = file_name.strip_prefix("static_file_headers_")?;
195    let (start, end_with_suffix) = remainder.split_once('_')?;
196
197    let start = start.parse::<u64>().ok()?;
198    let end_digits: String = end_with_suffix.chars().take_while(|ch| ch.is_ascii_digit()).collect();
199    let end = end_digits.parse::<u64>().ok()?;
200
201    Some((start, end))
202}
203
204#[cfg(test)]
205mod tests {
206    use super::*;
207    use tempfile::tempdir;
208
209    #[test]
210    fn parse_headers_range_works_with_suffixes() {
211        assert_eq!(parse_headers_range("static_file_headers_0_499999"), Some((0, 499_999)));
212        assert_eq!(
213            parse_headers_range("static_file_headers_500000_999999.jar"),
214            Some((500_000, 999_999))
215        );
216        assert_eq!(parse_headers_range("static_file_transactions_0_499999"), None);
217    }
218
219    #[test]
220    fn infer_blocks_per_file_from_header_ranges() {
221        let dir = tempdir().unwrap();
222        let sf = dir.path().join("static_files");
223        std::fs::create_dir_all(&sf).unwrap();
224        std::fs::write(sf.join("static_file_headers_0_499999"), []).unwrap();
225        std::fs::write(sf.join("static_file_headers_500000_999999.jar"), []).unwrap();
226
227        assert_eq!(infer_blocks_per_file(dir.path()).unwrap(), 500_000);
228    }
229
230    #[test]
231    fn infer_snapshot_block_from_headers_uses_max_end() {
232        let dir = tempdir().unwrap();
233        let sf = dir.path().join("static_files");
234        std::fs::create_dir_all(&sf).unwrap();
235        std::fs::write(sf.join("static_file_headers_0_499999"), []).unwrap();
236        std::fs::write(sf.join("static_file_headers_500000_999999"), []).unwrap();
237
238        assert_eq!(infer_snapshot_block_from_headers(dir.path()).unwrap(), 999_999);
239    }
240}