Skip to main content

reth_downloaders/bodies/
bodies.rs

1use super::queue::BodiesRequestQueue;
2use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::Stream;
6use futures_util::StreamExt;
7use reth_config::BodiesConfig;
8use reth_consensus::Consensus;
9use reth_network_p2p::{
10    bodies::{
11        client::BodiesClient,
12        downloader::{BodyDownloader, BodyDownloaderResult},
13        response::BlockResponse,
14    },
15    error::{DownloadError, DownloadResult},
16};
17use reth_primitives_traits::{size::InMemorySize, Block, SealedHeader};
18use reth_storage_api::HeaderProvider;
19use reth_tasks::{TaskSpawner, TokioTaskExecutor};
20use std::{
21    cmp::Ordering,
22    collections::BinaryHeap,
23    fmt::Debug,
24    ops::RangeInclusive,
25    pin::Pin,
26    sync::Arc,
27    task::{Context, Poll},
28};
29use tracing::info;
30
31/// Downloads bodies in batches.
32///
33/// All blocks in a batch are fetched at the same time.
34#[must_use = "Stream does nothing unless polled"]
35#[derive(Debug)]
36pub struct BodiesDownloader<
37    B: Block,
38    C: BodiesClient<Body = B::Body>,
39    Provider: HeaderProvider<Header = B::Header>,
40> {
41    /// The bodies client
42    client: Arc<C>,
43    /// The consensus client
44    consensus: Arc<dyn Consensus<B>>,
45    /// The database handle
46    provider: Provider,
47    /// The maximum number of non-empty blocks per one request
48    request_limit: u64,
49    /// The maximum number of block bodies returned at once from the stream
50    stream_batch_size: usize,
51    /// The allowed range for number of concurrent requests.
52    concurrent_requests_range: RangeInclusive<usize>,
53    /// Maximum number of bytes of received blocks to buffer internally.
54    max_buffered_blocks_size_bytes: usize,
55    /// Current estimated size of buffered blocks in bytes.
56    buffered_blocks_size_bytes: usize,
57    /// The range of block numbers for body download.
58    download_range: RangeInclusive<BlockNumber>,
59    /// The latest block number returned.
60    latest_queued_block_number: Option<BlockNumber>,
61    /// Requests in progress
62    in_progress_queue: BodiesRequestQueue<B, C>,
63    /// Buffered responses
64    buffered_responses: BinaryHeap<OrderedBodiesResponse<B>>,
65    /// Queued body responses that can be returned for insertion into the database.
66    queued_bodies: Vec<BlockResponse<B>>,
67    /// The bodies downloader metrics.
68    metrics: BodyDownloaderMetrics,
69}
70
71impl<B, C, Provider> BodiesDownloader<B, C, Provider>
72where
73    B: Block,
74    C: BodiesClient<Body = B::Body> + 'static,
75    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
76{
77    /// Returns the next contiguous request.
78    fn next_headers_request(&self) -> DownloadResult<Option<Vec<SealedHeader<Provider::Header>>>> {
79        let start_at = match self.in_progress_queue.last_requested_block_number {
80            Some(num) => num + 1,
81            None => *self.download_range.start(),
82        };
83        // as the range is inclusive, we need to add 1 to the end.
84        let items_left = (self.download_range.end() + 1).saturating_sub(start_at);
85        let limit = items_left.min(self.request_limit);
86        self.query_headers(start_at..=*self.download_range.end(), limit)
87    }
88
89    /// Retrieve a batch of headers from the database starting from the provided block number.
90    ///
91    /// This method is going to return the batch as soon as one of the conditions below
92    /// is fulfilled:
93    ///     1. The number of non-empty headers in the batch equals requested.
94    ///     2. The total number of headers in the batch (both empty and non-empty) is greater than
95    ///        or equal to the stream batch size.
96    ///     3. Downloader reached the end of the range
97    ///
98    /// NOTE: The batches returned have a variable length.
99    fn query_headers(
100        &self,
101        range: RangeInclusive<BlockNumber>,
102        max_non_empty: u64,
103    ) -> DownloadResult<Option<Vec<SealedHeader<B::Header>>>> {
104        if range.is_empty() || max_non_empty == 0 {
105            return Ok(None)
106        }
107
108        // Collect headers while
109        //      1. Current block number is in range
110        //      2. The number of non empty headers is less than maximum
111        //      3. The total number of headers is less than the stream batch size (this is only
112        //         relevant if the range consists entirely of empty headers)
113        let mut collected = 0;
114        let mut non_empty_headers = 0;
115        let headers = self.provider.sealed_headers_while(range.clone(), |header| {
116            let should_take = range.contains(&header.number()) &&
117                non_empty_headers < max_non_empty &&
118                collected < self.stream_batch_size;
119
120            if should_take {
121                collected += 1;
122                if !header.is_empty() {
123                    non_empty_headers += 1;
124                }
125                true
126            } else {
127                false
128            }
129        })?;
130
131        Ok(Some(headers).filter(|h| !h.is_empty()))
132    }
133
134    /// Get the next expected block number for queueing.
135    const fn next_expected_block_number(&self) -> BlockNumber {
136        match self.latest_queued_block_number {
137            Some(num) => num + 1,
138            None => *self.download_range.start(),
139        }
140    }
141
142    /// Max requests to handle at the same time
143    ///
144    /// This depends on the number of active peers but will always be
145    /// `min_concurrent_requests..max_concurrent_requests`
146    #[inline]
147    fn concurrent_request_limit(&self) -> usize {
148        let num_peers = self.client.num_connected_peers();
149
150        let max_requests = num_peers.max(*self.concurrent_requests_range.start());
151
152        // if we're only connected to a few peers, we keep it low
153        if num_peers < *self.concurrent_requests_range.start() {
154            return max_requests
155        }
156
157        max_requests.min(*self.concurrent_requests_range.end())
158    }
159
160    /// Returns true if the size of buffered blocks is lower than the configured maximum
161    const fn has_buffer_capacity(&self) -> bool {
162        self.buffered_blocks_size_bytes < self.max_buffered_blocks_size_bytes
163    }
164
165    // Check if the stream is terminated
166    fn is_terminated(&self) -> bool {
167        // There is nothing to request if the range is empty
168        let nothing_to_request = self.download_range.is_empty() ||
169            // or all blocks have already been requested.
170            self.in_progress_queue
171                .last_requested_block_number.is_some_and(|last| last == *self.download_range.end());
172
173        nothing_to_request &&
174            self.in_progress_queue.is_empty() &&
175            self.buffered_responses.is_empty() &&
176            self.queued_bodies.is_empty()
177    }
178
179    /// Clear all download related data.
180    ///
181    /// Should be invoked upon encountering fatal error.
182    fn clear(&mut self) {
183        self.download_range = RangeInclusive::new(1, 0);
184        self.latest_queued_block_number.take();
185        self.in_progress_queue.clear();
186        self.queued_bodies = Vec::new();
187        self.buffered_responses = BinaryHeap::new();
188        self.buffered_blocks_size_bytes = 0;
189
190        // reset metrics
191        self.metrics.in_flight_requests.set(0.);
192        self.metrics.buffered_responses.set(0.);
193        self.metrics.buffered_blocks.set(0.);
194        self.metrics.buffered_blocks_size_bytes.set(0.);
195        self.metrics.queued_blocks.set(0.);
196    }
197
198    /// Queues bodies and sets the latest queued block number
199    fn queue_bodies(&mut self, bodies: Vec<BlockResponse<B>>) {
200        self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
201        self.queued_bodies.extend(bodies);
202        self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
203    }
204
205    /// Removes the next response from the buffer.
206    fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse<B>> {
207        let resp = self.buffered_responses.pop()?;
208        self.metrics.buffered_responses.decrement(1.);
209        self.buffered_blocks_size_bytes -= resp.size();
210        self.metrics.buffered_blocks.decrement(resp.len() as f64);
211        self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
212        Some(resp)
213    }
214
215    /// Adds a new response to the internal buffer
216    fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<B>>) {
217        let size = response.iter().map(BlockResponse::size).sum::<usize>();
218
219        let response = OrderedBodiesResponse { resp: response, size };
220        let response_len = response.len();
221
222        self.buffered_blocks_size_bytes += size;
223        self.buffered_responses.push(response);
224
225        self.metrics.buffered_blocks.increment(response_len as f64);
226        self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
227        self.metrics.buffered_responses.set(self.buffered_responses.len() as f64);
228    }
229
230    /// Returns a response if its first block number matches the next expected.
231    fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<B>>> {
232        if let Some(next) = self.buffered_responses.peek() {
233            let expected = self.next_expected_block_number();
234            let next_block_range = next.block_range();
235
236            if next_block_range.contains(&expected) {
237                return self.pop_buffered_response().map(|buffered| {
238                    buffered
239                        .resp
240                        .into_iter()
241                        .skip_while(|b| b.block_number() < expected)
242                        .take_while(|b| self.download_range.contains(&b.block_number()))
243                        .collect()
244                })
245            }
246
247            // Drop buffered response since we passed that range
248            if *next_block_range.end() < expected {
249                self.pop_buffered_response();
250            }
251        }
252        None
253    }
254
255    /// Returns the next batch of block bodies that can be returned if we have enough buffered
256    /// bodies
257    fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<B>>> {
258        if self.queued_bodies.len() >= self.stream_batch_size {
259            let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
260            self.queued_bodies.shrink_to_fit();
261            self.metrics.total_flushed.increment(next_batch.len() as u64);
262            self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
263            return Some(next_batch)
264        }
265        None
266    }
267
268    /// Check if a new request can be submitted, it implements back pressure to prevent overwhelming
269    /// the system and causing memory overload.
270    ///
271    /// Returns true if a new request can be submitted
272    fn can_submit_new_request(&self) -> bool {
273        // requests are issued in order but not necessarily finished in order, so the queued bodies
274        // can grow large if a certain request is slow, so we limit the followup requests if the
275        // queued bodies grew too large
276        self.queued_bodies.len() < 4 * self.stream_batch_size &&
277            self.has_buffer_capacity() &&
278            self.in_progress_queue.len() < self.concurrent_request_limit()
279    }
280}
281
282impl<B, C, Provider> BodiesDownloader<B, C, Provider>
283where
284    B: Block + 'static,
285    C: BodiesClient<Body = B::Body> + 'static,
286    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
287{
288    /// Spawns the downloader task via [`tokio::task::spawn`]
289    pub fn into_task(self) -> TaskDownloader<B> {
290        self.into_task_with(&TokioTaskExecutor::default())
291    }
292
293    /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner.
294    pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<B>
295    where
296        S: TaskSpawner,
297    {
298        TaskDownloader::spawn_with(self, spawner)
299    }
300}
301
302impl<B, C, Provider> BodyDownloader for BodiesDownloader<B, C, Provider>
303where
304    B: Block + 'static,
305    C: BodiesClient<Body = B::Body> + 'static,
306    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
307{
308    type Block = B;
309
310    /// Set a new download range (inclusive).
311    ///
312    /// If the provided range is a suffix of the current range with the same end block, the
313    /// existing download already covers it and the call is a no-op.
314    /// If the range starts immediately after the current range, it is treated as the next
315    /// consecutive range and appended without resetting the in-flight state.
316    /// For all other ranges, the downloader state is cleared and the new range replaces the old
317    /// one.
318    fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
319        // Check if the range is valid.
320        if range.is_empty() {
321            tracing::error!(target: "downloaders::bodies", ?range, "Bodies download range is invalid (empty)");
322            return Err(DownloadError::InvalidBodyRange { range })
323        }
324
325        // Check if the provided range is the subset of the existing range.
326        let is_current_range_subset = self.download_range.contains(range.start()) &&
327            *range.end() == *self.download_range.end();
328        if is_current_range_subset {
329            tracing::trace!(target: "downloaders::bodies", ?range, "Download range already in progress");
330            // The current range already includes requested.
331            return Ok(())
332        }
333
334        // Check if the provided range is the next expected range.
335        let count = *range.end() - *range.start() + 1; // range is inclusive
336        let is_next_consecutive_range = *range.start() == *self.download_range.end() + 1;
337        if is_next_consecutive_range {
338            // New range received.
339            tracing::trace!(target: "downloaders::bodies", ?range, "New download range set");
340            info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
341            self.download_range = range;
342            return Ok(())
343        }
344
345        // The block range is reset. This can happen either after unwind or after the bodies were
346        // written by external services (e.g. BlockchainTree).
347        tracing::trace!(target: "downloaders::bodies", ?range, prev_range = ?self.download_range, "Download range reset");
348        info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
349        // Increment out-of-order requests metric if the new start is below the last returned block
350        if let Some(last_returned) = self.latest_queued_block_number &&
351            *range.start() < last_returned
352        {
353            self.metrics.out_of_order_requests.increment(1);
354        }
355        self.clear();
356        self.download_range = range;
357        Ok(())
358    }
359}
360
361impl<B, C, Provider> Stream for BodiesDownloader<B, C, Provider>
362where
363    B: Block + 'static,
364    C: BodiesClient<Body = B::Body> + 'static,
365    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
366{
367    type Item = BodyDownloaderResult<B>;
368
369    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
370        let this = self.get_mut();
371        if this.is_terminated() {
372            return Poll::Ready(None)
373        }
374        // Submit new requests and poll any in progress
375        loop {
376            // Yield next batch if ready
377            if let Some(next_batch) = this.try_split_next_batch() {
378                return Poll::Ready(Some(Ok(next_batch)))
379            }
380
381            // Poll requests
382            while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
383                this.metrics.in_flight_requests.decrement(1.);
384                match response {
385                    Ok(response) => {
386                        this.buffer_bodies_response(response);
387                    }
388                    Err(error) => {
389                        tracing::debug!(target: "downloaders::bodies", %error, "Request failed");
390                        this.clear();
391                        return Poll::Ready(Some(Err(error)))
392                    }
393                };
394            }
395
396            // Loop exit condition
397            let mut new_request_submitted = false;
398            // Submit new requests
399            'inner: while this.can_submit_new_request() {
400                match this.next_headers_request() {
401                    Ok(Some(request)) => {
402                        this.metrics.in_flight_requests.increment(1.);
403                        this.in_progress_queue.push_new_request(
404                            Arc::clone(&this.client),
405                            Arc::clone(&this.consensus),
406                            request,
407                        );
408                        new_request_submitted = true;
409                    }
410                    Ok(None) => break 'inner,
411                    Err(error) => {
412                        tracing::error!(target: "downloaders::bodies", %error, "Failed to download from next request");
413                        this.clear();
414                        return Poll::Ready(Some(Err(error)))
415                    }
416                };
417            }
418
419            while let Some(buf_response) = this.try_next_buffered() {
420                this.queue_bodies(buf_response);
421            }
422
423            // shrink the buffer so that it doesn't grow indefinitely
424            this.buffered_responses.shrink_to_fit();
425
426            if !new_request_submitted {
427                break
428            }
429        }
430
431        // All requests are handled, stream is finished
432        if this.in_progress_queue.is_empty() {
433            if this.queued_bodies.is_empty() {
434                return Poll::Ready(None)
435            }
436            let batch_size = this.stream_batch_size.min(this.queued_bodies.len());
437            let next_batch = this.queued_bodies.drain(..batch_size).collect::<Vec<_>>();
438            this.queued_bodies.shrink_to_fit();
439            this.metrics.total_flushed.increment(next_batch.len() as u64);
440            this.metrics.queued_blocks.set(this.queued_bodies.len() as f64);
441            return Poll::Ready(Some(Ok(next_batch)))
442        }
443
444        Poll::Pending
445    }
446}
447
448#[derive(Debug)]
449struct OrderedBodiesResponse<B: Block> {
450    resp: Vec<BlockResponse<B>>,
451    /// The total size of the response in bytes
452    size: usize,
453}
454
455impl<B: Block> OrderedBodiesResponse<B> {
456    #[inline]
457    const fn len(&self) -> usize {
458        self.resp.len()
459    }
460
461    /// Returns the size of the response in bytes
462    ///
463    /// See [`BlockResponse::size`]
464    #[inline]
465    const fn size(&self) -> usize {
466        self.size
467    }
468}
469
470impl<B: Block> OrderedBodiesResponse<B> {
471    /// Returns the block number of the first element
472    ///
473    /// # Panics
474    /// If the response vec is empty.
475    fn first_block_number(&self) -> u64 {
476        self.resp.first().expect("is not empty").block_number()
477    }
478
479    /// Returns the range of the block numbers in the response
480    ///
481    /// # Panics
482    /// If the response vec is empty.
483    fn block_range(&self) -> RangeInclusive<u64> {
484        self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
485    }
486}
487
488impl<B: Block> PartialEq for OrderedBodiesResponse<B> {
489    fn eq(&self, other: &Self) -> bool {
490        self.first_block_number() == other.first_block_number()
491    }
492}
493
494impl<B: Block> Eq for OrderedBodiesResponse<B> {}
495
496impl<B: Block> PartialOrd for OrderedBodiesResponse<B> {
497    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
498        Some(self.cmp(other))
499    }
500}
501
502impl<B: Block> Ord for OrderedBodiesResponse<B> {
503    fn cmp(&self, other: &Self) -> Ordering {
504        self.first_block_number().cmp(&other.first_block_number()).reverse()
505    }
506}
507
508/// Builder for [`BodiesDownloader`].
509#[derive(Debug, Clone)]
510pub struct BodiesDownloaderBuilder {
511    /// The batch size of non-empty blocks per one request
512    pub request_limit: u64,
513    /// The maximum number of block bodies returned at once from the stream
514    pub stream_batch_size: usize,
515    /// Maximum number of bytes of received bodies to buffer internally.
516    pub max_buffered_blocks_size_bytes: usize,
517    /// The maximum number of requests to send concurrently.
518    pub concurrent_requests_range: RangeInclusive<usize>,
519}
520
521impl BodiesDownloaderBuilder {
522    /// Creates a new [`BodiesDownloaderBuilder`] with configurations based on the provided
523    /// [`BodiesConfig`].
524    pub fn new(config: BodiesConfig) -> Self {
525        Self::default()
526            .with_stream_batch_size(config.downloader_stream_batch_size)
527            .with_request_limit(config.downloader_request_limit)
528            .with_max_buffered_blocks_size_bytes(config.downloader_max_buffered_blocks_size_bytes)
529            .with_concurrent_requests_range(
530                config.downloader_min_concurrent_requests..=
531                    config.downloader_max_concurrent_requests,
532            )
533    }
534}
535
536impl Default for BodiesDownloaderBuilder {
537    fn default() -> Self {
538        Self {
539            request_limit: 200,
540            stream_batch_size: 1_000,
541            max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, // ~2GB
542            concurrent_requests_range: 5..=100,
543        }
544    }
545}
546
547impl BodiesDownloaderBuilder {
548    /// Set request batch size on the downloader.
549    pub const fn with_request_limit(mut self, request_limit: u64) -> Self {
550        self.request_limit = request_limit;
551        self
552    }
553
554    /// Set stream batch size on the downloader.
555    pub const fn with_stream_batch_size(mut self, stream_batch_size: usize) -> Self {
556        self.stream_batch_size = stream_batch_size;
557        self
558    }
559
560    /// Set concurrent requests range on the downloader.
561    pub const fn with_concurrent_requests_range(
562        mut self,
563        concurrent_requests_range: RangeInclusive<usize>,
564    ) -> Self {
565        self.concurrent_requests_range = concurrent_requests_range;
566        self
567    }
568
569    /// Set max buffered block bytes on the downloader.
570    pub const fn with_max_buffered_blocks_size_bytes(
571        mut self,
572        max_buffered_blocks_size_bytes: usize,
573    ) -> Self {
574        self.max_buffered_blocks_size_bytes = max_buffered_blocks_size_bytes;
575        self
576    }
577
578    /// Consume self and return the concurrent downloader.
579    pub fn build<B, C, Provider>(
580        self,
581        client: C,
582        consensus: Arc<dyn Consensus<B>>,
583        provider: Provider,
584    ) -> BodiesDownloader<B, C, Provider>
585    where
586        B: Block,
587        C: BodiesClient<Body = B::Body> + 'static,
588        Provider: HeaderProvider<Header = B::Header>,
589    {
590        let Self {
591            request_limit,
592            stream_batch_size,
593            concurrent_requests_range,
594            max_buffered_blocks_size_bytes,
595        } = self;
596        let metrics = BodyDownloaderMetrics::default();
597        let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
598        BodiesDownloader {
599            client: Arc::new(client),
600            consensus,
601            provider,
602            request_limit,
603            stream_batch_size,
604            max_buffered_blocks_size_bytes,
605            concurrent_requests_range,
606            in_progress_queue,
607            metrics,
608            download_range: RangeInclusive::new(1, 0),
609            latest_queued_block_number: None,
610            buffered_responses: Default::default(),
611            queued_bodies: Default::default(),
612            buffered_blocks_size_bytes: 0,
613        }
614    }
615}
616
617#[cfg(test)]
618mod tests {
619    use super::*;
620    use crate::{
621        bodies::test_utils::{insert_headers, zip_blocks},
622        test_utils::{generate_bodies, TestBodiesClient},
623    };
624    use alloy_primitives::{map::B256Map, B256};
625    use assert_matches::assert_matches;
626    use reth_consensus::test_utils::TestConsensus;
627    use reth_provider::test_utils::create_test_provider_factory;
628    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
629
630    // Check that the blocks are emitted in order of block number, not in order of
631    // first-downloaded
632    #[tokio::test]
633    async fn streams_bodies_in_order() {
634        // Generate some random blocks
635        let factory = create_test_provider_factory();
636        let (headers, mut bodies) = generate_bodies(0..=19);
637
638        insert_headers(&factory, &headers);
639
640        let client = Arc::new(
641            TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
642        );
643
644        let mut downloader = BodiesDownloaderBuilder::default()
645            .build::<reth_ethereum_primitives::Block, _, _>(
646                client.clone(),
647                Arc::new(TestConsensus::default()),
648                factory,
649            );
650        downloader.set_download_range(0..=19).expect("failed to set download range");
651
652        assert_matches!(
653            downloader.next().await,
654            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
655        );
656        assert_eq!(client.times_requested(), 1);
657    }
658
659    // Check that the number of times requested equals to the number of headers divided by request
660    // limit.
661    #[tokio::test]
662    async fn requests_correct_number_of_times() {
663        // Generate some random blocks
664        let factory = create_test_provider_factory();
665        let mut rng = generators::rng();
666        let blocks = random_block_range(
667            &mut rng,
668            0..=199,
669            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..2, ..Default::default() },
670        );
671
672        let headers = blocks.iter().map(|block| block.clone_sealed_header()).collect::<Vec<_>>();
673        let bodies = blocks
674            .into_iter()
675            .map(|block| (block.hash(), block.into_body()))
676            .collect::<B256Map<_>>();
677
678        insert_headers(&factory, &headers);
679
680        let request_limit = 10;
681        let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
682
683        let mut downloader = BodiesDownloaderBuilder::default()
684            .with_request_limit(request_limit)
685            .build::<reth_ethereum_primitives::Block, _, _>(
686            client.clone(),
687            Arc::new(TestConsensus::default()),
688            factory,
689        );
690        downloader.set_download_range(0..=199).expect("failed to set download range");
691
692        let _ = downloader.collect::<Vec<_>>().await;
693        assert_eq!(client.times_requested(), 20);
694    }
695
696    // Check that bodies are returned in correct order
697    // after resetting the download range multiple times.
698    #[tokio::test]
699    async fn streams_bodies_in_order_after_range_reset() {
700        // Generate some random blocks
701        let factory = create_test_provider_factory();
702        let (headers, mut bodies) = generate_bodies(0..=99);
703
704        insert_headers(&factory, &headers);
705
706        let stream_batch_size = 20;
707        let request_limit = 10;
708        let client = Arc::new(
709            TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
710        );
711        let mut downloader = BodiesDownloaderBuilder::default()
712            .with_stream_batch_size(stream_batch_size)
713            .with_request_limit(request_limit)
714            .build::<reth_ethereum_primitives::Block, _, _>(
715                client.clone(),
716                Arc::new(TestConsensus::default()),
717                factory,
718            );
719
720        let mut range_start = 0;
721        while range_start < 100 {
722            downloader.set_download_range(range_start..=99).expect("failed to set download range");
723
724            assert_matches!(
725                downloader.next().await,
726                Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(range_start as usize).take(stream_batch_size), &mut bodies))
727            );
728            assert!(downloader.latest_queued_block_number >= Some(range_start));
729            range_start += stream_batch_size as u64;
730        }
731    }
732
733    // Check that the downloader picks up the new range and downloads bodies after previous range
734    // was completed.
735    #[tokio::test]
736    async fn can_download_new_range_after_termination() {
737        // Generate some random blocks
738        let factory = create_test_provider_factory();
739        let (headers, mut bodies) = generate_bodies(0..=199);
740
741        insert_headers(&factory, &headers);
742
743        let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
744
745        let mut downloader = BodiesDownloaderBuilder::default()
746            .with_stream_batch_size(100)
747            .build::<reth_ethereum_primitives::Block, _, _>(
748            client.clone(),
749            Arc::new(TestConsensus::default()),
750            factory,
751        );
752
753        // Set and download the first range
754        downloader.set_download_range(0..=99).expect("failed to set download range");
755        assert_matches!(
756            downloader.next().await,
757            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
758        );
759
760        // Check that the stream is terminated
761        assert!(downloader.next().await.is_none());
762
763        // Set and download the second range
764        downloader.set_download_range(100..=199).expect("failed to set download range");
765        assert_matches!(
766            downloader.next().await,
767            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies))
768        );
769    }
770
771    // Check that the downloader continues after the size limit is reached.
772    #[tokio::test]
773    async fn can_download_after_exceeding_limit() {
774        // Generate some random blocks
775        let factory = create_test_provider_factory();
776        let (headers, mut bodies) = generate_bodies(0..=199);
777
778        insert_headers(&factory, &headers);
779
780        let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
781
782        // Set the max buffered block size to 1 byte, to make sure that every response exceeds the
783        // limit
784        let mut downloader = BodiesDownloaderBuilder::default()
785            .with_stream_batch_size(10)
786            .with_request_limit(1)
787            .with_max_buffered_blocks_size_bytes(1)
788            .build::<reth_ethereum_primitives::Block, _, _>(
789                client.clone(),
790                Arc::new(TestConsensus::default()),
791                factory,
792            );
793
794        // Set and download the entire range
795        downloader.set_download_range(0..=199).expect("failed to set download range");
796        let mut header = 0;
797        while let Some(Ok(resp)) = downloader.next().await {
798            assert_eq!(resp, zip_blocks(headers.iter().skip(header).take(resp.len()), &mut bodies));
799            header += resp.len();
800        }
801    }
802
803    // Check that the downloader can tolerate a few completely empty responses
804    #[tokio::test]
805    async fn can_tolerate_empty_responses() {
806        // Generate some random blocks
807        let factory = create_test_provider_factory();
808        let (headers, mut bodies) = generate_bodies(0..=99);
809
810        insert_headers(&factory, &headers);
811
812        // respond with empty bodies for every other request.
813        let client = Arc::new(
814            TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2),
815        );
816
817        let mut downloader = BodiesDownloaderBuilder::default()
818            .with_request_limit(3)
819            .with_stream_batch_size(100)
820            .build::<reth_ethereum_primitives::Block, _, _>(
821                client.clone(),
822                Arc::new(TestConsensus::default()),
823                factory,
824            );
825
826        // Download the requested range
827        downloader.set_download_range(0..=99).expect("failed to set download range");
828        assert_matches!(
829            downloader.next().await,
830            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
831        );
832    }
833}