1use crate::{client::HttpClient, EraClient, BLOCKS_PER_FILE};
2use alloy_primitives::BlockNumber;
3use futures_util::{stream::FuturesOrdered, FutureExt, Stream, StreamExt};
4use reqwest::Url;
5use reth_fs_util as fs;
6use std::{
7 collections::VecDeque,
8 fmt::{Debug, Formatter},
9 future::Future,
10 path::Path,
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15#[derive(Debug, Clone)]
24pub struct EraStreamConfig {
25 max_files: usize,
26 max_concurrent_downloads: usize,
27 start_from: Option<usize>,
28}
29
30impl Default for EraStreamConfig {
31 fn default() -> Self {
32 Self { max_files: 5, max_concurrent_downloads: 3, start_from: None }
33 }
34}
35
36impl EraStreamConfig {
37 pub const fn with_max_files(mut self, max_files: usize) -> Self {
39 self.max_files = max_files;
40 self
41 }
42
43 pub const fn with_max_concurrent_downloads(mut self, max_concurrent_downloads: usize) -> Self {
45 self.max_concurrent_downloads = max_concurrent_downloads;
46 self
47 }
48
49 pub const fn start_from(mut self, block_number: BlockNumber) -> Self {
51 self.start_from.replace(block_number as usize / BLOCKS_PER_FILE);
52 self
53 }
54}
55
56#[derive(Debug)]
73pub struct EraStream<Http> {
74 download_stream: DownloadStream,
75 starting_stream: StartingStream<Http>,
76}
77
78impl<Http> EraStream<Http> {
79 pub fn new(client: EraClient<Http>, config: EraStreamConfig) -> Self {
82 Self {
83 download_stream: DownloadStream {
84 downloads: Default::default(),
85 scheduled: Default::default(),
86 max_concurrent_downloads: config.max_concurrent_downloads,
87 ended: false,
88 },
89 starting_stream: StartingStream {
90 client,
91 files_count: Box::pin(async move { usize::MAX }),
92 next_url: Box::pin(async move { Ok(None) }),
93 delete_outside_range: Box::pin(async move { Ok(()) }),
94 recover_index: Box::pin(async move { None }),
95 fetch_file_list: Box::pin(async move { Ok(()) }),
96 state: Default::default(),
97 max_files: config.max_files,
98 index: config.start_from.unwrap_or_default(),
99 last: None,
100 downloading: 0,
101 },
102 }
103 }
104}
105
106pub trait EraMeta: Debug {
108 fn mark_as_processed(&self) -> eyre::Result<()>;
114
115 fn path(&self) -> &Path;
119}
120
121impl<T: EraMeta> EraMeta for Box<T> {
122 fn mark_as_processed(&self) -> eyre::Result<()> {
123 T::mark_as_processed(self)
124 }
125
126 fn path(&self) -> &Path {
127 T::path(self)
128 }
129}
130
131#[derive(Debug)]
134pub struct EraRemoteMeta {
135 path: Box<Path>,
136}
137
138impl EraRemoteMeta {
139 const fn new(path: Box<Path>) -> Self {
140 Self { path }
141 }
142}
143
144impl AsRef<Path> for EraRemoteMeta {
145 fn as_ref(&self) -> &Path {
146 self.path.as_ref()
147 }
148}
149
150impl EraMeta for EraRemoteMeta {
151 fn mark_as_processed(&self) -> eyre::Result<()> {
153 Ok(fs::remove_file(&self.path)?)
154 }
155
156 fn path(&self) -> &Path {
157 &self.path
158 }
159}
160
161impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for EraStream<Http> {
162 type Item = eyre::Result<EraRemoteMeta>;
163
164 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165 if let Poll::Ready(fut) = self.starting_stream.poll_next_unpin(cx) {
166 if let Some(fut) = fut {
167 self.download_stream.scheduled.push_back(fut);
168 } else {
169 self.download_stream.ended = true;
170 }
171 }
172
173 let poll = self.download_stream.poll_next_unpin(cx);
174
175 if poll.is_ready() {
176 self.starting_stream.downloaded();
177 }
178
179 poll
180 }
181}
182
183type DownloadFuture =
184 Pin<Box<dyn Future<Output = eyre::Result<EraRemoteMeta>> + Send + Sync + 'static>>;
185
186struct DownloadStream {
187 downloads: FuturesOrdered<DownloadFuture>,
188 scheduled: VecDeque<DownloadFuture>,
189 max_concurrent_downloads: usize,
190 ended: bool,
191}
192
193impl Debug for DownloadStream {
194 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
195 write!(f, "DownloadStream({})", self.downloads.len())
196 }
197}
198
199impl Stream for DownloadStream {
200 type Item = eyre::Result<EraRemoteMeta>;
201
202 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203 for _ in 0..self.max_concurrent_downloads - self.downloads.len() {
204 if let Some(fut) = self.scheduled.pop_front() {
205 self.downloads.push_back(fut);
206 }
207 }
208
209 let ended = self.ended;
210 let poll = self.downloads.poll_next_unpin(cx);
211
212 if matches!(poll, Poll::Ready(None)) && !ended {
213 cx.waker().wake_by_ref();
214 return Poll::Pending;
215 }
216
217 poll
218 }
219}
220
221struct StartingStream<Http> {
222 client: EraClient<Http>,
223 files_count: Pin<Box<dyn Future<Output = usize> + Send + Sync + 'static>>,
224 next_url: Pin<Box<dyn Future<Output = eyre::Result<Option<Url>>> + Send + Sync + 'static>>,
225 delete_outside_range: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
226 recover_index: Pin<Box<dyn Future<Output = Option<usize>> + Send + Sync + 'static>>,
227 fetch_file_list: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
228 state: State,
229 max_files: usize,
230 index: usize,
231 last: Option<usize>,
232 downloading: usize,
233}
234
235impl<Http> Debug for StartingStream<Http> {
236 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
237 write!(
238 f,
239 "StartingStream{{ max_files: {}, index: {}, downloading: {} }}",
240 self.max_files, self.index, self.downloading
241 )
242 }
243}
244
245#[derive(Debug, PartialEq, Default)]
246enum State {
247 #[default]
248 Initial,
249 FetchFileList,
250 DeleteOutsideRange,
251 RecoverIndex,
252 CountFiles,
253 Missing(usize),
254 NextUrl(usize),
255}
256
257impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for StartingStream<Http> {
258 type Item = DownloadFuture;
259
260 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261 if self.state == State::Initial {
262 self.fetch_file_list();
263 }
264
265 if self.state == State::FetchFileList &&
266 let Poll::Ready(result) = self.fetch_file_list.poll_unpin(cx)
267 {
268 match result {
269 Ok(_) => self.delete_outside_range(),
270 Err(e) => {
271 self.fetch_file_list();
272
273 return Poll::Ready(Some(Box::pin(async move { Err(e) })));
274 }
275 }
276 }
277
278 if self.state == State::DeleteOutsideRange &&
279 let Poll::Ready(result) = self.delete_outside_range.poll_unpin(cx)
280 {
281 match result {
282 Ok(_) => self.recover_index(),
283 Err(e) => {
284 self.delete_outside_range();
285
286 return Poll::Ready(Some(Box::pin(async move { Err(e) })));
287 }
288 }
289 }
290
291 if self.state == State::RecoverIndex &&
292 let Poll::Ready(last) = self.recover_index.poll_unpin(cx)
293 {
294 self.last = last;
295 self.count_files();
296 }
297
298 if self.state == State::CountFiles &&
299 let Poll::Ready(downloaded) = self.files_count.poll_unpin(cx)
300 {
301 let max_missing = self
302 .max_files
303 .saturating_sub(downloaded + self.downloading)
304 .max(self.last.unwrap_or_default().saturating_sub(self.index));
305 self.state = State::Missing(max_missing);
306 }
307
308 if let State::Missing(max_missing) = self.state {
309 if max_missing > 0 {
310 let index = self.index;
311 self.index += 1;
312 self.downloading += 1;
313 self.next_url(index, max_missing);
314 } else {
315 self.count_files();
316 }
317 }
318
319 if let State::NextUrl(max_missing) = self.state &&
320 let Poll::Ready(url) = self.next_url.poll_unpin(cx)
321 {
322 self.state = State::Missing(max_missing - 1);
323
324 return Poll::Ready(url.transpose().map(|url| -> DownloadFuture {
325 let mut client = self.client.clone();
326
327 Box::pin(async move { client.download_to_file(url?).await.map(EraRemoteMeta::new) })
328 }));
329 }
330
331 Poll::Pending
332 }
333}
334
335impl<Http> StartingStream<Http> {
336 const fn downloaded(&mut self) {
337 self.downloading = self.downloading.saturating_sub(1);
338 }
339}
340
341impl<Http: HttpClient + Clone + Send + Sync + 'static> StartingStream<Http> {
342 fn fetch_file_list(&mut self) {
343 let client = self.client.clone();
344
345 Pin::new(&mut self.fetch_file_list)
346 .set(Box::pin(async move { client.fetch_file_list().await }));
347
348 self.state = State::FetchFileList;
349 }
350
351 fn delete_outside_range(&mut self) {
352 let index = self.index;
353 let max_files = self.max_files;
354 let client = self.client.clone();
355
356 Pin::new(&mut self.delete_outside_range)
357 .set(Box::pin(async move { client.delete_outside_range(index, max_files).await }));
358
359 self.state = State::DeleteOutsideRange;
360 }
361
362 fn recover_index(&mut self) {
363 let client = self.client.clone();
364
365 Pin::new(&mut self.recover_index)
366 .set(Box::pin(async move { client.recover_index().await }));
367
368 self.state = State::RecoverIndex;
369 }
370
371 fn count_files(&mut self) {
372 let client = self.client.clone();
373
374 Pin::new(&mut self.files_count).set(Box::pin(async move { client.files_count().await }));
375
376 self.state = State::CountFiles;
377 }
378
379 fn next_url(&mut self, index: usize, max_missing: usize) {
380 let client = self.client.clone();
381
382 Pin::new(&mut self.next_url).set(Box::pin(async move { client.url(index).await }));
383
384 self.state = State::NextUrl(max_missing);
385 }
386}