Skip to main content

reth_cli_commands/download/
archive.rs

1use super::{
2    extract::{extract_archive_raw, streaming_download_and_extract, CompressionFormat},
3    fetch::ArchiveFetcher,
4    manifest::SnapshotArchive,
5    planning::{PlannedArchive, PlannedDownloads},
6    progress::{
7        spawn_progress_display, ArchiveDownloadProgress, ArchiveExtractionProgress,
8        ArchiveVerificationProgress, DownloadRequestLimiter, SharedProgress,
9    },
10    session::{ArchiveProcessContext, DownloadSession},
11    verify::OutputVerifier,
12    MAX_DOWNLOAD_RETRIES, RETRY_BACKOFF_SECS,
13};
14use eyre::Result;
15use futures::stream::{self, StreamExt};
16use reth_cli_util::cancellation::CancellationToken;
17use reth_fs_util as fs;
18use std::{
19    path::Path,
20    sync::{atomic::Ordering, Arc},
21    time::Duration,
22};
23use tokio::task;
24use tracing::{debug, info, warn};
25
26const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
27
28/// Runs all planned modular archive downloads for one command invocation.
29pub(crate) async fn run_modular_downloads(
30    planned_downloads: PlannedDownloads,
31    target_dir: &Path,
32    download_concurrency: usize,
33    cancel_token: CancellationToken,
34) -> Result<()> {
35    let download_cache_dir = target_dir.join(DOWNLOAD_CACHE_DIR);
36    fs::create_dir_all(&download_cache_dir)?;
37
38    let shared = SharedProgress::new(
39        planned_downloads.total_download_size,
40        planned_downloads.total_output_size,
41        planned_downloads.total_archives() as u64,
42        cancel_token.clone(),
43    );
44    let session = DownloadSession::new(
45        Some(Arc::clone(&shared)),
46        Some(DownloadRequestLimiter::new(download_concurrency)),
47        cancel_token,
48    );
49    let ctx =
50        ArchiveProcessContext::new(target_dir.to_path_buf(), Some(download_cache_dir), session);
51
52    ModularDownloadJob::new(ctx, download_concurrency).run(planned_downloads).await
53}
54
55/// Schedules modular archive work for one run of `reth download`.
56struct ModularDownloadJob {
57    /// Shared paths and session state for each archive in this job.
58    ctx: ArchiveProcessContext,
59    /// Maximum number of archives processed at once.
60    archive_concurrency: usize,
61}
62
63impl ModularDownloadJob {
64    /// Creates the modular download job for one command run.
65    const fn new(ctx: ArchiveProcessContext, archive_concurrency: usize) -> Self {
66        Self { ctx, archive_concurrency }
67    }
68
69    /// Runs all planned archives and waits for the shared progress task to finish.
70    async fn run(self, planned_downloads: PlannedDownloads) -> Result<()> {
71        let shared = Arc::clone(
72            self.ctx.session().progress().expect("modular downloads always use shared progress"),
73        );
74        let progress_handle = spawn_progress_display(Arc::clone(&shared));
75        let ctx = self.ctx.clone();
76        let results: Vec<Result<()>> = stream::iter(planned_downloads.archives)
77            .map(move |archive| {
78                let ctx = ctx.clone();
79                async move { Self::process_archive(ctx, archive).await }
80            })
81            .buffer_unordered(self.archive_concurrency)
82            .collect()
83            .await;
84
85        shared.done.store(true, Ordering::Relaxed);
86        let _ = progress_handle.await;
87
88        for result in results {
89            result?;
90        }
91
92        Ok(())
93    }
94
95    /// Runs one archive on the blocking pool so fetch and extraction stay off the async executor.
96    async fn process_archive(ctx: ArchiveProcessContext, archive: PlannedArchive) -> Result<()> {
97        task::spawn_blocking(move || ArchiveProcessor::new(archive, ctx).run()).await??;
98        Ok(())
99    }
100}
101
102/// Explicit retry states for one modular archive.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104enum ArchiveAttemptState {
105    /// Start or restart one full archive attempt.
106    RunAttempt,
107    /// Check whether the extracted outputs verify.
108    VerifyOutputs,
109    /// Wait and decide whether another full attempt should run.
110    RetryAttempt,
111    /// Finish successfully.
112    Complete,
113    /// Stop with an error after retries are exhausted.
114    Fail,
115}
116
117/// Processes one modular archive from reuse check through extraction and verification.
118struct ArchiveProcessor {
119    /// The concrete archive and component being processed.
120    archive: PlannedArchive,
121    /// Shared paths and session state for this archive attempt.
122    ctx: ArchiveProcessContext,
123}
124
125impl ArchiveProcessor {
126    /// Creates a processor for one archive and the shared download context.
127    fn new(archive: PlannedArchive, ctx: ArchiveProcessContext) -> Self {
128        Self { archive, ctx }
129    }
130
131    /// Runs the archive retry state machine until outputs are verified or retries are exhausted.
132    fn run(self) -> Result<()> {
133        let archive = self.archive();
134        if self.try_reuse_outputs()? {
135            info!(target: "reth::cli", file = %archive.file_name, component = %self.archive.component, "Skipping already verified plain files");
136            return Ok(());
137        }
138
139        let mode = ArchiveMode::new(&self.ctx)?;
140        let format = CompressionFormat::from_url(&archive.file_name)?;
141        let mut attempt = 1;
142        let mut last_error: Option<eyre::Error> = None;
143        let mut state = ArchiveAttemptState::RunAttempt;
144
145        loop {
146            match state {
147                ArchiveAttemptState::RunAttempt => {
148                    self.cleanup_outputs();
149
150                    if attempt > 1 {
151                        info!(target: "reth::cli",
152                            file = %archive.file_name,
153                            component = %self.archive.component,
154                            attempt,
155                            max = MAX_DOWNLOAD_RETRIES,
156                            "Retrying archive from scratch"
157                        );
158                    }
159
160                    match self.run_attempt(mode, format) {
161                        Ok(()) => state = ArchiveAttemptState::VerifyOutputs,
162                        Err(error) if mode.retries_fetch_errors() => {
163                            warn!(target: "reth::cli",
164                                file = %archive.file_name,
165                                component = %self.archive.component,
166                                attempt,
167                                err = %format_args!("{error:#}"),
168                                "Archive attempt failed, retrying from scratch"
169                            );
170                            last_error = Some(error);
171                            state = ArchiveAttemptState::RetryAttempt;
172                        }
173                        Err(error) => return Err(error),
174                    }
175                }
176                ArchiveAttemptState::VerifyOutputs => {
177                    if self.verify_outputs_with_progress()? {
178                        state = ArchiveAttemptState::Complete;
179                    } else {
180                        warn!(target: "reth::cli", file = %archive.file_name, component = %self.archive.component, attempt, "Archive extracted, but output verification failed, retrying");
181                        state = ArchiveAttemptState::RetryAttempt;
182                    }
183                }
184                ArchiveAttemptState::RetryAttempt => {
185                    if attempt >= MAX_DOWNLOAD_RETRIES {
186                        state = ArchiveAttemptState::Fail;
187                    } else {
188                        std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
189                        attempt += 1;
190                        state = ArchiveAttemptState::RunAttempt;
191                    }
192                }
193                ArchiveAttemptState::Complete => return Ok(()),
194                ArchiveAttemptState::Fail => {
195                    if let Some(error) = last_error {
196                        return Err(error.wrap_err(format!(
197                            "Failed after {} attempts for {}",
198                            MAX_DOWNLOAD_RETRIES, archive.file_name
199                        )));
200                    }
201
202                    eyre::bail!(
203                        "Failed integrity validation after {} attempts for {}",
204                        MAX_DOWNLOAD_RETRIES,
205                        archive.file_name
206                    )
207                }
208            }
209        }
210    }
211
212    /// Returns the concrete archive being fetched or verified.
213    fn archive(&self) -> &SnapshotArchive {
214        &self.archive.archive
215    }
216
217    /// Returns the verifier for this archive's output files.
218    fn output_verifier(&self) -> OutputVerifier<'_> {
219        OutputVerifier::new(self.ctx.target_dir())
220    }
221
222    /// Returns `true` if this archive can be reused from existing verified outputs.
223    /// Returns `false` if a fresh archive attempt is still needed.
224    fn try_reuse_outputs(&self) -> Result<bool> {
225        if self.verify_outputs()? {
226            self.mark_complete();
227            return Ok(true);
228        }
229
230        Ok(false)
231    }
232
233    /// Removes any partial outputs before a fresh archive attempt.
234    fn cleanup_outputs(&self) {
235        self.output_verifier().cleanup(&self.archive().output_files);
236    }
237
238    /// Returns `true` if all declared plain outputs verify.
239    /// Returns `false` if any output is missing or does not match.
240    fn verify_outputs(&self) -> Result<bool> {
241        self.output_verifier().verify(&self.archive().output_files)
242    }
243
244    /// Records archive completion in shared progress once outputs verify.
245    fn mark_complete(&self) {
246        self.ctx.session().record_reused_archive(self.archive().size, self.archive().output_size());
247    }
248
249    /// Executes one archive attempt according to the selected cache-vs-stream mode.
250    fn run_attempt(&self, mode: ArchiveMode, format: CompressionFormat) -> Result<()> {
251        mode.execute(self, format)
252    }
253
254    /// Downloads the archive into the cache, then extracts from the cached file.
255    fn run_cached_attempt(&self, format: CompressionFormat) -> Result<()> {
256        let cache_dir =
257            self.ctx.cache_dir().ok_or_else(|| eyre::eyre!("Missing download cache directory"))?;
258        let fetcher =
259            ArchiveFetcher::new(self.archive().url.clone(), cache_dir, self.ctx.session().clone());
260
261        if self.archive.ty == super::manifest::SnapshotComponentType::State {
262            debug!(target: "reth::cli", url = %self.archive().url, "Downloading state snapshot archive");
263        }
264
265        let download_result = {
266            let mut download_progress = ArchiveDownloadProgress::new(self.ctx.session().progress());
267            let result = fetcher.download(Some(&mut download_progress));
268            if let Ok(ref downloaded) = result &&
269                download_progress.has_tracked_bytes()
270            {
271                download_progress.complete(downloaded.size);
272            }
273            result
274        };
275
276        let downloaded = match download_result {
277            Ok(downloaded) => downloaded,
278            Err(error) => {
279                fetcher.cleanup_downloaded_files();
280                return Err(error);
281            }
282        };
283
284        info!(target: "reth::cli",
285            file = %self.archive().file_name,
286            component = %self.archive.component,
287            size = %super::progress::DownloadProgress::format_size(downloaded.size),
288            "Archive download complete"
289        );
290
291        let extract_result = self.extract_cached_archive(&downloaded.path, format);
292        fetcher.cleanup_downloaded_files();
293        extract_result
294    }
295
296    /// Streams the archive directly into extraction without keeping a cached copy.
297    fn run_streaming_attempt(&self, format: CompressionFormat) -> Result<()> {
298        let _download_progress = ArchiveDownloadProgress::new(self.ctx.session().progress());
299        streaming_download_and_extract(
300            &self.archive().url,
301            format,
302            self.ctx.target_dir(),
303            self.ctx.session(),
304        )
305    }
306
307    /// Extracts a cached archive file while updating shared extraction activity.
308    fn extract_cached_archive(&self, archive_path: &Path, format: CompressionFormat) -> Result<()> {
309        let mut extraction_progress = ArchiveExtractionProgress::new(self.ctx.session().progress());
310        let file = fs::open(archive_path)?;
311        let result = extract_archive_raw(
312            file,
313            format,
314            self.ctx.target_dir(),
315            Some(&mut extraction_progress),
316        );
317        extraction_progress.finish();
318        result
319    }
320
321    /// Returns `true` if all declared plain outputs verify while updating shared verification
322    /// progress.
323    fn verify_outputs_with_progress(&self) -> Result<bool> {
324        let mut verification_progress =
325            ArchiveVerificationProgress::new(self.ctx.session().progress());
326        let verified = self
327            .output_verifier()
328            .verify_with_progress(&self.archive().output_files, Some(&mut verification_progress))?;
329        if verified {
330            verification_progress.complete(self.archive().output_size());
331        }
332        Ok(verified)
333    }
334}
335
336/// Chooses whether an archive attempt uses the cache or streams directly.
337#[derive(Debug, Clone, Copy, PartialEq, Eq)]
338enum ArchiveMode {
339    /// Download the archive to the cache, then extract it.
340    Cached,
341    /// Stream the archive directly into extraction.
342    Streaming,
343}
344
345impl ArchiveMode {
346    /// Picks the archive mode from the process context.
347    fn new(ctx: &ArchiveProcessContext) -> Result<Self> {
348        if ctx.cache_dir().is_some() {
349            ctx.session().require_request_limiter()?;
350            return Ok(Self::Cached)
351        }
352
353        Ok(Self::Streaming)
354    }
355
356    /// Returns `true` when fetch failures should retry the whole archive attempt.
357    const fn retries_fetch_errors(&self) -> bool {
358        matches!(self, Self::Cached)
359    }
360
361    /// Runs the selected archive mode for a single attempt.
362    fn execute(&self, processor: &ArchiveProcessor, format: CompressionFormat) -> Result<()> {
363        match self {
364            Self::Cached => processor.run_cached_attempt(format),
365            Self::Streaming => processor.run_streaming_attempt(format),
366        }
367    }
368}