1use crate::{client::HttpClient, EraClient, BLOCKS_PER_FILE};
2use alloy_primitives::BlockNumber;
3use futures_util::{stream::FuturesOrdered, FutureExt, Stream, StreamExt};
4use reqwest::Url;
5use reth_fs_util as fs;
6use std::{
7 collections::VecDeque,
8 fmt::{Debug, Formatter},
9 future::Future,
10 path::Path,
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15#[derive(Debug, Clone)]
24pub struct EraStreamConfig {
25 max_files: usize,
26 max_concurrent_downloads: usize,
27 start_from: Option<usize>,
28}
29
30impl Default for EraStreamConfig {
31 fn default() -> Self {
32 Self { max_files: 5, max_concurrent_downloads: 3, start_from: None }
33 }
34}
35
36impl EraStreamConfig {
37 pub const fn with_max_files(mut self, max_files: usize) -> Self {
39 self.max_files = max_files;
40 self
41 }
42
43 pub const fn with_max_concurrent_downloads(mut self, max_concurrent_downloads: usize) -> Self {
45 self.max_concurrent_downloads = max_concurrent_downloads;
46 self
47 }
48
49 pub const fn start_from(mut self, block_number: BlockNumber) -> Self {
51 self.start_from.replace(block_number as usize / BLOCKS_PER_FILE);
52 self
53 }
54}
55
56#[derive(Debug)]
73pub struct EraStream<Http> {
74 download_stream: DownloadStream,
75 starting_stream: StartingStream<Http>,
76}
77
78impl<Http> EraStream<Http> {
79 pub fn new(client: EraClient<Http>, config: EraStreamConfig) -> Self {
82 Self {
83 download_stream: DownloadStream {
84 downloads: Default::default(),
85 scheduled: Default::default(),
86 max_concurrent_downloads: config.max_concurrent_downloads,
87 ended: false,
88 },
89 starting_stream: StartingStream {
90 client,
91 files_count: Box::pin(async move { usize::MAX }),
92 next_url: Box::pin(async move { Ok(None) }),
93 delete_outside_range: Box::pin(async move { Ok(()) }),
94 recover_index: Box::pin(async move { None }),
95 fetch_file_list: Box::pin(async move { Ok(()) }),
96 state: Default::default(),
97 max_files: config.max_files,
98 index: config.start_from.unwrap_or_default(),
99 last: None,
100 downloading: 0,
101 },
102 }
103 }
104}
105
106pub trait EraMeta: Debug {
108 fn mark_as_processed(&self) -> eyre::Result<()>;
114
115 fn path(&self) -> &Path;
119}
120
121impl<T: EraMeta> EraMeta for Box<T> {
122 fn mark_as_processed(&self) -> eyre::Result<()> {
123 T::mark_as_processed(self)
124 }
125
126 fn path(&self) -> &Path {
127 T::path(self)
128 }
129}
130
131#[derive(Debug)]
134pub struct EraRemoteMeta {
135 path: Box<Path>,
136}
137
138impl EraRemoteMeta {
139 const fn new(path: Box<Path>) -> Self {
140 Self { path }
141 }
142}
143
144impl AsRef<Path> for EraRemoteMeta {
145 fn as_ref(&self) -> &Path {
146 self.path.as_ref()
147 }
148}
149
150impl EraMeta for EraRemoteMeta {
151 fn mark_as_processed(&self) -> eyre::Result<()> {
153 Ok(fs::remove_file(&self.path)?)
154 }
155
156 fn path(&self) -> &Path {
157 &self.path
158 }
159}
160
161impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for EraStream<Http> {
162 type Item = eyre::Result<EraRemoteMeta>;
163
164 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165 if let Poll::Ready(fut) = self.starting_stream.poll_next_unpin(cx) {
166 if let Some(fut) = fut {
167 self.download_stream.scheduled.push_back(fut);
168 } else {
169 self.download_stream.ended = true;
170 }
171 }
172
173 let poll = self.download_stream.poll_next_unpin(cx);
174
175 if poll.is_ready() {
176 self.starting_stream.downloaded();
177 }
178
179 poll
180 }
181}
182
183type DownloadFuture =
184 Pin<Box<dyn Future<Output = eyre::Result<EraRemoteMeta>> + Send + Sync + 'static>>;
185
186struct DownloadStream {
187 downloads: FuturesOrdered<DownloadFuture>,
188 scheduled: VecDeque<DownloadFuture>,
189 max_concurrent_downloads: usize,
190 ended: bool,
191}
192
193impl Debug for DownloadStream {
194 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
195 write!(f, "DownloadStream({})", self.downloads.len())
196 }
197}
198
199impl Stream for DownloadStream {
200 type Item = eyre::Result<EraRemoteMeta>;
201
202 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203 for _ in 0..self.max_concurrent_downloads - self.downloads.len() {
204 if let Some(fut) = self.scheduled.pop_front() {
205 self.downloads.push_back(fut);
206 }
207 }
208
209 let ended = self.ended;
210 let poll = self.downloads.poll_next_unpin(cx);
211
212 if matches!(poll, Poll::Ready(None)) && !ended {
213 cx.waker().wake_by_ref();
214 return Poll::Pending;
215 }
216
217 poll
218 }
219}
220
221struct StartingStream<Http> {
222 client: EraClient<Http>,
223 files_count: Pin<Box<dyn Future<Output = usize> + Send + Sync + 'static>>,
224 next_url: Pin<Box<dyn Future<Output = eyre::Result<Option<Url>>> + Send + Sync + 'static>>,
225 delete_outside_range: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
226 recover_index: Pin<Box<dyn Future<Output = Option<usize>> + Send + Sync + 'static>>,
227 fetch_file_list: Pin<Box<dyn Future<Output = eyre::Result<()>> + Send + Sync + 'static>>,
228 state: State,
229 max_files: usize,
230 index: usize,
231 last: Option<usize>,
232 downloading: usize,
233}
234
235impl<Http> Debug for StartingStream<Http> {
236 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
237 write!(
238 f,
239 "StartingStream{{ max_files: {}, index: {}, downloading: {} }}",
240 self.max_files, self.index, self.downloading
241 )
242 }
243}
244
245#[derive(Debug, PartialEq, Default)]
246enum State {
247 #[default]
248 Initial,
249 FetchFileList,
250 DeleteOutsideRange,
251 RecoverIndex,
252 CountFiles,
253 Missing(usize),
254 NextUrl(usize),
255}
256
257impl<Http: HttpClient + Clone + Send + Sync + 'static + Unpin> Stream for StartingStream<Http> {
258 type Item = DownloadFuture;
259
260 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261 if self.state == State::Initial {
262 self.fetch_file_list();
263 }
264
265 if self.state == State::FetchFileList {
266 if let Poll::Ready(result) = self.fetch_file_list.poll_unpin(cx) {
267 match result {
268 Ok(_) => self.delete_outside_range(),
269 Err(e) => {
270 self.fetch_file_list();
271
272 return Poll::Ready(Some(Box::pin(async move { Err(e) })));
273 }
274 }
275 }
276 }
277
278 if self.state == State::DeleteOutsideRange {
279 if let Poll::Ready(result) = self.delete_outside_range.poll_unpin(cx) {
280 match result {
281 Ok(_) => self.recover_index(),
282 Err(e) => {
283 self.delete_outside_range();
284
285 return Poll::Ready(Some(Box::pin(async move { Err(e) })));
286 }
287 }
288 }
289 }
290
291 if self.state == State::RecoverIndex {
292 if let Poll::Ready(last) = self.recover_index.poll_unpin(cx) {
293 self.last = last;
294 self.count_files();
295 }
296 }
297
298 if self.state == State::CountFiles {
299 if let Poll::Ready(downloaded) = self.files_count.poll_unpin(cx) {
300 let max_missing = self
301 .max_files
302 .saturating_sub(downloaded + self.downloading)
303 .max(self.last.unwrap_or_default().saturating_sub(self.index));
304 self.state = State::Missing(max_missing);
305 }
306 }
307
308 if let State::Missing(max_missing) = self.state {
309 if max_missing > 0 {
310 let index = self.index;
311 self.index += 1;
312 self.downloading += 1;
313 self.next_url(index, max_missing);
314 } else {
315 self.count_files();
316 }
317 }
318
319 if let State::NextUrl(max_missing) = self.state {
320 if let Poll::Ready(url) = self.next_url.poll_unpin(cx) {
321 self.state = State::Missing(max_missing - 1);
322
323 return Poll::Ready(url.transpose().map(|url| -> DownloadFuture {
324 let mut client = self.client.clone();
325
326 Box::pin(
327 async move { client.download_to_file(url?).await.map(EraRemoteMeta::new) },
328 )
329 }));
330 }
331 }
332
333 Poll::Pending
334 }
335}
336
337impl<Http> StartingStream<Http> {
338 const fn downloaded(&mut self) {
339 self.downloading = self.downloading.saturating_sub(1);
340 }
341}
342
343impl<Http: HttpClient + Clone + Send + Sync + 'static> StartingStream<Http> {
344 fn fetch_file_list(&mut self) {
345 let client = self.client.clone();
346
347 Pin::new(&mut self.fetch_file_list)
348 .set(Box::pin(async move { client.fetch_file_list().await }));
349
350 self.state = State::FetchFileList;
351 }
352
353 fn delete_outside_range(&mut self) {
354 let index = self.index;
355 let max_files = self.max_files;
356 let client = self.client.clone();
357
358 Pin::new(&mut self.delete_outside_range)
359 .set(Box::pin(async move { client.delete_outside_range(index, max_files).await }));
360
361 self.state = State::DeleteOutsideRange;
362 }
363
364 fn recover_index(&mut self) {
365 let client = self.client.clone();
366
367 Pin::new(&mut self.recover_index)
368 .set(Box::pin(async move { client.recover_index().await }));
369
370 self.state = State::RecoverIndex;
371 }
372
373 fn count_files(&mut self) {
374 let client = self.client.clone();
375
376 Pin::new(&mut self.files_count).set(Box::pin(async move { client.files_count().await }));
377
378 self.state = State::CountFiles;
379 }
380
381 fn next_url(&mut self, index: usize, max_missing: usize) {
382 let client = self.client.clone();
383
384 Pin::new(&mut self.next_url).set(Box::pin(async move { client.url(index).await }));
385
386 self.state = State::NextUrl(max_missing);
387 }
388}