1use super::{
2 progress::{
3 ArchiveDownloadProgress, DownloadProgress, DownloadRequestLimiter, SharedProgress,
4 SharedProgressWriter,
5 },
6 session::DownloadSession,
7 RETRY_BACKOFF_SECS,
8};
9use eyre::Result;
10use reqwest::{blocking::Client as BlockingClient, header::RANGE, StatusCode};
11use reth_cli_util::cancellation::CancellationToken;
12use reth_fs_util as fs;
13use std::{
14 any::Any,
15 collections::VecDeque,
16 fs::OpenOptions,
17 io::{self, BufWriter, Read, Write},
18 path::{Path, PathBuf},
19 sync::{
20 atomic::{AtomicBool, AtomicU64, Ordering},
21 Arc, Mutex,
22 },
23 time::Duration,
24};
25use tracing::info;
26use url::Url;
27
28const SEGMENT_RETRY_ATTEMPTS: u32 = 3;
30
31const SEGMENTED_DOWNLOAD_MIN_FILE_SIZE: u64 = 128 * 1024 * 1024;
33
34const SEGMENTED_DOWNLOAD_SMALL_PIECE_SIZE: u64 = 32 * 1024 * 1024;
37const SEGMENTED_DOWNLOAD_LARGE_PIECE_SIZE: u64 = 64 * 1024 * 1024;
38
39const SEGMENTED_DOWNLOAD_MAX_BACKOFF_SECS: u64 = 30;
41
42const SEGMENTED_DOWNLOAD_REQUEST_TIMEOUT_SECS: u64 = 120;
45
46#[derive(Debug, Clone)]
48struct DownloadPaths {
49 file_name: String,
51 final_path: PathBuf,
53 part_path: PathBuf,
55}
56
57impl DownloadPaths {
58 fn from_url(url: &str, target_dir: &Path) -> Self {
60 let file_name = Url::parse(url)
61 .ok()
62 .and_then(|u| u.path_segments()?.next_back().map(|s| s.to_string()))
63 .unwrap_or_else(|| "snapshot.tar".to_string());
64
65 Self {
66 final_path: target_dir.join(&file_name),
67 part_path: target_dir.join(format!("{file_name}.part")),
68 file_name,
69 }
70 }
71
72 fn file_name(&self) -> &str {
74 &self.file_name
75 }
76
77 fn final_path(&self) -> &Path {
79 &self.final_path
80 }
81
82 fn part_path(&self) -> &Path {
84 &self.part_path
85 }
86
87 fn finalize(&self) -> Result<()> {
89 fs::rename(&self.part_path, &self.final_path)?;
90 Ok(())
91 }
92
93 fn cleanup_partial(&self) {
95 let _ = fs::remove_file(&self.part_path);
96 }
97
98 fn cleanup_all(&self) {
100 let _ = fs::remove_file(&self.final_path);
101 self.cleanup_partial();
102 }
103}
104
105pub(crate) struct ArchiveFetcher {
107 url: String,
109 paths: DownloadPaths,
111 session: DownloadSession,
113}
114
115impl ArchiveFetcher {
116 pub(crate) fn new(url: impl Into<String>, target_dir: &Path, session: DownloadSession) -> Self {
118 let url = url.into();
119 let paths = DownloadPaths::from_url(&url, target_dir);
120 Self { url, paths, session }
121 }
122
123 pub(crate) fn download(
125 &self,
126 download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
127 ) -> Result<DownloadedArchive> {
128 let Some(request_limiter) = self.session.request_limiter() else {
129 return self.download_sequential(super::MAX_DOWNLOAD_RETRIES, download_progress)
130 };
131
132 let client = BlockingClient::builder().connect_timeout(Duration::from_secs(30)).build()?;
133 let probe = self.probe(&client)?;
134
135 match choose_fetch_strategy(probe, request_limiter.max_concurrency()) {
136 FetchStrategy::Sequential(reason) => {
137 self.log_sequential_fallback(reason, probe.total_size);
138 self.download_sequential(super::MAX_DOWNLOAD_RETRIES, download_progress)
139 }
140 FetchStrategy::Segmented(plan) => {
141 self.download_segmented(probe.total_size, plan, download_progress)
142 }
143 }
144 }
145
146 pub(crate) fn cleanup_downloaded_files(&self) {
148 self.paths.cleanup_all();
149 }
150
151 fn probe(&self, client: &BlockingClient) -> Result<RemoteArchiveProbe> {
153 let probe = client
154 .get(&self.url)
155 .header(RANGE, "bytes=0-0")
156 .send()
157 .and_then(|response| response.error_for_status());
158
159 let (supports_ranges, total_size) = match probe {
160 Ok(response) if response.status() == StatusCode::PARTIAL_CONTENT => {
161 let total = response
162 .headers()
163 .get("Content-Range")
164 .and_then(|value| value.to_str().ok())
165 .and_then(|value| value.split('/').next_back())
166 .and_then(|value| value.parse::<u64>().ok());
167 (true, total)
168 }
169 _ => {
170 let response = client.head(&self.url).send()?.error_for_status()?;
171 (false, response.content_length())
172 }
173 };
174
175 Ok(RemoteArchiveProbe {
176 total_size: total_size.ok_or_else(|| eyre::eyre!("Server did not return file size"))?,
177 supports_ranges,
178 })
179 }
180
181 fn download_sequential(
183 &self,
184 max_download_retries: u32,
185 mut download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
186 ) -> Result<DownloadedArchive> {
187 let quiet = self.quiet();
188
189 if !quiet {
190 info!(target: "reth::cli", file = %self.paths.file_name(), "Connecting to download server");
191 }
192
193 let client = BlockingClient::builder().timeout(Duration::from_secs(30)).build()?;
194 let mut total_size: Option<u64> = None;
195 let mut last_error: Option<eyre::Error> = None;
196
197 for attempt in 1..=max_download_retries {
198 let existing_size =
199 fs::metadata(self.paths.part_path()).map(|meta| meta.len()).unwrap_or(0);
200
201 if let Some(total) = total_size &&
202 existing_size >= total
203 {
204 return self.finalize_download(total)
205 }
206
207 if attempt > 1 {
208 info!(target: "reth::cli",
209 file = %self.paths.file_name(),
210 "Retry attempt {}/{} - resuming from {} bytes",
211 attempt, max_download_retries, existing_size
212 );
213 }
214
215 let mut request = client.get(&self.url);
216 if existing_size > 0 {
217 request = request.header(RANGE, format!("bytes={existing_size}-"));
218 if !quiet && attempt == 1 {
219 info!(target: "reth::cli", file = %self.paths.file_name(), "Resuming from {} bytes", existing_size);
220 }
221 }
222
223 let _request_permit = self
224 .session
225 .request_limiter()
226 .map(|limiter| {
227 limiter.acquire(self.session.progress(), self.session.cancel_token())
228 })
229 .transpose()?;
230
231 let response = match request.send().and_then(|response| response.error_for_status()) {
232 Ok(response) => response,
233 Err(error) => {
234 last_error = Some(error.into());
235 if attempt < max_download_retries {
236 info!(target: "reth::cli",
237 file = %self.paths.file_name(),
238 "Download failed, retrying in {RETRY_BACKOFF_SECS}s..."
239 );
240 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
241 }
242 continue;
243 }
244 };
245
246 let is_partial = response.status() == StatusCode::PARTIAL_CONTENT;
247 let size = if is_partial {
248 response
249 .headers()
250 .get("Content-Range")
251 .and_then(|value| value.to_str().ok())
252 .and_then(|value| value.split('/').next_back())
253 .and_then(|value| value.parse().ok())
254 } else {
255 response.content_length()
256 };
257
258 if total_size.is_none() {
259 total_size = size;
260 if !quiet && let Some(size) = size {
261 info!(target: "reth::cli",
262 file = %self.paths.file_name(),
263 size = %DownloadProgress::format_size(size),
264 "Downloading"
265 );
266 }
267 }
268
269 let current_total = total_size.ok_or_else(|| {
270 eyre::eyre!("Server did not provide Content-Length or Content-Range header")
271 })?;
272
273 let file = if is_partial && existing_size > 0 {
274 OpenOptions::new()
275 .append(true)
276 .open(self.paths.part_path())
277 .map_err(|error| fs::FsPathError::open(error, self.paths.part_path()))?
278 } else {
279 fs::create_file(self.paths.part_path())?
280 };
281
282 let start_offset = if is_partial { existing_size } else { 0 };
283 let mut reader = response;
284
285 let copy_result;
286 let flush_result;
287
288 if let Some(progress) = self.session.progress() {
289 let mut on_written = |bytes| {
290 if let Some(download_progress) = download_progress.as_deref_mut() {
291 download_progress.record_downloaded(bytes);
292 }
293 };
294 let mut writer = SharedProgressWriter {
295 inner: BufWriter::new(file),
296 progress: Arc::clone(progress),
297 on_written: Some(&mut on_written),
298 };
299 copy_result = io::copy(&mut reader, &mut writer);
300 flush_result = writer.inner.flush();
301 } else {
302 let mut progress = DownloadProgress::new(current_total);
303 progress.downloaded = start_offset;
304 let mut writer = ProgressWriter {
305 inner: BufWriter::new(file),
306 progress,
307 cancel_token: self.session.cancel_token().clone(),
308 };
309 copy_result = io::copy(&mut reader, &mut writer);
310 flush_result = writer.inner.flush();
311 println!();
312 }
313
314 if let Err(error) = copy_result.and(flush_result) {
315 last_error = Some(error.into());
316 if attempt < max_download_retries {
317 info!(target: "reth::cli",
318 file = %self.paths.file_name(),
319 "Download interrupted, retrying in {RETRY_BACKOFF_SECS}s..."
320 );
321 std::thread::sleep(Duration::from_secs(RETRY_BACKOFF_SECS));
322 }
323 continue;
324 }
325
326 return self.finalize_download(current_total)
327 }
328
329 Err(last_error.unwrap_or_else(|| {
330 eyre::eyre!("Download failed after {} attempts", max_download_retries)
331 }))
332 }
333
334 fn download_segmented(
336 &self,
337 total_size: u64,
338 plan: SegmentedDownloadPlan,
339 download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
340 ) -> Result<DownloadedArchive> {
341 let request_limiter = self.session.require_request_limiter()?;
342 info!(target: "reth::cli",
343 total_size = %DownloadProgress::format_size(total_size),
344 piece_size = %DownloadProgress::format_size(plan.piece_size),
345 pieces = plan.piece_count,
346 workers = plan.worker_count,
347 max_concurrent_requests = request_limiter.max_concurrency(),
348 "Starting queued segmented download"
349 );
350
351 SegmentedDownload::new(
352 self.url.clone(),
353 self.paths.clone(),
354 total_size,
355 plan,
356 self.session.clone(),
357 download_progress,
358 )
359 .run()
360 }
361
362 fn log_sequential_fallback(&self, reason: SequentialDownloadFallback, total_size: u64) {
364 match reason {
365 SequentialDownloadFallback::NoRangeSupport => {
366 info!(target: "reth::cli",
367 file = %self.paths.file_name(),
368 "Server does not support Range requests, falling back to sequential download"
369 );
370 }
371 SequentialDownloadFallback::EmptyFile => {
372 info!(target: "reth::cli",
373 file = %self.paths.file_name(),
374 "Remote archive is empty, falling back to sequential download"
375 );
376 }
377 SequentialDownloadFallback::TooSmall => {
378 let _ = total_size;
379 }
380 }
381 }
382
383 fn finalize_download(&self, size: u64) -> Result<DownloadedArchive> {
385 self.paths.finalize()?;
386 if !self.quiet() {
387 info!(target: "reth::cli", file = %self.paths.file_name(), "Download complete");
388 }
389 Ok(DownloadedArchive { path: self.paths.final_path().to_path_buf(), size })
390 }
391
392 fn quiet(&self) -> bool {
394 self.session.progress().is_some()
395 }
396}
397
398#[derive(Debug, Clone)]
400pub(crate) struct DownloadedArchive {
401 pub(crate) path: PathBuf,
403 pub(crate) size: u64,
405}
406
407#[derive(Debug, Clone, Copy)]
409struct RemoteArchiveProbe {
410 total_size: u64,
412 supports_ranges: bool,
414}
415
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
418enum SequentialDownloadFallback {
419 NoRangeSupport,
421 EmptyFile,
423 TooSmall,
425}
426
427#[derive(Debug)]
429enum FetchStrategy {
430 Sequential(SequentialDownloadFallback),
432 Segmented(SegmentedDownloadPlan),
434}
435
436fn choose_fetch_strategy(probe: RemoteArchiveProbe, max_workers: usize) -> FetchStrategy {
438 if !probe.supports_ranges {
439 return FetchStrategy::Sequential(SequentialDownloadFallback::NoRangeSupport)
440 }
441
442 if probe.total_size == 0 {
443 return FetchStrategy::Sequential(SequentialDownloadFallback::EmptyFile)
444 }
445
446 plan_segmented_download(probe.total_size, max_workers)
447 .map(FetchStrategy::Segmented)
448 .unwrap_or(FetchStrategy::Sequential(SequentialDownloadFallback::TooSmall))
449}
450
451struct ProgressWriter<W> {
454 inner: W,
456 progress: DownloadProgress,
458 cancel_token: CancellationToken,
460}
461
462impl<W: Write> Write for ProgressWriter<W> {
463 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
465 if self.cancel_token.is_cancelled() {
466 return Err(io::Error::new(io::ErrorKind::Interrupted, "download cancelled"));
467 }
468 let n = self.inner.write(buf)?;
469 let _ = self.progress.update(n as u64);
470 Ok(n)
471 }
472
473 fn flush(&mut self) -> io::Result<()> {
475 self.inner.flush()
476 }
477}
478
479#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481struct DownloadPiece {
482 start: u64,
484 end: u64,
486}
487
488#[derive(Debug)]
490struct SegmentedDownloadPlan {
491 piece_size: u64,
493 piece_count: usize,
495 worker_count: usize,
497 pieces: VecDeque<DownloadPiece>,
499}
500
501struct SegmentedDownload {
503 url: String,
505 paths: DownloadPaths,
507 total_size: u64,
509 plan: SegmentedDownloadPlan,
511 session: DownloadSession,
513}
514
515#[derive(Clone, Copy)]
517struct SegmentedWorkerContext<'a> {
518 url: &'a str,
520 part_path: &'a Path,
522 shared: Option<&'a Arc<SharedProgress>>,
524 request_limiter: &'a DownloadRequestLimiter,
526 cancel_token: &'a CancellationToken,
528}
529
530impl SegmentedDownload {
531 fn new(
533 url: String,
534 paths: DownloadPaths,
535 total_size: u64,
536 plan: SegmentedDownloadPlan,
537 session: DownloadSession,
538 _download_progress: Option<&mut ArchiveDownloadProgress<'_>>,
539 ) -> Self {
540 Self { url, paths, total_size, plan, session }
541 }
542
543 fn run(self) -> Result<DownloadedArchive> {
545 let Self { url, paths, total_size, plan, session } = self;
546 {
547 let file = fs::create_file(paths.part_path())?;
548 file.set_len(total_size)?;
549 }
550
551 let worker_count = plan.worker_count;
552 let state = Arc::new(SegmentedDownloadState::new(plan.pieces));
553 let terminal_failure = Arc::new(TerminalFailure::default());
554 let piece_progress_bytes = Arc::new(AtomicU64::new(0));
555 let worker_client = BlockingClient::builder()
556 .connect_timeout(Duration::from_secs(30))
557 .timeout(Duration::from_secs(SEGMENTED_DOWNLOAD_REQUEST_TIMEOUT_SECS))
558 .build()?;
559 let request_limiter = Arc::clone(session.require_request_limiter()?);
560 let shared = session.progress();
561 let cancel_token = session.cancel_token();
562 let url = url.as_str();
563 let worker_context = SegmentedWorkerContext {
564 url,
565 part_path: paths.part_path(),
566 shared,
567 request_limiter: request_limiter.as_ref(),
568 cancel_token,
569 };
570
571 std::thread::scope(|scope| {
572 let mut handles = Vec::with_capacity(worker_count);
573
574 for _ in 0..worker_count {
575 let state = Arc::clone(&state);
576 let terminal_failure = Arc::clone(&terminal_failure);
577 let piece_progress_bytes = Arc::clone(&piece_progress_bytes);
578 let client = worker_client.clone();
579
580 handles.push(scope.spawn(move || {
581 Self::worker_loop(
582 &client,
583 worker_context,
584 state,
585 terminal_failure,
586 piece_progress_bytes,
587 );
588 }));
589 }
590
591 for handle in handles {
592 if let Err(payload) = handle.join() {
593 state.note_terminal_failure();
594 terminal_failure.record(eyre::eyre!(
595 "Segmented download worker panicked: {}",
596 panic_payload_message(payload)
597 ));
598 }
599 }
600 });
601
602 if let Some(error) = terminal_failure.take() {
603 if let Some(shared) = shared {
604 shared.sub_active_download_bytes(piece_progress_bytes.load(Ordering::Relaxed));
605 }
606 paths.cleanup_partial();
607 return Err(error.wrap_err("Parallel download failed"))
608 }
609
610 if let Some(shared) = shared {
611 shared.sub_active_download_bytes(piece_progress_bytes.load(Ordering::Relaxed));
612 shared.record_archive_download_complete(total_size);
613 }
614
615 paths.finalize()?;
616 info!(target: "reth::cli", file = %paths.file_name(), "Download complete");
617 Ok(DownloadedArchive { path: paths.final_path().to_path_buf(), size: total_size })
618 }
619
620 fn worker_loop(
622 client: &BlockingClient,
623 context: SegmentedWorkerContext<'_>,
624 state: Arc<SegmentedDownloadState>,
625 terminal_failure: Arc<TerminalFailure>,
626 piece_progress_bytes: Arc<AtomicU64>,
627 ) {
628 let file = match OpenOptions::new().write(true).open(context.part_path) {
629 Ok(file) => file,
630 Err(error) => {
631 state.note_terminal_failure();
632 terminal_failure.record(error.into());
633 return;
634 }
635 };
636
637 while let Some(piece) = state.next_piece(context.cancel_token) {
638 if let Err(error) = Self::download_piece_with_retries(
639 client,
640 context.url,
641 &file,
642 piece,
643 context.shared,
644 &piece_progress_bytes,
645 context.request_limiter,
646 context.cancel_token,
647 ) {
648 state.note_terminal_failure();
649 terminal_failure.record(error);
650 return;
651 }
652 }
653 }
654
655 #[expect(clippy::too_many_arguments)]
660 fn download_piece_with_retries(
661 client: &BlockingClient,
662 url: &str,
663 file: &std::fs::File,
664 piece: DownloadPiece,
665 shared: Option<&Arc<SharedProgress>>,
666 piece_progress_bytes: &AtomicU64,
667 request_limiter: &DownloadRequestLimiter,
668 cancel_token: &CancellationToken,
669 ) -> Result<()> {
670 for attempt in 1..=SEGMENT_RETRY_ATTEMPTS {
671 if cancel_token.is_cancelled() {
672 return Err(eyre::eyre!("Download cancelled"))
673 }
674
675 let _request_permit = request_limiter.acquire(shared, cancel_token)?;
676 match Self::download_piece_once(
677 client,
678 url,
679 file,
680 piece,
681 shared,
682 piece_progress_bytes,
683 cancel_token,
684 ) {
685 Ok(()) => return Ok(()),
686 Err(PieceAttemptFailure::Retryable { error: _, throttled })
687 if attempt < SEGMENT_RETRY_ATTEMPTS =>
688 {
689 std::thread::sleep(piece_retry_backoff(attempt, throttled));
690 }
691 Err(PieceAttemptFailure::Retryable { error, .. }) => return Err(error),
692 Err(PieceAttemptFailure::Terminal(error)) => return Err(error),
693 }
694 }
695
696 Err(eyre::eyre!("Piece download failed after {SEGMENT_RETRY_ATTEMPTS} attempts"))
697 }
698
699 fn download_piece_once(
701 client: &BlockingClient,
702 url: &str,
703 file: &std::fs::File,
704 piece: DownloadPiece,
705 shared: Option<&Arc<SharedProgress>>,
706 piece_progress_bytes: &AtomicU64,
707 cancel_token: &CancellationToken,
708 ) -> std::result::Result<(), PieceAttemptFailure> {
709 use std::os::unix::fs::FileExt;
710
711 let expected_len = piece.end - piece.start + 1;
712
713 let response = match client
714 .get(url)
715 .header(RANGE, format!("bytes={}-{}", piece.start, piece.end))
716 .send()
717 {
718 Ok(response) if response.status() == StatusCode::PARTIAL_CONTENT => response,
719 Ok(response) if should_retry_piece_status(response.status()) => {
720 return Err(PieceAttemptFailure::Retryable {
721 error: eyre::eyre!(
722 "Server returned {} for piece {}-{}",
723 response.status(),
724 piece.start,
725 piece.end
726 ),
727 throttled: is_throttle_piece_status(response.status()),
728 });
729 }
730 Ok(response) => {
731 return Err(PieceAttemptFailure::Terminal(eyre::eyre!(
732 "Server returned {} instead of 206 for Range request",
733 response.status()
734 )));
735 }
736 Err(error) => {
737 return Err(PieceAttemptFailure::Retryable {
738 throttled: is_throttle_piece_error(&error),
739 error: error.into(),
740 });
741 }
742 };
743
744 let mut buf = [0u8; 64 * 1024];
745 let mut reader = response.take(expected_len);
746 let mut offset = piece.start;
747
748 loop {
749 if cancel_token.is_cancelled() {
750 return Err(PieceAttemptFailure::Terminal(eyre::eyre!("Download cancelled")));
751 }
752
753 match reader.read(&mut buf) {
754 Ok(0) => break,
755 Ok(n) => {
756 file.write_all_at(&buf[..n], offset)
757 .map_err(|error| PieceAttemptFailure::Terminal(error.into()))?;
758 offset += n as u64;
759 if let Some(progress) = shared {
760 progress.record_session_fetched_bytes(n as u64);
761 }
762 }
763 Err(error) if error.kind() == io::ErrorKind::Interrupted => continue,
764 Err(error) => {
765 return Err(PieceAttemptFailure::Retryable {
766 throttled: error.kind() == io::ErrorKind::TimedOut,
767 error: error.into(),
768 });
769 }
770 }
771 }
772
773 let downloaded_len = offset - piece.start;
774 if downloaded_len == expected_len {
775 if let Some(progress) = shared {
776 progress.add_active_download_bytes(expected_len);
777 }
778 piece_progress_bytes.fetch_add(expected_len, Ordering::Relaxed);
779 return Ok(())
780 }
781
782 Err(PieceAttemptFailure::Retryable {
783 error: eyre::eyre!(
784 "Piece {}-{} ended early: expected {} bytes, downloaded {}",
785 piece.start,
786 piece.end,
787 expected_len,
788 downloaded_len
789 ),
790 throttled: false,
791 })
792 }
793}
794
795struct SegmentedDownloadState {
799 pieces: Mutex<VecDeque<DownloadPiece>>,
801 failed: AtomicBool,
803}
804
805impl SegmentedDownloadState {
806 fn new(pieces: VecDeque<DownloadPiece>) -> Self {
808 Self { pieces: Mutex::new(pieces), failed: AtomicBool::new(false) }
809 }
810
811 fn next_piece(&self, cancel_token: &CancellationToken) -> Option<DownloadPiece> {
813 if cancel_token.is_cancelled() || self.failed.load(Ordering::Relaxed) {
814 return None;
815 }
816
817 self.pieces.lock().unwrap().pop_front()
818 }
819
820 fn note_terminal_failure(&self) {
822 self.failed.store(true, Ordering::Relaxed);
823 }
824}
825
826#[derive(Default)]
828struct TerminalFailure {
829 error: Mutex<Option<eyre::Error>>,
831}
832
833impl TerminalFailure {
834 fn record(&self, error: eyre::Error) {
836 let mut slot = self.error.lock().unwrap();
837 if slot.is_none() {
838 *slot = Some(error);
839 }
840 }
841
842 fn take(&self) -> Option<eyre::Error> {
844 self.error.lock().unwrap().take()
845 }
846}
847
848fn build_download_pieces(total_size: u64, piece_size: u64) -> VecDeque<DownloadPiece> {
850 let mut pieces = VecDeque::new();
851 let mut start = 0;
852
853 while start < total_size {
854 let end = (start + piece_size).min(total_size) - 1;
855 pieces.push_back(DownloadPiece { start, end });
856 start = end + 1;
857 }
858
859 pieces
860}
861
862fn segmented_piece_size(total_size: u64) -> u64 {
867 if total_size < 2 * 1024 * 1024 * 1024 {
868 SEGMENTED_DOWNLOAD_SMALL_PIECE_SIZE
869 } else {
870 SEGMENTED_DOWNLOAD_LARGE_PIECE_SIZE
871 }
872}
873
874fn plan_segmented_download(total_size: u64, max_workers: usize) -> Option<SegmentedDownloadPlan> {
879 if max_workers == 0 || total_size < SEGMENTED_DOWNLOAD_MIN_FILE_SIZE {
880 return None;
881 }
882
883 let piece_size = segmented_piece_size(total_size);
884 if total_size <= piece_size {
885 return None;
886 }
887
888 let pieces = build_download_pieces(total_size, piece_size);
889 let piece_count = pieces.len();
890 let worker_count = max_workers.min(piece_count).max(1);
891
892 Some(SegmentedDownloadPlan { piece_size, piece_count, worker_count, pieces })
893}
894
895fn piece_retry_backoff(attempt: u32, throttled: bool) -> Duration {
897 let base = if throttled { 2 } else { RETRY_BACKOFF_SECS };
898 let multiplier = 1u64 << attempt.saturating_sub(1).min(3);
899 Duration::from_secs(base.saturating_mul(multiplier).min(SEGMENTED_DOWNLOAD_MAX_BACKOFF_SECS))
900}
901
902fn is_retryable_piece_status(status: StatusCode) -> bool {
904 matches!(
905 status,
906 StatusCode::REQUEST_TIMEOUT |
907 StatusCode::TOO_MANY_REQUESTS |
908 StatusCode::INTERNAL_SERVER_ERROR |
909 StatusCode::BAD_GATEWAY |
910 StatusCode::SERVICE_UNAVAILABLE |
911 StatusCode::GATEWAY_TIMEOUT
912 )
913}
914
915fn should_retry_piece_status(status: StatusCode) -> bool {
917 status == StatusCode::OK || is_retryable_piece_status(status)
918}
919
920fn is_throttle_piece_status(status: StatusCode) -> bool {
922 matches!(
923 status,
924 StatusCode::REQUEST_TIMEOUT |
925 StatusCode::TOO_MANY_REQUESTS |
926 StatusCode::SERVICE_UNAVAILABLE |
927 StatusCode::GATEWAY_TIMEOUT
928 )
929}
930
931fn is_throttle_piece_error(error: &reqwest::Error) -> bool {
933 error.is_timeout() || matches!(error.status(), Some(status) if is_throttle_piece_status(status))
934}
935
936enum PieceAttemptFailure {
938 Retryable { error: eyre::Error, throttled: bool },
940 Terminal(eyre::Error),
942}
943
944fn panic_payload_message(payload: Box<dyn Any + Send + 'static>) -> String {
946 if let Some(message) = payload.downcast_ref::<&'static str>() {
947 (*message).to_string()
948 } else if let Some(message) = payload.downcast_ref::<String>() {
949 message.clone()
950 } else {
951 "unknown panic payload".to_string()
952 }
953}
954
955#[cfg(test)]
956mod tests {
957 use super::*;
958 use reqwest::StatusCode;
959
960 #[test]
961 fn segmented_plan_skips_small_files() {
962 assert!(plan_segmented_download(SEGMENTED_DOWNLOAD_MIN_FILE_SIZE - 1, 16).is_none());
963 }
964
965 #[test]
966 fn segmented_plan_uses_large_pieces_and_adaptive_workers() {
967 let total_size = 512 * 1024 * 1024;
968 let plan = plan_segmented_download(total_size, 32).unwrap();
969
970 assert_eq!(plan.piece_size, SEGMENTED_DOWNLOAD_SMALL_PIECE_SIZE);
971 assert_eq!(plan.piece_count, 16);
972 assert_eq!(plan.worker_count, 16);
973 }
974
975 #[test]
976 fn build_download_pieces_covers_entire_file() {
977 let pieces = build_download_pieces(10, 4).into_iter().collect::<Vec<_>>();
978
979 assert_eq!(
980 pieces,
981 vec![
982 DownloadPiece { start: 0, end: 3 },
983 DownloadPiece { start: 4, end: 7 },
984 DownloadPiece { start: 8, end: 9 },
985 ]
986 );
987 }
988
989 #[test]
990 fn piece_status_retry_policy_retries_200_ok() {
991 assert!(should_retry_piece_status(StatusCode::OK));
992 assert!(should_retry_piece_status(StatusCode::TOO_MANY_REQUESTS));
993 assert!(!should_retry_piece_status(StatusCode::NOT_FOUND));
994 }
995
996 #[test]
997 fn choose_fetch_strategy_uses_segmented_when_ranges_are_supported() {
998 let strategy = choose_fetch_strategy(
999 RemoteArchiveProbe { total_size: 512 * 1024 * 1024, supports_ranges: true },
1000 16,
1001 );
1002
1003 assert!(matches!(strategy, FetchStrategy::Segmented(_)));
1004 }
1005
1006 #[test]
1007 fn choose_fetch_strategy_falls_back_without_ranges() {
1008 let strategy = choose_fetch_strategy(
1009 RemoteArchiveProbe { total_size: 512 * 1024 * 1024, supports_ranges: false },
1010 16,
1011 );
1012
1013 assert!(matches!(
1014 strategy,
1015 FetchStrategy::Sequential(SequentialDownloadFallback::NoRangeSupport)
1016 ));
1017 }
1018}