reth_era_downloader/
client.rs

1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use reth_era::common::file_ops::EraFileType;
7use sha2::{Digest, Sha256};
8use std::{future::Future, path::Path, str::FromStr};
9use tokio::{
10    fs::{self, File},
11    io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
12    try_join,
13};
14
15/// Downloaded index page filename
16const INDEX_HTML_FILE: &str = "index.html";
17
18/// Accesses the network over HTTP.
19pub trait HttpClient {
20    /// Makes an HTTP GET request to `url`. Returns a stream of response body bytes.
21    fn get<U: IntoUrl + Send + Sync>(
22        &self,
23        url: U,
24    ) -> impl Future<
25        Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
26    > + Send
27           + Sync;
28}
29
30impl HttpClient for Client {
31    async fn get<U: IntoUrl + Send + Sync>(
32        &self,
33        url: U,
34    ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
35        let response = Self::get(self, url).send().await?;
36
37        Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
38    }
39}
40
41/// An HTTP client with features for downloading ERA files from an external HTTP accessible
42/// endpoint.
43#[derive(Debug, Clone)]
44pub struct EraClient<Http> {
45    client: Http,
46    url: Url,
47    folder: Box<Path>,
48    era_type: EraFileType,
49}
50
51impl<Http: HttpClient + Clone> EraClient<Http> {
52    const CHECKSUMS: &'static str = "checksums.txt";
53
54    /// Constructs [`EraClient`] using `client` to download from `url` into `folder`.
55    pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
56        let era_type = EraFileType::from_url(url.as_str());
57        Self { client, url, folder: folder.into(), era_type }
58    }
59
60    /// Performs a GET request on `url` and stores the response body into a file located within
61    /// the `folder`.
62    pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
63        let path = self.folder.to_path_buf();
64
65        let url = url.into_url()?;
66        let client = self.client.clone();
67        let file_name = url
68            .path_segments()
69            .ok_or_eyre("cannot-be-a-base")?
70            .next_back()
71            .ok_or_eyre("empty path segments")?;
72        let path = path.join(file_name);
73
74        if !self.is_downloaded(file_name, &path).await? {
75            let number = self
76                .file_name_to_number(file_name)
77                .ok_or_eyre("Cannot parse number from file name")?;
78
79            let mut tries = 1..3;
80            let mut actual_checksum: eyre::Result<_>;
81            loop {
82                actual_checksum = async {
83                    let mut file = File::create(&path).await?;
84                    let mut stream = client.get(url.clone()).await?;
85                    let mut hasher = Sha256::new();
86
87                    while let Some(item) = stream.next().await.transpose()? {
88                        io::copy(&mut item.as_ref(), &mut file).await?;
89                        hasher.update(item);
90                    }
91
92                    Ok(hasher.finalize().to_vec())
93                }
94                .await;
95
96                if actual_checksum.is_ok() || tries.next().is_none() {
97                    break;
98                }
99            }
100
101            if self.era_type == EraFileType::Era1 {
102                self.assert_checksum(number, actual_checksum?)
103                    .await
104                    .map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
105            }
106        }
107
108        Ok(path.into_boxed_path())
109    }
110
111    /// Recovers index of file following the latest downloaded file from a different run.
112    pub async fn recover_index(&self) -> Option<usize> {
113        let mut max = None;
114
115        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
116            while let Ok(Some(entry)) = dir.next_entry().await {
117                if let Some(name) = entry.file_name().to_str() &&
118                    let Some(number) = self.file_name_to_number(name) &&
119                    (max.is_none() || matches!(max, Some(max) if number > max))
120                {
121                    max.replace(number + 1);
122                }
123            }
124        }
125
126        max
127    }
128
129    /// Deletes files that are outside-of the working range.
130    pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
131        let last = index + max_files;
132
133        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
134            while let Ok(Some(entry)) = dir.next_entry().await {
135                if let Some(name) = entry.file_name().to_str() &&
136                    let Some(number) = self.file_name_to_number(name) &&
137                    (number < index || number >= last)
138                {
139                    reth_fs_util::remove_file(entry.path())?;
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    /// Returns a download URL for the file corresponding to `number`.
148    pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
149        Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
150    }
151
152    /// Returns the number of files in the `folder`.
153    pub async fn files_count(&self) -> usize {
154        let mut count = 0usize;
155
156        let file_extension = self.era_type.extension().trim_start_matches('.');
157
158        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
159            while let Ok(Some(entry)) = dir.next_entry().await {
160                if entry.path().extension() == Some(file_extension.as_ref()) {
161                    count += 1;
162                }
163            }
164        }
165
166        count
167    }
168
169    /// Fetches the list of ERA1/ERA files from `url` and stores it in a file located within
170    /// `folder`.
171    /// For era files, checksum.txt file does not exist, so the checksum verification is
172    /// skipped.
173    pub async fn fetch_file_list(&self) -> eyre::Result<()> {
174        let index_path = self.folder.to_path_buf().join(INDEX_HTML_FILE);
175        let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
176
177        // Only for era1, we download also checksums file
178        if self.era_type == EraFileType::Era1 {
179            let checksums_url = self.url.join(Self::CHECKSUMS)?;
180            try_join!(
181                self.download_file_to_path(self.url.clone(), &index_path),
182                self.download_file_to_path(checksums_url, &checksums_path)
183            )?;
184        } else {
185            // Download only index file
186            self.download_file_to_path(self.url.clone(), &index_path).await?;
187        }
188
189        // Parse and extract era filenames from index.html
190        self.extract_era_filenames(&index_path).await?;
191
192        Ok(())
193    }
194
195    /// Extracts ERA filenames from `index.html` and writes them to the index file
196    async fn extract_era_filenames(&self, index_path: &Path) -> eyre::Result<()> {
197        let file = File::open(index_path).await?;
198        let reader = io::BufReader::new(file);
199        let mut lines = reader.lines();
200
201        let path = self.folder.to_path_buf().join("index");
202        let file = File::create(&path).await?;
203        let mut writer = io::BufWriter::new(file);
204
205        let ext = self.era_type.extension();
206        let ext_len = ext.len();
207
208        while let Some(line) = lines.next_line().await? {
209            if let Some(j) = line.find(ext) &&
210                let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-')
211            {
212                let era = &line[i + 1..j + ext_len];
213                writer.write_all(era.as_bytes()).await?;
214                writer.write_all(b"\n").await?;
215            }
216        }
217
218        writer.flush().await?;
219        Ok(())
220    }
221
222    // Helper to download a file to a specified path
223    async fn download_file_to_path(&self, url: Url, path: &Path) -> eyre::Result<()> {
224        let mut stream = self.client.get(url).await?;
225        let mut file = File::create(path).await?;
226
227        while let Some(item) = stream.next().await.transpose()? {
228            io::copy(&mut item.as_ref(), &mut file).await?;
229        }
230
231        Ok(())
232    }
233
234    /// Returns ERA1/ERA file name that is ordered at `number`.
235    pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
236        let path = self.folder.to_path_buf().join("index");
237        let file = File::open(&path).await?;
238        let reader = io::BufReader::new(file);
239        let mut lines = reader.lines();
240        for _ in 0..number {
241            lines.next_line().await?;
242        }
243
244        Ok(lines.next_line().await?)
245    }
246
247    async fn is_downloaded(&self, name: &str, path: impl AsRef<Path>) -> eyre::Result<bool> {
248        let path = path.as_ref();
249
250        match File::open(path).await {
251            Ok(file) => {
252                if self.era_type == EraFileType::Era1 {
253                    let number = self
254                        .file_name_to_number(name)
255                        .ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
256
257                    let actual_checksum = checksum(file).await?;
258                    let is_verified = self.verify_checksum(number, actual_checksum).await?;
259
260                    if !is_verified {
261                        fs::remove_file(path).await?;
262                    }
263
264                    Ok(is_verified)
265                } else {
266                    // For era files, we skip checksum verification, as checksum.txt does not exist
267                    Ok(true)
268                }
269            }
270            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
271            Err(e) => Err(e)?,
272        }
273    }
274
275    /// Returns `true` if `actual_checksum` matches expected checksum of the ERA1 file indexed by
276    /// `number` based on the [file list].
277    ///
278    /// [file list]: Self::fetch_file_list
279    async fn verify_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<bool> {
280        Ok(actual_checksum == self.expected_checksum(number).await?)
281    }
282
283    /// Returns `Ok` if `actual_checksum` matches expected checksum of the ERA1 file indexed by
284    /// `number` based on the [file list].
285    ///
286    /// [file list]: Self::fetch_file_list
287    async fn assert_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<()> {
288        let expected_checksum = self.expected_checksum(number).await?;
289
290        if actual_checksum == expected_checksum {
291            Ok(())
292        } else {
293            Err(eyre!(
294                "Checksum mismatch, got: {}, expected: {}",
295                actual_checksum.encode_hex(),
296                expected_checksum.encode_hex()
297            ))
298        }
299    }
300
301    /// Returns SHA-256 checksum for ERA1 file indexed by `number` based on the [file list].
302    ///
303    /// [file list]: Self::fetch_file_list
304    async fn expected_checksum(&self, number: usize) -> eyre::Result<Vec<u8>> {
305        let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
306        let reader = io::BufReader::new(file);
307        let mut lines = reader.lines();
308
309        for _ in 0..number {
310            lines.next_line().await?;
311        }
312        let expected_checksum =
313            lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
314        let expected_checksum = hex::decode(expected_checksum)?;
315
316        Ok(expected_checksum)
317    }
318
319    fn file_name_to_number(&self, file_name: &str) -> Option<usize> {
320        file_name.split('-').nth(1).and_then(|v| usize::from_str(v).ok())
321    }
322}
323
324async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
325    let mut hasher = Sha256::new();
326
327    // Create a buffer to read data into, sized for performance.
328    let mut data = vec![0; 64 * 1024];
329
330    loop {
331        // Read data from the reader into the buffer.
332        let len = reader.read(&mut data).await?;
333        if len == 0 {
334            break;
335        } // Exit loop if no more data.
336
337        // Update the hash with the data read.
338        hasher.update(&data[..len]);
339    }
340
341    // Finalize the hash after all data has been processed.
342    let hash = hasher.finalize().to_vec();
343
344    Ok(hash)
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use std::path::PathBuf;
351    use test_case::test_case;
352
353    impl EraClient<Client> {
354        fn empty() -> Self {
355            Self::new(Client::new(), Url::from_str("file:///").unwrap(), PathBuf::new())
356        }
357    }
358
359    #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
360    #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
361    #[test_case("00000-a81ae85f.era1", None)]
362    #[test_case("", None)]
363    fn test_file_name_to_number(file_name: &str, expected_number: Option<usize>) {
364        let client = EraClient::empty();
365
366        let actual_number = client.file_name_to_number(file_name);
367
368        assert_eq!(actual_number, expected_number);
369    }
370}