Skip to main content

reth_cli_commands/download/
progress.rs

1use eyre::Result;
2use reth_cli_util::cancellation::CancellationToken;
3use std::{
4    io::{self, Read, Write},
5    sync::{
6        atomic::{AtomicBool, AtomicU64, Ordering},
7        Arc, Condvar, Mutex,
8    },
9    time::{Duration, Instant},
10};
11use tracing::info;
12
13const BYTE_UNITS: [&str; 4] = ["B", "KB", "MB", "GB"];
14
15/// Tracks download progress and throttles display updates to every 100ms.
16pub(crate) struct DownloadProgress {
17    /// Bytes copied so far for this single download.
18    pub(crate) downloaded: u64,
19    /// Total bytes expected for this single download.
20    total_size: u64,
21    /// Time when the progress line was last printed.
22    last_displayed: Instant,
23    /// Time when this progress tracker started.
24    started_at: Instant,
25}
26
27impl DownloadProgress {
28    /// Creates new progress tracker with given total size
29    pub(crate) fn new(total_size: u64) -> Self {
30        let now = Instant::now();
31        Self { downloaded: 0, total_size, last_displayed: now, started_at: now }
32    }
33
34    /// Converts bytes to human readable format (B, KB, MB, GB)
35    pub(crate) fn format_size(size: u64) -> String {
36        let mut size = size as f64;
37        let mut unit_index = 0;
38
39        while size >= 1024.0 && unit_index < BYTE_UNITS.len() - 1 {
40            size /= 1024.0;
41            unit_index += 1;
42        }
43
44        format!("{:.2} {}", size, BYTE_UNITS[unit_index])
45    }
46
47    /// Format duration as human readable string
48    pub(crate) fn format_duration(duration: Duration) -> String {
49        let secs = duration.as_secs();
50        if secs < 60 {
51            format!("{secs}s")
52        } else if secs < 3600 {
53            format!("{}m {}s", secs / 60, secs % 60)
54        } else {
55            format!("{}h {}m", secs / 3600, (secs % 3600) / 60)
56        }
57    }
58
59    /// Updates progress bar (for single-archive legacy downloads)
60    pub(crate) fn update(&mut self, chunk_size: u64) -> Result<()> {
61        self.downloaded += chunk_size;
62
63        if self.last_displayed.elapsed() >= Duration::from_millis(100) {
64            let formatted_downloaded = Self::format_size(self.downloaded);
65            let formatted_total = Self::format_size(self.total_size);
66            let progress = (self.downloaded as f64 / self.total_size as f64) * 100.0;
67
68            let elapsed = self.started_at.elapsed();
69            let eta = if self.downloaded > 0 {
70                let remaining = self.total_size.saturating_sub(self.downloaded);
71                let speed = self.downloaded as f64 / elapsed.as_secs_f64();
72                if speed > 0.0 {
73                    Duration::from_secs_f64(remaining as f64 / speed)
74                } else {
75                    Duration::ZERO
76                }
77            } else {
78                Duration::ZERO
79            };
80            let eta_str = Self::format_duration(eta);
81
82            print!(
83                "\rDownloading and extracting... {progress:.2}% ({formatted_downloaded} / {formatted_total}) ETA: {eta_str}     ",
84            );
85            io::stdout().flush()?;
86            self.last_displayed = Instant::now();
87        }
88
89        Ok(())
90    }
91}
92
93#[derive(Debug, Clone, Copy)]
94struct PhaseStart {
95    started_at: Instant,
96    baseline_bytes: u64,
97}
98
99/// Shared progress counters for parallel downloads.
100pub(crate) struct SharedProgress {
101    /// Raw HTTP bytes fetched during this session, including retries.
102    pub(crate) session_fetched_bytes: AtomicU64,
103    /// Compressed bytes from archives that have fully downloaded.
104    pub(crate) completed_download_bytes: AtomicU64,
105    /// Compressed bytes written for currently active archive download attempts.
106    pub(crate) active_download_bytes: AtomicU64,
107    /// Total compressed bytes expected across all planned archives.
108    pub(crate) total_download_bytes: u64,
109    /// Plain-output bytes from archives that have fully verified.
110    pub(crate) completed_output_bytes: AtomicU64,
111    /// Plain-output bytes unpacked by currently active extractions.
112    pub(crate) active_extracted_output_bytes: AtomicU64,
113    /// Plain-output bytes hashed by currently active verifications.
114    pub(crate) active_verified_output_bytes: AtomicU64,
115    /// Total plain-output bytes expected across all planned archives.
116    pub(crate) total_output_bytes: u64,
117    /// Total number of planned archives.
118    pub(crate) total_archives: u64,
119    /// Time when the modular download job started.
120    pub(crate) started_at: Instant,
121    /// Time and baseline when the current extraction phase started.
122    extraction_phase: Mutex<Option<PhaseStart>>,
123    /// Time and baseline when the current verification phase started.
124    verification_phase: Mutex<Option<PhaseStart>>,
125    /// Number of archives that have fully finished.
126    pub(crate) archives_done: AtomicU64,
127    /// Number of archives currently in the fetch phase.
128    pub(crate) active_downloads: AtomicU64,
129    /// Number of in-flight HTTP requests.
130    pub(crate) active_download_requests: AtomicU64,
131    /// Number of archives currently extracting.
132    pub(crate) active_extractions: AtomicU64,
133    /// Number of archives currently verifying extracted outputs.
134    pub(crate) active_verifications: AtomicU64,
135    /// Signals the background progress task to exit.
136    pub(crate) done: AtomicBool,
137    /// Cancellation token shared by the whole command.
138    cancel_token: CancellationToken,
139}
140
141impl SharedProgress {
142    /// Creates the shared progress state for a modular download job.
143    pub(crate) fn new(
144        total_download_bytes: u64,
145        total_output_bytes: u64,
146        total_archives: u64,
147        cancel_token: CancellationToken,
148    ) -> Arc<Self> {
149        Arc::new(Self {
150            session_fetched_bytes: AtomicU64::new(0),
151            completed_download_bytes: AtomicU64::new(0),
152            active_download_bytes: AtomicU64::new(0),
153            total_download_bytes,
154            completed_output_bytes: AtomicU64::new(0),
155            active_extracted_output_bytes: AtomicU64::new(0),
156            active_verified_output_bytes: AtomicU64::new(0),
157            total_output_bytes,
158            total_archives,
159            started_at: Instant::now(),
160            extraction_phase: Mutex::new(None),
161            verification_phase: Mutex::new(None),
162            archives_done: AtomicU64::new(0),
163            active_downloads: AtomicU64::new(0),
164            active_download_requests: AtomicU64::new(0),
165            active_extractions: AtomicU64::new(0),
166            active_verifications: AtomicU64::new(0),
167            done: AtomicBool::new(false),
168            cancel_token,
169        })
170    }
171
172    /// Returns whether the whole command has been cancelled.
173    pub(crate) fn is_cancelled(&self) -> bool {
174        self.cancel_token.is_cancelled()
175    }
176
177    /// Adds raw session traffic bytes without affecting logical progress.
178    pub(crate) fn record_session_fetched_bytes(&self, bytes: u64) {
179        self.session_fetched_bytes.fetch_add(bytes, Ordering::Relaxed);
180    }
181
182    pub(crate) fn add_active_download_bytes(&self, bytes: u64) {
183        self.active_download_bytes.fetch_add(bytes, Ordering::Relaxed);
184    }
185
186    pub(crate) fn sub_active_download_bytes(&self, bytes: u64) {
187        sub_bytes(&self.active_download_bytes, bytes);
188    }
189
190    fn add_active_extracted_output_bytes(&self, bytes: u64) {
191        self.active_extracted_output_bytes.fetch_add(bytes, Ordering::Relaxed);
192    }
193
194    fn sub_active_extracted_output_bytes(&self, bytes: u64) {
195        sub_bytes(&self.active_extracted_output_bytes, bytes);
196    }
197
198    fn add_active_verified_output_bytes(&self, bytes: u64) {
199        self.active_verified_output_bytes.fetch_add(bytes, Ordering::Relaxed);
200    }
201
202    fn sub_active_verified_output_bytes(&self, bytes: u64) {
203        sub_bytes(&self.active_verified_output_bytes, bytes);
204    }
205
206    /// Records an archive whose outputs were already present locally.
207    pub(crate) fn record_reused_archive(&self, download_bytes: u64, output_bytes: u64) {
208        self.completed_download_bytes.fetch_add(download_bytes, Ordering::Relaxed);
209        self.completed_output_bytes.fetch_add(output_bytes, Ordering::Relaxed);
210        self.archives_done.fetch_add(1, Ordering::Relaxed);
211    }
212
213    /// Records an archive whose compressed download completed successfully.
214    pub(crate) fn record_archive_download_complete(&self, bytes: u64) {
215        self.completed_download_bytes.fetch_add(bytes, Ordering::Relaxed);
216    }
217
218    /// Records an archive whose extracted outputs have fully verified.
219    pub(crate) fn record_archive_output_complete(&self, bytes: u64) {
220        self.completed_output_bytes.fetch_add(bytes, Ordering::Relaxed);
221        self.archives_done.fetch_add(1, Ordering::Relaxed);
222    }
223
224    /// Returns logical compressed download progress.
225    pub(crate) fn logical_downloaded_bytes(&self) -> u64 {
226        (self.completed_download_bytes.load(Ordering::Relaxed) +
227            self.active_download_bytes.load(Ordering::Relaxed))
228        .min(self.total_download_bytes)
229    }
230
231    /// Returns verified plain-output bytes.
232    pub(crate) fn verified_output_bytes(&self) -> u64 {
233        self.completed_output_bytes.load(Ordering::Relaxed).min(self.total_output_bytes)
234    }
235
236    /// Returns plain-output bytes currently represented by extraction progress.
237    pub(crate) fn extracting_output_bytes(&self) -> u64 {
238        (self.completed_output_bytes.load(Ordering::Relaxed) +
239            self.active_extracted_output_bytes.load(Ordering::Relaxed))
240        .min(self.total_output_bytes)
241    }
242
243    /// Returns plain-output bytes currently represented by verification progress.
244    pub(crate) fn verifying_output_bytes(&self) -> u64 {
245        (self.completed_output_bytes.load(Ordering::Relaxed) +
246            self.active_verified_output_bytes.load(Ordering::Relaxed))
247        .min(self.total_output_bytes)
248    }
249
250    fn restart_phase(slot: &Mutex<Option<PhaseStart>>, baseline_bytes: u64) {
251        *slot.lock().unwrap() = Some(PhaseStart { started_at: Instant::now(), baseline_bytes });
252    }
253
254    fn phase_eta(
255        slot: &Mutex<Option<PhaseStart>>,
256        current_bytes: u64,
257        total_bytes: u64,
258    ) -> Option<Duration> {
259        let phase = *slot.lock().unwrap();
260        let phase = phase?;
261        let done = current_bytes.saturating_sub(phase.baseline_bytes);
262        let total = total_bytes.saturating_sub(phase.baseline_bytes);
263        eta_from_progress(phase.started_at.elapsed(), done, total)
264    }
265
266    fn extraction_eta(&self, current_bytes: u64) -> Option<Duration> {
267        Self::phase_eta(&self.extraction_phase, current_bytes, self.total_output_bytes)
268    }
269
270    fn verification_eta(&self, current_bytes: u64) -> Option<Duration> {
271        Self::phase_eta(&self.verification_phase, current_bytes, self.total_output_bytes)
272    }
273
274    /// Marks one archive as actively downloading.
275    pub(crate) fn download_started(&self) {
276        self.active_downloads.fetch_add(1, Ordering::Relaxed);
277    }
278
279    /// Marks one archive download as finished.
280    pub(crate) fn download_finished(&self) {
281        sub_bytes(&self.active_downloads, 1);
282    }
283
284    /// Marks one HTTP request as in flight.
285    pub(crate) fn request_started(&self) {
286        self.active_download_requests.fetch_add(1, Ordering::Relaxed);
287    }
288
289    /// Marks one HTTP request as finished.
290    pub(crate) fn request_finished(&self) {
291        sub_bytes(&self.active_download_requests, 1);
292    }
293
294    /// Marks one archive as actively extracting.
295    pub(crate) fn extraction_started(&self) {
296        if self.active_extractions.fetch_add(1, Ordering::Relaxed) == 0 {
297            Self::restart_phase(
298                &self.extraction_phase,
299                self.completed_output_bytes.load(Ordering::Relaxed),
300            );
301        }
302    }
303
304    /// Marks one archive extraction as finished.
305    pub(crate) fn extraction_finished(&self) {
306        sub_bytes(&self.active_extractions, 1);
307    }
308
309    /// Marks one archive as actively verifying outputs.
310    pub(crate) fn verification_started(&self) {
311        if self.active_verifications.fetch_add(1, Ordering::Relaxed) == 0 {
312            Self::restart_phase(
313                &self.verification_phase,
314                self.completed_output_bytes.load(Ordering::Relaxed),
315            );
316        }
317    }
318
319    /// Marks one archive verification as finished.
320    pub(crate) fn verification_finished(&self) {
321        sub_bytes(&self.active_verifications, 1);
322    }
323}
324
325fn sub_bytes(counter: &AtomicU64, bytes: u64) {
326    let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
327        Some(current.saturating_sub(bytes))
328    });
329}
330
331fn eta_from_progress(elapsed: Duration, done: u64, total: u64) -> Option<Duration> {
332    if done == 0 || done >= total {
333        return None;
334    }
335
336    let secs = elapsed.as_secs_f64();
337    if secs <= 0.0 {
338        return None;
339    }
340
341    let speed = done as f64 / secs;
342    if speed <= 0.0 {
343        return None;
344    }
345
346    Some(Duration::from_secs_f64((total - done) as f64 / speed))
347}
348
349fn format_percent(done: u64, total: u64) -> String {
350    if total == 0 {
351        return "100.0%".to_string();
352    }
353
354    format!("{:.1}%", (done as f64 / total as f64) * 100.0)
355}
356
357fn format_eta(eta: Option<Duration>) -> String {
358    eta.map(DownloadProgress::format_duration).unwrap_or_else(|| "unknown".to_string())
359}
360
361/// Global request limit for the blocking downloader.
362///
363/// This uses `Mutex + Condvar` because the segmented path runs blocking reqwest
364/// clients on OS threads.
365pub(crate) struct DownloadRequestLimiter {
366    /// Maximum number of in-flight HTTP requests.
367    limit: usize,
368    /// Current number of acquired request slots.
369    active: Mutex<usize>,
370    /// Wakes blocked threads when a slot is released.
371    notify: Condvar,
372}
373
374impl DownloadRequestLimiter {
375    /// Creates the shared request limiter.
376    pub(crate) fn new(limit: usize) -> Arc<Self> {
377        Arc::new(Self { limit: limit.max(1), active: Mutex::new(0), notify: Condvar::new() })
378    }
379
380    /// Returns the configured request limit.
381    pub(crate) fn max_concurrency(&self) -> usize {
382        self.limit
383    }
384
385    pub(crate) fn acquire<'a>(
386        &'a self,
387        progress: Option<&'a Arc<SharedProgress>>,
388        cancel_token: &CancellationToken,
389    ) -> Result<DownloadRequestPermit<'a>> {
390        let mut active = self.active.lock().unwrap();
391        loop {
392            if cancel_token.is_cancelled() {
393                return Err(eyre::eyre!("Download cancelled"));
394            }
395
396            if *active < self.limit {
397                *active += 1;
398                if let Some(progress) = progress {
399                    progress.request_started();
400                }
401                return Ok(DownloadRequestPermit { limiter: self, progress });
402            }
403
404            // Wake periodically so cancellation can interrupt waiters even if
405            // no request finishes.
406            let (next_active, _) =
407                self.notify.wait_timeout(active, Duration::from_millis(100)).unwrap();
408            active = next_active;
409        }
410    }
411}
412
413/// RAII permit for one in-flight HTTP request.
414///
415/// Dropping the permit releases a slot in the shared request limit and updates
416/// the live progress counters.
417pub(crate) struct DownloadRequestPermit<'a> {
418    /// Limiter that owns the request slot.
419    limiter: &'a DownloadRequestLimiter,
420    /// Shared progress counters updated when the permit drops.
421    progress: Option<&'a Arc<SharedProgress>>,
422}
423
424impl Drop for DownloadRequestPermit<'_> {
425    /// Releases the request slot and updates shared progress counters.
426    fn drop(&mut self) {
427        let mut active = self.limiter.active.lock().unwrap();
428        *active = active.saturating_sub(1);
429        drop(active);
430        self.limiter.notify.notify_one();
431
432        if let Some(progress) = self.progress {
433            progress.request_finished();
434        }
435    }
436}
437
438/// Tracks one active archive download attempt.
439pub(crate) struct ArchiveDownloadProgress<'a> {
440    progress: Option<&'a Arc<SharedProgress>>,
441    downloaded: u64,
442    completed: bool,
443}
444
445impl<'a> ArchiveDownloadProgress<'a> {
446    /// Starts tracking one archive download attempt.
447    pub(crate) fn new(progress: Option<&'a Arc<SharedProgress>>) -> Self {
448        if let Some(progress) = progress {
449            progress.download_started();
450        }
451        Self { progress, downloaded: 0, completed: false }
452    }
453
454    /// Adds logical compressed bytes written by this attempt.
455    pub(crate) fn record_downloaded(&mut self, bytes: u64) {
456        self.downloaded += bytes;
457        if let Some(progress) = self.progress {
458            progress.add_active_download_bytes(bytes);
459        }
460    }
461
462    /// Returns whether this tracker has recorded any logical bytes itself.
463    pub(crate) fn has_tracked_bytes(&self) -> bool {
464        self.downloaded > 0
465    }
466
467    /// Moves this archive from active download bytes into completed download bytes.
468    pub(crate) fn complete(&mut self, total_bytes: u64) {
469        if self.completed {
470            return;
471        }
472        if let Some(progress) = self.progress {
473            progress.sub_active_download_bytes(self.downloaded);
474            progress.record_archive_download_complete(total_bytes);
475        }
476        self.downloaded = 0;
477        self.completed = true;
478    }
479}
480
481impl Drop for ArchiveDownloadProgress<'_> {
482    fn drop(&mut self) {
483        if let Some(progress) = self.progress {
484            progress.sub_active_download_bytes(self.downloaded);
485            progress.download_finished();
486        }
487    }
488}
489
490/// Tracks one active archive extraction attempt.
491pub(crate) struct ArchiveExtractionProgress {
492    progress: Option<Arc<SharedProgress>>,
493    extracted: Arc<AtomicU64>,
494    finished: bool,
495}
496
497/// Cloneable handle for reporting extracted bytes from background monitoring.
498#[derive(Clone)]
499pub(crate) struct ArchiveExtractionProgressHandle {
500    progress: Arc<SharedProgress>,
501    extracted: Arc<AtomicU64>,
502}
503
504impl ArchiveExtractionProgress {
505    /// Starts tracking one archive extraction attempt.
506    pub(crate) fn new(progress: Option<&Arc<SharedProgress>>) -> Self {
507        if let Some(progress) = progress {
508            progress.extraction_started();
509        }
510        Self {
511            progress: progress.cloned(),
512            extracted: Arc::new(AtomicU64::new(0)),
513            finished: false,
514        }
515    }
516
517    /// Returns a cloneable handle that can report extraction progress from another thread.
518    pub(crate) fn handle(&self) -> Option<ArchiveExtractionProgressHandle> {
519        Some(ArchiveExtractionProgressHandle {
520            progress: Arc::clone(self.progress.as_ref()?),
521            extracted: Arc::clone(&self.extracted),
522        })
523    }
524
525    /// Adds plain-output bytes extracted by this attempt.
526    pub(crate) fn record_extracted(&mut self, bytes: u64) {
527        if let Some(handle) = self.handle() {
528            handle.record_extracted(bytes);
529        }
530    }
531
532    /// Ends extraction tracking before verification begins.
533    pub(crate) fn finish(&mut self) {
534        if self.finished {
535            return;
536        }
537        if let Some(progress) = &self.progress {
538            progress.sub_active_extracted_output_bytes(self.extracted.swap(0, Ordering::Relaxed));
539        }
540        self.finished = true;
541    }
542}
543
544impl Drop for ArchiveExtractionProgress {
545    fn drop(&mut self) {
546        if let Some(progress) = &self.progress {
547            progress.sub_active_extracted_output_bytes(self.extracted.swap(0, Ordering::Relaxed));
548            progress.extraction_finished();
549        }
550    }
551}
552
553impl ArchiveExtractionProgressHandle {
554    /// Adds plain-output bytes extracted by this attempt.
555    pub(crate) fn record_extracted(&self, bytes: u64) {
556        self.extracted.fetch_add(bytes, Ordering::Relaxed);
557        self.progress.add_active_extracted_output_bytes(bytes);
558    }
559}
560
561/// Tracks one active archive verification attempt.
562pub(crate) struct ArchiveVerificationProgress<'a> {
563    progress: Option<&'a Arc<SharedProgress>>,
564    verified: u64,
565    completed: bool,
566}
567
568impl<'a> ArchiveVerificationProgress<'a> {
569    /// Starts tracking one archive verification attempt.
570    pub(crate) fn new(progress: Option<&'a Arc<SharedProgress>>) -> Self {
571        if let Some(progress) = progress {
572            progress.verification_started();
573        }
574        Self { progress, verified: 0, completed: false }
575    }
576
577    /// Adds plain-output bytes hashed by this verification attempt.
578    pub(crate) fn record_verified(&mut self, bytes: u64) {
579        self.verified += bytes;
580        if let Some(progress) = self.progress {
581            progress.add_active_verified_output_bytes(bytes);
582        }
583    }
584
585    /// Moves this archive from active verification bytes into completed output bytes.
586    pub(crate) fn complete(&mut self, total_bytes: u64) {
587        if self.completed {
588            return;
589        }
590        if let Some(progress) = self.progress {
591            progress.sub_active_verified_output_bytes(self.verified);
592            progress.record_archive_output_complete(total_bytes);
593        }
594        self.verified = 0;
595        self.completed = true;
596    }
597}
598
599impl Drop for ArchiveVerificationProgress<'_> {
600    fn drop(&mut self) {
601        if let Some(progress) = self.progress {
602            progress.sub_active_verified_output_bytes(self.verified);
603            progress.verification_finished();
604        }
605    }
606}
607
608/// Adapter to track progress while reading (used for extraction in legacy path)
609pub(crate) struct ProgressReader<R> {
610    /// Wrapped reader that provides archive bytes.
611    reader: R,
612    /// Per-download progress tracker for legacy paths.
613    progress: DownloadProgress,
614    /// Cancellation token checked between reads.
615    cancel_token: CancellationToken,
616}
617
618impl<R: Read> ProgressReader<R> {
619    /// Wraps a reader with per-download progress tracking.
620    pub(crate) fn new(reader: R, total_size: u64, cancel_token: CancellationToken) -> Self {
621        Self { reader, progress: DownloadProgress::new(total_size), cancel_token }
622    }
623}
624
625impl<R: Read> Read for ProgressReader<R> {
626    /// Reads bytes, checks cancellation, and updates the local progress bar.
627    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
628        if self.cancel_token.is_cancelled() {
629            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
630        }
631        let bytes = self.reader.read(buf)?;
632        if bytes > 0 &&
633            let Err(error) = self.progress.update(bytes as u64)
634        {
635            return Err(io::Error::other(error));
636        }
637        Ok(bytes)
638    }
639}
640
641/// Wrapper that bumps a shared atomic counter while writing data.
642/// Used for parallel downloads where a single display task shows aggregated progress.
643pub(crate) struct SharedProgressWriter<'a, W> {
644    /// Wrapped writer receiving downloaded bytes.
645    pub(crate) inner: W,
646    /// Shared counters updated as bytes are written.
647    pub(crate) progress: Arc<SharedProgress>,
648    /// Optional callback for logical bytes written by the current archive attempt.
649    pub(crate) on_written: Option<&'a mut dyn FnMut(u64)>,
650}
651
652impl<W: Write> Write for SharedProgressWriter<'_, W> {
653    /// Writes bytes and records them in shared progress.
654    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
655        if self.progress.is_cancelled() {
656            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
657        }
658        let n = self.inner.write(buf)?;
659        self.progress.record_session_fetched_bytes(n as u64);
660        if let Some(on_written) = self.on_written.as_deref_mut() {
661            on_written(n as u64);
662        }
663        Ok(n)
664    }
665
666    /// Flushes the wrapped writer.
667    fn flush(&mut self) -> io::Result<()> {
668        self.inner.flush()
669    }
670}
671
672/// Wrapper that bumps a shared atomic counter while reading data.
673/// Used for streaming downloads where a single display task shows aggregated progress.
674pub(crate) struct SharedProgressReader<R> {
675    /// Wrapped reader producing streamed bytes.
676    pub(crate) inner: R,
677    /// Shared counters updated as bytes are read.
678    pub(crate) progress: Arc<SharedProgress>,
679}
680
681impl<R: Read> Read for SharedProgressReader<R> {
682    /// Reads bytes and records them in shared progress.
683    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
684        if self.progress.is_cancelled() {
685            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
686        }
687        let n = self.inner.read(buf)?;
688        self.progress.record_session_fetched_bytes(n as u64);
689        Ok(n)
690    }
691}
692
693/// Spawns a background task that prints aggregated download progress.
694/// Returns a handle; drop it (or call `.abort()`) to stop.
695pub(crate) fn spawn_progress_display(progress: Arc<SharedProgress>) -> tokio::task::JoinHandle<()> {
696    tokio::spawn(async move {
697        let mut interval = tokio::time::interval(Duration::from_secs(3));
698        interval.tick().await;
699        loop {
700            interval.tick().await;
701
702            if progress.done.load(Ordering::Relaxed) {
703                break;
704            }
705
706            let download_total = progress.total_download_bytes;
707            let output_total = progress.total_output_bytes;
708            if download_total == 0 && output_total == 0 {
709                continue;
710            }
711
712            let done = progress.archives_done.load(Ordering::Relaxed);
713            let all = progress.total_archives;
714            let active_downloads = progress.active_downloads.load(Ordering::Relaxed);
715            let active_requests = progress.active_download_requests.load(Ordering::Relaxed);
716            let active_extractions = progress.active_extractions.load(Ordering::Relaxed);
717            let active_verifications = progress.active_verifications.load(Ordering::Relaxed);
718            let downloaded = progress.logical_downloaded_bytes();
719            let extracted = progress.extracting_output_bytes();
720            let verified = progress.verifying_output_bytes();
721            let elapsed = DownloadProgress::format_duration(progress.started_at.elapsed());
722            let download_total_display = DownloadProgress::format_size(download_total);
723            let output_total_display = DownloadProgress::format_size(output_total);
724            let downloaded_display = DownloadProgress::format_size(downloaded);
725            let extracted_display = DownloadProgress::format_size(extracted);
726            let active_download_phase = active_downloads > 0 || active_requests > 0;
727
728            if active_download_phase {
729                info!(target: "reth::cli",
730                    archives = format_args!("{done}/{all}"),
731                    progress = %format_percent(downloaded, download_total),
732                    elapsed = %elapsed,
733                    eta = %format_eta(eta_from_progress(progress.started_at.elapsed(), downloaded, download_total)),
734                    bytes = format_args!("{downloaded_display}/{download_total_display}"),
735                    "Downloading snapshot archives"
736                );
737            } else if active_extractions > 0 {
738                info!(target: "reth::cli",
739                    archives = format_args!("{done}/{all}"),
740                    progress = %format_percent(extracted, output_total),
741                    elapsed = %elapsed,
742                    eta = %format_eta(progress.extraction_eta(extracted)),
743                    bytes = format_args!("{extracted_display}/{output_total_display}"),
744                    "Extracting snapshot archives"
745                );
746            } else if active_verifications > 0 {
747                info!(target: "reth::cli",
748                    archives = format_args!("{done}/{all}"),
749                    progress = %format_percent(verified, output_total),
750                    elapsed = %elapsed,
751                    eta = %format_eta(progress.verification_eta(verified)),
752                    bytes = format_args!("{}/{output_total_display}", DownloadProgress::format_size(verified)),
753                    "Verifying snapshot archives"
754                );
755            } else {
756                continue;
757            }
758        }
759
760        let completed = progress.verified_output_bytes();
761        let completed_display = DownloadProgress::format_size(completed);
762        let output_total = DownloadProgress::format_size(progress.total_output_bytes);
763        info!(target: "reth::cli",
764            archives = format_args!("{}/{}", progress.total_archives, progress.total_archives),
765            progress = "100.0%",
766            elapsed = %DownloadProgress::format_duration(progress.started_at.elapsed()),
767            eta = "0s",
768            bytes = format_args!("{completed_display}/{output_total}"),
769            "Snapshot archive processing complete"
770        );
771    })
772}
773
774#[cfg(test)]
775mod tests {
776    use super::*;
777    use std::sync::atomic::Ordering;
778
779    #[test]
780    fn shared_progress_separates_session_fetch_from_logical_progress() {
781        let progress = SharedProgress::new(10, 20, 1, CancellationToken::new());
782
783        progress.record_session_fetched_bytes(10);
784        progress.record_session_fetched_bytes(10);
785        progress.record_archive_download_complete(10);
786        progress.record_archive_output_complete(20);
787
788        assert_eq!(progress.session_fetched_bytes.load(Ordering::Relaxed), 20);
789        assert_eq!(progress.logical_downloaded_bytes(), 10);
790        assert_eq!(progress.verified_output_bytes(), 20);
791        assert_eq!(progress.archives_done.load(Ordering::Relaxed), 1);
792    }
793
794    #[test]
795    fn archive_download_progress_rolls_back_unfinished_attempts() {
796        let progress = SharedProgress::new(10, 20, 1, CancellationToken::new());
797
798        {
799            let mut download = ArchiveDownloadProgress::new(Some(&progress));
800            download.record_downloaded(4);
801            assert_eq!(progress.logical_downloaded_bytes(), 4);
802        }
803
804        assert_eq!(progress.logical_downloaded_bytes(), 0);
805        assert_eq!(progress.active_downloads.load(Ordering::Relaxed), 0);
806    }
807
808    #[test]
809    fn extraction_phase_baseline_restarts_after_idle() {
810        let progress = SharedProgress::new(10, 100, 1, CancellationToken::new());
811
812        progress.extraction_started();
813        assert_eq!(progress.extraction_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
814
815        progress.completed_output_bytes.store(25, Ordering::Relaxed);
816        progress.extraction_started();
817        assert_eq!(progress.extraction_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
818
819        progress.extraction_finished();
820        progress.extraction_finished();
821        progress.extraction_started();
822        assert_eq!(progress.extraction_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 25);
823    }
824
825    #[test]
826    fn verification_phase_baseline_restarts_after_idle() {
827        let progress = SharedProgress::new(10, 100, 1, CancellationToken::new());
828
829        progress.verification_started();
830        assert_eq!(progress.verification_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
831
832        progress.completed_output_bytes.store(40, Ordering::Relaxed);
833        progress.verification_started();
834        assert_eq!(progress.verification_phase.lock().unwrap().as_ref().unwrap().baseline_bytes, 0);
835
836        progress.verification_finished();
837        progress.verification_finished();
838        progress.verification_started();
839        assert_eq!(
840            progress.verification_phase.lock().unwrap().as_ref().unwrap().baseline_bytes,
841            40
842        );
843    }
844}