reth_cli_commands/download/
archive.rs1use super::{
2 extract::{extract_archive_raw, streaming_download_and_extract, CompressionFormat},
3 fetch::ArchiveFetcher,
4 manifest::SnapshotArchive,
5 planning::{PlannedArchive, PlannedDownloads},
6 progress::{
7 spawn_progress_display, ArchiveDownloadProgress, ArchiveExtractionProgress,
8 ArchiveVerificationProgress, DownloadRequestLimiter, SharedProgress,
9 },
10 session::{ArchiveProcessContext, DownloadSession},
11 verify::OutputVerifier,
12 MAX_DOWNLOAD_RETRIES, RETRY_BACKOFF_SECS,
13};
14use eyre::Result;
15use futures::stream::{self, StreamExt};
16use reth_cli_util::cancellation::CancellationToken;
17use reth_fs_util as fs;
18use std::{
19 path::Path,
20 sync::{atomic::Ordering, Arc},
21 time::Duration,
22};
23use tokio::task;
24use tracing::{debug, info, warn};
25
26const DOWNLOAD_CACHE_DIR: &str = ".download-cache";
27
28pub(crate) async fn run_modular_downloads(
30 planned_downloads: PlannedDownloads,
31 target_dir: &Path,
32 download_concurrency: usize,
33 cancel_token: CancellationToken,
34) -> Result<()> {
35 let download_cache_dir = target_dir.join(DOWNLOAD_CACHE_DIR);
36 fs::create_dir_all(&download_cache_dir)?;
37
38 let shared = SharedProgress::new(
39 planned_downloads.total_download_size,
40 planned_downloads.total_output_size,
41 planned_downloads.total_archives() as u64,
42 cancel_token.clone(),
43 );
44 let session = DownloadSession::new(
45 Some(Arc::clone(&shared)),
46 Some(DownloadRequestLimiter::new(download_concurrency)),
47 cancel_token,
48 );
49 let ctx =
50 ArchiveProcessContext::new(target_dir.to_path_buf(), Some(download_cache_dir), session);
51
52 ModularDownloadJob::new(ctx, download_concurrency).run(planned_downloads).await
53}
54
55struct ModularDownloadJob {
57 ctx: ArchiveProcessContext,
59 archive_concurrency: usize,
61}
62
63impl ModularDownloadJob {
64 const fn new(ctx: ArchiveProcessContext, archive_concurrency: usize) -> Self {
66 Self { ctx, archive_concurrency }
67 }
68
69 async fn run(self, planned_downloads: PlannedDownloads) -> Result<()> {
71 let shared = Arc::clone(
72 self.ctx.session().progress().expect("modular downloads always use shared progress"),
73 );
74 let progress_handle = spawn_progress_display(Arc::clone(&shared));
75 let ctx = self.ctx.clone();
76 let results: Vec<Result<()>> = stream::iter(planned_downloads.archives)
77 .map(move |archive| {
78 let ctx = ctx.clone();
79 async move { Self::process_archive(ctx, archive).await }
80 })
81 .buffer_unordered(self.archive_concurrency)
82 .collect()
83 .await;
84
85 shared.done.store(true, Ordering::Relaxed);
86 let _ = progress_handle.await;
87
88 for result in results {
89 result?;
90 }
91
92 Ok(())
93 }
94
95 async fn process_archive(ctx: ArchiveProcessContext, archive: PlannedArchive) -> Result<()> {
97 task::spawn_blocking(move || ArchiveProcessor::new(archive, ctx).run()).await??;
98 Ok(())
99 }
100}
101
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104enum ArchiveAttemptState {
105 RunAttempt,
107 VerifyOutputs,
109 RetryAttempt,
111 Complete,
113 Fail,
115}
116
117struct ArchiveProcessor {
119 archive: PlannedArchive,
121 ctx: ArchiveProcessContext,
123}
124
125impl ArchiveProcessor {
126 fn new(archive: PlannedArchive, ctx: ArchiveProcessContext) -> Self {
128 Self { archive, ctx }
129 }
130
131 fn run(self) -> Result<()> {
133 let archive = self.archive();
134 if self.try_reuse_outputs()? {
135 info!(target: "reth::cli", file = %archive.file_name, component = %self.archive.component, "Skipping already verified plain files");
136 return Ok(());
137 }
138
139 let mode = ArchiveMode::new(&self.ctx)?;
140 let format = CompressionFormat::from_url(&archive.file_name)?;
141 let mut attempt = 1;
142 let mut last_error: Option<eyre::Error> = None;
143 let mut state = ArchiveAttemptState::RunAttempt;
144
145 loop {
146 match state {
147 ArchiveAttemptState::RunAttempt => {
148 self.cleanup_outputs();
149
150 if attempt > 1 {
151 info!(target: "reth::cli",
152 file = %archive.file_name,
153 component = %self.archive.component,
154 attempt,
155 max = MAX_DOWNLOAD_RETRIES,
156 "Retrying archive from scratch"
157 );
158 }
159
160 match self.run_attempt(mode, format) {
161 Ok(()) => state = ArchiveAttemptState::VerifyOutputs,
162 Err(error) if mode.retries_fetch_errors() => {
163 warn!(target: "reth::cli",
164 file = %archive.file_name,
165 component = %self.archive.component,
166 attempt,
167 err = %format_args!("{error:#}"),
168 "Archive attempt failed, retrying from scratch"
169 );
170 last_error = Some(error);
171 state = ArchiveAttemptState::RetryAttempt;
172 }
173 Err(error) => return Err(error),
174 }
175 }
176 ArchiveAttemptState::VerifyOutputs => {
177 if self.verify_outputs_with_progress()? {
178 state = ArchiveAttemptState::Complete;
179 } else {
180 warn!(target: "reth::cli", file = %archive.file_name, component = %self.archive.component, attempt, "Archive extracted, but output verification failed, retrying");
181 state = ArchiveAttemptState::RetryAttempt;
182 }
183 }
184 ArchiveAttemptState::RetryAttempt => {
185 if attempt >= MAX_DOWNLOAD_RETRIES {
186 state = ArchiveAttemptState::Fail;
187 } else {
188 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
189 attempt += 1;
190 state = ArchiveAttemptState::RunAttempt;
191 }
192 }
193 ArchiveAttemptState::Complete => return Ok(()),
194 ArchiveAttemptState::Fail => {
195 if let Some(error) = last_error {
196 return Err(error.wrap_err(format!(
197 "Failed after {} attempts for {}",
198 MAX_DOWNLOAD_RETRIES, archive.file_name
199 )));
200 }
201
202 eyre::bail!(
203 "Failed integrity validation after {} attempts for {}",
204 MAX_DOWNLOAD_RETRIES,
205 archive.file_name
206 )
207 }
208 }
209 }
210 }
211
212 fn archive(&self) -> &SnapshotArchive {
214 &self.archive.archive
215 }
216
217 fn output_verifier(&self) -> OutputVerifier<'_> {
219 OutputVerifier::new(self.ctx.target_dir())
220 }
221
222 fn try_reuse_outputs(&self) -> Result<bool> {
225 if self.verify_outputs()? {
226 self.mark_complete();
227 return Ok(true);
228 }
229
230 Ok(false)
231 }
232
233 fn cleanup_outputs(&self) {
235 self.output_verifier().cleanup(&self.archive().output_files);
236 }
237
238 fn verify_outputs(&self) -> Result<bool> {
241 self.output_verifier().verify(&self.archive().output_files)
242 }
243
244 fn mark_complete(&self) {
246 self.ctx.session().record_reused_archive(self.archive().size, self.archive().output_size());
247 }
248
249 fn run_attempt(&self, mode: ArchiveMode, format: CompressionFormat) -> Result<()> {
251 mode.execute(self, format)
252 }
253
254 fn run_cached_attempt(&self, format: CompressionFormat) -> Result<()> {
256 let cache_dir =
257 self.ctx.cache_dir().ok_or_else(|| eyre::eyre!("Missing download cache directory"))?;
258 let fetcher =
259 ArchiveFetcher::new(self.archive().url.clone(), cache_dir, self.ctx.session().clone());
260
261 if self.archive.ty == super::manifest::SnapshotComponentType::State {
262 debug!(target: "reth::cli", url = %self.archive().url, "Downloading state snapshot archive");
263 }
264
265 let download_result = {
266 let mut download_progress = ArchiveDownloadProgress::new(self.ctx.session().progress());
267 let result = fetcher.download(Some(&mut download_progress));
268 if let Ok(ref downloaded) = result &&
269 download_progress.has_tracked_bytes()
270 {
271 download_progress.complete(downloaded.size);
272 }
273 result
274 };
275
276 let downloaded = match download_result {
277 Ok(downloaded) => downloaded,
278 Err(error) => {
279 fetcher.cleanup_downloaded_files();
280 return Err(error);
281 }
282 };
283
284 info!(target: "reth::cli",
285 file = %self.archive().file_name,
286 component = %self.archive.component,
287 size = %super::progress::DownloadProgress::format_size(downloaded.size),
288 "Archive download complete"
289 );
290
291 let extract_result = self.extract_cached_archive(&downloaded.path, format);
292 fetcher.cleanup_downloaded_files();
293 extract_result
294 }
295
296 fn run_streaming_attempt(&self, format: CompressionFormat) -> Result<()> {
298 let _download_progress = ArchiveDownloadProgress::new(self.ctx.session().progress());
299 streaming_download_and_extract(
300 &self.archive().url,
301 format,
302 self.ctx.target_dir(),
303 self.ctx.session(),
304 )
305 }
306
307 fn extract_cached_archive(&self, archive_path: &Path, format: CompressionFormat) -> Result<()> {
309 let mut extraction_progress = ArchiveExtractionProgress::new(self.ctx.session().progress());
310 let file = fs::open(archive_path)?;
311 let result = extract_archive_raw(
312 file,
313 format,
314 self.ctx.target_dir(),
315 Some(&mut extraction_progress),
316 );
317 extraction_progress.finish();
318 result
319 }
320
321 fn verify_outputs_with_progress(&self) -> Result<bool> {
324 let mut verification_progress =
325 ArchiveVerificationProgress::new(self.ctx.session().progress());
326 let verified = self
327 .output_verifier()
328 .verify_with_progress(&self.archive().output_files, Some(&mut verification_progress))?;
329 if verified {
330 verification_progress.complete(self.archive().output_size());
331 }
332 Ok(verified)
333 }
334}
335
336#[derive(Debug, Clone, Copy, PartialEq, Eq)]
338enum ArchiveMode {
339 Cached,
341 Streaming,
343}
344
345impl ArchiveMode {
346 fn new(ctx: &ArchiveProcessContext) -> Result<Self> {
348 if ctx.cache_dir().is_some() {
349 ctx.session().require_request_limiter()?;
350 return Ok(Self::Cached)
351 }
352
353 Ok(Self::Streaming)
354 }
355
356 const fn retries_fetch_errors(&self) -> bool {
358 matches!(self, Self::Cached)
359 }
360
361 fn execute(&self, processor: &ArchiveProcessor, format: CompressionFormat) -> Result<()> {
363 match self {
364 Self::Cached => processor.run_cached_attempt(format),
365 Self::Streaming => processor.run_streaming_attempt(format),
366 }
367 }
368}