reth_era_downloader/
fs.rs

1use crate::{EraMeta, BLOCKS_PER_FILE};
2use alloy_primitives::{hex, hex::ToHexExt, BlockNumber};
3use eyre::{eyre, OptionExt};
4use futures_util::{stream, Stream};
5use reth_fs_util as fs;
6use sha2::{Digest, Sha256};
7use std::{fmt::Debug, io, io::BufRead, path::Path, str::FromStr};
8
9/// Creates a new ordered asynchronous [`Stream`] of ERA1 files read from `dir`.
10pub fn read_dir(
11    dir: impl AsRef<Path> + Send + Sync + 'static,
12    start_from: BlockNumber,
13) -> eyre::Result<impl Stream<Item = eyre::Result<EraLocalMeta>> + Send + Sync + 'static + Unpin> {
14    let mut checksums = None;
15    let mut entries = fs::read_dir(dir)?
16        .filter_map(|entry| {
17            (|| {
18                let path = entry?.path();
19
20                if path.extension() == Some("era1".as_ref()) {
21                    if let Some(last) = path.components().next_back() {
22                        let str = last.as_os_str().to_string_lossy().to_string();
23                        let parts = str.split('-').collect::<Vec<_>>();
24
25                        if parts.len() == 3 {
26                            let number = usize::from_str(parts[1])?;
27
28                            return Ok(Some((number, path.into_boxed_path())));
29                        }
30                    }
31                }
32                if path.file_name() == Some("checksums.txt".as_ref()) {
33                    let file = fs::open(path)?;
34                    let reader = io::BufReader::new(file);
35                    let lines = reader.lines();
36                    checksums = Some(lines);
37                }
38
39                Ok(None)
40            })()
41            .transpose()
42        })
43        .collect::<eyre::Result<Vec<_>>>()?;
44    let mut checksums = checksums.ok_or_eyre("Missing file `checksums.txt` in the `dir`")?;
45
46    entries.sort_by(|(left, _), (right, _)| left.cmp(right));
47
48    Ok(stream::iter(entries.into_iter().skip(start_from as usize / BLOCKS_PER_FILE).map(
49        move |(_, path)| {
50            let expected_checksum =
51                checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
52            let expected_checksum = hex::decode(expected_checksum)?;
53
54            let mut hasher = Sha256::new();
55            let mut reader = io::BufReader::new(fs::open(&path)?);
56
57            io::copy(&mut reader, &mut hasher)?;
58            let actual_checksum = hasher.finalize().to_vec();
59
60            if actual_checksum != expected_checksum {
61                return Err(eyre!(
62                    "Checksum mismatch, got: {}, expected: {}",
63                    actual_checksum.encode_hex(),
64                    expected_checksum.encode_hex()
65                ));
66            }
67
68            Ok(EraLocalMeta::new(path))
69        },
70    )))
71}
72
73/// Contains information about an ERA file that is on the local file-system and is read-only.
74#[derive(Debug)]
75pub struct EraLocalMeta {
76    path: Box<Path>,
77}
78
79impl EraLocalMeta {
80    const fn new(path: Box<Path>) -> Self {
81        Self { path }
82    }
83}
84
85impl<T: AsRef<Path>> PartialEq<T> for EraLocalMeta {
86    fn eq(&self, other: &T) -> bool {
87        self.as_ref().eq(other.as_ref())
88    }
89}
90
91impl AsRef<Path> for EraLocalMeta {
92    fn as_ref(&self) -> &Path {
93        self.path.as_ref()
94    }
95}
96
97impl EraMeta for EraLocalMeta {
98    /// A no-op.
99    fn mark_as_processed(&self) -> eyre::Result<()> {
100        Ok(())
101    }
102
103    fn path(&self) -> &Path {
104        &self.path
105    }
106}