Skip to main content

reth_cli_commands/download/
extract.rs

1use super::{
2    fetch::{ArchiveFetcher, DownloadedArchive},
3    progress::{
4        ArchiveExtractionProgress, ArchiveExtractionProgressHandle, DownloadProgress,
5        DownloadRequestLimiter, ProgressReader, SharedProgress, SharedProgressReader,
6    },
7    session::DownloadSession,
8    MAX_DOWNLOAD_RETRIES, RETRY_BACKOFF_SECS,
9};
10use eyre::{Result, WrapErr};
11use lz4::Decoder;
12use reqwest::blocking::Client as BlockingClient;
13use reth_cli_util::cancellation::CancellationToken;
14use reth_fs_util as fs;
15use std::{
16    io::Read,
17    path::{Component, Path, PathBuf},
18    sync::{
19        atomic::{AtomicBool, Ordering},
20        Arc,
21    },
22    thread,
23    time::{Duration, Instant},
24};
25use tar::Archive;
26use tokio::task;
27use tracing::{info, warn};
28use url::Url;
29use zstd::stream::read::Decoder as ZstdDecoder;
30
31const EXTENSION_TAR_LZ4: &str = ".tar.lz4";
32const EXTENSION_TAR_ZSTD: &str = ".tar.zst";
33const STREAMING_EXTRACTION_PROGRESS_MIN_FILE_SIZE: u64 = 64 * 1024 * 1024;
34const EXTRACTION_PROGRESS_POLL_INTERVAL: Duration = Duration::from_millis(100);
35
36/// Supported compression formats for snapshots
37#[derive(Debug, Clone, Copy)]
38pub(crate) enum CompressionFormat {
39    /// LZ4-compressed tar archive.
40    Lz4,
41    /// Zstandard-compressed tar archive.
42    Zstd,
43}
44
45impl CompressionFormat {
46    /// Detect compression format from file extension
47    pub(crate) fn from_url(url: &str) -> Result<Self> {
48        let path =
49            Url::parse(url).map(|u| u.path().to_string()).unwrap_or_else(|_| url.to_string());
50
51        if path.ends_with(EXTENSION_TAR_LZ4) {
52            Ok(Self::Lz4)
53        } else if path.ends_with(EXTENSION_TAR_ZSTD) {
54            Ok(Self::Zstd)
55        } else {
56            Err(eyre::eyre!(
57                "Unsupported file format. Expected .tar.lz4 or .tar.zst, got: {}",
58                path
59            ))
60        }
61    }
62}
63
64/// Extracts a compressed tar archive to the target directory with progress tracking.
65fn extract_archive<R: Read>(
66    reader: R,
67    total_size: u64,
68    format: CompressionFormat,
69    target_dir: &Path,
70    cancel_token: CancellationToken,
71) -> Result<()> {
72    let progress_reader = ProgressReader::new(reader, total_size, cancel_token);
73
74    match format {
75        CompressionFormat::Lz4 => {
76            let decoder = Decoder::new(progress_reader)?;
77            Archive::new(decoder).unpack(target_dir)?;
78        }
79        CompressionFormat::Zstd => {
80            let decoder = ZstdDecoder::new(progress_reader)?;
81            Archive::new(decoder).unpack(target_dir)?;
82        }
83    }
84
85    println!();
86    Ok(())
87}
88
89/// Extracts a compressed tar archive without progress tracking.
90pub(crate) fn extract_archive_raw<R: Read>(
91    reader: R,
92    format: CompressionFormat,
93    target_dir: &Path,
94    progress: Option<&mut ArchiveExtractionProgress>,
95) -> Result<()> {
96    match format {
97        CompressionFormat::Lz4 => {
98            unpack_archive(Archive::new(Decoder::new(reader)?), target_dir, progress)?;
99        }
100        CompressionFormat::Zstd => {
101            unpack_archive(Archive::new(ZstdDecoder::new(reader)?), target_dir, progress)?;
102        }
103    }
104
105    Ok(())
106}
107
108fn unpack_archive<R: Read>(
109    mut archive: Archive<R>,
110    target_dir: &Path,
111    mut progress: Option<&mut ArchiveExtractionProgress>,
112) -> Result<()> {
113    let entries = archive.entries().wrap_err_with(|| {
114        format!("failed to read archive entries for `{}`", target_dir.display())
115    })?;
116
117    for entry in entries {
118        let mut entry = entry.wrap_err_with(|| {
119            format!("failed to read archive entry for `{}`", target_dir.display())
120        })?;
121        extract_entry_with_progress(&mut entry, target_dir, progress.as_deref_mut())?;
122    }
123
124    Ok(())
125}
126
127fn extract_entry_with_progress<R: Read>(
128    entry: &mut tar::Entry<'_, R>,
129    target_dir: &Path,
130    progress: Option<&mut ArchiveExtractionProgress>,
131) -> Result<()> {
132    let size = entry.header().entry_size().unwrap_or(0);
133    let entry_type = entry.header().entry_type();
134
135    if !entry_type.is_file() || size == 0 {
136        entry.unpack_in(target_dir).wrap_err_with(|| {
137            format!("failed to extract archive into `{}`", target_dir.display())
138        })?;
139        return Ok(())
140    }
141
142    if size < STREAMING_EXTRACTION_PROGRESS_MIN_FILE_SIZE {
143        entry.unpack_in(target_dir).wrap_err_with(|| {
144            format!("failed to extract archive into `{}`", target_dir.display())
145        })?;
146        if let Some(progress) = progress {
147            progress.record_extracted(size);
148        }
149        return Ok(())
150    }
151
152    let Some(progress_handle) = progress.as_ref().and_then(|progress| progress.handle()) else {
153        entry.unpack_in(target_dir).wrap_err_with(|| {
154            format!("failed to extract archive into `{}`", target_dir.display())
155        })?;
156        return Ok(())
157    };
158
159    let Some(entry_path) = entry_destination_path(entry, target_dir)? else {
160        entry.unpack_in(target_dir).wrap_err_with(|| {
161            format!("failed to extract archive into `{}`", target_dir.display())
162        })?;
163        return Ok(())
164    };
165
166    let stop = Arc::new(AtomicBool::new(false));
167    let monitor = spawn_extraction_progress_monitor(entry_path, progress_handle, Arc::clone(&stop));
168    let unpack_result = entry
169        .unpack_in(target_dir)
170        .wrap_err_with(|| format!("failed to extract archive into `{}`", target_dir.display()));
171    stop.store(true, Ordering::Relaxed);
172
173    let monitor_result = monitor.join();
174    unpack_result?;
175
176    monitor_result.map_err(|_| eyre::eyre!("extraction progress monitor panicked"))?;
177    Ok(())
178}
179
180fn entry_destination_path<R: Read>(
181    entry: &tar::Entry<'_, R>,
182    target_dir: &Path,
183) -> Result<Option<PathBuf>> {
184    let mut file_dst = target_dir.to_path_buf();
185    let path = entry.path().wrap_err("invalid path in archive entry")?;
186
187    for part in path.components() {
188        match part {
189            Component::Prefix(..) | Component::RootDir | Component::CurDir => continue,
190            Component::ParentDir => return Ok(None),
191            Component::Normal(part) => file_dst.push(part),
192        }
193    }
194
195    if file_dst == target_dir {
196        return Ok(None)
197    }
198
199    Ok(Some(file_dst))
200}
201
202fn spawn_extraction_progress_monitor(
203    entry_path: PathBuf,
204    progress: ArchiveExtractionProgressHandle,
205    stop: Arc<AtomicBool>,
206) -> thread::JoinHandle<()> {
207    thread::spawn(move || {
208        let mut extracted = 0_u64;
209
210        loop {
211            record_extracted_file_bytes(&entry_path, &progress, &mut extracted);
212            if stop.load(Ordering::Relaxed) {
213                break;
214            }
215            thread::sleep(EXTRACTION_PROGRESS_POLL_INTERVAL);
216        }
217    })
218}
219
220fn record_extracted_file_bytes(
221    entry_path: &Path,
222    progress: &ArchiveExtractionProgressHandle,
223    extracted: &mut u64,
224) {
225    let Ok(meta) = fs::metadata(entry_path) else { return };
226    let len = meta.len();
227    if len > *extracted {
228        progress.record_extracted(len - *extracted);
229        *extracted = len;
230    }
231}
232
233/// Extracts a snapshot from a local file.
234fn extract_from_file(path: &Path, format: CompressionFormat, target_dir: &Path) -> Result<()> {
235    let file = std::fs::File::open(path)?;
236    let total_size = file.metadata()?.len();
237    info!(target: "reth::cli",
238        file = %path.display(),
239        size = %DownloadProgress::format_size(total_size),
240        "Extracting local archive"
241    );
242    let start = Instant::now();
243    extract_archive(file, total_size, format, target_dir, CancellationToken::new())?;
244    info!(target: "reth::cli",
245        file = %path.display(),
246        elapsed = %DownloadProgress::format_duration(start.elapsed()),
247        "Local extraction complete"
248    );
249    Ok(())
250}
251
252/// Streams a remote archive directly into the extractor without writing to disk.
253///
254/// On failure, retries from scratch up to [`MAX_DOWNLOAD_RETRIES`] times.
255pub(crate) fn streaming_download_and_extract(
256    url: &str,
257    format: CompressionFormat,
258    target_dir: &Path,
259    session: &DownloadSession,
260) -> Result<()> {
261    let shared = session.progress();
262    let quiet = session.progress().is_some();
263    let mut last_error: Option<eyre::Error> = None;
264
265    for attempt in 1..=MAX_DOWNLOAD_RETRIES {
266        if attempt > 1 {
267            info!(target: "reth::cli",
268                url = %url,
269                attempt,
270                max = MAX_DOWNLOAD_RETRIES,
271                "Retrying streaming download from scratch"
272            );
273        }
274
275        let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
276        let _request_permit = session
277            .request_limiter()
278            .map(|limiter| limiter.acquire(session.progress(), session.cancel_token()))
279            .transpose()?;
280
281        let response = match client.get(url).send().and_then(|r| r.error_for_status()) {
282            Ok(r) => r,
283            Err(error) => {
284                let err = eyre::Error::from(error);
285                if attempt < MAX_DOWNLOAD_RETRIES {
286                    warn!(target: "reth::cli",
287                        url = %url,
288                        attempt,
289                        max = MAX_DOWNLOAD_RETRIES,
290                        err = %err,
291                        "Streaming request failed, retrying"
292                    );
293                }
294                last_error = Some(err);
295                if attempt < MAX_DOWNLOAD_RETRIES {
296                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
297                }
298                continue;
299            }
300        };
301
302        if !quiet && let Some(size) = response.content_length() {
303            info!(target: "reth::cli",
304                url = %url,
305                size = %DownloadProgress::format_size(size),
306                "Streaming archive"
307            );
308        }
309
310        let result = if let Some(progress) = shared {
311            let reader = SharedProgressReader { inner: response, progress: Arc::clone(progress) };
312            extract_archive_raw(reader, format, target_dir, None)
313        } else {
314            let total_size = response.content_length().unwrap_or(0);
315            extract_archive(
316                response,
317                total_size,
318                format,
319                target_dir,
320                session.cancel_token().clone(),
321            )
322        };
323
324        match result {
325            Ok(()) => return Ok(()),
326            Err(error) => {
327                if attempt < MAX_DOWNLOAD_RETRIES {
328                    warn!(target: "reth::cli",
329                        url = %url,
330                        attempt,
331                        max = MAX_DOWNLOAD_RETRIES,
332                        err = %error,
333                        "Streaming extraction failed, retrying"
334                    );
335                }
336                last_error = Some(error);
337                if attempt < MAX_DOWNLOAD_RETRIES {
338                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
339                }
340            }
341        }
342    }
343
344    Err(last_error.unwrap_or_else(|| {
345        eyre::eyre!("Streaming download failed after {MAX_DOWNLOAD_RETRIES} attempts")
346    }))
347}
348
349/// Fetches the snapshot from a remote URL with resume support, then extracts it.
350fn download_and_extract(
351    url: &str,
352    format: CompressionFormat,
353    target_dir: &Path,
354    session: DownloadSession,
355) -> Result<()> {
356    let quiet = session.progress().is_some();
357    let fetcher = ArchiveFetcher::new(url.to_string(), target_dir, session.clone());
358    let DownloadedArchive { path: downloaded_path, size: total_size } = fetcher.download(None)?;
359
360    let file_name =
361        downloaded_path.file_name().map(|f| f.to_string_lossy().to_string()).unwrap_or_default();
362
363    if !quiet {
364        info!(target: "reth::cli",
365            file = %file_name,
366            size = %DownloadProgress::format_size(total_size),
367            "Extracting archive"
368        );
369    }
370    let file = fs::open(&downloaded_path)?;
371
372    if quiet {
373        extract_archive_raw(file, format, target_dir, None)?;
374    } else {
375        extract_archive(file, total_size, format, target_dir, session.cancel_token().clone())?;
376        info!(target: "reth::cli",
377            file = %file_name,
378            "Extraction complete"
379        );
380    }
381
382    fetcher.cleanup_downloaded_files();
383    session.record_archive_output_complete(total_size);
384
385    Ok(())
386}
387
388/// Downloads and extracts a snapshot, blocking until finished.
389///
390/// Supports `file://` URLs for local files and HTTP(S) URLs for remote downloads.
391/// When `resumable` is true, downloads to a `.part` file first with HTTP Range resume
392/// support. Otherwise streams directly into the extractor.
393fn blocking_download_and_extract(
394    url: &str,
395    target_dir: &Path,
396    shared: Option<Arc<SharedProgress>>,
397    resumable: bool,
398    request_limiter: Option<Arc<DownloadRequestLimiter>>,
399    cancel_token: CancellationToken,
400) -> Result<()> {
401    let format = CompressionFormat::from_url(url)?;
402
403    if let Ok(parsed_url) = Url::parse(url) &&
404        parsed_url.scheme() == "file"
405    {
406        let session = DownloadSession::new(shared, request_limiter, cancel_token);
407        let file_path = parsed_url
408            .to_file_path()
409            .map_err(|_| eyre::eyre!("Invalid file:// URL path: {}", url))?;
410        let result = extract_from_file(&file_path, format, target_dir);
411        if result.is_ok() {
412            session.record_archive_output_complete(file_path.metadata()?.len());
413        }
414        result
415    } else if let Some(request_limiter) = request_limiter {
416        download_and_extract(
417            url,
418            format,
419            target_dir,
420            DownloadSession::new(shared, Some(request_limiter), cancel_token),
421        )
422    } else if resumable {
423        let session =
424            DownloadSession::new(shared, Some(DownloadRequestLimiter::new(1)), cancel_token);
425        download_and_extract(url, format, target_dir, session)
426    } else {
427        let session = DownloadSession::new(shared, None, cancel_token);
428        let result = streaming_download_and_extract(url, format, target_dir, &session);
429        if result.is_ok() {
430            session.record_archive_output_complete(0);
431        }
432        result
433    }
434}
435
436/// Downloads and extracts a snapshot archive asynchronously.
437///
438/// When `shared` is provided, download progress is reported to the shared
439/// counter for aggregated display. Otherwise uses a local progress bar.
440/// When `resumable` is true, uses two-phase download with `.part` files.
441pub(crate) async fn stream_and_extract(
442    url: &str,
443    target_dir: &Path,
444    shared: Option<Arc<SharedProgress>>,
445    resumable: bool,
446    request_limiter: Option<Arc<DownloadRequestLimiter>>,
447    cancel_token: CancellationToken,
448) -> Result<()> {
449    let target_dir = target_dir.to_path_buf();
450    let url = url.to_string();
451    task::spawn_blocking(move || {
452        blocking_download_and_extract(
453            &url,
454            &target_dir,
455            shared,
456            resumable,
457            request_limiter,
458            cancel_token,
459        )
460    })
461    .await??;
462
463    Ok(())
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469
470    #[test]
471    fn test_compression_format_detection() {
472        assert!(matches!(
473            CompressionFormat::from_url("https://example.com/snapshot.tar.lz4"),
474            Ok(CompressionFormat::Lz4)
475        ));
476        assert!(matches!(
477            CompressionFormat::from_url("https://example.com/snapshot.tar.zst"),
478            Ok(CompressionFormat::Zstd)
479        ));
480        assert!(matches!(
481            CompressionFormat::from_url("file:///path/to/snapshot.tar.lz4"),
482            Ok(CompressionFormat::Lz4)
483        ));
484        assert!(matches!(
485            CompressionFormat::from_url("file:///path/to/snapshot.tar.zst"),
486            Ok(CompressionFormat::Zstd)
487        ));
488        assert!(CompressionFormat::from_url("https://example.com/snapshot.tar.gz").is_err());
489    }
490}