reth_era_downloader/
fs.rs

1use crate::{EraMeta, BLOCKS_PER_FILE};
2use alloy_primitives::{hex, hex::ToHexExt, BlockNumber};
3use eyre::{eyre, OptionExt};
4use futures_util::{stream, Stream};
5use reth_fs_util as fs;
6use sha2::{Digest, Sha256};
7use std::{fmt::Debug, io, io::BufRead, path::Path, str::FromStr};
8
9/// Creates a new ordered asynchronous [`Stream`] of ERA1 files read from `dir`.
10pub fn read_dir(
11    dir: impl AsRef<Path> + Send + Sync + 'static,
12    start_from: BlockNumber,
13) -> eyre::Result<impl Stream<Item = eyre::Result<EraLocalMeta>> + Send + Sync + 'static + Unpin> {
14    let mut checksums = None;
15
16    // read all the files in the given dir and also read the checksums file
17    let mut entries = fs::read_dir(dir)?
18        .filter_map(|entry| {
19            (|| {
20                let path = entry?.path();
21
22                if path.extension() == Some("era1".as_ref()) &&
23                    let Some(last) = path.components().next_back()
24                {
25                    let str = last.as_os_str().to_string_lossy().to_string();
26                    let parts = str.split('-').collect::<Vec<_>>();
27
28                    if parts.len() == 3 {
29                        let number = usize::from_str(parts[1])?;
30
31                        return Ok(Some((number, path.into_boxed_path())));
32                    }
33                }
34
35                if path.file_name() == Some("checksums.txt".as_ref()) {
36                    let file = fs::open(path)?;
37                    let reader = io::BufReader::new(file);
38                    let lines = reader.lines();
39                    checksums = Some(lines);
40                }
41
42                Ok(None)
43            })()
44            .transpose()
45        })
46        .collect::<eyre::Result<Vec<_>>>()?;
47    let mut checksums = checksums.ok_or_eyre("Missing file `checksums.txt` in the `dir`")?;
48
49    let start_index = start_from as usize / BLOCKS_PER_FILE;
50    for _ in 0..start_index {
51        // skip the first entries in the checksums iterator so that both iters align
52        checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
53    }
54
55    entries.sort_by(|(left, _), (right, _)| left.cmp(right));
56
57    Ok(stream::iter(entries.into_iter().skip_while(move |(n, _)| *n < start_index).map(
58        move |(_, path)| {
59            let expected_checksum =
60                checksums.next().transpose()?.ok_or_eyre("Got less checksums than ERA files")?;
61            let expected_checksum = hex::decode(expected_checksum)?;
62
63            let mut hasher = Sha256::new();
64            let mut reader = io::BufReader::new(fs::open(&path)?);
65
66            io::copy(&mut reader, &mut hasher)?;
67            let actual_checksum = hasher.finalize().to_vec();
68
69            if actual_checksum != expected_checksum {
70                return Err(eyre!(
71                    "Checksum mismatch, got: {}, expected: {}",
72                    actual_checksum.encode_hex(),
73                    expected_checksum.encode_hex()
74                ));
75            }
76
77            Ok(EraLocalMeta::new(path))
78        },
79    )))
80}
81
82/// Contains information about an ERA file that is on the local file-system and is read-only.
83#[derive(Debug)]
84pub struct EraLocalMeta {
85    path: Box<Path>,
86}
87
88impl EraLocalMeta {
89    const fn new(path: Box<Path>) -> Self {
90        Self { path }
91    }
92}
93
94impl<T: AsRef<Path>> PartialEq<T> for EraLocalMeta {
95    fn eq(&self, other: &T) -> bool {
96        self.as_ref().eq(other.as_ref())
97    }
98}
99
100impl AsRef<Path> for EraLocalMeta {
101    fn as_ref(&self) -> &Path {
102        self.path.as_ref()
103    }
104}
105
106impl EraMeta for EraLocalMeta {
107    /// A no-op.
108    fn mark_as_processed(&self) -> eyre::Result<()> {
109        Ok(())
110    }
111
112    fn path(&self) -> &Path {
113        &self.path
114    }
115}