reth_cli_commands/
download.rs1use crate::common::EnvironmentArgs;
2use clap::Parser;
3use eyre::Result;
4use lz4::Decoder;
5use reqwest::Client;
6use reth_chainspec::{EthChainSpec, EthereumHardforks};
7use reth_cli::chainspec::ChainSpecParser;
8use reth_fs_util as fs;
9use std::{
10 borrow::Cow,
11 io::{self, Read, Write},
12 path::Path,
13 sync::{Arc, OnceLock},
14 time::{Duration, Instant},
15};
16use tar::Archive;
17use tokio::task;
18use tracing::info;
19use zstd::stream::read::Decoder as ZstdDecoder;
20
21const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
22const MERKLE_BASE_URL: &str = "https://downloads.merkle.io";
23const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
24const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
25
26static DOWNLOAD_DEFAULTS: OnceLock<DownloadDefaults> = OnceLock::new();
28
29#[derive(Debug, Clone)]
33pub struct DownloadDefaults {
34 pub available_snapshots: Vec<Cow<'static, str>>,
36 pub default_base_url: Cow<'static, str>,
38 pub long_help: Option<String>,
40}
41
42impl DownloadDefaults {
43 pub fn try_init(self) -> Result<(), Self> {
45 DOWNLOAD_DEFAULTS.set(self)
46 }
47
48 pub fn get_global() -> &'static DownloadDefaults {
50 DOWNLOAD_DEFAULTS.get_or_init(DownloadDefaults::default_download_defaults)
51 }
52
53 pub fn default_download_defaults() -> Self {
55 Self {
56 available_snapshots: vec![
57 Cow::Borrowed("https://www.merkle.io/snapshots (default, mainnet archive)"),
58 Cow::Borrowed("https://publicnode.com/snapshots (full nodes & testnets)"),
59 ],
60 default_base_url: Cow::Borrowed(MERKLE_BASE_URL),
61 long_help: None,
62 }
63 }
64
65 pub fn long_help(&self) -> String {
70 if let Some(ref custom_help) = self.long_help {
71 return custom_help.clone();
72 }
73
74 let mut help = String::from(
75 "Specify a snapshot URL or let the command propose a default one.\n\nAvailable snapshot sources:\n",
76 );
77
78 for source in &self.available_snapshots {
79 help.push_str("- ");
80 help.push_str(source);
81 help.push('\n');
82 }
83
84 help.push_str(
85 "\nIf no URL is provided, the latest mainnet archive snapshot\nwill be proposed for download from ",
86 );
87 help.push_str(self.default_base_url.as_ref());
88 help
89 }
90
91 pub fn with_snapshot(mut self, source: impl Into<Cow<'static, str>>) -> Self {
93 self.available_snapshots.push(source.into());
94 self
95 }
96
97 pub fn with_snapshots(mut self, sources: Vec<Cow<'static, str>>) -> Self {
99 self.available_snapshots = sources;
100 self
101 }
102
103 pub fn with_base_url(mut self, url: impl Into<Cow<'static, str>>) -> Self {
105 self.default_base_url = url.into();
106 self
107 }
108
109 pub fn with_long_help(mut self, help: impl Into<String>) -> Self {
111 self.long_help = Some(help.into());
112 self
113 }
114}
115
116impl Default for DownloadDefaults {
117 fn default() -> Self {
118 Self::default_download_defaults()
119 }
120}
121
122#[derive(Debug, Parser)]
123pub struct DownloadCommand<C: ChainSpecParser> {
124 #[command(flatten)]
125 env: EnvironmentArgs<C>,
126
127 #[arg(long, short, long_help = DownloadDefaults::get_global().long_help())]
129 url: Option<String>,
130}
131
132impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> DownloadCommand<C> {
133 pub async fn execute<N>(self) -> Result<()> {
134 let data_dir = self.env.datadir.resolve_datadir(self.env.chain.chain());
135 fs::create_dir_all(&data_dir)?;
136
137 let url = match self.url {
138 Some(url) => url,
139 None => {
140 let url = get_latest_snapshot_url().await?;
141 info!(target: "reth::cli", "Using default snapshot URL: {}", url);
142 url
143 }
144 };
145
146 info!(target: "reth::cli",
147 chain = %self.env.chain.chain(),
148 dir = ?data_dir.data_dir(),
149 url = %url,
150 "Starting snapshot download and extraction"
151 );
152
153 stream_and_extract(&url, data_dir.data_dir()).await?;
154 info!(target: "reth::cli", "Snapshot downloaded and extracted successfully");
155
156 Ok(())
157 }
158}
159
160impl<C: ChainSpecParser> DownloadCommand<C> {
161 pub fn chain_spec(&self) -> Option<&Arc<C::ChainSpec>> {
163 Some(&self.env.chain)
164 }
165}
166
167struct DownloadProgress {
170 downloaded: u64,
171 total_size: u64,
172 last_displayed: Instant,
173}
174
175impl DownloadProgress {
176 fn new(total_size: u64) -> Self {
178 Self { downloaded: 0, total_size, last_displayed: Instant::now() }
179 }
180
181 fn format_size(size: u64) -> String {
183 let mut size = size as f64;
184 let mut unit_index = 0;
185
186 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
187 size /= 1024.0;
188 unit_index += 1;
189 }
190
191 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
192 }
193
194 fn update(&mut self, chunk_size: u64) -> Result<()> {
196 self.downloaded += chunk_size;
197
198 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
200 let formatted_downloaded = Self::format_size(self.downloaded);
201 let formatted_total = Self::format_size(self.total_size);
202 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
203
204 print!(
205 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total})",
206 );
207 io::stdout().flush()?;
208 self.last_displayed = Instant::now();
209 }
210
211 Ok(())
212 }
213}
214
215struct ProgressReader<R> {
217 reader: R,
218 progress: DownloadProgress,
219}
220
221impl<R: Read> ProgressReader<R> {
222 fn new(reader: R, total_size: u64) -> Self {
223 Self { reader, progress: DownloadProgress::new(total_size) }
224 }
225}
226
227impl<R: Read> Read for ProgressReader<R> {
228 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
229 let bytes = self.reader.read(buf)?;
230 if bytes > 0 &&
231 let Err(e) = self.progress.update(bytes as u64)
232 {
233 return Err(io::Error::other(e));
234 }
235 Ok(bytes)
236 }
237}
238
239#[derive(Debug, Clone, Copy)]
241enum CompressionFormat {
242 Lz4,
243 Zstd,
244}
245
246impl CompressionFormat {
247 fn from_url(url: &str) -> Result<Self> {
249 if url.ends_with(EXTENSION_TAR_LZ4) {
250 Ok(Self::Lz4)
251 } else if url.ends_with(EXTENSION_TAR_ZSTD) {
252 Ok(Self::Zstd)
253 } else {
254 Err(eyre::eyre!("Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}", url))
255 }
256 }
257}
258
259fn blocking_download_and_extract(url: &str, target_dir: &Path) -> Result<()> {
261 let client = reqwest::blocking::Client::builder().build()?;
262 let response = client.get(url).send()?.error_for_status()?;
263
264 let total_size = response.content_length().ok_or_else(|| {
265 eyre::eyre!(
266 "Server did not provide Content-Length header. This is required for snapshot downloads"
267 )
268 })?;
269
270 let progress_reader = ProgressReader::new(response, total_size);
271 let format = CompressionFormat::from_url(url)?;
272
273 match format {
274 CompressionFormat::Lz4 => {
275 let decoder = Decoder::new(progress_reader)?;
276 Archive::new(decoder).unpack(target_dir)?;
277 }
278 CompressionFormat::Zstd => {
279 let decoder = ZstdDecoder::new(progress_reader)?;
280 Archive::new(decoder).unpack(target_dir)?;
281 }
282 }
283
284 info!(target: "reth::cli", "Extraction complete.");
285 Ok(())
286}
287
288async fn stream_and_extract(url: &str, target_dir: &Path) -> Result<()> {
289 let target_dir = target_dir.to_path_buf();
290 let url = url.to_string();
291 task::spawn_blocking(move || blocking_download_and_extract(&url, &target_dir)).await??;
292
293 Ok(())
294}
295
296async fn get_latest_snapshot_url() -> Result<String> {
298 let base_url = &DownloadDefaults::get_global().default_base_url;
299 let latest_url = format!("{base_url}/latest.txt");
300 let filename = Client::new()
301 .get(latest_url)
302 .send()
303 .await?
304 .error_for_status()?
305 .text()
306 .await?
307 .trim()
308 .to_string();
309
310 Ok(format!("{base_url}/{filename}"))
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn test_download_defaults_builder() {
319 let defaults = DownloadDefaults::default()
320 .with_snapshot("https://example.com/snapshots (example)")
321 .with_base_url("https://example.com");
322
323 assert_eq!(defaults.default_base_url, "https://example.com");
324 assert_eq!(defaults.available_snapshots.len(), 3); }
326
327 #[test]
328 fn test_download_defaults_replace_snapshots() {
329 let defaults = DownloadDefaults::default().with_snapshots(vec![
330 Cow::Borrowed("https://custom1.com"),
331 Cow::Borrowed("https://custom2.com"),
332 ]);
333
334 assert_eq!(defaults.available_snapshots.len(), 2);
335 assert_eq!(defaults.available_snapshots[0], "https://custom1.com");
336 }
337
338 #[test]
339 fn test_long_help_generation() {
340 let defaults = DownloadDefaults::default();
341 let help = defaults.long_help();
342
343 assert!(help.contains("Available snapshot sources:"));
344 assert!(help.contains("merkle.io"));
345 assert!(help.contains("publicnode.com"));
346 }
347
348 #[test]
349 fn test_long_help_override() {
350 let custom_help = "This is custom help text for downloading snapshots.";
351 let defaults = DownloadDefaults::default().with_long_help(custom_help);
352
353 let help = defaults.long_help();
354 assert_eq!(help, custom_help);
355 assert!(!help.contains("Available snapshot sources:"));
356 }
357
358 #[test]
359 fn test_builder_chaining() {
360 let defaults = DownloadDefaults::default()
361 .with_base_url("https://custom.example.com")
362 .with_snapshot("https://snapshot1.com")
363 .with_snapshot("https://snapshot2.com")
364 .with_long_help("Custom help for snapshots");
365
366 assert_eq!(defaults.default_base_url, "https://custom.example.com");
367 assert_eq!(defaults.available_snapshots.len(), 4); assert_eq!(defaults.long_help, Some("Custom help for snapshots".to_string()));
369 }
370}