reth_era_downloader/
stream.rs

1use crate::{client::HttpClient, EraClient, BLOCKS_PER_FILE};
2use alloy_primitives::BlockNumber;
3use futures_util::{stream::FuturesOrdered, FutureExt, Stream, StreamExt};
4use reqwest::Url;
5use reth_fs_util as fs;
6use std::{
7    collections::VecDeque,
8    fmt::{Debug, Formatter},
9    future::Future,
10    path::Path,
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15/// Parameters that alter the behavior of [`EraStream`].
16///
17/// # Examples
18/// ```
19/// use reth_era_downloader::EraStreamConfig;
20///
21/// EraStreamConfig::default().with_max_files(10).with_max_concurrent_downloads(2);
22/// ```
23#[derive(Debug, Clone)]
24pub struct EraStreamConfig {
25    max_files: usize,
26    max_concurrent_downloads: usize,
27    start_from: Option<usize>,
28}
29
30impl Default for EraStreamConfig {
31    fn default() -> Self {
32        Self { max_files: 5, max_concurrent_downloads: 3, start_from: None }
33    }
34}
35
36impl EraStreamConfig {
37    /// The maximum amount of downloaded ERA1 files kept in the download directory.
38    pub const fn with_max_files(mut self, max_files: usize) -> Self {
39        self.max_files = max_files;
40        self
41    }
42
43    /// The maximum amount of downloads happening at the same time.
44    pub const fn with_max_concurrent_downloads(mut self, max_concurrent_downloads: usize) -> Self {
45        self.max_concurrent_downloads = max_concurrent_downloads;
46        self
47    }
48
49    /// Overrides the starting ERA file index to be the first one that contains `block_number`.
50    pub const fn start_from(mut self, block_number: BlockNumber) -> Self {
51        self.start_from.replace(block_number as usize / BLOCKS_PER_FILE);
52        self
53    }
54}
55
56/// An asynchronous stream of ERA1 files.
57///
58/// # Examples
59/// ```
60/// use futures_util::StreamExt;
61/// use reth_era_downloader::{EraMeta, EraStream, HttpClient};
62///
63/// # async fn import(mut stream: EraStream<impl HttpClient + Clone + Send + Sync + 'static + Unpin>) -> eyre::Result<()> {
64/// while let Some(meta) = stream.next().await {
65///     let meta = meta?;
66///     // Process file at `meta.path(): &Path`
67///     meta.mark_as_processed()?;
68/// }
69/// # Ok(())
70/// # }
71/// ```
72#[derive(Debug)]
73pub struct EraStream<Http> {
74    download_stream: DownloadStream,
75    starting_stream: StartingStream<Http>,
76}
77
78impl<Http> EraStream<Http> {
79    /// Constructs a new [`EraStream`] that downloads concurrently up to `max_concurrent_downloads`
80    /// ERA1 files to `client` `folder`, keeping their count up to `max_files`.
81    pub fn new(client: EraClient<Http>, config: EraStreamConfig) -> Self {
82        Self {
83            download_stream: DownloadStream {
84                downloads: Default::default(),
85                scheduled: Default::default(),
86                max_concurrent_downloads: config.max_concurrent_downloads,
87                ended: false,
88            },
89            starting_stream: StartingStream {
90                client,
91                files_count: Box::pin(async move { usize::MAX }),
92                next_url: Box::pin(async move { Ok(None) }),
93                delete_outside_range: Box::pin(async move { Ok(()) }),
94                recover_index: Box::pin(async move { None }),
95                fetch_file_list: Box::pin(async move { Ok(()) }),
96                state: Default::default(),
97                max_files: config.max_files,
98                index: config.start_from.unwrap_or_default(),
99                last: None,
100                downloading: 0,
101            },
102        }
103    }
104}
105
106/// Contains information about an ERA file.
107pub trait EraMeta: Debug {
108    /// Marking this particular ERA file as "processed" lets the caller hint that it is no longer
109    /// going to be using it.
110    ///
111    /// The meaning of that is up to the implementation. The caller should assume that after this
112    /// point is no longer possible to safely read it.
113    fn mark_as_processed(&self) -> eyre::Result<()>;
114
115    /// A path to the era file.
116    ///
117    /// File should be openable and treated as read-only.
118    fn path(&self) -> &Path;
119}
120
121impl<T: EraMeta> EraMeta for Box<T> {
122    fn mark_as_processed(&self) -> eyre::Result<()> {
123        T::mark_as_processed(self)
124    }
125
126    fn path(&self) -> &Path {
127        T::path(self)
128    }
129}
130
131/// Contains information about ERA file that is hosted remotely and represented by a temporary
132/// local file.
133#[derive(Debug)]
134pub struct EraRemoteMeta {
135    path: Box<Path>,
136}
137
138impl EraRemoteMeta {
139    const fn new(path: Box<Path>) -> Self {
140        Self { path }
141    }
142}
143
144impl AsRef<Path> for EraRemoteMeta {
145    fn as_ref(&self) -> &Path {
146        self.path.as_ref()
147    }
148}
149
150impl EraMeta for EraRemoteMeta {
151    /// Removes a temporary local file representation of the remotely hosted original.
152    fn mark_as_processed(&self) -> eyre::Result<()> {
153        Ok(fs::remove_file(&self.path)?)
154    }
155
156    fn path(&self) -> &Path {
157        &self.path
158    }
159}
160
161impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for EraStream<Http> {
162    type Item = eyre::Result<EraRemoteMeta>;
163
164    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165        if let Poll::Ready(fut) = self.starting_stream.poll_next_unpin(cx) {
166            if let Some(fut) = fut {
167                self.download_stream.scheduled.push_back(fut);
168            } else {
169                self.download_stream.ended = true;
170            }
171        }
172
173        let poll = self.download_stream.poll_next_unpin(cx);
174
175        if poll.is_ready() {
176            self.starting_stream.downloaded();
177        }
178
179        poll
180    }
181}
182
183type DownloadFuture =
184    Pin<Box<dyn Future<Output = eyre::Result<EraRemoteMeta>> + Send + Sync + 'static>>;
185
186struct DownloadStream {
187    downloads: FuturesOrdered<DownloadFuture>,
188    scheduled: VecDeque<DownloadFuture>,
189    max_concurrent_downloads: usize,
190    ended: bool,
191}
192
193impl Debug for DownloadStream {
194    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
195        write!(f, "DownloadStream({})", self.downloads.len())
196    }
197}
198
199impl Stream for DownloadStream {
200    type Item = eyre::Result<EraRemoteMeta>;
201
202    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203        for _ in 0..self.max_concurrent_downloads - self.downloads.len() {
204            if let Some(fut) = self.scheduled.pop_front() {
205                self.downloads.push_back(fut);
206            }
207        }
208
209        let ended = self.ended;
210        let poll = self.downloads.poll_next_unpin(cx);
211
212        if matches!(poll, Poll::Ready(None)) && !ended {
213            cx.waker().wake_by_ref();
214            return Poll::Pending;
215        }
216
217        poll
218    }
219}
220
221struct StartingStream<Http> {
222    client: EraClient<Http>,
223    files_count: Pin<Box<dyn Future<Output = usize> + Send + Sync + 'static>>,
224    next_url: Pin<Box<dyn Future<Output = eyre::Result<Option<Url>>> + Send + Sync + 'static>>,
225    delete_outside_range: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
226    recover_index: Pin<Box<dyn Future<Output = Option<usize>> + Send + Sync + 'static>>,
227    fetch_file_list: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
228    state: State,
229    max_files: usize,
230    index: usize,
231    last: Option<usize>,
232    downloading: usize,
233}
234
235impl<Http> Debug for StartingStream<Http> {
236    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
237        write!(
238            f,
239            "StartingStream{{ max_files: {}, index: {}, downloading: {} }}",
240            self.max_files, self.index, self.downloading
241        )
242    }
243}
244
245#[derive(Debug, PartialEq, Default)]
246enum State {
247    #[default]
248    Initial,
249    FetchFileList,
250    DeleteOutsideRange,
251    RecoverIndex,
252    CountFiles,
253    Missing(usize),
254    NextUrl(usize),
255}
256
257impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for StartingStream<Http> {
258    type Item = DownloadFuture;
259
260    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261        if self.state == State::Initial {
262            self.fetch_file_list();
263        }
264
265        if self.state == State::FetchFileList {
266            if let Poll::Ready(result) = self.fetch_file_list.poll_unpin(cx) {
267                match result {
268                    Ok(_) => self.delete_outside_range(),
269                    Err(e) => {
270                        self.fetch_file_list();
271
272                        return Poll::Ready(Some(Box::pin(async move { Err(e) })));
273                    }
274                }
275            }
276        }
277
278        if self.state == State::DeleteOutsideRange {
279            if let Poll::Ready(result) = self.delete_outside_range.poll_unpin(cx) {
280                match result {
281                    Ok(_) => self.recover_index(),
282                    Err(e) => {
283                        self.delete_outside_range();
284
285                        return Poll::Ready(Some(Box::pin(async move { Err(e) })));
286                    }
287                }
288            }
289        }
290
291        if self.state == State::RecoverIndex {
292            if let Poll::Ready(last) = self.recover_index.poll_unpin(cx) {
293                self.last = last;
294                self.count_files();
295            }
296        }
297
298        if self.state == State::CountFiles {
299            if let Poll::Ready(downloaded) = self.files_count.poll_unpin(cx) {
300                let max_missing = self
301                    .max_files
302                    .saturating_sub(downloaded + self.downloading)
303                    .max(self.last.unwrap_or_default().saturating_sub(self.index));
304                self.state = State::Missing(max_missing);
305            }
306        }
307
308        if let State::Missing(max_missing) = self.state {
309            if max_missing > 0 {
310                let index = self.index;
311                self.index += 1;
312                self.downloading += 1;
313                self.next_url(index, max_missing);
314            } else {
315                self.count_files();
316            }
317        }
318
319        if let State::NextUrl(max_missing) = self.state {
320            if let Poll::Ready(url) = self.next_url.poll_unpin(cx) {
321                self.state = State::Missing(max_missing - 1);
322
323                return Poll::Ready(url.transpose().map(|url| -> DownloadFuture {
324                    let mut client = self.client.clone();
325
326                    Box::pin(
327                        async move { client.download_to_file(url?).await.map(EraRemoteMeta::new) },
328                    )
329                }));
330            }
331        }
332
333        Poll::Pending
334    }
335}
336
337impl<Http> StartingStream<Http> {
338    const fn downloaded(&mut self) {
339        self.downloading = self.downloading.saturating_sub(1);
340    }
341}
342
343impl<Http: HttpClient + Clone + Send + Sync + 'static> StartingStream<Http> {
344    fn fetch_file_list(&mut self) {
345        let client = self.client.clone();
346
347        Pin::new(&mut self.fetch_file_list)
348            .set(Box::pin(async move { client.fetch_file_list().await }));
349
350        self.state = State::FetchFileList;
351    }
352
353    fn delete_outside_range(&mut self) {
354        let index = self.index;
355        let max_files = self.max_files;
356        let client = self.client.clone();
357
358        Pin::new(&mut self.delete_outside_range)
359            .set(Box::pin(async move { client.delete_outside_range(index, max_files).await }));
360
361        self.state = State::DeleteOutsideRange;
362    }
363
364    fn recover_index(&mut self) {
365        let client = self.client.clone();
366
367        Pin::new(&mut self.recover_index)
368            .set(Box::pin(async move { client.recover_index().await }));
369
370        self.state = State::RecoverIndex;
371    }
372
373    fn count_files(&mut self) {
374        let client = self.client.clone();
375
376        Pin::new(&mut self.files_count).set(Box::pin(async move { client.files_count().await }));
377
378        self.state = State::CountFiles;
379    }
380
381    fn next_url(&mut self, index: usize, max_missing: usize) {
382        let client = self.client.clone();
383
384        Pin::new(&mut self.next_url).set(Box::pin(async move { client.url(index).await }));
385
386        self.state = State::NextUrl(max_missing);
387    }
388}