1use eyre::Result;
2use reth_cli_util::cancellation::CancellationToken;
3use std::{
4 io::{self, Read, Write},
5 sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 Arc, Condvar, Mutex,
8 },
9 time::{Duration, Instant},
10};
11use tracing::info;
12
13const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
14
15pub(crate) struct DownloadProgress {
17 pub(crate) downloaded: u64,
19 total_size: u64,
21 last_displayed: Instant,
23 started_at: Instant,
25}
26
27impl DownloadProgress {
28 pub(crate) fn new(total_size: u64) -> Self {
30 let now = Instant::now();
31 Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
32 }
33
34 pub(crate) fn format_size(size: u64) -> String {
36 let mut size = size as f64;
37 let mut unit_index = 0;
38
39 while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
40 size /= 1024.0;
41 unit_index += 1;
42 }
43
44 format!("{:.2} {}", size, BYTE_UNITS[unit_index])
45 }
46
47 pub(crate) fn format_duration(duration: Duration) -> String {
49 let secs = duration.as_secs();
50 if secs < 60 {
51 format!("{secs}s")
52 } else if secs < 3600 {
53 format!("{}m {}s", secs / 60, secs % 60)
54 } else {
55 format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
56 }
57 }
58
59 pub(crate) fn update(&mut self, chunk_size: u64) -> Result<()> {
61 self.downloaded += chunk_size;
62
63 if self.last_displayed.elapsed() >= Duration::from_millis(100) {
64 let formatted_downloaded = Self::format_size(self.downloaded);
65 let formatted_total = Self::format_size(self.total_size);
66 let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
67
68 let elapsed = self.started_at.elapsed();
69 let eta = if self.downloaded > 0 {
70 let remaining = self.total_size.saturating_sub(self.downloaded);
71 let speed = self.downloaded as f64 / elapsed.as_secs_f64();
72 if speed > 0.0 {
73 Duration::from_secs_f64(remaining as f64 / speed)
74 } else {
75 Duration::ZERO
76 }
77 } else {
78 Duration::ZERO
79 };
80 let eta_str = Self::format_duration(eta);
81
82 print!(
83 "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str} ",
84 );
85 io::stdout().flush()?;
86 self.last_displayed = Instant::now();
87 }
88
89 Ok(())
90 }
91}
92
93#[derive(Debug, Clone, Copy)]
94struct PhaseStart {
95 started_at: Instant,
96 baseline_bytes: u64,
97}
98
99pub(crate) struct SharedProgress {
101 pub(crate) session_fetched_bytes: AtomicU64,
103 pub(crate) completed_download_bytes: AtomicU64,
105 pub(crate) active_download_bytes: AtomicU64,
107 pub(crate) total_download_bytes: u64,
109 pub(crate) completed_output_bytes: AtomicU64,
111 pub(crate) active_extracted_output_bytes: AtomicU64,
113 pub(crate) active_verified_output_bytes: AtomicU64,
115 pub(crate) total_output_bytes: u64,
117 pub(crate) total_archives: u64,
119 pub(crate) started_at: Instant,
121 extraction_phase: Mutex<Option<PhaseStart>>,
123 verification_phase: Mutex<Option<PhaseStart>>,
125 pub(crate) archives_done: AtomicU64,
127 pub(crate) active_downloads: AtomicU64,
129 pub(crate) active_download_requests: AtomicU64,
131 pub(crate) active_extractions: AtomicU64,
133 pub(crate) active_verifications: AtomicU64,
135 pub(crate) done: AtomicBool,
137 cancel_token: CancellationToken,
139}
140
141impl SharedProgress {
142 pub(crate) fn new(
144 total_download_bytes: u64,
145 total_output_bytes: u64,
146 total_archives: u64,
147 cancel_token: CancellationToken,
148 ) -> Arc<Self> {
149 Arc::new(Self {
150 session_fetched_bytes: AtomicU64::new(0),
151 completed_download_bytes: AtomicU64::new(0),
152 active_download_bytes: AtomicU64::new(0),
153 total_download_bytes,
154 completed_output_bytes: AtomicU64::new(0),
155 active_extracted_output_bytes: AtomicU64::new(0),
156 active_verified_output_bytes: AtomicU64::new(0),
157 total_output_bytes,
158 total_archives,
159 started_at: Instant::now(),
160 extraction_phase: Mutex::new(None),
161 verification_phase: Mutex::new(None),
162 archives_done: AtomicU64::new(0),
163 active_downloads: AtomicU64::new(0),
164 active_download_requests: AtomicU64::new(0),
165 active_extractions: AtomicU64::new(0),
166 active_verifications: AtomicU64::new(0),
167 done: AtomicBool::new(false),
168 cancel_token,
169 })
170 }
171
172 pub(crate) fn is_cancelled(&self) -> bool {
174 self.cancel_token.is_cancelled()
175 }
176
177 pub(crate) fn record_session_fetched_bytes(&self, bytes: u64) {
179 self.session_fetched_bytes.fetch_add(bytes, Ordering::Relaxed);
180 }
181
182 pub(crate) fn add_active_download_bytes(&self, bytes: u64) {
183 self.active_download_bytes.fetch_add(bytes, Ordering::Relaxed);
184 }
185
186 pub(crate) fn sub_active_download_bytes(&self, bytes: u64) {
187 sub_bytes(&self.active_download_bytes, bytes);
188 }
189
190 fn add_active_extracted_output_bytes(&self, bytes: u64) {
191 self.active_extracted_output_bytes.fetch_add(bytes, Ordering::Relaxed);
192 }
193
194 fn sub_active_extracted_output_bytes(&self, bytes: u64) {
195 sub_bytes(&self.active_extracted_output_bytes, bytes);
196 }
197
198 fn add_active_verified_output_bytes(&self, bytes: u64) {
199 self.active_verified_output_bytes.fetch_add(bytes, Ordering::Relaxed);
200 }
201
202 fn sub_active_verified_output_bytes(&self, bytes: u64) {
203 sub_bytes(&self.active_verified_output_bytes, bytes);
204 }
205
206 pub(crate) fn record_reused_archive(&self, download_bytes: u64, output_bytes: u64) {
208 self.completed_download_bytes.fetch_add(download_bytes, Ordering::Relaxed);
209 self.completed_output_bytes.fetch_add(output_bytes, Ordering::Relaxed);
210 self.archives_done.fetch_add(1, Ordering::Relaxed);
211 }
212
213 pub(crate) fn record_archive_download_complete(&self, bytes: u64) {
215 self.completed_download_bytes.fetch_add(bytes, Ordering::Relaxed);
216 }
217
218 pub(crate) fn record_archive_output_complete(&self, bytes: u64) {
220 self.completed_output_bytes.fetch_add(bytes, Ordering::Relaxed);
221 self.archives_done.fetch_add(1, Ordering::Relaxed);
222 }
223
224 pub(crate) fn logical_downloaded_bytes(&self) -> u64 {
226 (self.completed_download_bytes.load(Ordering::Relaxed) +
227 self.active_download_bytes.load(Ordering::Relaxed))
228 .min(self.total_download_bytes)
229 }
230
231 pub(crate) fn verified_output_bytes(&self) -> u64 {
233 self.completed_output_bytes.load(Ordering::Relaxed).min(self.total_output_bytes)
234 }
235
236 pub(crate) fn extracting_output_bytes(&self) -> u64 {
238 (self.completed_output_bytes.load(Ordering::Relaxed) +
239 self.active_extracted_output_bytes.load(Ordering::Relaxed))
240 .min(self.total_output_bytes)
241 }
242
243 pub(crate) fn verifying_output_bytes(&self) -> u64 {
245 (self.completed_output_bytes.load(Ordering::Relaxed) +
246 self.active_verified_output_bytes.load(Ordering::Relaxed))
247 .min(self.total_output_bytes)
248 }
249
250 fn restart_phase(slot: &Mutex<Option<PhaseStart>>, baseline_bytes: u64) {
251 *slot.lock().unwrap() = Some(PhaseStart { started_at: Instant::now(), baseline_bytes });
252 }
253
254 fn phase_eta(
255 slot: &Mutex<Option<PhaseStart>>,
256 current_bytes: u64,
257 total_bytes: u64,
258 ) -> Option<Duration> {
259 let phase = *slot.lock().unwrap();
260 let phase = phase?;
261 let done = current_bytes.saturating_sub(phase.baseline_bytes);
262 let total = total_bytes.saturating_sub(phase.baseline_bytes);
263 eta_from_progress(phase.started_at.elapsed(), done, total)
264 }
265
266 fn extraction_eta(&self, current_bytes: u64) -> Option<Duration> {
267 Self::phase_eta(&self.extraction_phase, current_bytes, self.total_output_bytes)
268 }
269
270 fn verification_eta(&self, current_bytes: u64) -> Option<Duration> {
271 Self::phase_eta(&self.verification_phase, current_bytes, self.total_output_bytes)
272 }
273
274 pub(crate) fn download_started(&self) {
276 self.active_downloads.fetch_add(1, Ordering::Relaxed);
277 }
278
279 pub(crate) fn download_finished(&self) {
281 sub_bytes(&self.active_downloads, 1);
282 }
283
284 pub(crate) fn request_started(&self) {
286 self.active_download_requests.fetch_add(1, Ordering::Relaxed);
287 }
288
289 pub(crate) fn request_finished(&self) {
291 sub_bytes(&self.active_download_requests, 1);
292 }
293
294 pub(crate) fn extraction_started(&self) {
296 if self.active_extractions.fetch_add(1, Ordering::Relaxed) == 0 {
297 Self::restart_phase(
298 &self.extraction_phase,
299 self.completed_output_bytes.load(Ordering::Relaxed),
300 );
301 }
302 }
303
304 pub(crate) fn extraction_finished(&self) {
306 sub_bytes(&self.active_extractions, 1);
307 }
308
309 pub(crate) fn verification_started(&self) {
311 if self.active_verifications.fetch_add(1, Ordering::Relaxed) == 0 {
312 Self::restart_phase(
313 &self.verification_phase,
314 self.completed_output_bytes.load(Ordering::Relaxed),
315 );
316 }
317 }
318
319 pub(crate) fn verification_finished(&self) {
321 sub_bytes(&self.active_verifications, 1);
322 }
323}
324
325fn sub_bytes(counter: &AtomicU64, bytes: u64) {
326 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
327 Some(current.saturating_sub(bytes))
328 });
329}
330
331fn eta_from_progress(elapsed: Duration, done: u64, total: u64) -> Option<Duration> {
332 if done == 0 || done >= total {
333 return None;
334 }
335
336 let secs = elapsed.as_secs_f64();
337 if secs <= 0.0 {
338 return None;
339 }
340
341 let speed = done as f64 / secs;
342 if speed <= 0.0 {
343 return None;
344 }
345
346 Some(Duration::from_secs_f64((total - done) as f64 / speed))
347}
348
349fn format_percent(done: u64, total: u64) -> String {
350 if total == 0 {
351 return "100.0%".to_string();
352 }
353
354 format!("{:.1}%", (done as f64 / total as f64) * 100.0)
355}
356
357fn format_eta(eta: Option<Duration>) -> String {
358 eta.map(DownloadProgress::format_duration).unwrap_or_else(|| "unknown".to_string())
359}
360
361pub(crate) struct DownloadRequestLimiter {
366 limit: usize,
368 active: Mutex<usize>,
370 notify: Condvar,
372}
373
374impl DownloadRequestLimiter {
375 pub(crate) fn new(limit: usize) -> Arc<Self> {
377 Arc::new(Self { limit: limit.max(1), active: Mutex::new(0), notify: Condvar::new() })
378 }
379
380 pub(crate) fn max_concurrency(&self) -> usize {
382 self.limit
383 }
384
385 pub(crate) fn acquire<'a>(
386 &'a self,
387 progress: Option<&'a Arc<SharedProgress>>,
388 cancel_token: &CancellationToken,
389 ) -> Result<DownloadRequestPermit<'a>> {
390 let mut active = self.active.lock().unwrap();
391 loop {
392 if cancel_token.is_cancelled() {
393 return Err(eyre::eyre!("Download cancelled"));
394 }
395
396 if *active < self.limit {
397 *active += 1;
398 if let Some(progress) = progress {
399 progress.request_started();
400 }
401 return Ok(DownloadRequestPermit { limiter: self, progress });
402 }
403
404 let (next_active, _) =
407 self.notify.wait_timeout(active, Duration::from_millis(100)).unwrap();
408 active = next_active;
409 }
410 }
411}
412
413pub(crate) struct DownloadRequestPermit<'a> {
418 limiter: &'a DownloadRequestLimiter,
420 progress: Option<&'a Arc<SharedProgress>>,
422}
423
424impl Drop for DownloadRequestPermit<'_> {
425 fn drop(&mut self) {
427 let mut active = self.limiter.active.lock().unwrap();
428 *active = active.saturating_sub(1);
429 drop(active);
430 self.limiter.notify.notify_one();
431
432 if let Some(progress) = self.progress {
433 progress.request_finished();
434 }
435 }
436}
437
438pub(crate) struct ArchiveDownloadProgress<'a> {
440 progress: Option<&'a Arc<SharedProgress>>,
441 downloaded: u64,
442 completed: bool,
443}
444
445impl<'a> ArchiveDownloadProgress<'a> {
446 pub(crate) fn new(progress: Option<&'a Arc<SharedProgress>>) -> Self {
448 if let Some(progress) = progress {
449 progress.download_started();
450 }
451 Self { progress, downloaded: 0, completed: false }
452 }
453
454 pub(crate) fn record_downloaded(&mut self, bytes: u64) {
456 self.downloaded += bytes;
457 if let Some(progress) = self.progress {
458 progress.add_active_download_bytes(bytes);
459 }
460 }
461
462 pub(crate) fn has_tracked_bytes(&self) -> bool {
464 self.downloaded > 0
465 }
466
467 pub(crate) fn complete(&mut self, total_bytes: u64) {
469 if self.completed {
470 return;
471 }
472 if let Some(progress) = self.progress {
473 progress.sub_active_download_bytes(self.downloaded);
474 progress.record_archive_download_complete(total_bytes);
475 }
476 self.downloaded = 0;
477 self.completed = true;
478 }
479}
480
481impl Drop for ArchiveDownloadProgress<'_> {
482 fn drop(&mut self) {
483 if let Some(progress) = self.progress {
484 progress.sub_active_download_bytes(self.downloaded);
485 progress.download_finished();
486 }
487 }
488}
489
490pub(crate) struct ArchiveExtractionProgress {
492 progress: Option<Arc<SharedProgress>>,
493 extracted: Arc<AtomicU64>,
494 finished: bool,
495}
496
497#[derive(Clone)]
499pub(crate) struct ArchiveExtractionProgressHandle {
500 progress: Arc<SharedProgress>,
501 extracted: Arc<AtomicU64>,
502}
503
504impl ArchiveExtractionProgress {
505 pub(crate) fn new(progress: Option<&Arc<SharedProgress>>) -> Self {
507 if let Some(progress) = progress {
508 progress.extraction_started();
509 }
510 Self {
511 progress: progress.cloned(),
512 extracted: Arc::new(AtomicU64::new(0)),
513 finished: false,
514 }
515 }
516
517 pub(crate) fn handle(&self) -> Option<ArchiveExtractionProgressHandle> {
519 Some(ArchiveExtractionProgressHandle {
520 progress: Arc::clone(self.progress.as_ref()?),
521 extracted: Arc::clone(&self.extracted),
522 })
523 }
524
525 pub(crate) fn record_extracted(&mut self, bytes: u64) {
527 if let Some(handle) = self.handle() {
528 handle.record_extracted(bytes);
529 }
530 }
531
532 pub(crate) fn finish(&mut self) {
534 if self.finished {
535 return;
536 }
537 if let Some(progress) = &self.progress {
538 progress.sub_active_extracted_output_bytes(self.extracted.swap(0, Ordering::Relaxed));
539 }
540 self.finished = true;
541 }
542}
543
544impl Drop for ArchiveExtractionProgress {
545 fn drop(&mut self) {
546 if let Some(progress) = &self.progress {
547 progress.sub_active_extracted_output_bytes(self.extracted.swap(0, Ordering::Relaxed));
548 progress.extraction_finished();
549 }
550 }
551}
552
553impl ArchiveExtractionProgressHandle {
554 pub(crate) fn record_extracted(&self, bytes: u64) {
556 self.extracted.fetch_add(bytes, Ordering::Relaxed);
557 self.progress.add_active_extracted_output_bytes(bytes);
558 }
559}
560
561pub(crate) struct ArchiveVerificationProgress<'a> {
563 progress: Option<&'a Arc<SharedProgress>>,
564 verified: u64,
565 completed: bool,
566}
567
568impl<'a> ArchiveVerificationProgress<'a> {
569 pub(crate) fn new(progress: Option<&'a Arc<SharedProgress>>) -> Self {
571 if let Some(progress) = progress {
572 progress.verification_started();
573 }
574 Self { progress, verified: 0, completed: false }
575 }
576
577 pub(crate) fn record_verified(&mut self, bytes: u64) {
579 self.verified += bytes;
580 if let Some(progress) = self.progress {
581 progress.add_active_verified_output_bytes(bytes);
582 }
583 }
584
585 pub(crate) fn complete(&mut self, total_bytes: u64) {
587 if self.completed {
588 return;
589 }
590 if let Some(progress) = self.progress {
591 progress.sub_active_verified_output_bytes(self.verified);
592 progress.record_archive_output_complete(total_bytes);
593 }
594 self.verified = 0;
595 self.completed = true;
596 }
597}
598
599impl Drop for ArchiveVerificationProgress<'_> {
600 fn drop(&mut self) {
601 if let Some(progress) = self.progress {
602 progress.sub_active_verified_output_bytes(self.verified);
603 progress.verification_finished();
604 }
605 }
606}
607
608pub(crate) struct ProgressReader<R> {
610 reader: R,
612 progress: DownloadProgress,
614 cancel_token: CancellationToken,
616}
617
618impl<R: Read> ProgressReader<R> {
619 pub(crate) fn new(reader: R, total_size: u64, cancel_token: CancellationToken) -> Self {
621 Self { reader, progress: DownloadProgress::new(total_size), cancel_token }
622 }
623}
624
625impl<R: Read> Read for ProgressReader<R> {
626 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
628 if self.cancel_token.is_cancelled() {
629 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
630 }
631 let bytes = self.reader.read(buf)?;
632 if bytes > 0 &&
633 let Err(error) = self.progress.update(bytes as u64)
634 {
635 return Err(io::Error::other(error));
636 }
637 Ok(bytes)
638 }
639}
640
641pub(crate) struct SharedProgressWriter<'a, W> {
644 pub(crate) inner: W,
646 pub(crate) progress: Arc<SharedProgress>,
648 pub(crate) on_written: Option<&'a mut dyn FnMut(u64)>,
650}
651
652impl<W: Write> Write for SharedProgressWriter<'_, W> {
653 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
655 if self.progress.is_cancelled() {
656 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
657 }
658 let n = self.inner.write(buf)?;
659 self.progress.record_session_fetched_bytes(n as u64);
660 if let Some(on_written) = self.on_written.as_deref_mut() {
661 on_written(n as u64);
662 }
663 Ok(n)
664 }
665
666 fn flush(&mut self) -> io::Result<()> {
668 self.inner.flush()
669 }
670}
671
672pub(crate) struct SharedProgressReader<R> {
675 pub(crate) inner: R,
677 pub(crate) progress: Arc<SharedProgress>,
679}
680
681impl<R: Read> Read for SharedProgressReader<R> {
682 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
684 if self.progress.is_cancelled() {
685 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
686 }
687 let n = self.inner.read(buf)?;
688 self.progress.record_session_fetched_bytes(n as u64);
689 Ok(n)
690 }
691}
692
693pub(crate) fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
696 tokio::spawn(async move {
697 let mut interval = tokio::time::interval(Duration::from_secs(3));
698 interval.tick().await;
699 loop {
700 interval.tick().await;
701
702 if progress.done.load(Ordering::Relaxed) {
703 break;
704 }
705
706 let download_total = progress.total_download_bytes;
707 let output_total = progress.total_output_bytes;
708 if download_total == 0 && output_total == 0 {
709 continue;
710 }
711
712 let done = progress.archives_done.load(Ordering::Relaxed);
713 let all = progress.total_archives;
714 let active_downloads = progress.active_downloads.load(Ordering::Relaxed);
715 let active_requests = progress.active_download_requests.load(Ordering::Relaxed);
716 let active_extractions = progress.active_extractions.load(Ordering::Relaxed);
717 let active_verifications = progress.active_verifications.load(Ordering::Relaxed);
718 let downloaded = progress.logical_downloaded_bytes();
719 let extracted = progress.extracting_output_bytes();
720 let verified = progress.verifying_output_bytes();
721 let elapsed = DownloadProgress::format_duration(progress.started_at.elapsed());
722 let download_total_display = DownloadProgress::format_size(download_total);
723 let output_total_display = DownloadProgress::format_size(output_total);
724 let downloaded_display = DownloadProgress::format_size(downloaded);
725 let extracted_display = DownloadProgress::format_size(extracted);
726 let active_download_phase = active_downloads > 0 || active_requests > 0;
727
728 if active_download_phase {
729 info!(target: "reth::cli",
730 archives = format_args!("{done}/{all}"),
731 progress = %format_percent(downloaded, download_total),
732 elapsed = %elapsed,
733 eta = %format_eta(eta_from_progress(progress.started_at.elapsed(), downloaded, download_total)),
734 bytes = format_args!("{downloaded_display}/{download_total_display}"),
735 "Downloading snapshot archives"
736 );
737 } else if active_extractions > 0 {
738 info!(target: "reth::cli",
739 archives = format_args!("{done}/{all}"),
740 progress = %format_percent(extracted, output_total),
741 elapsed = %elapsed,
742 eta = %format_eta(progress.extraction_eta(extracted)),
743 bytes = format_args!("{extracted_display}/{output_total_display}"),
744 "Extracting snapshot archives"
745 );
746 } else if active_verifications > 0 {
747 info!(target: "reth::cli",
748 archives = format_args!("{done}/{all}"),
749 progress = %format_percent(verified, output_total),
750 elapsed = %elapsed,
751 eta = %format_eta(progress.verification_eta(verified)),
752 bytes = format_args!("{}/{output_total_display}", DownloadProgress::format_size(verified)),
753 "Verifying snapshot archives"
754 );
755 } else {
756 continue;
757 }
758 }
759
760 let completed = progress.verified_output_bytes();
761 let completed_display = DownloadProgress::format_size(completed);
762 let output_total = DownloadProgress::format_size(progress.total_output_bytes);
763 info!(target: "reth::cli",
764 archives = format_args!("{}/{}", progress.total_archives, progress.total_archives),
765 progress = "100.0%",
766 elapsed = %DownloadProgress::format_duration(progress.started_at.elapsed()),
767 eta = "0s",
768 bytes = format_args!("{completed_display}/{output_total}"),
769 "Snapshot archive processing complete"
770 );
771 })
772}
773
774#[cfg(test)]
775mod tests {
776 use super::*;
777 use std::sync::atomic::Ordering;
778
779 #[test]
780 fn shared_progress_separates_session_fetch_from_logical_progress() {
781 let progress = SharedProgress::new(10, 20, 1, CancellationToken::new());
782
783 progress.record_session_fetched_bytes(10);
784 progress.record_session_fetched_bytes(10);
785 progress.record_archive_download_complete(10);
786 progress.record_archive_output_complete(20);
787
788 assert_eq!(progress.session_fetched_bytes.load(Ordering::Relaxed), 20);
789 assert_eq!(progress.logical_downloaded_bytes(), 10);
790 assert_eq!(progress.verified_output_bytes(), 20);
791 assert_eq!(progress.archives_done.load(Ordering::Relaxed), 1);
792 }
793
794 #[test]
795 fn archive_download_progress_rolls_back_unfinished_attempts() {
796 let progress = SharedProgress::new(10, 20, 1, CancellationToken::new());
797
798 {
799 let mut download = ArchiveDownloadProgress::new(Some(&progress));
800 download.record_downloaded(4);
801 assert_eq!(progress.logical_downloaded_bytes(), 4);
802 }
803
804 assert_eq!(progress.logical_downloaded_bytes(), 0);
805 assert_eq!(progress.active_downloads.load(Ordering::Relaxed), 0);
806 }
807
808 #[test]
809 fn extraction_phase_baseline_restarts_after_idle() {
810 let progress = SharedProgress::new(10, 100, 1, CancellationToken::new());
811
812 progress.extraction_started();
813 assert_eq!(progress.extraction_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
814
815 progress.completed_output_bytes.store(25, Ordering::Relaxed);
816 progress.extraction_started();
817 assert_eq!(progress.extraction_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
818
819 progress.extraction_finished();
820 progress.extraction_finished();
821 progress.extraction_started();
822 assert_eq!(progress.extraction_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 25);
823 }
824
825 #[test]
826 fn verification_phase_baseline_restarts_after_idle() {
827 let progress = SharedProgress::new(10, 100, 1, CancellationToken::new());
828
829 progress.verification_started();
830 assert_eq!(progress.verification_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
831
832 progress.completed_output_bytes.store(40, Ordering::Relaxed);
833 progress.verification_started();
834 assert_eq!(progress.verification_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
835
836 progress.verification_finished();
837 progress.verification_finished();
838 progress.verification_started();
839 assert_eq!(
840 progress.verification_phase.lock().unwrap().as_ref().unwrap().baseline_bytes,
841 40
842 );
843 }
844}