Skip to main content

reth_era_downloader/
fs.rs

1use crate::{EraMeta, BLOCKS_PER_FILE};
2use alloy_primitives::{hex, hex::ToHexExt, BlockNumber};
3use eyre::{eyre, OptionExt};
4use futures_util::{stream, Stream};
5use reth_era::common::file_ops::EraFileType;
6use reth_fs_util as fs;
7use sha2::{Digest, Sha256};
8use std::{fmt::Debug, io, io::BufRead, path::Path, str::FromStr};
9
10/// Creates a new ordered asynchronous [`Stream`] of ERA1 files read from `dir`.
11pub fn read_dir(
12    dir: impl AsRef<Path> + Send + Sync + 'static,
13    start_from: BlockNumber,
14) -> eyre::Result<impl Stream<Item = eyre::Result<EraLocalMeta>> + Send + Sync + 'static + Unpin> {
15    let mut checksums = None;
16
17    // read all the files in the given dir and also read the checksums file
18    let mut entries = fs::read_dir(dir)?
19        .filter_map(|entry| {
20            (|| {
21                let path = entry?.path();
22
23                if let Some(name) = path.file_name().and_then(|name| name.to_str()) &&
24                    matches!(
25                        EraFileType::from_filename(name),
26                        Some(EraFileType::Era1 | EraFileType::Ere)
27                    )
28                {
29                    let parts = name.split('-').collect::<Vec<_>>();
30
31                    if parts.len() == 3 {
32                        let number = usize::from_str(parts[1])?;
33
34                        return Ok(Some((number, path.into_boxed_path())));
35                    }
36                }
37
38                if path.file_name() == Some("checksums.txt".as_ref()) {
39                    let file = fs::open(path)?;
40                    let reader = io::BufReader::new(file);
41                    let lines = reader.lines();
42                    checksums = Some(lines);
43                }
44
45                Ok(None)
46            })()
47            .transpose()
48        })
49        .collect::<eyre::Result<Vec<_>>>()?;
50    let mut checksums = checksums.ok_or_eyre("Missing file `checksums.txt` in the `dir`")?;
51
52    let start_index = start_from as usize / BLOCKS_PER_FILE;
53    for _ in 0..start_index {
54        // skip the first entries in the checksums iterator so that both iters align
55        checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
56    }
57
58    entries.sort_by_key(|(left, _)| *left);
59
60    Ok(stream::iter(entries.into_iter().skip_while(move |(n, _)| *n < start_index).map(
61        move |(_, path)| {
62            let expected_checksum =
63                checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
64            let expected_checksum = hex::decode(expected_checksum)?;
65
66            let mut hasher = Sha256::new();
67            let mut reader = io::BufReader::new(fs::open(&path)?);
68
69            io::copy(&mut reader, &mut hasher)?;
70            let actual_checksum = hasher.finalize().to_vec();
71
72            if actual_checksum != expected_checksum {
73                return Err(eyre!(
74                    "Checksum mismatch, got: {}, expected: {}",
75                    actual_checksum.encode_hex(),
76                    expected_checksum.encode_hex()
77                ));
78            }
79
80            Ok(EraLocalMeta::new(path))
81        },
82    )))
83}
84
85/// Contains information about an ERA file that is on the local file-system and is read-only.
86#[derive(Debug)]
87pub struct EraLocalMeta {
88    path: Box<Path>,
89}
90
91impl EraLocalMeta {
92    const fn new(path: Box<Path>) -> Self {
93        Self { path }
94    }
95}
96
97impl<T: AsRef<Path>> PartialEq<T> for EraLocalMeta {
98    fn eq(&self, other: &T) -> bool {
99        self.as_ref().eq(other.as_ref())
100    }
101}
102
103impl AsRef<Path> for EraLocalMeta {
104    fn as_ref(&self) -> &Path {
105        self.path.as_ref()
106    }
107}
108
109impl EraMeta for EraLocalMeta {
110    /// A no-op.
111    fn mark_as_processed(&self) -> eyre::Result<()> {
112        Ok(())
113    }
114
115    fn path(&self) -> &Path {
116        &self.path
117    }
118}