reth_stages/stages/s3/downloader/
meta.rs

1use super::{error::DownloaderError, RemainingChunkRange};
2use serde::{Deserialize, Serialize};
3use std::{
4    fs::File,
5    ops::RangeInclusive,
6    path::{Path, PathBuf},
7};
8use tracing::info;
9
10/// Tracks download progress and manages chunked downloads for resumable file transfers.
11#[derive(Debug)]
12pub struct Metadata {
13    /// Total file size
14    pub total_size: usize,
15    /// Total file size
16    pub downloaded: usize,
17    /// Download chunk size. Default 150MB.
18    pub chunk_size: usize,
19    /// Remaining download ranges for each chunk.
20    /// - `Some(RangeInclusive)`: range to be downloaded.
21    /// - `None`: Chunk fully downloaded.
22    chunks: Vec<Option<RangeInclusive<usize>>>,
23    /// Path with the stored metadata.
24    path: PathBuf,
25}
26
27impl Metadata {
28    /// Build a [`Metadata`] using a builder.
29    pub fn builder(data_file: &Path) -> MetadataBuilder {
30        MetadataBuilder::new(Self::file_path(data_file))
31    }
32
33    /// Returns the metadata file path of a data file: `{data_file}.metadata`
34    pub fn file_path(data_file: &Path) -> PathBuf {
35        data_file.with_file_name(format!(
36            "{}.metadata",
37            data_file.file_name().unwrap_or_default().to_string_lossy()
38        ))
39    }
40
41    /// Returns a list of all chunks with their remaining ranges to be downloaded:
42    /// `RemainingChunkRange`.
43    pub fn needed_ranges(&self) -> Vec<RemainingChunkRange> {
44        self.chunks
45            .iter()
46            .enumerate()
47            .filter(|(_, remaining)| remaining.is_some())
48            .map(|(index, remaining)| {
49                let range = remaining.as_ref().expect("qed");
50                RemainingChunkRange { index, start: *range.start(), end: *range.end() }
51            })
52            .collect()
53    }
54
55    /// Updates a downloaded chunk.
56    pub fn update_chunk(
57        &mut self,
58        index: usize,
59        downloaded_bytes: usize,
60    ) -> Result<(), DownloaderError> {
61        self.downloaded += downloaded_bytes;
62
63        let num_chunks = self.chunks.len();
64        if index >= self.chunks.len() {
65            return Err(DownloaderError::InvalidChunk(index, num_chunks))
66        }
67
68        // Update chunk with downloaded range
69        if let Some(range) = &self.chunks[index] {
70            let start = range.start() + downloaded_bytes;
71            if start > *range.end() {
72                self.chunks[index] = None;
73            } else {
74                self.chunks[index] = Some(start..=*range.end());
75            }
76        }
77
78        let file = self.path.file_stem().unwrap_or_default().to_string_lossy().into_owned();
79        info!(
80            target: "sync::stages::s3::downloader",
81            file,
82            "{}/{}", self.downloaded / 1024 / 1024, self.total_size / 1024 / 1024);
83
84        self.commit()
85    }
86
87    /// Commits the [`Metadata`] to file.
88    pub fn commit(&self) -> Result<(), DownloaderError> {
89        Ok(reth_fs_util::atomic_write_file(&self.path, |file| {
90            bincode::serialize_into(file, &MetadataFile::from(self))
91        })?)
92    }
93
94    /// Loads a [`Metadata`] file from disk using the target data file.
95    pub fn load(data_file: &Path) -> Result<Self, DownloaderError> {
96        let metadata_file_path = Self::file_path(data_file);
97        let MetadataFile { total_size, downloaded, chunk_size, chunks } =
98            bincode::deserialize_from(File::open(&metadata_file_path)?)?;
99
100        Ok(Self { total_size, downloaded, chunk_size, chunks, path: metadata_file_path })
101    }
102
103    /// Returns true if we have downloaded all chunks.
104    pub fn is_done(&self) -> bool {
105        !self.chunks.iter().any(|c| c.is_some())
106    }
107
108    /// Deletes [`Metadata`] file from disk.
109    pub fn delete(self) -> Result<(), DownloaderError> {
110        Ok(reth_fs_util::remove_file(&self.path)?)
111    }
112}
113
114/// A builder that can configure [Metadata]
115#[derive(Debug)]
116pub struct MetadataBuilder {
117    /// Path with the stored metadata.
118    metadata_path: PathBuf,
119    /// Total file size
120    total_size: Option<usize>,
121    /// Download chunk size. Default 150MB.
122    chunk_size: usize,
123}
124
125impl MetadataBuilder {
126    const fn new(metadata_path: PathBuf) -> Self {
127        Self {
128            metadata_path,
129            total_size: None,
130            chunk_size: 150 * (1024 * 1024), // 150MB
131        }
132    }
133
134    pub const fn with_total_size(mut self, total_size: usize) -> Self {
135        self.total_size = Some(total_size);
136        self
137    }
138
139    pub const fn with_chunk_size(mut self, chunk_size: usize) -> Self {
140        self.chunk_size = chunk_size;
141        self
142    }
143
144    /// Returns a [Metadata] if
145    pub fn build(&self) -> Result<Metadata, DownloaderError> {
146        match &self.total_size {
147            Some(total_size) if *total_size > 0 => {
148                let chunks = (0..*total_size)
149                    .step_by(self.chunk_size)
150                    .map(|start| {
151                        Some(start..=(start + self.chunk_size).min(*total_size).saturating_sub(1))
152                    })
153                    .collect();
154
155                let metadata = Metadata {
156                    path: self.metadata_path.clone(),
157                    total_size: *total_size,
158                    downloaded: 0,
159                    chunk_size: self.chunk_size,
160                    chunks,
161                };
162                metadata.commit()?;
163
164                Ok(metadata)
165            }
166            _ => Err(DownloaderError::InvalidMetadataTotalSize(self.total_size)),
167        }
168    }
169}
170
171/// Helper type that can serialize and deserialize [`Metadata`] to disk.
172#[derive(Debug, Serialize, Deserialize)]
173struct MetadataFile {
174    /// Total file size
175    total_size: usize,
176    /// Total file size
177    downloaded: usize,
178    /// Download chunk size. Default 150MB.
179    chunk_size: usize,
180    /// Remaining download ranges for each chunk.
181    /// - `Some(RangeInclusive)`: range to be downloaded.
182    /// - `None`: Chunk fully downloaded.
183    chunks: Vec<Option<RangeInclusive<usize>>>,
184}
185
186impl From<&Metadata> for MetadataFile {
187    fn from(metadata: &Metadata) -> Self {
188        Self {
189            total_size: metadata.total_size,
190            downloaded: metadata.downloaded,
191            chunk_size: metadata.chunk_size,
192            chunks: metadata.chunks.clone(),
193        }
194    }
195}