reth_era_downloader/
client.rs

1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use sha2::{Digest, Sha256};
7use std::{future::Future, path::Path, str::FromStr};
8use tokio::{
9    fs::{self, File},
10    io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
11    join, try_join,
12};
13
14/// Accesses the network over HTTP.
15pub trait HttpClient {
16    /// Makes an HTTP GET request to `url`. Returns a stream of response body bytes.
17    fn get<U: IntoUrl + Send + Sync>(
18        &self,
19        url: U,
20    ) -> impl Future<
21        Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
22    > + Send
23           + Sync;
24}
25
26impl HttpClient for Client {
27    async fn get<U: IntoUrl + Send + Sync>(
28        &self,
29        url: U,
30    ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
31        let response = Self::get(self, url).send().await?;
32
33        Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
34    }
35}
36
37/// An HTTP client with features for downloading ERA files from an external HTTP accessible
38/// endpoint.
39#[derive(Debug, Clone)]
40pub struct EraClient<Http> {
41    client: Http,
42    url: Url,
43    folder: Box<Path>,
44}
45
46impl<Http: HttpClient + Clone> EraClient<Http> {
47    const CHECKSUMS: &'static str = "checksums.txt";
48
49    /// Constructs [`EraClient`] using `client` to download from `url` into `folder`.
50    pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
51        Self { client, url, folder: folder.into() }
52    }
53
54    /// Performs a GET request on `url` and stores the response body into a file located within
55    /// the `folder`.
56    pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
57        let path = self.folder.to_path_buf();
58
59        let url = url.into_url()?;
60        let client = self.client.clone();
61        let file_name = url
62            .path_segments()
63            .ok_or_eyre("cannot-be-a-base")?
64            .next_back()
65            .ok_or_eyre("empty path segments")?;
66        let path = path.join(file_name);
67
68        if !self.is_downloaded(file_name, &path).await? {
69            let number = self
70                .file_name_to_number(file_name)
71                .ok_or_eyre("Cannot parse number from file name")?;
72
73            let mut tries = 1..3;
74            let mut actual_checksum: eyre::Result<_>;
75            loop {
76                actual_checksum = async {
77                    let mut file = File::create(&path).await?;
78                    let mut stream = client.get(url.clone()).await?;
79                    let mut hasher = Sha256::new();
80
81                    while let Some(item) = stream.next().await.transpose()? {
82                        io::copy(&mut item.as_ref(), &mut file).await?;
83                        hasher.update(item);
84                    }
85
86                    Ok(hasher.finalize().to_vec())
87                }
88                .await;
89
90                if actual_checksum.is_ok() || tries.next().is_none() {
91                    break;
92                }
93            }
94
95            self.assert_checksum(number, actual_checksum?)
96                .await
97                .map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
98        }
99
100        Ok(path.into_boxed_path())
101    }
102
103    /// Recovers index of file following the latest downloaded file from a different run.
104    pub async fn recover_index(&self) -> Option<usize> {
105        let mut max = None;
106
107        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
108            while let Ok(Some(entry)) = dir.next_entry().await {
109                if let Some(name) = entry.file_name().to_str() {
110                    if let Some(number) = self.file_name_to_number(name) {
111                        if max.is_none() || matches!(max, Some(max) if number > max) {
112                            max.replace(number + 1);
113                        }
114                    }
115                }
116            }
117        }
118
119        max
120    }
121
122    /// Deletes files that are outside-of the working range.
123    pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
124        let last = index + max_files;
125
126        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
127            while let Ok(Some(entry)) = dir.next_entry().await {
128                if let Some(name) = entry.file_name().to_str() {
129                    if let Some(number) = self.file_name_to_number(name) {
130                        if number < index || number >= last {
131                            eprintln!("Deleting file {}", entry.path().display());
132                            eprintln!("{number} < {index} || {number} >= {last}");
133                            reth_fs_util::remove_file(entry.path())?;
134                        }
135                    }
136                }
137            }
138        }
139
140        Ok(())
141    }
142
143    /// Returns a download URL for the file corresponding to `number`.
144    pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
145        Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
146    }
147
148    /// Returns the number of files in the `folder`.
149    pub async fn files_count(&self) -> usize {
150        let mut count = 0usize;
151
152        if let Ok(mut dir) = fs::read_dir(&self.folder).await {
153            while let Ok(Some(entry)) = dir.next_entry().await {
154                if entry.path().extension() == Some("era1".as_ref()) {
155                    count += 1;
156                }
157            }
158        }
159
160        count
161    }
162
163    /// Fetches the list of ERA1 files from `url` and stores it in a file located within `folder`.
164    pub async fn fetch_file_list(&self) -> eyre::Result<()> {
165        let (mut index, mut checksums) = try_join!(
166            self.client.get(self.url.clone()),
167            self.client.get(self.url.clone().join(Self::CHECKSUMS)?),
168        )?;
169
170        let index_path = self.folder.to_path_buf().join("index.html");
171        let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
172
173        let (mut index_file, mut checksums_file) =
174            try_join!(File::create(&index_path), File::create(&checksums_path))?;
175
176        loop {
177            let (index, checksums) = join!(index.next(), checksums.next());
178            let (index, checksums) = (index.transpose()?, checksums.transpose()?);
179
180            if index.is_none() && checksums.is_none() {
181                break;
182            }
183            let index_file = &mut index_file;
184            let checksums_file = &mut checksums_file;
185
186            try_join!(
187                async move {
188                    if let Some(index) = index {
189                        io::copy(&mut index.as_ref(), index_file).await?;
190                    }
191                    Ok::<(), eyre::Error>(())
192                },
193                async move {
194                    if let Some(checksums) = checksums {
195                        io::copy(&mut checksums.as_ref(), checksums_file).await?;
196                    }
197                    Ok::<(), eyre::Error>(())
198                },
199            )?;
200        }
201
202        let file = File::open(&index_path).await?;
203        let reader = io::BufReader::new(file);
204        let mut lines = reader.lines();
205
206        let path = self.folder.to_path_buf().join("index");
207        let file = File::create(&path).await?;
208        let mut writer = io::BufWriter::new(file);
209
210        while let Some(line) = lines.next_line().await? {
211            if let Some(j) = line.find(".era1") {
212                if let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-') {
213                    let era = &line[i + 1..j + 5];
214                    writer.write_all(era.as_bytes()).await?;
215                    writer.write_all(b"\n").await?;
216                }
217            }
218        }
219        writer.flush().await?;
220
221        Ok(())
222    }
223
224    /// Returns ERA1 file name that is ordered at `number`.
225    pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
226        let path = self.folder.to_path_buf().join("index");
227        let file = File::open(&path).await?;
228        let reader = io::BufReader::new(file);
229        let mut lines = reader.lines();
230        for _ in 0..number {
231            lines.next_line().await?;
232        }
233
234        Ok(lines.next_line().await?)
235    }
236
237    async fn is_downloaded(&self, name: &str, path: impl AsRef<Path>) -> eyre::Result<bool> {
238        let path = path.as_ref();
239
240        match File::open(path).await {
241            Ok(file) => {
242                let number = self
243                    .file_name_to_number(name)
244                    .ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
245
246                let actual_checksum = checksum(file).await?;
247                let is_verified = self.verify_checksum(number, actual_checksum).await?;
248
249                if !is_verified {
250                    fs::remove_file(path).await?;
251                }
252
253                Ok(is_verified)
254            }
255            Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
256            Err(e) => Err(e)?,
257        }
258    }
259
260    /// Returns `true` if `actual_checksum` matches expected checksum of the ERA1 file indexed by
261    /// `number` based on the [file list].
262    ///
263    /// [file list]: Self::fetch_file_list
264    async fn verify_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<bool> {
265        Ok(actual_checksum == self.expected_checksum(number).await?)
266    }
267
268    /// Returns `Ok` if `actual_checksum` matches expected checksum of the ERA1 file indexed by
269    /// `number` based on the [file list].
270    ///
271    /// [file list]: Self::fetch_file_list
272    async fn assert_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<()> {
273        let expected_checksum = self.expected_checksum(number).await?;
274
275        if actual_checksum == expected_checksum {
276            Ok(())
277        } else {
278            Err(eyre!(
279                "Checksum mismatch, got: {}, expected: {}",
280                actual_checksum.encode_hex(),
281                expected_checksum.encode_hex()
282            ))
283        }
284    }
285
286    /// Returns SHA-256 checksum for ERA1 file indexed by `number` based on the [file list].
287    ///
288    /// [file list]: Self::fetch_file_list
289    async fn expected_checksum(&self, number: usize) -> eyre::Result<Vec<u8>> {
290        let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
291        let reader = io::BufReader::new(file);
292        let mut lines = reader.lines();
293
294        for _ in 0..number {
295            lines.next_line().await?;
296        }
297        let expected_checksum =
298            lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
299        let expected_checksum = hex::decode(expected_checksum)?;
300
301        Ok(expected_checksum)
302    }
303
304    fn file_name_to_number(&self, file_name: &str) -> Option<usize> {
305        file_name.split('-').nth(1).and_then(|v| usize::from_str(v).ok())
306    }
307}
308
309async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
310    let mut hasher = Sha256::new();
311
312    // Create a buffer to read data into, sized for performance.
313    let mut data = vec![0; 64 * 1024];
314
315    loop {
316        // Read data from the reader into the buffer.
317        let len = reader.read(&mut data).await?;
318        if len == 0 {
319            break;
320        } // Exit loop if no more data.
321
322        // Update the hash with the data read.
323        hasher.update(&data[..len]);
324    }
325
326    // Finalize the hash after all data has been processed.
327    let hash = hasher.finalize().to_vec();
328
329    Ok(hash)
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use std::path::PathBuf;
336    use test_case::test_case;
337
338    impl EraClient<Client> {
339        fn empty() -> Self {
340            Self::new(Client::new(), Url::from_str("file:///").unwrap(), PathBuf::new())
341        }
342    }
343
344    #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
345    #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
346    #[test_case("00000-a81ae85f.era1", None)]
347    #[test_case("", None)]
348    fn test_file_name_to_number(file_name: &str, expected_number: Option<usize>) {
349        let client = EraClient::empty();
350
351        let actual_number = client.file_name_to_number(file_name);
352
353        assert_eq!(actual_number, expected_number);
354    }
355}