1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use reth_era::common::file_ops::EraFileType;
7use sha2::{Digest, Sha256};
8use std::{future::Future, path::Path, str::FromStr};
9use tokio::{
10 fs::{self, File},
11 io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
12 try_join,
13};
14
15const INDEX_HTML_FILE: &str = "index.html";
17
18pub trait HttpClient {
20 fn get<U: IntoUrl + Send + Sync>(
22 &self,
23 url: U,
24 ) -> impl Future<
25 Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
26 > + Send
27 + Sync;
28}
29
30impl HttpClient for Client {
31 async fn get<U: IntoUrl + Send + Sync>(
32 &self,
33 url: U,
34 ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
35 let response = Self::get(self, url).send().await?;
36
37 Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
38 }
39}
40
41#[derive(Debug, Clone)]
44pub struct EraClient<Http> {
45 client: Http,
46 url: Url,
47 folder: Box<Path>,
48 era_type: EraFileType,
49}
50
51impl<Http: HttpClient + Clone> EraClient<Http> {
52 const CHECKSUMS: &'static str = "checksums.txt";
53
54 pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
59 let era_type = EraFileType::from_url(url.as_str());
60 Self { client, url, folder: folder.into(), era_type }
61 }
62
63 pub const fn with_era_type(mut self, era_type: EraFileType) -> Self {
65 self.era_type = era_type;
66 self
67 }
68
69 pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
72 let path = self.folder.to_path_buf();
73
74 let url = url.into_url()?;
75 let client = self.client.clone();
76 let file_name = url
77 .path_segments()
78 .ok_or_eyre("cannot-be-a-base")?
79 .next_back()
80 .ok_or_eyre("empty path segments")?;
81 let path = path.join(file_name);
82
83 if !self.is_downloaded(file_name, &path).await? {
84 let number = self
85 .file_name_to_number(file_name)
86 .ok_or_eyre("Cannot parse number from file name")?;
87
88 let mut tries = 1..3;
89 let mut actual_checksum: eyre::Result<_>;
90 loop {
91 actual_checksum = async {
92 let mut file = File::create(&path).await?;
93 let mut stream = client.get(url.clone()).await?;
94 let mut hasher = Sha256::new();
95
96 while let Some(item) = stream.next().await.transpose()? {
97 io::copy(&mut item.as_ref(), &mut file).await?;
98 hasher.update(item);
99 }
100
101 Ok(hasher.finalize().to_vec())
102 }
103 .await;
104
105 if actual_checksum.is_ok() || tries.next().is_none() {
106 break;
107 }
108 }
109
110 if self.era_type.has_checksums() {
111 self.assert_checksum(number, actual_checksum?)
112 .await
113 .map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
114 }
115 }
116
117 Ok(path.into_boxed_path())
118 }
119
120 pub async fn recover_index(&self) -> Option<usize> {
122 let mut max = None;
123
124 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
125 while let Ok(Some(entry)) = dir.next_entry().await {
126 if let Some(name) = entry.file_name().to_str() &&
127 let Some(number) = self.file_name_to_number(name) &&
128 (max.is_none() || matches!(max, Some(max) if number > max))
129 {
130 max.replace(number + 1);
131 }
132 }
133 }
134
135 max
136 }
137
138 pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
140 let last = index + max_files;
141
142 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
143 while let Ok(Some(entry)) = dir.next_entry().await {
144 if let Some(name) = entry.file_name().to_str() &&
145 let Some(number) = self.file_name_to_number(name) &&
146 (number < index || number >= last)
147 {
148 reth_fs_util::remove_file_if_exists(entry.path())?;
149 }
150 }
151 }
152
153 Ok(())
154 }
155
156 pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
158 Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
159 }
160
161 pub async fn files_count(&self) -> usize {
163 let mut count = 0usize;
164
165 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
166 while let Ok(Some(entry)) = dir.next_entry().await {
167 if let Some(ext) = entry.path().extension().and_then(|ext| ext.to_str()) &&
168 self.era_type
169 .extensions()
170 .iter()
171 .any(|valid| valid.trim_start_matches('.') == ext)
172 {
173 count += 1;
174 }
175 }
176 }
177
178 count
179 }
180
181 pub async fn fetch_file_list(&self) -> eyre::Result<()> {
186 let index_path = self.folder.to_path_buf().join(INDEX_HTML_FILE);
187 let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
188
189 if self.era_type.has_checksums() {
191 let checksums_url = self.url.join(Self::CHECKSUMS)?;
192 try_join!(
193 self.download_file_to_path(self.url.clone(), &index_path),
194 self.download_file_to_path(checksums_url, &checksums_path)
195 )?;
196 } else {
197 self.download_file_to_path(self.url.clone(), &index_path).await?;
199 }
200
201 self.extract_era_filenames(&index_path).await?;
203
204 Ok(())
205 }
206
207 async fn extract_era_filenames(&self, index_path: &Path) -> eyre::Result<()> {
209 let file = File::open(index_path).await?;
210 let reader = io::BufReader::new(file);
211 let mut lines = reader.lines();
212
213 let path = self.folder.to_path_buf().join("index");
214 let file = File::create(&path).await?;
215 let mut writer = io::BufWriter::new(file);
216
217 while let Some(line) = lines.next_line().await? {
218 if let Some(era) = extract_era_filename(&line, self.era_type.extensions()) {
219 writer.write_all(era.as_bytes()).await?;
220 writer.write_all(b"\n").await?;
221 }
222 }
223
224 writer.flush().await?;
225 Ok(())
226 }
227
228 async fn download_file_to_path(&self, url: Url, path: &Path) -> eyre::Result<()> {
230 let mut stream = self.client.get(url).await?;
231 let mut file = File::create(path).await?;
232
233 while let Some(item) = stream.next().await.transpose()? {
234 io::copy(&mut item.as_ref(), &mut file).await?;
235 }
236
237 Ok(())
238 }
239
240 pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
242 let path = self.folder.to_path_buf().join("index");
243 let file = File::open(&path).await?;
244 let reader = io::BufReader::new(file);
245 let mut lines = reader.lines();
246 for _ in 0..number {
247 lines.next_line().await?;
248 }
249
250 Ok(lines.next_line().await?)
251 }
252
253 async fn is_downloaded(&self, name: &str, path: impl AsRef<Path>) -> eyre::Result<bool> {
254 let path = path.as_ref();
255
256 match File::open(path).await {
257 Ok(file) => {
258 if self.era_type.has_checksums() {
259 let number = self
260 .file_name_to_number(name)
261 .ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
262
263 let actual_checksum = checksum(file).await?;
264 let is_verified = self.verify_checksum(number, actual_checksum).await?;
265
266 if !is_verified {
267 fs::remove_file(path).await?;
268 }
269
270 Ok(is_verified)
271 } else {
272 Ok(true)
274 }
275 }
276 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
277 Err(e) => Err(e)?,
278 }
279 }
280
281 async fn verify_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<bool> {
286 Ok(actual_checksum == self.expected_checksum(number).await?)
287 }
288
289 async fn assert_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<()> {
294 let expected_checksum = self.expected_checksum(number).await?;
295
296 if actual_checksum == expected_checksum {
297 Ok(())
298 } else {
299 Err(eyre!(
300 "Checksum mismatch, got: {}, expected: {}",
301 actual_checksum.encode_hex(),
302 expected_checksum.encode_hex()
303 ))
304 }
305 }
306
307 async fn expected_checksum(&self, number: usize) -> eyre::Result<Vec<u8>> {
311 let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
312 let reader = io::BufReader::new(file);
313 let mut lines = reader.lines();
314
315 for _ in 0..number {
316 lines.next_line().await?;
317 }
318 let expected_checksum =
319 lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
320 let expected_checksum = hex::decode(expected_checksum)?;
321
322 Ok(expected_checksum)
323 }
324
325 fn file_name_to_number(&self, file_name: &str) -> Option<usize> {
326 file_name.split('-').nth(1).and_then(|v| usize::from_str(v).ok())
327 }
328}
329
330fn extract_era_filename<'a>(line: &'a str, extensions: &[&str]) -> Option<&'a str> {
334 for ext in extensions {
335 if let Some(j) = line.find(ext) &&
336 let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-')
337 {
338 return Some(&line[i + 1..j + ext.len()]);
339 }
340 }
341 None
342}
343
344async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
345 let mut hasher = Sha256::new();
346
347 let mut data = vec![0; 64 * 1024];
349
350 loop {
351 let len = reader.read(&mut data).await?;
353 if len == 0 {
354 break;
355 } hasher.update(&data[..len]);
359 }
360
361 let hash = hasher.finalize().to_vec();
363
364 Ok(hash)
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use std::path::PathBuf;
371 use test_case::test_case;
372
373 impl EraClient<Client> {
374 fn empty() -> Self {
375 Self::new(Client::new(), Url::from_str("file:///").unwrap(), PathBuf::new())
376 }
377 }
378
379 #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
380 #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
381 #[test_case("00000-a81ae85f.era1", None)]
382 #[test_case("", None)]
383 fn test_file_name_to_number(file_name: &str, expected_number: Option<usize>) {
384 let client = EraClient::empty();
385
386 let actual_number = client.file_name_to_number(file_name);
387
388 assert_eq!(actual_number, expected_number);
389 }
390
391 #[test_case(
393 "<a href=\"mainnet-00000-a6860fef.erae\">", &[".erae", ".ere"],
394 Some("mainnet-00000-a6860fef.erae"); "erae anchor not clipped to ere"
395 )]
396 #[test_case(
397 " \"name\": \"mainnet-00001-05c64fc4.erae\",", &[".erae", ".ere"],
398 Some("mainnet-00001-05c64fc4.erae"); "erae json entry"
399 )]
400 #[test_case(
401 "<a href=\"mainnet-00600-a81ae85f.era1\">", &[".era1"],
402 Some("mainnet-00600-a81ae85f.era1"); "era1 anchor"
403 )]
404 #[test_case("<a href=\"checksums.txt\">", &[".erae", ".ere"], None; "no era file on line")]
405 fn test_extract_era_filename(line: &str, exts: &[&str], expected: Option<&str>) {
406 assert_eq!(extract_era_filename(line, exts), expected);
407 }
408
409 #[test]
410 fn test_with_era_type_overrides_auto_detection() {
411 let client = EraClient::new(
413 Client::new(),
414 Url::from_str("https://example.com/").unwrap(),
415 PathBuf::new(),
416 );
417 assert_eq!(client.era_type, EraFileType::Era);
418
419 let client = client.with_era_type(EraFileType::Era1);
421 assert_eq!(client.era_type, EraFileType::Era1);
422 }
423}