1use super::queue::BodiesRequestQueue;
2use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::Stream;
6use futures_util::StreamExt;
7use reth_config::BodiesConfig;
8use reth_consensus::{Consensus, ConsensusError};
9use reth_network_p2p::{
10 bodies::{
11 client::BodiesClient,
12 downloader::{BodyDownloader, BodyDownloaderResult},
13 response::BlockResponse,
14 },
15 error::{DownloadError, DownloadResult},
16};
17use reth_primitives_traits::{size::InMemorySize, Block, SealedHeader};
18use reth_storage_api::HeaderProvider;
19use reth_tasks::{TaskSpawner, TokioTaskExecutor};
20use std::{
21 cmp::Ordering,
22 collections::BinaryHeap,
23 fmt::Debug,
24 mem,
25 ops::RangeInclusive,
26 pin::Pin,
27 sync::Arc,
28 task::{Context, Poll},
29};
30use tracing::info;
31
32#[must_use = "Stream does nothing unless polled"]
36#[derive(Debug)]
37pub struct BodiesDownloader<
38 B: Block,
39 C: BodiesClient<Body = B::Body>,
40 Provider: HeaderProvider<Header = B::Header>,
41> {
42 client: Arc<C>,
44 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
46 provider: Provider,
48 request_limit: u64,
50 stream_batch_size: usize,
52 concurrent_requests_range: RangeInclusive<usize>,
54 max_buffered_blocks_size_bytes: usize,
56 buffered_blocks_size_bytes: usize,
58 download_range: RangeInclusive<BlockNumber>,
60 latest_queued_block_number: Option<BlockNumber>,
62 in_progress_queue: BodiesRequestQueue<B, C>,
64 buffered_responses: BinaryHeap<OrderedBodiesResponse<B>>,
66 queued_bodies: Vec<BlockResponse<B>>,
68 metrics: BodyDownloaderMetrics,
70}
71
72impl<B, C, Provider> BodiesDownloader<B, C, Provider>
73where
74 B: Block,
75 C: BodiesClient<Body = B::Body> + 'static,
76 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
77{
78 fn next_headers_request(&self) -> DownloadResult<Option<Vec<SealedHeader<Provider::Header>>>> {
80 let start_at = match self.in_progress_queue.last_requested_block_number {
81 Some(num) => num + 1,
82 None => *self.download_range.start(),
83 };
84 let items_left = (self.download_range.end() + 1).saturating_sub(start_at);
86 let limit = items_left.min(self.request_limit);
87 self.query_headers(start_at..=*self.download_range.end(), limit)
88 }
89
90 fn query_headers(
101 &self,
102 range: RangeInclusive<BlockNumber>,
103 max_non_empty: u64,
104 ) -> DownloadResult<Option<Vec<SealedHeader<B::Header>>>> {
105 if range.is_empty() || max_non_empty == 0 {
106 return Ok(None)
107 }
108
109 let mut collected = 0;
115 let mut non_empty_headers = 0;
116 let headers = self.provider.sealed_headers_while(range.clone(), |header| {
117 let should_take = range.contains(&header.number()) &&
118 non_empty_headers < max_non_empty &&
119 collected < self.stream_batch_size;
120
121 if should_take {
122 collected += 1;
123 if !header.is_empty() {
124 non_empty_headers += 1;
125 }
126 true
127 } else {
128 false
129 }
130 })?;
131
132 Ok(Some(headers).filter(|h| !h.is_empty()))
133 }
134
135 const fn next_expected_block_number(&self) -> BlockNumber {
137 match self.latest_queued_block_number {
138 Some(num) => num + 1,
139 None => *self.download_range.start(),
140 }
141 }
142
143 #[inline]
148 fn concurrent_request_limit(&self) -> usize {
149 let num_peers = self.client.num_connected_peers();
150
151 let max_requests = num_peers.max(*self.concurrent_requests_range.start());
152
153 if num_peers < *self.concurrent_requests_range.start() {
155 return max_requests
156 }
157
158 max_requests.min(*self.concurrent_requests_range.end())
159 }
160
161 const fn has_buffer_capacity(&self) -> bool {
163 self.buffered_blocks_size_bytes < self.max_buffered_blocks_size_bytes
164 }
165
166 fn is_terminated(&self) -> bool {
168 let nothing_to_request = self.download_range.is_empty() ||
170 self.in_progress_queue
172 .last_requested_block_number.is_some_and(|last| last == *self.download_range.end());
173
174 nothing_to_request &&
175 self.in_progress_queue.is_empty() &&
176 self.buffered_responses.is_empty() &&
177 self.queued_bodies.is_empty()
178 }
179
180 fn clear(&mut self) {
184 self.download_range = RangeInclusive::new(1, 0);
185 self.latest_queued_block_number.take();
186 self.in_progress_queue.clear();
187 self.queued_bodies = Vec::new();
188 self.buffered_responses = BinaryHeap::new();
189 self.buffered_blocks_size_bytes = 0;
190
191 self.metrics.in_flight_requests.set(0.);
193 self.metrics.buffered_responses.set(0.);
194 self.metrics.buffered_blocks.set(0.);
195 self.metrics.buffered_blocks_size_bytes.set(0.);
196 self.metrics.queued_blocks.set(0.);
197 }
198
199 fn queue_bodies(&mut self, bodies: Vec<BlockResponse<B>>) {
201 self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
202 self.queued_bodies.extend(bodies);
203 self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
204 }
205
206 fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse<B>> {
208 let resp = self.buffered_responses.pop()?;
209 self.metrics.buffered_responses.decrement(1.);
210 self.buffered_blocks_size_bytes -= resp.size();
211 self.metrics.buffered_blocks.decrement(resp.len() as f64);
212 self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
213 Some(resp)
214 }
215
216 fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<B>>) {
218 let size = response.iter().map(BlockResponse::size).sum::<usize>() +
220 response.capacity() * mem::size_of::<BlockResponse<B>>();
221
222 let response = OrderedBodiesResponse { resp: response, size };
223 let response_len = response.len();
224
225 self.buffered_blocks_size_bytes += size;
226 self.buffered_responses.push(response);
227
228 self.metrics.buffered_blocks.increment(response_len as f64);
229 self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
230 self.metrics.buffered_responses.set(self.buffered_responses.len() as f64);
231 }
232
233 fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<B>>> {
235 if let Some(next) = self.buffered_responses.peek() {
236 let expected = self.next_expected_block_number();
237 let next_block_range = next.block_range();
238
239 if next_block_range.contains(&expected) {
240 return self.pop_buffered_response().map(|buffered| {
241 buffered
242 .resp
243 .into_iter()
244 .skip_while(|b| b.block_number() < expected)
245 .take_while(|b| self.download_range.contains(&b.block_number()))
246 .collect()
247 })
248 }
249
250 if *next_block_range.end() < expected {
252 self.pop_buffered_response();
253 }
254 }
255 None
256 }
257
258 fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<B>>> {
261 if self.queued_bodies.len() >= self.stream_batch_size {
262 let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
263 self.queued_bodies.shrink_to_fit();
264 self.metrics.total_flushed.increment(next_batch.len() as u64);
265 self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
266 return Some(next_batch)
267 }
268 None
269 }
270
271 fn can_submit_new_request(&self) -> bool {
276 self.queued_bodies.len() < 4 * self.stream_batch_size &&
280 self.has_buffer_capacity() &&
281 self.in_progress_queue.len() < self.concurrent_request_limit()
282 }
283}
284
285impl<B, C, Provider> BodiesDownloader<B, C, Provider>
286where
287 B: Block + 'static,
288 C: BodiesClient<Body = B::Body> + 'static,
289 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
290{
291 pub fn into_task(self) -> TaskDownloader<B> {
293 self.into_task_with(&TokioTaskExecutor::default())
294 }
295
296 pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<B>
298 where
299 S: TaskSpawner,
300 {
301 TaskDownloader::spawn_with(self, spawner)
302 }
303}
304
305impl<B, C, Provider> BodyDownloader for BodiesDownloader<B, C, Provider>
306where
307 B: Block + 'static,
308 C: BodiesClient<Body = B::Body> + 'static,
309 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
310{
311 type Block = B;
312
313 fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
320 if range.is_empty() {
322 tracing::error!(target: "downloaders::bodies", ?range, "Bodies download range is invalid (empty)");
323 return Err(DownloadError::InvalidBodyRange { range })
324 }
325
326 let is_current_range_subset = self.download_range.contains(range.start()) &&
328 *range.end() == *self.download_range.end();
329 if is_current_range_subset {
330 tracing::trace!(target: "downloaders::bodies", ?range, "Download range already in progress");
331 return Ok(())
333 }
334
335 let count = *range.end() - *range.start() + 1; let is_next_consecutive_range = *range.start() == *self.download_range.end() + 1;
338 if is_next_consecutive_range {
339 tracing::trace!(target: "downloaders::bodies", ?range, "New download range set");
341 info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
342 self.download_range = range;
343 return Ok(())
344 }
345
346 tracing::trace!(target: "downloaders::bodies", ?range, prev_range = ?self.download_range, "Download range reset");
349 info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
350 self.clear();
351 self.download_range = range;
352 Ok(())
353 }
354}
355
356impl<B, C, Provider> Stream for BodiesDownloader<B, C, Provider>
357where
358 B: Block + 'static,
359 C: BodiesClient<Body = B::Body> + 'static,
360 Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
361{
362 type Item = BodyDownloaderResult<B>;
363
364 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365 let this = self.get_mut();
366 if this.is_terminated() {
367 return Poll::Ready(None)
368 }
369 loop {
371 if let Some(next_batch) = this.try_split_next_batch() {
373 return Poll::Ready(Some(Ok(next_batch)))
374 }
375
376 while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
378 this.metrics.in_flight_requests.decrement(1.);
379 match response {
380 Ok(response) => {
381 this.buffer_bodies_response(response);
382 }
383 Err(error) => {
384 tracing::debug!(target: "downloaders::bodies", %error, "Request failed");
385 this.clear();
386 return Poll::Ready(Some(Err(error)))
387 }
388 };
389 }
390
391 let mut new_request_submitted = false;
393 'inner: while this.can_submit_new_request() {
395 match this.next_headers_request() {
396 Ok(Some(request)) => {
397 this.metrics.in_flight_requests.increment(1.);
398 this.in_progress_queue.push_new_request(
399 Arc::clone(&this.client),
400 Arc::clone(&this.consensus),
401 request,
402 );
403 new_request_submitted = true;
404 }
405 Ok(None) => break 'inner,
406 Err(error) => {
407 tracing::error!(target: "downloaders::bodies", %error, "Failed to download from next request");
408 this.clear();
409 return Poll::Ready(Some(Err(error)))
410 }
411 };
412 }
413
414 while let Some(buf_response) = this.try_next_buffered() {
415 this.queue_bodies(buf_response);
416 }
417
418 this.buffered_responses.shrink_to_fit();
420
421 if !new_request_submitted {
422 break
423 }
424 }
425
426 if this.in_progress_queue.is_empty() {
428 if this.queued_bodies.is_empty() {
429 return Poll::Ready(None)
430 }
431 let batch_size = this.stream_batch_size.min(this.queued_bodies.len());
432 let next_batch = this.queued_bodies.drain(..batch_size).collect::<Vec<_>>();
433 this.queued_bodies.shrink_to_fit();
434 this.metrics.total_flushed.increment(next_batch.len() as u64);
435 this.metrics.queued_blocks.set(this.queued_bodies.len() as f64);
436 return Poll::Ready(Some(Ok(next_batch)))
437 }
438
439 Poll::Pending
440 }
441}
442
443#[derive(Debug)]
444struct OrderedBodiesResponse<B: Block> {
445 resp: Vec<BlockResponse<B>>,
446 size: usize,
448}
449
450impl<B: Block> OrderedBodiesResponse<B> {
451 #[inline]
452 fn len(&self) -> usize {
453 self.resp.len()
454 }
455
456 #[inline]
460 const fn size(&self) -> usize {
461 self.size
462 }
463}
464
465impl<B: Block> OrderedBodiesResponse<B> {
466 fn first_block_number(&self) -> u64 {
471 self.resp.first().expect("is not empty").block_number()
472 }
473
474 fn block_range(&self) -> RangeInclusive<u64> {
479 self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
480 }
481}
482
483impl<B: Block> PartialEq for OrderedBodiesResponse<B> {
484 fn eq(&self, other: &Self) -> bool {
485 self.first_block_number() == other.first_block_number()
486 }
487}
488
489impl<B: Block> Eq for OrderedBodiesResponse<B> {}
490
491impl<B: Block> PartialOrd for OrderedBodiesResponse<B> {
492 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
493 Some(self.cmp(other))
494 }
495}
496
497impl<B: Block> Ord for OrderedBodiesResponse<B> {
498 fn cmp(&self, other: &Self) -> Ordering {
499 self.first_block_number().cmp(&other.first_block_number()).reverse()
500 }
501}
502
503#[derive(Debug, Clone)]
505pub struct BodiesDownloaderBuilder {
506 pub request_limit: u64,
508 pub stream_batch_size: usize,
510 pub max_buffered_blocks_size_bytes: usize,
512 pub concurrent_requests_range: RangeInclusive<usize>,
514}
515
516impl BodiesDownloaderBuilder {
517 pub fn new(config: BodiesConfig) -> Self {
520 Self::default()
521 .with_stream_batch_size(config.downloader_stream_batch_size)
522 .with_request_limit(config.downloader_request_limit)
523 .with_max_buffered_blocks_size_bytes(config.downloader_max_buffered_blocks_size_bytes)
524 .with_concurrent_requests_range(
525 config.downloader_min_concurrent_requests..=
526 config.downloader_max_concurrent_requests,
527 )
528 }
529}
530
531impl Default for BodiesDownloaderBuilder {
532 fn default() -> Self {
533 Self {
534 request_limit: 200,
535 stream_batch_size: 1_000,
536 max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, concurrent_requests_range: 5..=100,
538 }
539 }
540}
541
542impl BodiesDownloaderBuilder {
543 pub const fn with_request_limit(mut self, request_limit: u64) -> Self {
545 self.request_limit = request_limit;
546 self
547 }
548
549 pub const fn with_stream_batch_size(mut self, stream_batch_size: usize) -> Self {
551 self.stream_batch_size = stream_batch_size;
552 self
553 }
554
555 pub const fn with_concurrent_requests_range(
557 mut self,
558 concurrent_requests_range: RangeInclusive<usize>,
559 ) -> Self {
560 self.concurrent_requests_range = concurrent_requests_range;
561 self
562 }
563
564 pub const fn with_max_buffered_blocks_size_bytes(
566 mut self,
567 max_buffered_blocks_size_bytes: usize,
568 ) -> Self {
569 self.max_buffered_blocks_size_bytes = max_buffered_blocks_size_bytes;
570 self
571 }
572
573 pub fn build<B, C, Provider>(
575 self,
576 client: C,
577 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
578 provider: Provider,
579 ) -> BodiesDownloader<B, C, Provider>
580 where
581 B: Block,
582 C: BodiesClient<Body = B::Body> + 'static,
583 Provider: HeaderProvider<Header = B::Header>,
584 {
585 let Self {
586 request_limit,
587 stream_batch_size,
588 concurrent_requests_range,
589 max_buffered_blocks_size_bytes,
590 } = self;
591 let metrics = BodyDownloaderMetrics::default();
592 let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
593 BodiesDownloader {
594 client: Arc::new(client),
595 consensus,
596 provider,
597 request_limit,
598 stream_batch_size,
599 max_buffered_blocks_size_bytes,
600 concurrent_requests_range,
601 in_progress_queue,
602 metrics,
603 download_range: RangeInclusive::new(1, 0),
604 latest_queued_block_number: None,
605 buffered_responses: Default::default(),
606 queued_bodies: Default::default(),
607 buffered_blocks_size_bytes: 0,
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::*;
615 use crate::{
616 bodies::test_utils::{insert_headers, zip_blocks},
617 test_utils::{generate_bodies, TestBodiesClient},
618 };
619 use alloy_primitives::B256;
620 use assert_matches::assert_matches;
621 use reth_chainspec::MAINNET;
622 use reth_consensus::test_utils::TestConsensus;
623 use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
624 use reth_provider::{
625 providers::StaticFileProvider, test_utils::MockNodeTypesWithDB, ProviderFactory,
626 };
627 use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
628 use std::collections::HashMap;
629
630 #[tokio::test]
633 async fn streams_bodies_in_order() {
634 let db = create_test_rw_db();
636 let (headers, mut bodies) = generate_bodies(0..=19);
637
638 insert_headers(db.db(), &headers);
639
640 let client = Arc::new(
641 TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
642 );
643 let (_static_dir, static_dir_path) = create_test_static_files_dir();
644
645 let mut downloader = BodiesDownloaderBuilder::default()
646 .build::<reth_ethereum_primitives::Block, _, _>(
647 client.clone(),
648 Arc::new(TestConsensus::default()),
649 ProviderFactory::<MockNodeTypesWithDB>::new(
650 db,
651 MAINNET.clone(),
652 StaticFileProvider::read_write(static_dir_path).unwrap(),
653 ),
654 );
655 downloader.set_download_range(0..=19).expect("failed to set download range");
656
657 assert_matches!(
658 downloader.next().await,
659 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
660 );
661 assert_eq!(client.times_requested(), 1);
662 }
663
664 #[tokio::test]
667 async fn requests_correct_number_of_times() {
668 let db = create_test_rw_db();
670 let mut rng = generators::rng();
671 let blocks = random_block_range(
672 &mut rng,
673 0..=199,
674 BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..2, ..Default::default() },
675 );
676
677 let headers = blocks.iter().map(|block| block.clone_sealed_header()).collect::<Vec<_>>();
678 let bodies = blocks
679 .into_iter()
680 .map(|block| (block.hash(), block.into_body()))
681 .collect::<HashMap<_, _>>();
682
683 insert_headers(db.db(), &headers);
684
685 let request_limit = 10;
686 let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
687 let (_static_dir, static_dir_path) = create_test_static_files_dir();
688
689 let mut downloader = BodiesDownloaderBuilder::default()
690 .with_request_limit(request_limit)
691 .build::<reth_ethereum_primitives::Block, _, _>(
692 client.clone(),
693 Arc::new(TestConsensus::default()),
694 ProviderFactory::<MockNodeTypesWithDB>::new(
695 db,
696 MAINNET.clone(),
697 StaticFileProvider::read_write(static_dir_path).unwrap(),
698 ),
699 );
700 downloader.set_download_range(0..=199).expect("failed to set download range");
701
702 let _ = downloader.collect::<Vec<_>>().await;
703 assert_eq!(client.times_requested(), 20);
704 }
705
706 #[tokio::test]
709 async fn streams_bodies_in_order_after_range_reset() {
710 let db = create_test_rw_db();
712 let (headers, mut bodies) = generate_bodies(0..=99);
713
714 insert_headers(db.db(), &headers);
715
716 let stream_batch_size = 20;
717 let request_limit = 10;
718 let client = Arc::new(
719 TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
720 );
721 let (_static_dir, static_dir_path) = create_test_static_files_dir();
722 let mut downloader = BodiesDownloaderBuilder::default()
723 .with_stream_batch_size(stream_batch_size)
724 .with_request_limit(request_limit)
725 .build::<reth_ethereum_primitives::Block, _, _>(
726 client.clone(),
727 Arc::new(TestConsensus::default()),
728 ProviderFactory::<MockNodeTypesWithDB>::new(
729 db,
730 MAINNET.clone(),
731 StaticFileProvider::read_write(static_dir_path).unwrap(),
732 ),
733 );
734
735 let mut range_start = 0;
736 while range_start < 100 {
737 downloader.set_download_range(range_start..=99).expect("failed to set download range");
738
739 assert_matches!(
740 downloader.next().await,
741 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(range_start as usize).take(stream_batch_size), &mut bodies))
742 );
743 assert!(downloader.latest_queued_block_number >= Some(range_start));
744 range_start += stream_batch_size as u64;
745 }
746 }
747
748 #[tokio::test]
751 async fn can_download_new_range_after_termination() {
752 let db = create_test_rw_db();
754 let (headers, mut bodies) = generate_bodies(0..=199);
755
756 insert_headers(db.db(), &headers);
757
758 let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
759 let (_static_dir, static_dir_path) = create_test_static_files_dir();
760
761 let mut downloader = BodiesDownloaderBuilder::default()
762 .with_stream_batch_size(100)
763 .build::<reth_ethereum_primitives::Block, _, _>(
764 client.clone(),
765 Arc::new(TestConsensus::default()),
766 ProviderFactory::<MockNodeTypesWithDB>::new(
767 db,
768 MAINNET.clone(),
769 StaticFileProvider::read_write(static_dir_path).unwrap(),
770 ),
771 );
772
773 downloader.set_download_range(0..=99).expect("failed to set download range");
775 assert_matches!(
776 downloader.next().await,
777 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
778 );
779
780 assert!(downloader.next().await.is_none());
782
783 downloader.set_download_range(100..=199).expect("failed to set download range");
785 assert_matches!(
786 downloader.next().await,
787 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies))
788 );
789 }
790
791 #[tokio::test]
793 async fn can_download_after_exceeding_limit() {
794 let db = create_test_rw_db();
796 let (headers, mut bodies) = generate_bodies(0..=199);
797
798 insert_headers(db.db(), &headers);
799
800 let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
801
802 let (_static_dir, static_dir_path) = create_test_static_files_dir();
803 let mut downloader = BodiesDownloaderBuilder::default()
806 .with_stream_batch_size(10)
807 .with_request_limit(1)
808 .with_max_buffered_blocks_size_bytes(1)
809 .build::<reth_ethereum_primitives::Block, _, _>(
810 client.clone(),
811 Arc::new(TestConsensus::default()),
812 ProviderFactory::<MockNodeTypesWithDB>::new(
813 db,
814 MAINNET.clone(),
815 StaticFileProvider::read_write(static_dir_path).unwrap(),
816 ),
817 );
818
819 downloader.set_download_range(0..=199).expect("failed to set download range");
821 let mut header = 0;
822 while let Some(Ok(resp)) = downloader.next().await {
823 assert_eq!(resp, zip_blocks(headers.iter().skip(header).take(resp.len()), &mut bodies));
824 header += resp.len();
825 }
826 }
827
828 #[tokio::test]
830 async fn can_tolerate_empty_responses() {
831 let db = create_test_rw_db();
833 let (headers, mut bodies) = generate_bodies(0..=99);
834
835 insert_headers(db.db(), &headers);
836
837 let client = Arc::new(
839 TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2),
840 );
841 let (_static_dir, static_dir_path) = create_test_static_files_dir();
842
843 let mut downloader = BodiesDownloaderBuilder::default()
844 .with_request_limit(3)
845 .with_stream_batch_size(100)
846 .build::<reth_ethereum_primitives::Block, _, _>(
847 client.clone(),
848 Arc::new(TestConsensus::default()),
849 ProviderFactory::<MockNodeTypesWithDB>::new(
850 db,
851 MAINNET.clone(),
852 StaticFileProvider::read_write(static_dir_path).unwrap(),
853 ),
854 );
855
856 downloader.set_download_range(0..=99).expect("failed to set download range");
858 assert_matches!(
859 downloader.next().await,
860 Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
861 );
862 }
863}