reth_cli_commands/
download.rs

1use crate::common::EnvironmentArgs;
2use clap::Parser;
3use eyre::Result;
4use lz4::Decoder;
5use reqwest::Client;
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_fs_util as fs;
9use std::{
10    io::{self, Read, Write},
11    path::Path,
12    sync::Arc,
13    time::{Duration, Instant},
14};
15use tar::Archive;
16use tokio::task;
17use tracing::info;
18use zstd::stream::read::Decoder as ZstdDecoder;
19
20const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
21const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
22const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
23const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
24
25#[derive(Debug, Parser)]
26pub struct DownloadCommand<C: ChainSpecParser> {
27    #[command(flatten)]
28    env: EnvironmentArgs<C>,
29
30    #[arg(
31        long,
32        short,
33        help = "Custom URL to download the snapshot from",
34        long_help = "Specify a snapshot URL or let the command propose a default one.\n\
35        \n\
36        Available snapshot sources:\n\
37        - https://www.merkle.io/snapshots (default, mainnet archive)\n\
38        - https://publicnode.com/snapshots (full nodes & testnets)\n\
39        \n\
40        If no URL is provided, the latest mainnet archive snapshot\n\
41        will be proposed for download from merkle.io"
42    )]
43    url: Option<String>,
44}
45
46impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
47    pub async fn execute<N>(self) -> Result<()> {
48        let data_dir = self.env.datadir.resolve_datadir(self.env.chain.chain());
49        fs::create_dir_all(&data_dir)?;
50
51        let url = match self.url {
52            Some(url) => url,
53            None => {
54                let url = get_latest_snapshot_url().await?;
55                info!(target: "reth::cli", "Using default snapshot URL: {}", url);
56                url
57            }
58        };
59
60        info!(target: "reth::cli",
61            chain = %self.env.chain.chain(),
62            dir = ?data_dir.data_dir(),
63            url = %url,
64            "Starting snapshot download and extraction"
65        );
66
67        stream_and_extract(&url, data_dir.data_dir()).await?;
68        info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
69
70        Ok(())
71    }
72}
73
74impl<C: ChainSpecParser> DownloadCommand<C> {
75    /// Returns the underlying chain being used to run this command
76    pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
77        Some(&self.env.chain)
78    }
79}
80
81// Monitor process status and display progress every 100ms
82// to avoid overwhelming stdout
83struct DownloadProgress {
84    downloaded: u64,
85    total_size: u64,
86    last_displayed: Instant,
87}
88
89impl DownloadProgress {
90    /// Creates new progress tracker with given total size
91    fn new(total_size: u64) -> Self {
92        Self { downloaded: 0, total_size, last_displayed: Instant::now() }
93    }
94
95    /// Converts bytes to human readable format (B, KB, MB, GB)
96    fn format_size(size: u64) -> String {
97        let mut size = size as f64;
98        let mut unit_index = 0;
99
100        while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
101            size /= 1024.0;
102            unit_index += 1;
103        }
104
105        format!("{:.2} {}", size, BYTE_UNITS[unit_index])
106    }
107
108    /// Updates progress bar
109    fn update(&mut self, chunk_size: u64) -> Result<()> {
110        self.downloaded += chunk_size;
111
112        // Only update display at most 10 times per second for efficiency
113        if self.last_displayed.elapsed() >= Duration::from_millis(100) {
114            let formatted_downloaded = Self::format_size(self.downloaded);
115            let formatted_total = Self::format_size(self.total_size);
116            let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
117
118            print!(
119                "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total})",
120            );
121            io::stdout().flush()?;
122            self.last_displayed = Instant::now();
123        }
124
125        Ok(())
126    }
127}
128
129/// Adapter to track progress while reading
130struct ProgressReader<R> {
131    reader: R,
132    progress: DownloadProgress,
133}
134
135impl<R: Read> ProgressReader<R> {
136    fn new(reader: R, total_size: u64) -> Self {
137        Self { reader, progress: DownloadProgress::new(total_size) }
138    }
139}
140
141impl<R: Read> Read for ProgressReader<R> {
142    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143        let bytes = self.reader.read(buf)?;
144        if bytes > 0 &&
145            let Err(e) = self.progress.update(bytes as u64)
146        {
147            return Err(io::Error::other(e));
148        }
149        Ok(bytes)
150    }
151}
152
153/// Supported compression formats for snapshots
154#[derive(Debug, Clone, Copy)]
155enum CompressionFormat {
156    Lz4,
157    Zstd,
158}
159
160impl CompressionFormat {
161    /// Detect compression format from file extension
162    fn from_url(url: &str) -> Result<Self> {
163        if url.ends_with(EXTENSION_TAR_LZ4) {
164            Ok(Self::Lz4)
165        } else if url.ends_with(EXTENSION_TAR_ZSTD) {
166            Ok(Self::Zstd)
167        } else {
168            Err(eyre::eyre!("Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}", url))
169        }
170    }
171}
172
173/// Downloads and extracts a snapshot, blocking until finished.
174fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
175    let client = reqwest::blocking::Client::builder().build()?;
176    let response = client.get(url).send()?.error_for_status()?;
177
178    let total_size = response.content_length().ok_or_else(|| {
179        eyre::eyre!(
180            "Server did not provide Content-Length header. This is required for snapshot downloads"
181        )
182    })?;
183
184    let progress_reader = ProgressReader::new(response, total_size);
185    let format = CompressionFormat::from_url(url)?;
186
187    match format {
188        CompressionFormat::Lz4 => {
189            let decoder = Decoder::new(progress_reader)?;
190            Archive::new(decoder).unpack(target_dir)?;
191        }
192        CompressionFormat::Zstd => {
193            let decoder = ZstdDecoder::new(progress_reader)?;
194            Archive::new(decoder).unpack(target_dir)?;
195        }
196    }
197
198    info!(target: "reth::cli", "Extraction complete.");
199    Ok(())
200}
201
202async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
203    let target_dir = target_dir.to_path_buf();
204    let url = url.to_string();
205    task::spawn_blocking(move || blocking_download_and_extract(&url, &target_dir)).await??;
206
207    Ok(())
208}
209
210// Builds default URL for latest mainnet archive  snapshot
211async fn get_latest_snapshot_url() -> Result<String> {
212    let latest_url = format!("{MERKLE_BASE_URL}/latest.txt");
213    let filename = Client::new()
214        .get(latest_url)
215        .send()
216        .await?
217        .error_for_status()?
218        .text()
219        .await?
220        .trim()
221        .to_string();
222
223    Ok(format!("{MERKLE_BASE_URL}/{filename}"))
224}