reth_cli_commands/
download.rs1use crate::common::EnvironmentArgs;
2use clap::Parser;
3use eyre::Result;
4use lz4::Decoder;
5use reqwest::Client;
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_fs_util as fs;
9use std::{
10 io::{self, Read, Write},
11 path::Path,
12 sync::Arc,
13 time::{Duration, Instant},
14};
15use tar::Archive;
16use tokio::task;
17use tracing::info;
18use zstd::stream::read::Decoder as ZstdDecoder;
19
20const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
21const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
22const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
23const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
24
25#[derive(Debug, Parser)]
26pub struct DownloadCommand<C: ChainSpecParser> {
27 #[command(flatten)]
28 env: EnvironmentArgs<C>,
29
30 #[arg(
31 long,
32 short,
33 help = "Custom URL to download the snapshot from",
34 long_help = "Specify a snapshot URL or let the command propose a default one.\n\
35 \n\
36 Available snapshot sources:\n\
37 - https://www.merkle.io/snapshots (default, mainnet archive)\n\
38 - https://publicnode.com/snapshots (full nodes & testnets)\n\
39 \n\
40 If no URL is provided, the latest mainnet archive snapshot\n\
41 will be proposed for download from merkle.io"
42 )]
43 url: Option<String>,
44}
45
46impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
47 pub async fn execute<N>(self) -> Result<()> {
48 let data_dir = self.env.datadir.resolve_datadir(self.env.chain.chain());
49 fs::create_dir_all(&data_dir)?;
50
51 let url = match self.url {
52 Some(url) => url,
53 None => {
54 let url = get_latest_snapshot_url().await?;
55 info!(target: "reth::cli", "Using default snapshot URL: {}", url);
56 url
57 }
58 };
59
60 info!(target: "reth::cli",
61 chain = %self.env.chain.chain(),
62 dir = ?data_dir.data_dir(),
63 url = %url,
64 "Starting snapshot download and extraction"
65 );
66
67 stream_and_extract(&url, data_dir.data_dir()).await?;
68 info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
69
70 Ok(())
71 }
72}
73
74impl<C: ChainSpecParser> DownloadCommand<C> {
75 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
77 Some(&self.env.chain)
78 }
79}
80
81struct DownloadProgress {
84 downloaded: u64,
85 total_size: u64,
86 last_displayed: Instant,
87}
88
89impl DownloadProgress {
90 fn new(total_size: u64) -> Self {
92 Self { downloaded: 0, total_size, last_displayed: Instant::now() }
93 }
94
95 fn format_size(size: u64) -> String {
97 let mut size = size as f64;
98 let mut unit_index = 0;
99
100 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
101 size /= 1024.0;
102 unit_index += 1;
103 }
104
105 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
106 }
107
108 fn update(&mut self, chunk_size: u64) -> Result<()> {
110 self.downloaded += chunk_size;
111
112 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
114 let formatted_downloaded = Self::format_size(self.downloaded);
115 let formatted_total = Self::format_size(self.total_size);
116 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
117
118 print!(
119 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total})",
120 );
121 io::stdout().flush()?;
122 self.last_displayed = Instant::now();
123 }
124
125 Ok(())
126 }
127}
128
129struct ProgressReader<R> {
131 reader: R,
132 progress: DownloadProgress,
133}
134
135impl<R: Read> ProgressReader<R> {
136 fn new(reader: R, total_size: u64) -> Self {
137 Self { reader, progress: DownloadProgress::new(total_size) }
138 }
139}
140
141impl<R: Read> Read for ProgressReader<R> {
142 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
143 let bytes = self.reader.read(buf)?;
144 if bytes > 0 &&
145 let Err(e) = self.progress.update(bytes as u64)
146 {
147 return Err(io::Error::other(e));
148 }
149 Ok(bytes)
150 }
151}
152
153#[derive(Debug, Clone, Copy)]
155enum CompressionFormat {
156 Lz4,
157 Zstd,
158}
159
160impl CompressionFormat {
161 fn from_url(url: &str) -> Result<Self> {
163 if url.ends_with(EXTENSION_TAR_LZ4) {
164 Ok(Self::Lz4)
165 } else if url.ends_with(EXTENSION_TAR_ZSTD) {
166 Ok(Self::Zstd)
167 } else {
168 Err(eyre::eyre!("Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}", url))
169 }
170 }
171}
172
173fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
175 let client = reqwest::blocking::Client::builder().build()?;
176 let response = client.get(url).send()?.error_for_status()?;
177
178 let total_size = response.content_length().ok_or_else(|| {
179 eyre::eyre!(
180 "Server did not provide Content-Length header. This is required for snapshot downloads"
181 )
182 })?;
183
184 let progress_reader = ProgressReader::new(response, total_size);
185 let format = CompressionFormat::from_url(url)?;
186
187 match format {
188 CompressionFormat::Lz4 => {
189 let decoder = Decoder::new(progress_reader)?;
190 Archive::new(decoder).unpack(target_dir)?;
191 }
192 CompressionFormat::Zstd => {
193 let decoder = ZstdDecoder::new(progress_reader)?;
194 Archive::new(decoder).unpack(target_dir)?;
195 }
196 }
197
198 info!(target: "reth::cli", "Extraction complete.");
199 Ok(())
200}
201
202async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
203 let target_dir = target_dir.to_path_buf();
204 let url = url.to_string();
205 task::spawn_blocking(move || blocking_download_and_extract(&url, &target_dir)).await??;
206
207 Ok(())
208}
209
210async fn get_latest_snapshot_url() -> Result<String> {
212 let latest_url = format!("{MERKLE_BASE_URL}/latest.txt");
213 let filename = Client::new()
214 .get(latest_url)
215 .send()
216 .await?
217 .error_for_status()?
218 .text()
219 .await?
220 .trim()
221 .to_string();
222
223 Ok(format!("{MERKLE_BASE_URL}/{filename}"))
224}