Skip to main content

reth_era_downloader/
client.rs

1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use reth_era::common::file_ops::EraFileType;
7use sha2::{Digest, Sha256};
8use std::{future::Future, path::Path, str::FromStr};
9use tokio::{
10    fs::{self, File},
11    io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
12    try_join,
13};
14
15/// Downloaded index page filename
16const INDEX_HTML_FILE: &str = "index.html";
17
18/// Accesses the network over HTTP.
19pub trait HttpClient {
20    /// Makes an HTTP GET request to `url`. Returns a stream of response body bytes.
21    fn get<U: IntoUrl + Send + Sync>(
22        &self,
23        url: U,
24    ) -> impl Future<
25        Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
26    > + Send
27           + Sync;
28}
29
30impl HttpClient for Client {
31    async fn get<U: IntoUrl + Send + Sync>(
32        &self,
33        url: U,
34    ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
35        let response = Self::get(self, url).send().await?;
36
37        Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
38    }
39}
40
41/// An HTTP client with features for downloading ERA files from an external HTTP accessible
42/// endpoint.
43#[derive(Debug, Clone)]
44pub struct EraClient<Http> {
45    client: Http,
46    url: Url,
47    folder: Box<Path>,
48    era_type: EraFileType,
49}
50
51impl<Http: HttpClient + Clone> EraClient<Http> {
52    const CHECKSUMS: &'static str = "checksums.txt";
53
54    /// Constructs [`EraClient`] using `client` to download from `url` into `folder`.
55    ///
56    /// The file type is auto-detected from the URL. Use
57    /// [`with_era_type`](Self::with_era_type) to override.
58    pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
59        let era_type = EraFileType::from_url(url.as_str());
60        Self { client, url, folder: folder.into(), era_type }
61    }
62
63    /// Override the auto-detected [`EraFileType`].
64    pub const fn with_era_type(mut self, era_type: EraFileType) -> Self {
65        self.era_type = era_type;
66        self
67    }
68
69    /// Performs a GET request on `url` and stores the response body into a file located within
70    /// the `folder`.
71    pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
72        let path = self.folder.to_path_buf();
73
74        let url = url.into_url()?;
75        let client = self.client.clone();
76        let file_name = url
77            .path_segments()
78            .ok_or_eyre("cannot-be-a-base")?
79            .next_back()
80            .ok_or_eyre("empty path segments")?;
81        let path = path.join(file_name);
82
83        if !self.is_downloaded(file_name, &path).await? {
84            let number = self
85                .file_name_to_number(file_name)
86                .ok_or_eyre("Cannot parse number from file name")?;
87
88            let mut tries = 1..3;
89            let mut actual_checksum: eyre::Result<_>;
90            loop {
91                actual_checksum = async {
92                    let mut file = File::create(&path).await?;
93                    let mut stream = client.get(url.clone()).await?;
94                    let mut hasher = Sha256::new();
95
96                    while let Some(item) = stream.next().await.transpose()? {
97                        io::copy(&mut item.as_ref(), &mut file).await?;
98                        hasher.update(item);
99                    }
100
101                    Ok(hasher.finalize().to_vec())
102                }
103                .await;
104
105                if actual_checksum.is_ok() || tries.next().is_none() {
106                    break;
107                }
108            }
109
110            if self.era_type.has_checksums() {
111                self.assert_checksum(number, actual_checksum?)
112                    .await
113                    .map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
114            }
115        }
116
117        Ok(path.into_boxed_path())
118    }
119
120    /// Recovers index of file following the latest downloaded file from a different run.
121    pub async fn recover_index(&self) -> Option<usize> {
122        let mut max = None;
123
124        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
125            while let Ok(Some(entry)) = dir.next_entry().await {
126                if let Some(name) = entry.file_name().to_str() &&
127                    let Some(number) = self.file_name_to_number(name) &&
128                    (max.is_none() || matches!(max, Some(max) if number > max))
129                {
130                    max.replace(number + 1);
131                }
132            }
133        }
134
135        max
136    }
137
138    /// Deletes files that are outside-of the working range.
139    pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
140        let last = index + max_files;
141
142        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
143            while let Ok(Some(entry)) = dir.next_entry().await {
144                if let Some(name) = entry.file_name().to_str() &&
145                    let Some(number) = self.file_name_to_number(name) &&
146                    (number < index || number >= last)
147                {
148                    reth_fs_util::remove_file_if_exists(entry.path())?;
149                }
150            }
151        }
152
153        Ok(())
154    }
155
156    /// Returns a download URL for the file corresponding to `number`.
157    pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
158        Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
159    }
160
161    /// Returns the number of files in the `folder`.
162    pub async fn files_count(&self) -> usize {
163        let mut count = 0usize;
164
165        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
166            while let Ok(Some(entry)) = dir.next_entry().await {
167                if let Some(ext) = entry.path().extension().and_then(|ext| ext.to_str()) &&
168                    self.era_type
169                        .extensions()
170                        .iter()
171                        .any(|valid| valid.trim_start_matches('.') == ext)
172                {
173                    count += 1;
174                }
175            }
176        }
177
178        count
179    }
180
181    /// Fetches the list of ERA1/ERA files from `url` and stores it in a file located within
182    /// `folder`.
183    /// For era files, checksum.txt file does not exist, so the checksum verification is
184    /// skipped.
185    pub async fn fetch_file_list(&self) -> eyre::Result<()> {
186        let index_path = self.folder.to_path_buf().join(INDEX_HTML_FILE);
187        let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
188
189        // Only for files that ship checksums (era1, ere) we also download the checksums file.
190        if self.era_type.has_checksums() {
191            let checksums_url = self.url.join(Self::CHECKSUMS)?;
192            try_join!(
193                self.download_file_to_path(self.url.clone(), &index_path),
194                self.download_file_to_path(checksums_url, &checksums_path)
195            )?;
196        } else {
197            // Download only index file
198            self.download_file_to_path(self.url.clone(), &index_path).await?;
199        }
200
201        // Parse and extract era filenames from index.html
202        self.extract_era_filenames(&index_path).await?;
203
204        Ok(())
205    }
206
207    /// Extracts ERA filenames from `index.html` and writes them to the index file
208    async fn extract_era_filenames(&self, index_path: &Path) -> eyre::Result<()> {
209        let file = File::open(index_path).await?;
210        let reader = io::BufReader::new(file);
211        let mut lines = reader.lines();
212
213        let path = self.folder.to_path_buf().join("index");
214        let file = File::create(&path).await?;
215        let mut writer = io::BufWriter::new(file);
216
217        while let Some(line) = lines.next_line().await? {
218            if let Some(era) = extract_era_filename(&line, self.era_type.extensions()) {
219                writer.write_all(era.as_bytes()).await?;
220                writer.write_all(b"\n").await?;
221            }
222        }
223
224        writer.flush().await?;
225        Ok(())
226    }
227
228    // Helper to download a file to a specified path
229    async fn download_file_to_path(&self, url: Url, path: &Path) -> eyre::Result<()> {
230        let mut stream = self.client.get(url).await?;
231        let mut file = File::create(path).await?;
232
233        while let Some(item) = stream.next().await.transpose()? {
234            io::copy(&mut item.as_ref(), &mut file).await?;
235        }
236
237        Ok(())
238    }
239
240    /// Returns ERA1/ERA file name that is ordered at `number`.
241    pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
242        let path = self.folder.to_path_buf().join("index");
243        let file = File::open(&path).await?;
244        let reader = io::BufReader::new(file);
245        let mut lines = reader.lines();
246        for _ in 0..number {
247            lines.next_line().await?;
248        }
249
250        Ok(lines.next_line().await?)
251    }
252
253    async fn is_downloaded(&self, name: &str, path: impl AsRef<Path>) -> eyre::Result<bool> {
254        let path = path.as_ref();
255
256        match File::open(path).await {
257            Ok(file) => {
258                if self.era_type.has_checksums() {
259                    let number = self
260                        .file_name_to_number(name)
261                        .ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
262
263                    let actual_checksum = checksum(file).await?;
264                    let is_verified = self.verify_checksum(number, actual_checksum).await?;
265
266                    if !is_verified {
267                        fs::remove_file(path).await?;
268                    }
269
270                    Ok(is_verified)
271                } else {
272                    // For era files there is no checksums.txt, so verification is skipped.
273                    Ok(true)
274                }
275            }
276            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
277            Err(e) => Err(e)?,
278        }
279    }
280
281    /// Returns `true` if `actual_checksum` matches expected checksum of the ERA1 file indexed by
282    /// `number` based on the [file list].
283    ///
284    /// [file list]: Self::fetch_file_list
285    async fn verify_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<bool> {
286        Ok(actual_checksum == self.expected_checksum(number).await?)
287    }
288
289    /// Returns `Ok` if `actual_checksum` matches expected checksum of the ERA1 file indexed by
290    /// `number` based on the [file list].
291    ///
292    /// [file list]: Self::fetch_file_list
293    async fn assert_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<()> {
294        let expected_checksum = self.expected_checksum(number).await?;
295
296        if actual_checksum == expected_checksum {
297            Ok(())
298        } else {
299            Err(eyre!(
300                "Checksum mismatch, got: {}, expected: {}",
301                actual_checksum.encode_hex(),
302                expected_checksum.encode_hex()
303            ))
304        }
305    }
306
307    /// Returns SHA-256 checksum for ERA1 file indexed by `number` based on the [file list].
308    ///
309    /// [file list]: Self::fetch_file_list
310    async fn expected_checksum(&self, number: usize) -> eyre::Result<Vec<u8>> {
311        let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
312        let reader = io::BufReader::new(file);
313        let mut lines = reader.lines();
314
315        for _ in 0..number {
316            lines.next_line().await?;
317        }
318        let expected_checksum =
319            lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
320        let expected_checksum = hex::decode(expected_checksum)?;
321
322        Ok(expected_checksum)
323    }
324
325    fn file_name_to_number(&self, file_name: &str) -> Option<usize> {
326        file_name.split('-').nth(1).and_then(|v| usize::from_str(v).ok())
327    }
328}
329
330/// Extracts an era filename ending in one of `extensions` from a single index line.
331///
332/// `extensions` are tried in order; pass them longest-first so `.ere` never matches inside `.erae`.
333fn extract_era_filename<'a>(line: &'a str, extensions: &[&str]) -> Option<&'a str> {
334    for ext in extensions {
335        if let Some(j) = line.find(ext) &&
336            let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-')
337        {
338            return Some(&line[i + 1..j + ext.len()]);
339        }
340    }
341    None
342}
343
344async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
345    let mut hasher = Sha256::new();
346
347    // Create a buffer to read data into, sized for performance.
348    let mut data = vec![0; 64 * 1024];
349
350    loop {
351        // Read data from the reader into the buffer.
352        let len = reader.read(&mut data).await?;
353        if len == 0 {
354            break;
355        } // Exit loop if no more data.
356
357        // Update the hash with the data read.
358        hasher.update(&data[..len]);
359    }
360
361    // Finalize the hash after all data has been processed.
362    let hash = hasher.finalize().to_vec();
363
364    Ok(hash)
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use std::path::PathBuf;
371    use test_case::test_case;
372
373    impl EraClient<Client> {
374        fn empty() -> Self {
375            Self::new(Client::new(), Url::from_str("file:///").unwrap(), PathBuf::new())
376        }
377    }
378
379    #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
380    #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
381    #[test_case("00000-a81ae85f.era1", None)]
382    #[test_case("", None)]
383    fn test_file_name_to_number(file_name: &str, expected_number: Option<usize>) {
384        let client = EraClient::empty();
385
386        let actual_number = client.file_name_to_number(file_name);
387
388        assert_eq!(actual_number, expected_number);
389    }
390
391    // `.erae` lines must yield the full `.erae` name, never the `.ere` prefix inside it.
392    #[test_case(
393        "<a href=\"mainnet-00000-a6860fef.erae\">", &[".erae", ".ere"],
394        Some("mainnet-00000-a6860fef.erae"); "erae anchor not clipped to ere"
395    )]
396    #[test_case(
397        "    \"name\": \"mainnet-00001-05c64fc4.erae\",", &[".erae", ".ere"],
398        Some("mainnet-00001-05c64fc4.erae"); "erae json entry"
399    )]
400    #[test_case(
401        "<a href=\"mainnet-00600-a81ae85f.era1\">", &[".era1"],
402        Some("mainnet-00600-a81ae85f.era1"); "era1 anchor"
403    )]
404    #[test_case("<a href=\"checksums.txt\">", &[".erae", ".ere"], None; "no era file on line")]
405    fn test_extract_era_filename(line: &str, exts: &[&str], expected: Option<&str>) {
406        assert_eq!(extract_era_filename(line, exts), expected);
407    }
408
409    #[test]
410    fn test_with_era_type_overrides_auto_detection() {
411        // URL without "era1" auto-detects as Era
412        let client = EraClient::new(
413            Client::new(),
414            Url::from_str("https://example.com/").unwrap(),
415            PathBuf::new(),
416        );
417        assert_eq!(client.era_type, EraFileType::Era);
418
419        // with_era_type overrides to Era1
420        let client = client.with_era_type(EraFileType::Era1);
421        assert_eq!(client.era_type, EraFileType::Era1);
422    }
423}