Skip to main content

reth_cli_commands/download/
fetch.rs

1use super::{
2    progress::{
3        ArchiveDownloadProgress, DownloadProgress, DownloadRequestLimiter, SharedProgress,
4        SharedProgressWriter,
5    },
6    session::DownloadSession,
7    RETRY_BACKOFF_SECS,
8};
9use eyre::Result;
10use reqwest::{blocking::Client as BlockingClient, header::RANGE, StatusCode};
11use reth_cli_util::cancellation::CancellationToken;
12use reth_fs_util as fs;
13use std::{
14    any::Any,
15    collections::VecDeque,
16    fs::OpenOptions,
17    io::{self, BufWriter, Read, Write},
18    path::{Path, PathBuf},
19    sync::{
20        atomic::{AtomicBool, AtomicU64, Ordering},
21        Arc, Mutex,
22    },
23    time::Duration,
24};
25use tracing::info;
26use url::Url;
27
28/// Maximum retry attempts for a single download segment.
29const SEGMENT_RETRY_ATTEMPTS: u32 = 3;
30
31/// Minimum archive size that benefits from segmented downloads.
32const SEGMENTED_DOWNLOAD_MIN_FILE_SIZE: u64 = 128 * 1024 * 1024;
33
34/// Piece sizes are large so big downloads do not create too many requests while
35/// still giving multiple workers enough work to do.
36const SEGMENTED_DOWNLOAD_SMALL_PIECE_SIZE: u64 = 32 * 1024 * 1024;
37const SEGMENTED_DOWNLOAD_LARGE_PIECE_SIZE: u64 = 64 * 1024 * 1024;
38
39/// Cap exponential piece retry backoff to avoid overly long stalls.
40const SEGMENTED_DOWNLOAD_MAX_BACKOFF_SECS: u64 = 30;
41
42/// Segmented piece requests should time out quickly enough to recover from slow or stalled
43/// requests.
44const SEGMENTED_DOWNLOAD_REQUEST_TIMEOUT_SECS: u64 = 120;
45
46/// Paths for one downloaded archive and its `.part` file.
47#[derive(Debug, Clone)]
48struct DownloadPaths {
49    /// User-facing archive file name derived from the URL.
50    file_name: String,
51    /// Final path for the completed archive file.
52    final_path: PathBuf,
53    /// Temporary path used while the archive is still downloading.
54    part_path: PathBuf,
55}
56
57impl DownloadPaths {
58    /// Builds the final and partial download paths from the archive URL.
59    fn from_url(url: &str, target_dir: &Path) -> Self {
60        let file_name = Url::parse(url)
61            .ok()
62            .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
63            .unwrap_or_else(|| "snapshot.tar".to_string());
64
65        Self {
66            final_path: target_dir.join(&file_name),
67            part_path: target_dir.join(format!("{file_name}.part")),
68            file_name,
69        }
70    }
71
72    /// Returns the user-facing file name derived from the archive URL.
73    fn file_name(&self) -> &str {
74        &self.file_name
75    }
76
77    /// Returns the final on-disk path for the completed archive.
78    fn final_path(&self) -> &Path {
79        &self.final_path
80    }
81
82    /// Returns the partial download path used while the archive is still in flight.
83    fn part_path(&self) -> &Path {
84        &self.part_path
85    }
86
87    /// Promotes the partial file into the final archive path.
88    fn finalize(&self) -> Result<()> {
89        fs::rename(&self.part_path, &self.final_path)?;
90        Ok(())
91    }
92
93    /// Removes only the partial `.part` file for the current archive.
94    fn cleanup_partial(&self) {
95        let _ = fs::remove_file(&self.part_path);
96    }
97
98    /// Removes both final and partial archive files so a fresh attempt can restart cleanly.
99    fn cleanup_all(&self) {
100        let _ = fs::remove_file(&self.final_path);
101        self.cleanup_partial();
102    }
103}
104
105/// Fetches one archive to disk and chooses sequential or segmented download.
106pub(crate) struct ArchiveFetcher {
107    /// Remote archive URL.
108    url: String,
109    /// On-disk paths used for this archive download.
110    paths: DownloadPaths,
111    /// Shared command-scoped download state.
112    session: DownloadSession,
113}
114
115impl ArchiveFetcher {
116    /// Creates a fetcher for one archive URL under the given target directory.
117    pub(crate) fn new(url: impl Into<String>, target_dir: &Path, session: DownloadSession) -> Self {
118        let url = url.into();
119        let paths = DownloadPaths::from_url(&url, target_dir);
120        Self { url, paths, session }
121    }
122
123    /// Downloads the archive using the best strategy supported by the remote source.
124    pub(crate) fn download(
125        &self,
126        download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
127    ) -> Result<DownloadedArchive> {
128        let Some(request_limiter) = self.session.request_limiter() else {
129            return self.download_sequential(super::MAX_DOWNLOAD_RETRIES, download_progress)
130        };
131
132        let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
133        let probe = self.probe(&client)?;
134
135        match choose_fetch_strategy(probe, request_limiter.max_concurrency()) {
136            FetchStrategy::Sequential(reason) => {
137                self.log_sequential_fallback(reason, probe.total_size);
138                self.download_sequential(super::MAX_DOWNLOAD_RETRIES, download_progress)
139            }
140            FetchStrategy::Segmented(plan) => {
141                self.download_segmented(probe.total_size, plan, download_progress)
142            }
143        }
144    }
145
146    /// Removes any archive files created by this fetcher.
147    pub(crate) fn cleanup_downloaded_files(&self) {
148        self.paths.cleanup_all();
149    }
150
151    /// Probes the remote source for file size and HTTP range support.
152    fn probe(&self, client: &BlockingClient) -> Result<RemoteArchiveProbe> {
153        let probe = client
154            .get(&self.url)
155            .header(RANGE, "bytes=0-0")
156            .send()
157            .and_then(|response| response.error_for_status());
158
159        let (supports_ranges, total_size) = match probe {
160            Ok(response) if response.status() == StatusCode::PARTIAL_CONTENT => {
161                let total = response
162                    .headers()
163                    .get("Content-Range")
164                    .and_then(|value| value.to_str().ok())
165                    .and_then(|value| value.split('/').next_back())
166                    .and_then(|value| value.parse::<u64>().ok());
167                (true, total)
168            }
169            _ => {
170                let response = client.head(&self.url).send()?.error_for_status()?;
171                (false, response.content_length())
172            }
173        };
174
175        Ok(RemoteArchiveProbe {
176            total_size: total_size.ok_or_else(|| eyre::eyre!("Server did not return file size"))?,
177            supports_ranges,
178        })
179    }
180
181    /// Downloads the archive as a single resumable stream using one request at a time.
182    fn download_sequential(
183        &self,
184        max_download_retries: u32,
185        mut download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
186    ) -> Result<DownloadedArchive> {
187        let quiet = self.quiet();
188
189        if !quiet {
190            info!(target: "reth::cli", file = %self.paths.file_name(), "Connecting to download server");
191        }
192
193        let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
194        let mut total_size: Option<u64> = None;
195        let mut last_error: Option<eyre::Error> = None;
196
197        for attempt in 1..=max_download_retries {
198            let existing_size =
199                fs::metadata(self.paths.part_path()).map(|meta| meta.len()).unwrap_or(0);
200
201            if let Some(total) = total_size &&
202                existing_size >= total
203            {
204                return self.finalize_download(total)
205            }
206
207            if attempt > 1 {
208                info!(target: "reth::cli",
209                    file = %self.paths.file_name(),
210                    "Retry attempt {}/{} - resuming from {} bytes",
211                    attempt, max_download_retries, existing_size
212                );
213            }
214
215            let mut request = client.get(&self.url);
216            if existing_size > 0 {
217                request = request.header(RANGE, format!("bytes={existing_size}-"));
218                if !quiet && attempt == 1 {
219                    info!(target: "reth::cli", file = %self.paths.file_name(), "Resuming from {} bytes", existing_size);
220                }
221            }
222
223            let _request_permit = self
224                .session
225                .request_limiter()
226                .map(|limiter| {
227                    limiter.acquire(self.session.progress(), self.session.cancel_token())
228                })
229                .transpose()?;
230
231            let response = match request.send().and_then(|response| response.error_for_status()) {
232                Ok(response) => response,
233                Err(error) => {
234                    last_error = Some(error.into());
235                    if attempt < max_download_retries {
236                        info!(target: "reth::cli",
237                            file = %self.paths.file_name(),
238                            "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
239                        );
240                        std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
241                    }
242                    continue;
243                }
244            };
245
246            let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
247            let size = if is_partial {
248                response
249                    .headers()
250                    .get("Content-Range")
251                    .and_then(|value| value.to_str().ok())
252                    .and_then(|value| value.split('/').next_back())
253                    .and_then(|value| value.parse().ok())
254            } else {
255                response.content_length()
256            };
257
258            if total_size.is_none() {
259                total_size = size;
260                if !quiet && let Some(size) = size {
261                    info!(target: "reth::cli",
262                        file = %self.paths.file_name(),
263                        size = %DownloadProgress::format_size(size),
264                        "Downloading"
265                    );
266                }
267            }
268
269            let current_total = total_size.ok_or_else(|| {
270                eyre::eyre!("Server did not provide Content-Length or Content-Range header")
271            })?;
272
273            let file = if is_partial && existing_size > 0 {
274                OpenOptions::new()
275                    .append(true)
276                    .open(self.paths.part_path())
277                    .map_err(|error| fs::FsPathError::open(error, self.paths.part_path()))?
278            } else {
279                fs::create_file(self.paths.part_path())?
280            };
281
282            let start_offset = if is_partial { existing_size } else { 0 };
283            let mut reader = response;
284
285            let copy_result;
286            let flush_result;
287
288            if let Some(progress) = self.session.progress() {
289                let mut on_written = |bytes| {
290                    if let Some(download_progress) = download_progress.as_deref_mut() {
291                        download_progress.record_downloaded(bytes);
292                    }
293                };
294                let mut writer = SharedProgressWriter {
295                    inner: BufWriter::new(file),
296                    progress: Arc::clone(progress),
297                    on_written: Some(&mut on_written),
298                };
299                copy_result = io::copy(&mut reader, &mut writer);
300                flush_result = writer.inner.flush();
301            } else {
302                let mut progress = DownloadProgress::new(current_total);
303                progress.downloaded = start_offset;
304                let mut writer = ProgressWriter {
305                    inner: BufWriter::new(file),
306                    progress,
307                    cancel_token: self.session.cancel_token().clone(),
308                };
309                copy_result = io::copy(&mut reader, &mut writer);
310                flush_result = writer.inner.flush();
311                println!();
312            }
313
314            if let Err(error) = copy_result.and(flush_result) {
315                last_error = Some(error.into());
316                if attempt < max_download_retries {
317                    info!(target: "reth::cli",
318                        file = %self.paths.file_name(),
319                        "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
320                    );
321                    std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
322                }
323                continue;
324            }
325
326            return self.finalize_download(current_total)
327        }
328
329        Err(last_error.unwrap_or_else(|| {
330            eyre::eyre!("Download failed after {} attempts", max_download_retries)
331        }))
332    }
333
334    /// Downloads the archive by splitting it into large range-request pieces.
335    fn download_segmented(
336        &self,
337        total_size: u64,
338        plan: SegmentedDownloadPlan,
339        download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
340    ) -> Result<DownloadedArchive> {
341        let request_limiter = self.session.require_request_limiter()?;
342        info!(target: "reth::cli",
343            total_size = %DownloadProgress::format_size(total_size),
344            piece_size = %DownloadProgress::format_size(plan.piece_size),
345            pieces = plan.piece_count,
346            workers = plan.worker_count,
347            max_concurrent_requests = request_limiter.max_concurrency(),
348            "Starting queued segmented download"
349        );
350
351        SegmentedDownload::new(
352            self.url.clone(),
353            self.paths.clone(),
354            total_size,
355            plan,
356            self.session.clone(),
357            download_progress,
358        )
359        .run()
360    }
361
362    /// Logs why this archive must fall back to the sequential fetch path.
363    fn log_sequential_fallback(&self, reason: SequentialDownloadFallback, total_size: u64) {
364        match reason {
365            SequentialDownloadFallback::NoRangeSupport => {
366                info!(target: "reth::cli",
367                    file = %self.paths.file_name(),
368                    "Server does not support Range requests, falling back to sequential download"
369                );
370            }
371            SequentialDownloadFallback::EmptyFile => {
372                info!(target: "reth::cli",
373                    file = %self.paths.file_name(),
374                    "Remote archive is empty, falling back to sequential download"
375                );
376            }
377            SequentialDownloadFallback::TooSmall => {
378                let _ = total_size;
379            }
380        }
381    }
382
383    /// Finalizes the downloaded archive and returns its on-disk location and size.
384    fn finalize_download(&self, size: u64) -> Result<DownloadedArchive> {
385        self.paths.finalize()?;
386        if !self.quiet() {
387            info!(target: "reth::cli", file = %self.paths.file_name(), "Download complete");
388        }
389        Ok(DownloadedArchive { path: self.paths.final_path().to_path_buf(), size })
390    }
391
392    /// Returns `true` when this fetch should stay quiet because shared progress is active.
393    fn quiet(&self) -> bool {
394        self.session.progress().is_some()
395    }
396}
397
398/// The final path and size of one archive fetched to disk.
399#[derive(Debug, Clone)]
400pub(crate) struct DownloadedArchive {
401    /// Final on-disk path for the downloaded archive.
402    pub(crate) path: PathBuf,
403    /// Total archive size in bytes.
404    pub(crate) size: u64,
405}
406
407/// Remote metadata used to choose between sequential and segmented download.
408#[derive(Debug, Clone, Copy)]
409struct RemoteArchiveProbe {
410    /// Total archive size reported by the remote source.
411    total_size: u64,
412    /// Whether the remote source supports byte-range requests.
413    supports_ranges: bool,
414}
415
416/// Reasons the fetcher may choose the sequential download path.
417#[derive(Debug, Clone, Copy, PartialEq, Eq)]
418enum SequentialDownloadFallback {
419    /// The remote source does not support byte-range requests.
420    NoRangeSupport,
421    /// The remote source reported an empty archive.
422    EmptyFile,
423    /// The archive is too small to benefit from segmented download.
424    TooSmall,
425}
426
427/// The fetch strategy chosen after probing the remote source.
428#[derive(Debug)]
429enum FetchStrategy {
430    /// Use the single-stream download path.
431    Sequential(SequentialDownloadFallback),
432    /// Use the segmented download path.
433    Segmented(SegmentedDownloadPlan),
434}
435
436/// Chooses the fetch strategy from the remote probe and available worker budget.
437fn choose_fetch_strategy(probe: RemoteArchiveProbe, max_workers: usize) -> FetchStrategy {
438    if !probe.supports_ranges {
439        return FetchStrategy::Sequential(SequentialDownloadFallback::NoRangeSupport)
440    }
441
442    if probe.total_size == 0 {
443        return FetchStrategy::Sequential(SequentialDownloadFallback::EmptyFile)
444    }
445
446    plan_segmented_download(probe.total_size, max_workers)
447        .map(FetchStrategy::Segmented)
448        .unwrap_or(FetchStrategy::Sequential(SequentialDownloadFallback::TooSmall))
449}
450
451/// Wrapper that tracks download progress while writing data.
452/// Used with [`io::copy`] to display progress during downloads.
453struct ProgressWriter<W> {
454    /// Wrapped writer receiving downloaded bytes.
455    inner: W,
456    /// Per-download progress tracker for the legacy path.
457    progress: DownloadProgress,
458    /// Cancellation token checked between writes.
459    cancel_token: CancellationToken,
460}
461
462impl<W: Write> Write for ProgressWriter<W> {
463    /// Writes bytes, checks cancellation, and updates local download progress.
464    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
465        if self.cancel_token.is_cancelled() {
466            return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
467        }
468        let n = self.inner.write(buf)?;
469        let _ = self.progress.update(n as u64);
470        Ok(n)
471    }
472
473    /// Flushes the wrapped writer.
474    fn flush(&mut self) -> io::Result<()> {
475        self.inner.flush()
476    }
477}
478
479/// One queued byte range for a segmented archive download.
480#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481struct DownloadPiece {
482    /// Inclusive start byte for this piece.
483    start: u64,
484    /// Inclusive end byte for this piece.
485    end: u64,
486}
487
488/// Fixed plan for a segmented archive: piece size, piece count, and worker count.
489#[derive(Debug)]
490struct SegmentedDownloadPlan {
491    /// Bytes assigned to each piece, except possibly the last.
492    piece_size: u64,
493    /// Number of pieces created for this archive.
494    piece_count: usize,
495    /// Number of worker threads used for this archive.
496    worker_count: usize,
497    /// Queue of pieces to download.
498    pieces: VecDeque<DownloadPiece>,
499}
500
501/// Runs the segmented download workers and piece retries for one archive.
502struct SegmentedDownload {
503    /// Remote archive URL.
504    url: String,
505    /// On-disk paths used for this archive download.
506    paths: DownloadPaths,
507    /// Total archive size in bytes.
508    total_size: u64,
509    /// Piece and worker plan for this archive.
510    plan: SegmentedDownloadPlan,
511    /// Shared command-scoped download state.
512    session: DownloadSession,
513}
514
515/// Shared inputs each segmented download worker needs while draining the piece queue.
516#[derive(Clone, Copy)]
517struct SegmentedWorkerContext<'a> {
518    /// Remote archive URL.
519    url: &'a str,
520    /// Partial file path where pieces are written.
521    part_path: &'a Path,
522    /// Shared progress counters for the whole command, when enabled.
523    shared: Option<&'a Arc<SharedProgress>>,
524    /// Shared cap for in-flight HTTP requests.
525    request_limiter: &'a DownloadRequestLimiter,
526    /// Cancellation token shared by the whole command.
527    cancel_token: &'a CancellationToken,
528}
529
530impl SegmentedDownload {
531    /// Creates the segmented download state for one archive.
532    fn new(
533        url: String,
534        paths: DownloadPaths,
535        total_size: u64,
536        plan: SegmentedDownloadPlan,
537        session: DownloadSession,
538        _download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
539    ) -> Self {
540        Self { url, paths, total_size, plan, session }
541    }
542
543    /// Runs the segmented download to completion or returns the first fatal error.
544    fn run(self) -> Result<DownloadedArchive> {
545        let Self { url, paths, total_size, plan, session } = self;
546        {
547            let file = fs::create_file(paths.part_path())?;
548            file.set_len(total_size)?;
549        }
550
551        let worker_count = plan.worker_count;
552        let state = Arc::new(SegmentedDownloadState::new(plan.pieces));
553        let terminal_failure = Arc::new(TerminalFailure::default());
554        let piece_progress_bytes = Arc::new(AtomicU64::new(0));
555        let worker_client = BlockingClient::builder()
556            .connect_timeout(Duration::from_secs(30))
557            .timeout(Duration::from_secs(SEGMENTED_DOWNLOAD_REQUEST_TIMEOUT_SECS))
558            .build()?;
559        let request_limiter = Arc::clone(session.require_request_limiter()?);
560        let shared = session.progress();
561        let cancel_token = session.cancel_token();
562        let url = url.as_str();
563        let worker_context = SegmentedWorkerContext {
564            url,
565            part_path: paths.part_path(),
566            shared,
567            request_limiter: request_limiter.as_ref(),
568            cancel_token,
569        };
570
571        std::thread::scope(|scope| {
572            let mut handles = Vec::with_capacity(worker_count);
573
574            for _ in 0..worker_count {
575                let state = Arc::clone(&state);
576                let terminal_failure = Arc::clone(&terminal_failure);
577                let piece_progress_bytes = Arc::clone(&piece_progress_bytes);
578                let client = worker_client.clone();
579
580                handles.push(scope.spawn(move || {
581                    Self::worker_loop(
582                        &client,
583                        worker_context,
584                        state,
585                        terminal_failure,
586                        piece_progress_bytes,
587                    );
588                }));
589            }
590
591            for handle in handles {
592                if let Err(payload) = handle.join() {
593                    state.note_terminal_failure();
594                    terminal_failure.record(eyre::eyre!(
595                        "Segmented download worker panicked: {}",
596                        panic_payload_message(payload)
597                    ));
598                }
599            }
600        });
601
602        if let Some(error) = terminal_failure.take() {
603            if let Some(shared) = shared {
604                shared.sub_active_download_bytes(piece_progress_bytes.load(Ordering::Relaxed));
605            }
606            paths.cleanup_partial();
607            return Err(error.wrap_err("Parallel download failed"))
608        }
609
610        if let Some(shared) = shared {
611            shared.sub_active_download_bytes(piece_progress_bytes.load(Ordering::Relaxed));
612            shared.record_archive_download_complete(total_size);
613        }
614
615        paths.finalize()?;
616        info!(target: "reth::cli", file = %paths.file_name(), "Download complete");
617        Ok(DownloadedArchive { path: paths.final_path().to_path_buf(), size: total_size })
618    }
619
620    /// Runs one worker until there are no pieces left or another worker fails.
621    fn worker_loop(
622        client: &BlockingClient,
623        context: SegmentedWorkerContext<'_>,
624        state: Arc<SegmentedDownloadState>,
625        terminal_failure: Arc<TerminalFailure>,
626        piece_progress_bytes: Arc<AtomicU64>,
627    ) {
628        let file = match OpenOptions::new().write(true).open(context.part_path) {
629            Ok(file) => file,
630            Err(error) => {
631                state.note_terminal_failure();
632                terminal_failure.record(error.into());
633                return;
634            }
635        };
636
637        while let Some(piece) = state.next_piece(context.cancel_token) {
638            if let Err(error) = Self::download_piece_with_retries(
639                client,
640                context.url,
641                &file,
642                piece,
643                context.shared,
644                &piece_progress_bytes,
645                context.request_limiter,
646                context.cancel_token,
647            ) {
648                state.note_terminal_failure();
649                terminal_failure.record(error);
650                return;
651            }
652        }
653    }
654
655    /// Downloads one queued piece with per-piece retry/backoff.
656    ///
657    /// Each attempt acquires a permit from the shared request limit so whole-file and
658    /// piece downloads use the same fixed number of HTTP request slots.
659    #[expect(clippy::too_many_arguments)]
660    fn download_piece_with_retries(
661        client: &BlockingClient,
662        url: &str,
663        file: &std::fs::File,
664        piece: DownloadPiece,
665        shared: Option<&Arc<SharedProgress>>,
666        piece_progress_bytes: &AtomicU64,
667        request_limiter: &DownloadRequestLimiter,
668        cancel_token: &CancellationToken,
669    ) -> Result<()> {
670        for attempt in 1..=SEGMENT_RETRY_ATTEMPTS {
671            if cancel_token.is_cancelled() {
672                return Err(eyre::eyre!("Download cancelled"))
673            }
674
675            let _request_permit = request_limiter.acquire(shared, cancel_token)?;
676            match Self::download_piece_once(
677                client,
678                url,
679                file,
680                piece,
681                shared,
682                piece_progress_bytes,
683                cancel_token,
684            ) {
685                Ok(()) => return Ok(()),
686                Err(PieceAttemptFailure::Retryable { error: _, throttled })
687                    if attempt < SEGMENT_RETRY_ATTEMPTS =>
688                {
689                    std::thread::sleep(piece_retry_backoff(attempt, throttled));
690                }
691                Err(PieceAttemptFailure::Retryable { error, .. }) => return Err(error),
692                Err(PieceAttemptFailure::Terminal(error)) => return Err(error),
693            }
694        }
695
696        Err(eyre::eyre!("Piece download failed after {SEGMENT_RETRY_ATTEMPTS} attempts"))
697    }
698
699    /// Downloads one queued piece once.
700    fn download_piece_once(
701        client: &BlockingClient,
702        url: &str,
703        file: &std::fs::File,
704        piece: DownloadPiece,
705        shared: Option<&Arc<SharedProgress>>,
706        piece_progress_bytes: &AtomicU64,
707        cancel_token: &CancellationToken,
708    ) -> std::result::Result<(), PieceAttemptFailure> {
709        use std::os::unix::fs::FileExt;
710
711        let expected_len = piece.end - piece.start + 1;
712
713        let response = match client
714            .get(url)
715            .header(RANGE, format!("bytes={}-{}", piece.start, piece.end))
716            .send()
717        {
718            Ok(response) if response.status() == StatusCode::PARTIAL_CONTENT => response,
719            Ok(response) if should_retry_piece_status(response.status()) => {
720                return Err(PieceAttemptFailure::Retryable {
721                    error: eyre::eyre!(
722                        "Server returned {} for piece {}-{}",
723                        response.status(),
724                        piece.start,
725                        piece.end
726                    ),
727                    throttled: is_throttle_piece_status(response.status()),
728                });
729            }
730            Ok(response) => {
731                return Err(PieceAttemptFailure::Terminal(eyre::eyre!(
732                    "Server returned {} instead of 206 for Range request",
733                    response.status()
734                )));
735            }
736            Err(error) => {
737                return Err(PieceAttemptFailure::Retryable {
738                    throttled: is_throttle_piece_error(&error),
739                    error: error.into(),
740                });
741            }
742        };
743
744        let mut buf = [0u8; 64 * 1024];
745        let mut reader = response.take(expected_len);
746        let mut offset = piece.start;
747
748        loop {
749            if cancel_token.is_cancelled() {
750                return Err(PieceAttemptFailure::Terminal(eyre::eyre!("Download cancelled")));
751            }
752
753            match reader.read(&mut buf) {
754                Ok(0) => break,
755                Ok(n) => {
756                    file.write_all_at(&buf[..n], offset)
757                        .map_err(|error| PieceAttemptFailure::Terminal(error.into()))?;
758                    offset += n as u64;
759                    if let Some(progress) = shared {
760                        progress.record_session_fetched_bytes(n as u64);
761                    }
762                }
763                Err(error) if error.kind() == io::ErrorKind::Interrupted => continue,
764                Err(error) => {
765                    return Err(PieceAttemptFailure::Retryable {
766                        throttled: error.kind() == io::ErrorKind::TimedOut,
767                        error: error.into(),
768                    });
769                }
770            }
771        }
772
773        let downloaded_len = offset - piece.start;
774        if downloaded_len == expected_len {
775            if let Some(progress) = shared {
776                progress.add_active_download_bytes(expected_len);
777            }
778            piece_progress_bytes.fetch_add(expected_len, Ordering::Relaxed);
779            return Ok(())
780        }
781
782        Err(PieceAttemptFailure::Retryable {
783            error: eyre::eyre!(
784                "Piece {}-{} ended early: expected {} bytes, downloaded {}",
785                piece.start,
786                piece.end,
787                expected_len,
788                downloaded_len
789            ),
790            throttled: false,
791        })
792    }
793}
794
795/// Shared queue state for one segmented archive download.
796///
797/// Workers pull pieces until the queue is empty or one worker fails the whole attempt.
798struct SegmentedDownloadState {
799    /// Remaining pieces waiting to be downloaded.
800    pieces: Mutex<VecDeque<DownloadPiece>>,
801    /// Set once a worker hits a fatal error.
802    failed: AtomicBool,
803}
804
805impl SegmentedDownloadState {
806    /// Creates the shared queue state for one segmented archive attempt.
807    fn new(pieces: VecDeque<DownloadPiece>) -> Self {
808        Self { pieces: Mutex::new(pieces), failed: AtomicBool::new(false) }
809    }
810
811    /// Returns the next piece unless cancellation or a fatal error stopped the attempt.
812    fn next_piece(&self, cancel_token: &CancellationToken) -> Option<DownloadPiece> {
813        if cancel_token.is_cancelled() || self.failed.load(Ordering::Relaxed) {
814            return None;
815        }
816
817        self.pieces.lock().unwrap().pop_front()
818    }
819
820    /// Marks the entire segmented attempt as failed so workers stop taking more pieces.
821    fn note_terminal_failure(&self) {
822        self.failed.store(true, Ordering::Relaxed);
823    }
824}
825
826/// Stores the first fatal error seen across segmented download workers.
827#[derive(Default)]
828struct TerminalFailure {
829    /// First fatal worker error, if any.
830    error: Mutex<Option<eyre::Error>>,
831}
832
833impl TerminalFailure {
834    /// Stores the first fatal error and ignores later ones from other workers.
835    fn record(&self, error: eyre::Error) {
836        let mut slot = self.error.lock().unwrap();
837        if slot.is_none() {
838            *slot = Some(error);
839        }
840    }
841
842    /// Returns the stored fatal error after worker execution finishes.
843    fn take(&self) -> Option<eyre::Error> {
844        self.error.lock().unwrap().take()
845    }
846}
847
848/// Splits an archive into contiguous byte ranges for segmented download.
849fn build_download_pieces(total_size: u64, piece_size: u64) -> VecDeque<DownloadPiece> {
850    let mut pieces = VecDeque::new();
851    let mut start = 0;
852
853    while start < total_size {
854        let end = (start + piece_size).min(total_size) - 1;
855        pieces.push_back(DownloadPiece { start, end });
856        start = end + 1;
857    }
858
859    pieces
860}
861
862/// Chooses the fixed piece size for a large archive.
863///
864/// Smaller large files use 32 MiB pieces so there are enough pieces for several workers.
865/// Very large files use 64 MiB pieces to keep the request count down.
866fn segmented_piece_size(total_size: u64) -> u64 {
867    if total_size < 2 * 1024 * 1024 * 1024 {
868        SEGMENTED_DOWNLOAD_SMALL_PIECE_SIZE
869    } else {
870        SEGMENTED_DOWNLOAD_LARGE_PIECE_SIZE
871    }
872}
873
874/// Builds the segmented download plan for one archive.
875///
876/// Small files stay single-stream. Larger files are split into fixed pieces and
877/// can use up to the shared request limit.
878fn plan_segmented_download(total_size: u64, max_workers: usize) -> Option<SegmentedDownloadPlan> {
879    if max_workers == 0 || total_size < SEGMENTED_DOWNLOAD_MIN_FILE_SIZE {
880        return None;
881    }
882
883    let piece_size = segmented_piece_size(total_size);
884    if total_size <= piece_size {
885        return None;
886    }
887
888    let pieces = build_download_pieces(total_size, piece_size);
889    let piece_count = pieces.len();
890    let worker_count = max_workers.min(piece_count).max(1);
891
892    Some(SegmentedDownloadPlan { piece_size, piece_count, worker_count, pieces })
893}
894
895/// Returns the retry backoff for one piece attempt.
896fn piece_retry_backoff(attempt: u32, throttled: bool) -> Duration {
897    let base = if throttled { 2 } else { RETRY_BACKOFF_SECS };
898    let multiplier = 1u64 << attempt.saturating_sub(1).min(3);
899    Duration::from_secs(base.saturating_mul(multiplier).min(SEGMENTED_DOWNLOAD_MAX_BACKOFF_SECS))
900}
901
902/// Returns whether an HTTP status should retry the current piece.
903fn is_retryable_piece_status(status: StatusCode) -> bool {
904    matches!(
905        status,
906        StatusCode::REQUEST_TIMEOUT |
907            StatusCode::TOO_MANY_REQUESTS |
908            StatusCode::INTERNAL_SERVER_ERROR |
909            StatusCode::BAD_GATEWAY |
910            StatusCode::SERVICE_UNAVAILABLE |
911            StatusCode::GATEWAY_TIMEOUT
912    )
913}
914
915/// Returns whether a piece request should retry after the given status.
916fn should_retry_piece_status(status: StatusCode) -> bool {
917    status == StatusCode::OK || is_retryable_piece_status(status)
918}
919
920/// Returns whether an HTTP status looks like throttling or timeout.
921fn is_throttle_piece_status(status: StatusCode) -> bool {
922    matches!(
923        status,
924        StatusCode::REQUEST_TIMEOUT |
925            StatusCode::TOO_MANY_REQUESTS |
926            StatusCode::SERVICE_UNAVAILABLE |
927            StatusCode::GATEWAY_TIMEOUT
928    )
929}
930
931/// Returns whether a reqwest error looks like throttling or timeout.
932fn is_throttle_piece_error(error: &reqwest::Error) -> bool {
933    error.is_timeout() || matches!(error.status(), Some(status) if is_throttle_piece_status(status))
934}
935
936/// The result of one piece download attempt.
937enum PieceAttemptFailure {
938    /// The piece can be retried.
939    Retryable { error: eyre::Error, throttled: bool },
940    /// The piece failed in a way that should stop the archive.
941    Terminal(eyre::Error),
942}
943
944/// Converts a thread panic payload into a readable message.
945fn panic_payload_message(payload: Box<dyn Any + Send + 'static>) -> String {
946    if let Some(message) = payload.downcast_ref::<&'static str>() {
947        (*message).to_string()
948    } else if let Some(message) = payload.downcast_ref::<String>() {
949        message.clone()
950    } else {
951        "unknown panic payload".to_string()
952    }
953}
954
955#[cfg(test)]
956mod tests {
957    use super::*;
958    use reqwest::StatusCode;
959
960    #[test]
961    fn segmented_plan_skips_small_files() {
962        assert!(plan_segmented_download(SEGMENTED_DOWNLOAD_MIN_FILE_SIZE - 1, 16).is_none());
963    }
964
965    #[test]
966    fn segmented_plan_uses_large_pieces_and_adaptive_workers() {
967        let total_size = 512 * 1024 * 1024;
968        let plan = plan_segmented_download(total_size, 32).unwrap();
969
970        assert_eq!(plan.piece_size, SEGMENTED_DOWNLOAD_SMALL_PIECE_SIZE);
971        assert_eq!(plan.piece_count, 16);
972        assert_eq!(plan.worker_count, 16);
973    }
974
975    #[test]
976    fn build_download_pieces_covers_entire_file() {
977        let pieces = build_download_pieces(10, 4).into_iter().collect::<Vec<_>>();
978
979        assert_eq!(
980            pieces,
981            vec![
982                DownloadPiece { start: 0, end: 3 },
983                DownloadPiece { start: 4, end: 7 },
984                DownloadPiece { start: 8, end: 9 },
985            ]
986        );
987    }
988
989    #[test]
990    fn piece_status_retry_policy_retries_200_ok() {
991        assert!(should_retry_piece_status(StatusCode::OK));
992        assert!(should_retry_piece_status(StatusCode::TOO_MANY_REQUESTS));
993        assert!(!should_retry_piece_status(StatusCode::NOT_FOUND));
994    }
995
996    #[test]
997    fn choose_fetch_strategy_uses_segmented_when_ranges_are_supported() {
998        let strategy = choose_fetch_strategy(
999            RemoteArchiveProbe { total_size: 512 * 1024 * 1024, supports_ranges: true },
1000            16,
1001        );
1002
1003        assert!(matches!(strategy, FetchStrategy::Segmented(_)));
1004    }
1005
1006    #[test]
1007    fn choose_fetch_strategy_falls_back_without_ranges() {
1008        let strategy = choose_fetch_strategy(
1009            RemoteArchiveProbe { total_size: 512 * 1024 * 1024, supports_ranges: false },
1010            16,
1011        );
1012
1013        assert!(matches!(
1014            strategy,
1015            FetchStrategy::Sequential(SequentialDownloadFallback::NoRangeSupport)
1016        ));
1017    }
1018}