reth_era_downloader/
client.rs1use alloy_primitives::{hex, hex::ToHexExt};
2use bytes::Bytes;
3use eyre::{eyre, OptionExt};
4use futures_util::{stream::StreamExt, Stream, TryStreamExt};
5use reqwest::{Client, IntoUrl, Url};
6use sha2::{Digest, Sha256};
7use std::{future::Future, path::Path, str::FromStr};
8use tokio::{
9 fs::{self, File},
10 io::{self, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWriteExt},
11 join, try_join,
12};
13
14pub trait HttpClient {
16 fn get<U: IntoUrl + Send + Sync>(
18 &self,
19 url: U,
20 ) -> impl Future<
21 Output = eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Send + Sync + Unpin>,
22 > + Send
23 + Sync;
24}
25
26impl HttpClient for Client {
27 async fn get<U: IntoUrl + Send + Sync>(
28 &self,
29 url: U,
30 ) -> eyre::Result<impl Stream<Item = eyre::Result<Bytes>> + Unpin> {
31 let response = Self::get(self, url).send().await?;
32
33 Ok(response.bytes_stream().map_err(|e| eyre::Error::new(e)))
34 }
35}
36
37#[derive(Debug, Clone)]
40pub struct EraClient<Http> {
41 client: Http,
42 url: Url,
43 folder: Box<Path>,
44}
45
46impl<Http: HttpClient + Clone> EraClient<Http> {
47 const CHECKSUMS: &'static str = "checksums.txt";
48
49 pub fn new(client: Http, url: Url, folder: impl Into<Box<Path>>) -> Self {
51 Self { client, url, folder: folder.into() }
52 }
53
54 pub async fn download_to_file(&mut self, url: impl IntoUrl) -> eyre::Result<Box<Path>> {
57 let path = self.folder.to_path_buf();
58
59 let url = url.into_url()?;
60 let client = self.client.clone();
61 let file_name = url
62 .path_segments()
63 .ok_or_eyre("cannot-be-a-base")?
64 .next_back()
65 .ok_or_eyre("empty path segments")?;
66 let path = path.join(file_name);
67
68 if !self.is_downloaded(file_name, &path).await? {
69 let number = self
70 .file_name_to_number(file_name)
71 .ok_or_eyre("Cannot parse number from file name")?;
72
73 let mut tries = 1..3;
74 let mut actual_checksum: eyre::Result<_>;
75 loop {
76 actual_checksum = async {
77 let mut file = File::create(&path).await?;
78 let mut stream = client.get(url.clone()).await?;
79 let mut hasher = Sha256::new();
80
81 while let Some(item) = stream.next().await.transpose()? {
82 io::copy(&mut item.as_ref(), &mut file).await?;
83 hasher.update(item);
84 }
85
86 Ok(hasher.finalize().to_vec())
87 }
88 .await;
89
90 if actual_checksum.is_ok() || tries.next().is_none() {
91 break;
92 }
93 }
94
95 self.assert_checksum(number, actual_checksum?)
96 .await
97 .map_err(|e| eyre!("{e} for {file_name} at {}", path.display()))?;
98 }
99
100 Ok(path.into_boxed_path())
101 }
102
103 pub async fn recover_index(&self) -> Option<usize> {
105 let mut max = None;
106
107 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
108 while let Ok(Some(entry)) = dir.next_entry().await {
109 if let Some(name) = entry.file_name().to_str() {
110 if let Some(number) = self.file_name_to_number(name) {
111 if max.is_none() || matches!(max, Some(max) if number > max) {
112 max.replace(number + 1);
113 }
114 }
115 }
116 }
117 }
118
119 max
120 }
121
122 pub async fn delete_outside_range(&self, index: usize, max_files: usize) -> eyre::Result<()> {
124 let last = index + max_files;
125
126 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
127 while let Ok(Some(entry)) = dir.next_entry().await {
128 if let Some(name) = entry.file_name().to_str() {
129 if let Some(number) = self.file_name_to_number(name) {
130 if number < index || number >= last {
131 eprintln!("Deleting file {}", entry.path().display());
132 eprintln!("{number} < {index} || {number} >= {last}");
133 reth_fs_util::remove_file(entry.path())?;
134 }
135 }
136 }
137 }
138 }
139
140 Ok(())
141 }
142
143 pub async fn url(&self, number: usize) -> eyre::Result<Option<Url>> {
145 Ok(self.number_to_file_name(number).await?.map(|name| self.url.join(&name)).transpose()?)
146 }
147
148 pub async fn files_count(&self) -> usize {
150 let mut count = 0usize;
151
152 if let Ok(mut dir) = fs::read_dir(&self.folder).await {
153 while let Ok(Some(entry)) = dir.next_entry().await {
154 if entry.path().extension() == Some("era1".as_ref()) {
155 count += 1;
156 }
157 }
158 }
159
160 count
161 }
162
163 pub async fn fetch_file_list(&self) -> eyre::Result<()> {
165 let (mut index, mut checksums) = try_join!(
166 self.client.get(self.url.clone()),
167 self.client.get(self.url.clone().join(Self::CHECKSUMS)?),
168 )?;
169
170 let index_path = self.folder.to_path_buf().join("index.html");
171 let checksums_path = self.folder.to_path_buf().join(Self::CHECKSUMS);
172
173 let (mut index_file, mut checksums_file) =
174 try_join!(File::create(&index_path), File::create(&checksums_path))?;
175
176 loop {
177 let (index, checksums) = join!(index.next(), checksums.next());
178 let (index, checksums) = (index.transpose()?, checksums.transpose()?);
179
180 if index.is_none() && checksums.is_none() {
181 break;
182 }
183 let index_file = &mut index_file;
184 let checksums_file = &mut checksums_file;
185
186 try_join!(
187 async move {
188 if let Some(index) = index {
189 io::copy(&mut index.as_ref(), index_file).await?;
190 }
191 Ok::<(), eyre::Error>(())
192 },
193 async move {
194 if let Some(checksums) = checksums {
195 io::copy(&mut checksums.as_ref(), checksums_file).await?;
196 }
197 Ok::<(), eyre::Error>(())
198 },
199 )?;
200 }
201
202 let file = File::open(&index_path).await?;
203 let reader = io::BufReader::new(file);
204 let mut lines = reader.lines();
205
206 let path = self.folder.to_path_buf().join("index");
207 let file = File::create(&path).await?;
208 let mut writer = io::BufWriter::new(file);
209
210 while let Some(line) = lines.next_line().await? {
211 if let Some(j) = line.find(".era1") {
212 if let Some(i) = line[..j].rfind(|c: char| !c.is_alphanumeric() && c != '-') {
213 let era = &line[i + 1..j + 5];
214 writer.write_all(era.as_bytes()).await?;
215 writer.write_all(b"\n").await?;
216 }
217 }
218 }
219 writer.flush().await?;
220
221 Ok(())
222 }
223
224 pub async fn number_to_file_name(&self, number: usize) -> eyre::Result<Option<String>> {
226 let path = self.folder.to_path_buf().join("index");
227 let file = File::open(&path).await?;
228 let reader = io::BufReader::new(file);
229 let mut lines = reader.lines();
230 for _ in 0..number {
231 lines.next_line().await?;
232 }
233
234 Ok(lines.next_line().await?)
235 }
236
237 async fn is_downloaded(&self, name: &str, path: impl AsRef<Path>) -> eyre::Result<bool> {
238 let path = path.as_ref();
239
240 match File::open(path).await {
241 Ok(file) => {
242 let number = self
243 .file_name_to_number(name)
244 .ok_or_else(|| eyre!("Cannot parse ERA number from {name}"))?;
245
246 let actual_checksum = checksum(file).await?;
247 let is_verified = self.verify_checksum(number, actual_checksum).await?;
248
249 if !is_verified {
250 fs::remove_file(path).await?;
251 }
252
253 Ok(is_verified)
254 }
255 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(false),
256 Err(e) => Err(e)?,
257 }
258 }
259
260 async fn verify_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<bool> {
265 Ok(actual_checksum == self.expected_checksum(number).await?)
266 }
267
268 async fn assert_checksum(&self, number: usize, actual_checksum: Vec<u8>) -> eyre::Result<()> {
273 let expected_checksum = self.expected_checksum(number).await?;
274
275 if actual_checksum == expected_checksum {
276 Ok(())
277 } else {
278 Err(eyre!(
279 "Checksum mismatch, got: {}, expected: {}",
280 actual_checksum.encode_hex(),
281 expected_checksum.encode_hex()
282 ))
283 }
284 }
285
286 async fn expected_checksum(&self, number: usize) -> eyre::Result<Vec<u8>> {
290 let file = File::open(self.folder.join(Self::CHECKSUMS)).await?;
291 let reader = io::BufReader::new(file);
292 let mut lines = reader.lines();
293
294 for _ in 0..number {
295 lines.next_line().await?;
296 }
297 let expected_checksum =
298 lines.next_line().await?.ok_or_else(|| eyre!("Missing hash for number {number}"))?;
299 let expected_checksum = hex::decode(expected_checksum)?;
300
301 Ok(expected_checksum)
302 }
303
304 fn file_name_to_number(&self, file_name: &str) -> Option<usize> {
305 file_name.split('-').nth(1).and_then(|v| usize::from_str(v).ok())
306 }
307}
308
309async fn checksum(mut reader: impl AsyncRead + Unpin) -> eyre::Result<Vec<u8>> {
310 let mut hasher = Sha256::new();
311
312 let mut data = vec![0; 64 * 1024];
314
315 loop {
316 let len = reader.read(&mut data).await?;
318 if len == 0 {
319 break;
320 } hasher.update(&data[..len]);
324 }
325
326 let hash = hasher.finalize().to_vec();
328
329 Ok(hash)
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335 use std::path::PathBuf;
336 use test_case::test_case;
337
338 impl EraClient<Client> {
339 fn empty() -> Self {
340 Self::new(Client::new(), Url::from_str("file:///").unwrap(), PathBuf::new())
341 }
342 }
343
344 #[test_case("mainnet-00600-a81ae85f.era1", Some(600))]
345 #[test_case("mainnet-00000-a81ae85f.era1", Some(0))]
346 #[test_case("00000-a81ae85f.era1", None)]
347 #[test_case("", None)]
348 fn test_file_name_to_number(file_name: &str, expected_number: Option<usize>) {
349 let client = EraClient::empty();
350
351 let actual_number = client.file_name_to_number(file_name);
352
353 assert_eq!(actual_number, expected_number);
354 }
355}