reth_era_downloader/
client.rs1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use reth_era::common::file_ops::EraFileType;
7use sha2::{Digest, Sha256};
8use std::{future::Future, path::Path, str::FromStr};
9use tokio::{
10 fs::{self, File},
11 io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
12 try_join,
13};
14
15const INDEX_HTML_FILE: &str = "index.html";
17
18pub trait HttpClient {
20 fn get<U: IntoUrl + Send + Sync>(
22 &self,
23 url: U,
24 ) -> impl Future<
25 Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
26 > + Send
27 + Sync;
28}
29
30impl HttpClient for Client {
31 async fn get<U: IntoUrl + Send + Sync>(
32 &self,
33 url: U,
34 ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
35 let response = Self::get(self, url).send().await?;
36
37 Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
38 }
39}
40
41#[derive(Debug, Clone)]
44pub struct EraClient<Http> {
45 client: Http,
46 url: Url,
47 folder: Box<Path>,
48 era_type: EraFileType,
49}
50
51impl<Http: HttpClient + Clone> EraClient<Http> {
52 const CHECKSUMS: &'static str = "checksums.txt";
53
54 pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
56 let era_type = EraFileType::from_url(url.as_str());
57 Self { client, url, folder: folder.into(), era_type }
58 }
59
60 pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
63 let path = self.folder.to_path_buf();
64
65 let url = url.into_url()?;
66 let client = self.client.clone();
67 let file_name = url
68 .path_segments()
69 .ok_or_eyre("cannot-be-a-base")?
70 .next_back()
71 .ok_or_eyre("empty path segments")?;
72 let path = path.join(file_name);
73
74 if !self.is_downloaded(file_name, &path).await? {
75 let number = self
76 .file_name_to_number(file_name)
77 .ok_or_eyre("Cannot parse number from file name")?;
78
79 let mut tries = 1..3;
80 let mut actual_checksum: eyre::Result<_>;
81 loop {
82 actual_checksum = async {
83 let mut file = File::create(&path).await?;
84 let mut stream = client.get(url.clone()).await?;
85 let mut hasher = Sha256::new();
86
87 while let Some(item) = stream.next().await.transpose()? {
88 io::copy(&mut item.as_ref(), &mut file).await?;
89 hasher.update(item);
90 }
91
92 Ok(hasher.finalize().to_vec())
93 }
94 .await;
95
96 if actual_checksum.is_ok() || tries.next().is_none() {
97 break;
98 }
99 }
100
101 if self.era_type == EraFileType::Era1 {
102 self.assert_checksum(number, actual_checksum?)
103 .await
104 .map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
105 }
106 }
107
108 Ok(path.into_boxed_path())
109 }
110
111 pub async fn recover_index(&self) -> Option<usize> {
113 let mut max = None;
114
115 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
116 while let Ok(Some(entry)) = dir.next_entry().await {
117 if let Some(name) = entry.file_name().to_str() &&
118 let Some(number) = self.file_name_to_number(name) &&
119 (max.is_none() || matches!(max, Some(max) if number > max))
120 {
121 max.replace(number + 1);
122 }
123 }
124 }
125
126 max
127 }
128
129 pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
131 let last = index + max_files;
132
133 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
134 while let Ok(Some(entry)) = dir.next_entry().await {
135 if let Some(name) = entry.file_name().to_str() &&
136 let Some(number) = self.file_name_to_number(name) &&
137 (number < index || number >= last)
138 {
139 reth_fs_util::remove_file(entry.path())?;
140 }
141 }
142 }
143
144 Ok(())
145 }
146
147 pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
149 Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
150 }
151
152 pub async fn files_count(&self) -> usize {
154 let mut count = 0usize;
155
156 let file_extension = self.era_type.extension().trim_start_matches('.');
157
158 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
159 while let Ok(Some(entry)) = dir.next_entry().await {
160 if entry.path().extension() == Some(file_extension.as_ref()) {
161 count += 1;
162 }
163 }
164 }
165
166 count
167 }
168
169 pub async fn fetch_file_list(&self) -> eyre::Result<()> {
174 let index_path = self.folder.to_path_buf().join(INDEX_HTML_FILE);
175 let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
176
177 if self.era_type == EraFileType::Era1 {
179 let checksums_url = self.url.join(Self::CHECKSUMS)?;
180 try_join!(
181 self.download_file_to_path(self.url.clone(), &index_path),
182 self.download_file_to_path(checksums_url, &checksums_path)
183 )?;
184 } else {
185 self.download_file_to_path(self.url.clone(), &index_path).await?;
187 }
188
189 self.extract_era_filenames(&index_path).await?;
191
192 Ok(())
193 }
194
195 async fn extract_era_filenames(&self, index_path: &Path) -> eyre::Result<()> {
197 let file = File::open(index_path).await?;
198 let reader = io::BufReader::new(file);
199 let mut lines = reader.lines();
200
201 let path = self.folder.to_path_buf().join("index");
202 let file = File::create(&path).await?;
203 let mut writer = io::BufWriter::new(file);
204
205 let ext = self.era_type.extension();
206 let ext_len = ext.len();
207
208 while let Some(line) = lines.next_line().await? {
209 if let Some(j) = line.find(ext) &&
210 let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-')
211 {
212 let era = &line[i + 1..j + ext_len];
213 writer.write_all(era.as_bytes()).await?;
214 writer.write_all(b"\n").await?;
215 }
216 }
217
218 writer.flush().await?;
219 Ok(())
220 }
221
222 async fn download_file_to_path(&self, url: Url, path: &Path) -> eyre::Result<()> {
224 let mut stream = self.client.get(url).await?;
225 let mut file = File::create(path).await?;
226
227 while let Some(item) = stream.next().await.transpose()? {
228 io::copy(&mut item.as_ref(), &mut file).await?;
229 }
230
231 Ok(())
232 }
233
234 pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
236 let path = self.folder.to_path_buf().join("index");
237 let file = File::open(&path).await?;
238 let reader = io::BufReader::new(file);
239 let mut lines = reader.lines();
240 for _ in 0..number {
241 lines.next_line().await?;
242 }
243
244 Ok(lines.next_line().await?)
245 }
246
247 async fn is_downloaded(&self, name: &str, path: impl AsRef<Path>) -> eyre::Result<bool> {
248 let path = path.as_ref();
249
250 match File::open(path).await {
251 Ok(file) => {
252 if self.era_type == EraFileType::Era1 {
253 let number = self
254 .file_name_to_number(name)
255 .ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
256
257 let actual_checksum = checksum(file).await?;
258 let is_verified = self.verify_checksum(number, actual_checksum).await?;
259
260 if !is_verified {
261 fs::remove_file(path).await?;
262 }
263
264 Ok(is_verified)
265 } else {
266 Ok(true)
268 }
269 }
270 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
271 Err(e) => Err(e)?,
272 }
273 }
274
275 async fn verify_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<bool> {
280 Ok(actual_checksum == self.expected_checksum(number).await?)
281 }
282
283 async fn assert_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<()> {
288 let expected_checksum = self.expected_checksum(number).await?;
289
290 if actual_checksum == expected_checksum {
291 Ok(())
292 } else {
293 Err(eyre!(
294 "Checksum mismatch, got: {}, expected: {}",
295 actual_checksum.encode_hex(),
296 expected_checksum.encode_hex()
297 ))
298 }
299 }
300
301 async fn expected_checksum(&self, number: usize) -> eyre::Result<Vec<u8>> {
305 let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
306 let reader = io::BufReader::new(file);
307 let mut lines = reader.lines();
308
309 for _ in 0..number {
310 lines.next_line().await?;
311 }
312 let expected_checksum =
313 lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
314 let expected_checksum = hex::decode(expected_checksum)?;
315
316 Ok(expected_checksum)
317 }
318
319 fn file_name_to_number(&self, file_name: &str) -> Option<usize> {
320 file_name.split('-').nth(1).and_then(|v| usize::from_str(v).ok())
321 }
322}
323
324async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
325 let mut hasher = Sha256::new();
326
327 let mut data = vec![0; 64 * 1024];
329
330 loop {
331 let len = reader.read(&mut data).await?;
333 if len == 0 {
334 break;
335 } hasher.update(&data[..len]);
339 }
340
341 let hash = hasher.finalize().to_vec();
343
344 Ok(hash)
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use std::path::PathBuf;
351 use test_case::test_case;
352
353 impl EraClient<Client> {
354 fn empty() -> Self {
355 Self::new(Client::new(), Url::from_str("file:///").unwrap(), PathBuf::new())
356 }
357 }
358
359 #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
360 #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
361 #[test_case("00000-a81ae85f.era1", None)]
362 #[test_case("", None)]
363 fn test_file_name_to_number(file_name: &str, expected_number: Option<usize>) {
364 let client = EraClient::empty();
365
366 let actual_number = client.file_name_to_number(file_name);
367
368 assert_eq!(actual_number, expected_number);
369 }
370}