reth_cli_commands/download/
extract.rs1use super::{
2 fetch::{ArchiveFetcher, DownloadedArchive},
3 progress::{
4 ArchiveExtractionProgress, ArchiveExtractionProgressHandle, DownloadProgress,
5 DownloadRequestLimiter, ProgressReader, SharedProgress, SharedProgressReader,
6 },
7 session::DownloadSession,
8 MAX_DOWNLOAD_RETRIES, RETRY_BACKOFF_SECS,
9};
10use eyre::{Result, WrapErr};
11use lz4::Decoder;
12use reqwest::blocking::Client as BlockingClient;
13use reth_cli_util::cancellation::CancellationToken;
14use reth_fs_util as fs;
15use std::{
16 io::Read,
17 path::{Component, Path, PathBuf},
18 sync::{
19 atomic::{AtomicBool, Ordering},
20 Arc,
21 },
22 thread,
23 time::{Duration, Instant},
24};
25use tar::Archive;
26use tokio::task;
27use tracing::{info, warn};
28use url::Url;
29use zstd::stream::read::Decoder as ZstdDecoder;
30
31const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
32const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
33const STREAMING_EXTRACTION_PROGRESS_MIN_FILE_SIZE: u64 = 64 * 1024 * 1024;
34const EXTRACTION_PROGRESS_POLL_INTERVAL: Duration = Duration::from_millis(100);
35
36#[derive(Debug, Clone, Copy)]
38pub(crate) enum CompressionFormat {
39 Lz4,
41 Zstd,
43}
44
45impl CompressionFormat {
46 pub(crate) fn from_url(url: &str) -> Result<Self> {
48 let path =
49 Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
50
51 if path.ends_with(EXTENSION_TAR_LZ4) {
52 Ok(Self::Lz4)
53 } else if path.ends_with(EXTENSION_TAR_ZSTD) {
54 Ok(Self::Zstd)
55 } else {
56 Err(eyre::eyre!(
57 "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
58 path
59 ))
60 }
61 }
62}
63
64fn extract_archive<R: Read>(
66 reader: R,
67 total_size: u64,
68 format: CompressionFormat,
69 target_dir: &Path,
70 cancel_token: CancellationToken,
71) -> Result<()> {
72 let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
73
74 match format {
75 CompressionFormat::Lz4 => {
76 let decoder = Decoder::new(progress_reader)?;
77 Archive::new(decoder).unpack(target_dir)?;
78 }
79 CompressionFormat::Zstd => {
80 let decoder = ZstdDecoder::new(progress_reader)?;
81 Archive::new(decoder).unpack(target_dir)?;
82 }
83 }
84
85 println!();
86 Ok(())
87}
88
89pub(crate) fn extract_archive_raw<R: Read>(
91 reader: R,
92 format: CompressionFormat,
93 target_dir: &Path,
94 progress: Option<&mut ArchiveExtractionProgress>,
95) -> Result<()> {
96 match format {
97 CompressionFormat::Lz4 => {
98 unpack_archive(Archive::new(Decoder::new(reader)?), target_dir, progress)?;
99 }
100 CompressionFormat::Zstd => {
101 unpack_archive(Archive::new(ZstdDecoder::new(reader)?), target_dir, progress)?;
102 }
103 }
104
105 Ok(())
106}
107
108fn unpack_archive<R: Read>(
109 mut archive: Archive<R>,
110 target_dir: &Path,
111 mut progress: Option<&mut ArchiveExtractionProgress>,
112) -> Result<()> {
113 let entries = archive.entries().wrap_err_with(|| {
114 format!("failed to read archive entries for `{}`", target_dir.display())
115 })?;
116
117 for entry in entries {
118 let mut entry = entry.wrap_err_with(|| {
119 format!("failed to read archive entry for `{}`", target_dir.display())
120 })?;
121 extract_entry_with_progress(&mut entry, target_dir, progress.as_deref_mut())?;
122 }
123
124 Ok(())
125}
126
127fn extract_entry_with_progress<R: Read>(
128 entry: &mut tar::Entry<'_, R>,
129 target_dir: &Path,
130 progress: Option<&mut ArchiveExtractionProgress>,
131) -> Result<()> {
132 let size = entry.header().entry_size().unwrap_or(0);
133 let entry_type = entry.header().entry_type();
134
135 if !entry_type.is_file() || size == 0 {
136 entry.unpack_in(target_dir).wrap_err_with(|| {
137 format!("failed to extract archive into `{}`", target_dir.display())
138 })?;
139 return Ok(())
140 }
141
142 if size < STREAMING_EXTRACTION_PROGRESS_MIN_FILE_SIZE {
143 entry.unpack_in(target_dir).wrap_err_with(|| {
144 format!("failed to extract archive into `{}`", target_dir.display())
145 })?;
146 if let Some(progress) = progress {
147 progress.record_extracted(size);
148 }
149 return Ok(())
150 }
151
152 let Some(progress_handle) = progress.as_ref().and_then(|progress| progress.handle()) else {
153 entry.unpack_in(target_dir).wrap_err_with(|| {
154 format!("failed to extract archive into `{}`", target_dir.display())
155 })?;
156 return Ok(())
157 };
158
159 let Some(entry_path) = entry_destination_path(entry, target_dir)? else {
160 entry.unpack_in(target_dir).wrap_err_with(|| {
161 format!("failed to extract archive into `{}`", target_dir.display())
162 })?;
163 return Ok(())
164 };
165
166 let stop = Arc::new(AtomicBool::new(false));
167 let monitor = spawn_extraction_progress_monitor(entry_path, progress_handle, Arc::clone(&stop));
168 let unpack_result = entry
169 .unpack_in(target_dir)
170 .wrap_err_with(|| format!("failed to extract archive into `{}`", target_dir.display()));
171 stop.store(true, Ordering::Relaxed);
172
173 let monitor_result = monitor.join();
174 unpack_result?;
175
176 monitor_result.map_err(|_| eyre::eyre!("extraction progress monitor panicked"))?;
177 Ok(())
178}
179
180fn entry_destination_path<R: Read>(
181 entry: &tar::Entry<'_, R>,
182 target_dir: &Path,
183) -> Result<Option<PathBuf>> {
184 let mut file_dst = target_dir.to_path_buf();
185 let path = entry.path().wrap_err("invalid path in archive entry")?;
186
187 for part in path.components() {
188 match part {
189 Component::Prefix(..) | Component::RootDir | Component::CurDir => continue,
190 Component::ParentDir => return Ok(None),
191 Component::Normal(part) => file_dst.push(part),
192 }
193 }
194
195 if file_dst == target_dir {
196 return Ok(None)
197 }
198
199 Ok(Some(file_dst))
200}
201
202fn spawn_extraction_progress_monitor(
203 entry_path: PathBuf,
204 progress: ArchiveExtractionProgressHandle,
205 stop: Arc<AtomicBool>,
206) -> thread::JoinHandle<()> {
207 thread::spawn(move || {
208 let mut extracted = 0_u64;
209
210 loop {
211 record_extracted_file_bytes(&entry_path, &progress, &mut extracted);
212 if stop.load(Ordering::Relaxed) {
213 break;
214 }
215 thread::sleep(EXTRACTION_PROGRESS_POLL_INTERVAL);
216 }
217 })
218}
219
220fn record_extracted_file_bytes(
221 entry_path: &Path,
222 progress: &ArchiveExtractionProgressHandle,
223 extracted: &mut u64,
224) {
225 let Ok(meta) = fs::metadata(entry_path) else { return };
226 let len = meta.len();
227 if len > *extracted {
228 progress.record_extracted(len - *extracted);
229 *extracted = len;
230 }
231}
232
233fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
235 let file = std::fs::File::open(path)?;
236 let total_size = file.metadata()?.len();
237 info!(target: "reth::cli",
238 file = %path.display(),
239 size = %DownloadProgress::format_size(total_size),
240 "Extracting local archive"
241 );
242 let start = Instant::now();
243 extract_archive(file, total_size, format, target_dir, CancellationToken::new())?;
244 info!(target: "reth::cli",
245 file = %path.display(),
246 elapsed = %DownloadProgress::format_duration(start.elapsed()),
247 "Local extraction complete"
248 );
249 Ok(())
250}
251
252pub(crate) fn streaming_download_and_extract(
256 url: &str,
257 format: CompressionFormat,
258 target_dir: &Path,
259 session: &DownloadSession,
260) -> Result<()> {
261 let shared = session.progress();
262 let quiet = session.progress().is_some();
263 let mut last_error: Option<eyre::Error> = None;
264
265 for attempt in 1..=MAX_DOWNLOAD_RETRIES {
266 if attempt > 1 {
267 info!(target: "reth::cli",
268 url = %url,
269 attempt,
270 max = MAX_DOWNLOAD_RETRIES,
271 "Retrying streaming download from scratch"
272 );
273 }
274
275 let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
276 let _request_permit = session
277 .request_limiter()
278 .map(|limiter| limiter.acquire(session.progress(), session.cancel_token()))
279 .transpose()?;
280
281 let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
282 Ok(r) => r,
283 Err(error) => {
284 let err = eyre::Error::from(error);
285 if attempt < MAX_DOWNLOAD_RETRIES {
286 warn!(target: "reth::cli",
287 url = %url,
288 attempt,
289 max = MAX_DOWNLOAD_RETRIES,
290 err = %err,
291 "Streaming request failed, retrying"
292 );
293 }
294 last_error = Some(err);
295 if attempt < MAX_DOWNLOAD_RETRIES {
296 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
297 }
298 continue;
299 }
300 };
301
302 if !quiet && let Some(size) = response.content_length() {
303 info!(target: "reth::cli",
304 url = %url,
305 size = %DownloadProgress::format_size(size),
306 "Streaming archive"
307 );
308 }
309
310 let result = if let Some(progress) = shared {
311 let reader = SharedProgressReader { inner: response, progress: Arc::clone(progress) };
312 extract_archive_raw(reader, format, target_dir, None)
313 } else {
314 let total_size = response.content_length().unwrap_or(0);
315 extract_archive(
316 response,
317 total_size,
318 format,
319 target_dir,
320 session.cancel_token().clone(),
321 )
322 };
323
324 match result {
325 Ok(()) => return Ok(()),
326 Err(error) => {
327 if attempt < MAX_DOWNLOAD_RETRIES {
328 warn!(target: "reth::cli",
329 url = %url,
330 attempt,
331 max = MAX_DOWNLOAD_RETRIES,
332 err = %error,
333 "Streaming extraction failed, retrying"
334 );
335 }
336 last_error = Some(error);
337 if attempt < MAX_DOWNLOAD_RETRIES {
338 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
339 }
340 }
341 }
342 }
343
344 Err(last_error.unwrap_or_else(|| {
345 eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
346 }))
347}
348
349fn download_and_extract(
351 url: &str,
352 format: CompressionFormat,
353 target_dir: &Path,
354 session: DownloadSession,
355) -> Result<()> {
356 let quiet = session.progress().is_some();
357 let fetcher = ArchiveFetcher::new(url.to_string(), target_dir, session.clone());
358 let DownloadedArchive { path: downloaded_path, size: total_size } = fetcher.download(None)?;
359
360 let file_name =
361 downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
362
363 if !quiet {
364 info!(target: "reth::cli",
365 file = %file_name,
366 size = %DownloadProgress::format_size(total_size),
367 "Extracting archive"
368 );
369 }
370 let file = fs::open(&downloaded_path)?;
371
372 if quiet {
373 extract_archive_raw(file, format, target_dir, None)?;
374 } else {
375 extract_archive(file, total_size, format, target_dir, session.cancel_token().clone())?;
376 info!(target: "reth::cli",
377 file = %file_name,
378 "Extraction complete"
379 );
380 }
381
382 fetcher.cleanup_downloaded_files();
383 session.record_archive_output_complete(total_size);
384
385 Ok(())
386}
387
388fn blocking_download_and_extract(
394 url: &str,
395 target_dir: &Path,
396 shared: Option<Arc<SharedProgress>>,
397 resumable: bool,
398 request_limiter: Option<Arc<DownloadRequestLimiter>>,
399 cancel_token: CancellationToken,
400) -> Result<()> {
401 let format = CompressionFormat::from_url(url)?;
402
403 if let Ok(parsed_url) = Url::parse(url) &&
404 parsed_url.scheme() == "file"
405 {
406 let session = DownloadSession::new(shared, request_limiter, cancel_token);
407 let file_path = parsed_url
408 .to_file_path()
409 .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
410 let result = extract_from_file(&file_path, format, target_dir);
411 if result.is_ok() {
412 session.record_archive_output_complete(file_path.metadata()?.len());
413 }
414 result
415 } else if let Some(request_limiter) = request_limiter {
416 download_and_extract(
417 url,
418 format,
419 target_dir,
420 DownloadSession::new(shared, Some(request_limiter), cancel_token),
421 )
422 } else if resumable {
423 let session =
424 DownloadSession::new(shared, Some(DownloadRequestLimiter::new(1)), cancel_token);
425 download_and_extract(url, format, target_dir, session)
426 } else {
427 let session = DownloadSession::new(shared, None, cancel_token);
428 let result = streaming_download_and_extract(url, format, target_dir, &session);
429 if result.is_ok() {
430 session.record_archive_output_complete(0);
431 }
432 result
433 }
434}
435
436pub(crate) async fn stream_and_extract(
442 url: &str,
443 target_dir: &Path,
444 shared: Option<Arc<SharedProgress>>,
445 resumable: bool,
446 request_limiter: Option<Arc<DownloadRequestLimiter>>,
447 cancel_token: CancellationToken,
448) -> Result<()> {
449 let target_dir = target_dir.to_path_buf();
450 let url = url.to_string();
451 task::spawn_blocking(move || {
452 blocking_download_and_extract(
453 &url,
454 &target_dir,
455 shared,
456 resumable,
457 request_limiter,
458 cancel_token,
459 )
460 })
461 .await??;
462
463 Ok(())
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469
470 #[test]
471 fn test_compression_format_detection() {
472 assert!(matches!(
473 CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
474 Ok(CompressionFormat::Lz4)
475 ));
476 assert!(matches!(
477 CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
478 Ok(CompressionFormat::Zstd)
479 ));
480 assert!(matches!(
481 CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
482 Ok(CompressionFormat::Lz4)
483 ));
484 assert!(matches!(
485 CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
486 Ok(CompressionFormat::Zstd)
487 ));
488 assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
489 }
490}