reth_stages/stages/s3/downloader/
meta.rs
1use super::{error::DownloaderError, RemainingChunkRange};
2use serde::{Deserialize, Serialize};
3use std::{
4 fs::File,
5 ops::RangeInclusive,
6 path::{Path, PathBuf},
7};
8use tracing::info;
9
10#[derive(Debug)]
12pub struct Metadata {
13 pub total_size: usize,
15 pub downloaded: usize,
17 pub chunk_size: usize,
19 chunks: Vec<Option<RangeInclusive<usize>>>,
23 path: PathBuf,
25}
26
27impl Metadata {
28 pub fn builder(data_file: &Path) -> MetadataBuilder {
30 MetadataBuilder::new(Self::file_path(data_file))
31 }
32
33 pub fn file_path(data_file: &Path) -> PathBuf {
35 data_file.with_file_name(format!(
36 "{}.metadata",
37 data_file.file_name().unwrap_or_default().to_string_lossy()
38 ))
39 }
40
41 pub fn needed_ranges(&self) -> Vec<RemainingChunkRange> {
44 self.chunks
45 .iter()
46 .enumerate()
47 .filter(|(_, remaining)| remaining.is_some())
48 .map(|(index, remaining)| {
49 let range = remaining.as_ref().expect("qed");
50 RemainingChunkRange { index, start: *range.start(), end: *range.end() }
51 })
52 .collect()
53 }
54
55 pub fn update_chunk(
57 &mut self,
58 index: usize,
59 downloaded_bytes: usize,
60 ) -> Result<(), DownloaderError> {
61 self.downloaded += downloaded_bytes;
62
63 let num_chunks = self.chunks.len();
64 if index >= self.chunks.len() {
65 return Err(DownloaderError::InvalidChunk(index, num_chunks))
66 }
67
68 if let Some(range) = &self.chunks[index] {
70 let start = range.start() + downloaded_bytes;
71 if start > *range.end() {
72 self.chunks[index] = None;
73 } else {
74 self.chunks[index] = Some(start..=*range.end());
75 }
76 }
77
78 let file = self.path.file_stem().unwrap_or_default().to_string_lossy().into_owned();
79 info!(
80 target: "sync::stages::s3::downloader",
81 file,
82 "{}/{}", self.downloaded / 1024 / 1024, self.total_size / 1024 / 1024);
83
84 self.commit()
85 }
86
87 pub fn commit(&self) -> Result<(), DownloaderError> {
89 Ok(reth_fs_util::atomic_write_file(&self.path, |file| {
90 bincode::serialize_into(file, &MetadataFile::from(self))
91 })?)
92 }
93
94 pub fn load(data_file: &Path) -> Result<Self, DownloaderError> {
96 let metadata_file_path = Self::file_path(data_file);
97 let MetadataFile { total_size, downloaded, chunk_size, chunks } =
98 bincode::deserialize_from(File::open(&metadata_file_path)?)?;
99
100 Ok(Self { total_size, downloaded, chunk_size, chunks, path: metadata_file_path })
101 }
102
103 pub fn is_done(&self) -> bool {
105 !self.chunks.iter().any(|c| c.is_some())
106 }
107
108 pub fn delete(self) -> Result<(), DownloaderError> {
110 Ok(reth_fs_util::remove_file(&self.path)?)
111 }
112}
113
114#[derive(Debug)]
116pub struct MetadataBuilder {
117 metadata_path: PathBuf,
119 total_size: Option<usize>,
121 chunk_size: usize,
123}
124
125impl MetadataBuilder {
126 const fn new(metadata_path: PathBuf) -> Self {
127 Self {
128 metadata_path,
129 total_size: None,
130 chunk_size: 150 * (1024 * 1024), }
132 }
133
134 pub const fn with_total_size(mut self, total_size: usize) -> Self {
135 self.total_size = Some(total_size);
136 self
137 }
138
139 pub const fn with_chunk_size(mut self, chunk_size: usize) -> Self {
140 self.chunk_size = chunk_size;
141 self
142 }
143
144 pub fn build(&self) -> Result<Metadata, DownloaderError> {
146 match &self.total_size {
147 Some(total_size) if *total_size > 0 => {
148 let chunks = (0..*total_size)
149 .step_by(self.chunk_size)
150 .map(|start| {
151 Some(start..=(start + self.chunk_size).min(*total_size).saturating_sub(1))
152 })
153 .collect();
154
155 let metadata = Metadata {
156 path: self.metadata_path.clone(),
157 total_size: *total_size,
158 downloaded: 0,
159 chunk_size: self.chunk_size,
160 chunks,
161 };
162 metadata.commit()?;
163
164 Ok(metadata)
165 }
166 _ => Err(DownloaderError::InvalidMetadataTotalSize(self.total_size)),
167 }
168 }
169}
170
171#[derive(Debug, Serialize, Deserialize)]
173struct MetadataFile {
174 total_size: usize,
176 downloaded: usize,
178 chunk_size: usize,
180 chunks: Vec<Option<RangeInclusive<usize>>>,
184}
185
186impl From<&Metadata> for MetadataFile {
187 fn from(metadata: &Metadata) -> Self {
188 Self {
189 total_size: metadata.total_size,
190 downloaded: metadata.downloaded,
191 chunk_size: metadata.chunk_size,
192 chunks: metadata.chunks.clone(),
193 }
194 }
195}