reth_downloaders/bodies/
bodies.rs

1use super::queue::BodiesRequestQueue;
2use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::Stream;
6use futures_util::StreamExt;
7use reth_config::BodiesConfig;
8use reth_consensus::{Consensus, ConsensusError};
9use reth_network_p2p::{
10    bodies::{
11        client::BodiesClient,
12        downloader::{BodyDownloader, BodyDownloaderResult},
13        response::BlockResponse,
14    },
15    error::{DownloadError, DownloadResult},
16};
17use reth_primitives_traits::{size::InMemorySize, Block, SealedHeader};
18use reth_storage_api::HeaderProvider;
19use reth_tasks::{TaskSpawner, TokioTaskExecutor};
20use std::{
21    cmp::Ordering,
22    collections::BinaryHeap,
23    fmt::Debug,
24    mem,
25    ops::RangeInclusive,
26    pin::Pin,
27    sync::Arc,
28    task::{Context, Poll},
29};
30use tracing::info;
31
32/// Downloads bodies in batches.
33///
34/// All blocks in a batch are fetched at the same time.
35#[must_use = "Stream does nothing unless polled"]
36#[derive(Debug)]
37pub struct BodiesDownloader<
38    B: Block,
39    C: BodiesClient<Body = B::Body>,
40    Provider: HeaderProvider<Header = B::Header>,
41> {
42    /// The bodies client
43    client: Arc<C>,
44    /// The consensus client
45    consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
46    /// The database handle
47    provider: Provider,
48    /// The maximum number of non-empty blocks per one request
49    request_limit: u64,
50    /// The maximum number of block bodies returned at once from the stream
51    stream_batch_size: usize,
52    /// The allowed range for number of concurrent requests.
53    concurrent_requests_range: RangeInclusive<usize>,
54    /// Maximum number of bytes of received blocks to buffer internally.
55    max_buffered_blocks_size_bytes: usize,
56    /// Current estimated size of buffered blocks in bytes.
57    buffered_blocks_size_bytes: usize,
58    /// The range of block numbers for body download.
59    download_range: RangeInclusive<BlockNumber>,
60    /// The latest block number returned.
61    latest_queued_block_number: Option<BlockNumber>,
62    /// Requests in progress
63    in_progress_queue: BodiesRequestQueue<B, C>,
64    /// Buffered responses
65    buffered_responses: BinaryHeap<OrderedBodiesResponse<B>>,
66    /// Queued body responses that can be returned for insertion into the database.
67    queued_bodies: Vec<BlockResponse<B>>,
68    /// The bodies downloader metrics.
69    metrics: BodyDownloaderMetrics,
70}
71
72impl<B, C, Provider> BodiesDownloader<B, C, Provider>
73where
74    B: Block,
75    C: BodiesClient<Body = B::Body> + 'static,
76    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
77{
78    /// Returns the next contiguous request.
79    fn next_headers_request(&self) -> DownloadResult<Option<Vec<SealedHeader<Provider::Header>>>> {
80        let start_at = match self.in_progress_queue.last_requested_block_number {
81            Some(num) => num + 1,
82            None => *self.download_range.start(),
83        };
84        // as the range is inclusive, we need to add 1 to the end.
85        let items_left = (self.download_range.end() + 1).saturating_sub(start_at);
86        let limit = items_left.min(self.request_limit);
87        self.query_headers(start_at..=*self.download_range.end(), limit)
88    }
89
90    /// Retrieve a batch of headers from the database starting from the provided block number.
91    ///
92    /// This method is going to return the batch as soon as one of the conditions below
93    /// is fulfilled:
94    ///     1. The number of non-empty headers in the batch equals requested.
95    ///     2. The total number of headers in the batch (both empty and non-empty) is greater than
96    ///        or equal to the stream batch size.
97    ///     3. Downloader reached the end of the range
98    ///
99    /// NOTE: The batches returned have a variable length.
100    fn query_headers(
101        &self,
102        range: RangeInclusive<BlockNumber>,
103        max_non_empty: u64,
104    ) -> DownloadResult<Option<Vec<SealedHeader<B::Header>>>> {
105        if range.is_empty() || max_non_empty == 0 {
106            return Ok(None)
107        }
108
109        // Collect headers while
110        //      1. Current block number is in range
111        //      2. The number of non empty headers is less than maximum
112        //      3. The total number of headers is less than the stream batch size (this is only
113        //         relevant if the range consists entirely of empty headers)
114        let mut collected = 0;
115        let mut non_empty_headers = 0;
116        let headers = self.provider.sealed_headers_while(range.clone(), |header| {
117            let should_take = range.contains(&header.number()) &&
118                non_empty_headers < max_non_empty &&
119                collected < self.stream_batch_size;
120
121            if should_take {
122                collected += 1;
123                if !header.is_empty() {
124                    non_empty_headers += 1;
125                }
126                true
127            } else {
128                false
129            }
130        })?;
131
132        Ok(Some(headers).filter(|h| !h.is_empty()))
133    }
134
135    /// Get the next expected block number for queueing.
136    const fn next_expected_block_number(&self) -> BlockNumber {
137        match self.latest_queued_block_number {
138            Some(num) => num + 1,
139            None => *self.download_range.start(),
140        }
141    }
142
143    /// Max requests to handle at the same time
144    ///
145    /// This depends on the number of active peers but will always be
146    /// [`min_concurrent_requests`..`max_concurrent_requests`]
147    #[inline]
148    fn concurrent_request_limit(&self) -> usize {
149        let num_peers = self.client.num_connected_peers();
150
151        let max_requests = num_peers.max(*self.concurrent_requests_range.start());
152
153        // if we're only connected to a few peers, we keep it low
154        if num_peers < *self.concurrent_requests_range.start() {
155            return max_requests
156        }
157
158        max_requests.min(*self.concurrent_requests_range.end())
159    }
160
161    /// Returns true if the size of buffered blocks is lower than the configured maximum
162    const fn has_buffer_capacity(&self) -> bool {
163        self.buffered_blocks_size_bytes < self.max_buffered_blocks_size_bytes
164    }
165
166    // Check if the stream is terminated
167    fn is_terminated(&self) -> bool {
168        // There is nothing to request if the range is empty
169        let nothing_to_request = self.download_range.is_empty() ||
170            // or all blocks have already been requested.
171            self.in_progress_queue
172                .last_requested_block_number.is_some_and(|last| last == *self.download_range.end());
173
174        nothing_to_request &&
175            self.in_progress_queue.is_empty() &&
176            self.buffered_responses.is_empty() &&
177            self.queued_bodies.is_empty()
178    }
179
180    /// Clear all download related data.
181    ///
182    /// Should be invoked upon encountering fatal error.
183    fn clear(&mut self) {
184        self.download_range = RangeInclusive::new(1, 0);
185        self.latest_queued_block_number.take();
186        self.in_progress_queue.clear();
187        self.queued_bodies = Vec::new();
188        self.buffered_responses = BinaryHeap::new();
189        self.buffered_blocks_size_bytes = 0;
190
191        // reset metrics
192        self.metrics.in_flight_requests.set(0.);
193        self.metrics.buffered_responses.set(0.);
194        self.metrics.buffered_blocks.set(0.);
195        self.metrics.buffered_blocks_size_bytes.set(0.);
196        self.metrics.queued_blocks.set(0.);
197    }
198
199    /// Queues bodies and sets the latest queued block number
200    fn queue_bodies(&mut self, bodies: Vec<BlockResponse<B>>) {
201        self.latest_queued_block_number = Some(bodies.last().expect("is not empty").block_number());
202        self.queued_bodies.extend(bodies);
203        self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
204    }
205
206    /// Removes the next response from the buffer.
207    fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse<B>> {
208        let resp = self.buffered_responses.pop()?;
209        self.metrics.buffered_responses.decrement(1.);
210        self.buffered_blocks_size_bytes -= resp.size();
211        self.metrics.buffered_blocks.decrement(resp.len() as f64);
212        self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
213        Some(resp)
214    }
215
216    /// Adds a new response to the internal buffer
217    fn buffer_bodies_response(&mut self, response: Vec<BlockResponse<B>>) {
218        // take into account capacity
219        let size = response.iter().map(BlockResponse::size).sum::<usize>() +
220            response.capacity() * mem::size_of::<BlockResponse<B>>();
221
222        let response = OrderedBodiesResponse { resp: response, size };
223        let response_len = response.len();
224
225        self.buffered_blocks_size_bytes += size;
226        self.buffered_responses.push(response);
227
228        self.metrics.buffered_blocks.increment(response_len as f64);
229        self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
230        self.metrics.buffered_responses.set(self.buffered_responses.len() as f64);
231    }
232
233    /// Returns a response if it's first block number matches the next expected.
234    fn try_next_buffered(&mut self) -> Option<Vec<BlockResponse<B>>> {
235        if let Some(next) = self.buffered_responses.peek() {
236            let expected = self.next_expected_block_number();
237            let next_block_range = next.block_range();
238
239            if next_block_range.contains(&expected) {
240                return self.pop_buffered_response().map(|buffered| {
241                    buffered
242                        .resp
243                        .into_iter()
244                        .skip_while(|b| b.block_number() < expected)
245                        .take_while(|b| self.download_range.contains(&b.block_number()))
246                        .collect()
247                })
248            }
249
250            // Drop buffered response since we passed that range
251            if *next_block_range.end() < expected {
252                self.pop_buffered_response();
253            }
254        }
255        None
256    }
257
258    /// Returns the next batch of block bodies that can be returned if we have enough buffered
259    /// bodies
260    fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse<B>>> {
261        if self.queued_bodies.len() >= self.stream_batch_size {
262            let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
263            self.queued_bodies.shrink_to_fit();
264            self.metrics.total_flushed.increment(next_batch.len() as u64);
265            self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
266            return Some(next_batch)
267        }
268        None
269    }
270
271    /// Check if a new request can be submitted, it implements back pressure to prevent overwhelming
272    /// the system and causing memory overload.
273    ///
274    /// Returns true if a new request can be submitted
275    fn can_submit_new_request(&self) -> bool {
276        // requests are issued in order but not necessarily finished in order, so the queued bodies
277        // can grow large if a certain request is slow, so we limit the followup requests if the
278        // queued bodies grew too large
279        self.queued_bodies.len() < 4 * self.stream_batch_size &&
280            self.has_buffer_capacity() &&
281            self.in_progress_queue.len() < self.concurrent_request_limit()
282    }
283}
284
285impl<B, C, Provider> BodiesDownloader<B, C, Provider>
286where
287    B: Block + 'static,
288    C: BodiesClient<Body = B::Body> + 'static,
289    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
290{
291    /// Spawns the downloader task via [`tokio::task::spawn`]
292    pub fn into_task(self) -> TaskDownloader<B> {
293        self.into_task_with(&TokioTaskExecutor::default())
294    }
295
296    /// Convert the downloader into a [`TaskDownloader`] by spawning it via the given spawner.
297    pub fn into_task_with<S>(self, spawner: &S) -> TaskDownloader<B>
298    where
299        S: TaskSpawner,
300    {
301        TaskDownloader::spawn_with(self, spawner)
302    }
303}
304
305impl<B, C, Provider> BodyDownloader for BodiesDownloader<B, C, Provider>
306where
307    B: Block + 'static,
308    C: BodiesClient<Body = B::Body> + 'static,
309    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
310{
311    type Block = B;
312
313    /// Set a new download range (exclusive).
314    ///
315    /// This method will drain all queued bodies, filter out ones outside the range and put them
316    /// back into the buffer.
317    /// If there are any bodies between the range start and last queued body that have not been
318    /// downloaded or are not in progress, they will be re-requested.
319    fn set_download_range(&mut self, range: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
320        // Check if the range is valid.
321        if range.is_empty() {
322            tracing::error!(target: "downloaders::bodies", ?range, "Bodies download range is invalid (empty)");
323            return Err(DownloadError::InvalidBodyRange { range })
324        }
325
326        // Check if the provided range is the subset of the existing range.
327        let is_current_range_subset = self.download_range.contains(range.start()) &&
328            *range.end() == *self.download_range.end();
329        if is_current_range_subset {
330            tracing::trace!(target: "downloaders::bodies", ?range, "Download range already in progress");
331            // The current range already includes requested.
332            return Ok(())
333        }
334
335        // Check if the provided range is the next expected range.
336        let count = *range.end() - *range.start() + 1; // range is inclusive
337        let is_next_consecutive_range = *range.start() == *self.download_range.end() + 1;
338        if is_next_consecutive_range {
339            // New range received.
340            tracing::trace!(target: "downloaders::bodies", ?range, "New download range set");
341            info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
342            self.download_range = range;
343            return Ok(())
344        }
345
346        // The block range is reset. This can happen either after unwind or after the bodies were
347        // written by external services (e.g. BlockchainTree).
348        tracing::trace!(target: "downloaders::bodies", ?range, prev_range = ?self.download_range, "Download range reset");
349        info!(target: "downloaders::bodies", count, ?range, "Downloading bodies");
350        self.clear();
351        self.download_range = range;
352        Ok(())
353    }
354}
355
356impl<B, C, Provider> Stream for BodiesDownloader<B, C, Provider>
357where
358    B: Block + 'static,
359    C: BodiesClient<Body = B::Body> + 'static,
360    Provider: HeaderProvider<Header = B::Header> + Unpin + 'static,
361{
362    type Item = BodyDownloaderResult<B>;
363
364    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365        let this = self.get_mut();
366        if this.is_terminated() {
367            return Poll::Ready(None)
368        }
369        // Submit new requests and poll any in progress
370        loop {
371            // Yield next batch if ready
372            if let Some(next_batch) = this.try_split_next_batch() {
373                return Poll::Ready(Some(Ok(next_batch)))
374            }
375
376            // Poll requests
377            while let Poll::Ready(Some(response)) = this.in_progress_queue.poll_next_unpin(cx) {
378                this.metrics.in_flight_requests.decrement(1.);
379                match response {
380                    Ok(response) => {
381                        this.buffer_bodies_response(response);
382                    }
383                    Err(error) => {
384                        tracing::debug!(target: "downloaders::bodies", %error, "Request failed");
385                        this.clear();
386                        return Poll::Ready(Some(Err(error)))
387                    }
388                };
389            }
390
391            // Loop exit condition
392            let mut new_request_submitted = false;
393            // Submit new requests
394            'inner: while this.can_submit_new_request() {
395                match this.next_headers_request() {
396                    Ok(Some(request)) => {
397                        this.metrics.in_flight_requests.increment(1.);
398                        this.in_progress_queue.push_new_request(
399                            Arc::clone(&this.client),
400                            Arc::clone(&this.consensus),
401                            request,
402                        );
403                        new_request_submitted = true;
404                    }
405                    Ok(None) => break 'inner,
406                    Err(error) => {
407                        tracing::error!(target: "downloaders::bodies", %error, "Failed to download from next request");
408                        this.clear();
409                        return Poll::Ready(Some(Err(error)))
410                    }
411                };
412            }
413
414            while let Some(buf_response) = this.try_next_buffered() {
415                this.queue_bodies(buf_response);
416            }
417
418            // shrink the buffer so that it doesn't grow indefinitely
419            this.buffered_responses.shrink_to_fit();
420
421            if !new_request_submitted {
422                break
423            }
424        }
425
426        // All requests are handled, stream is finished
427        if this.in_progress_queue.is_empty() {
428            if this.queued_bodies.is_empty() {
429                return Poll::Ready(None)
430            }
431            let batch_size = this.stream_batch_size.min(this.queued_bodies.len());
432            let next_batch = this.queued_bodies.drain(..batch_size).collect::<Vec<_>>();
433            this.queued_bodies.shrink_to_fit();
434            this.metrics.total_flushed.increment(next_batch.len() as u64);
435            this.metrics.queued_blocks.set(this.queued_bodies.len() as f64);
436            return Poll::Ready(Some(Ok(next_batch)))
437        }
438
439        Poll::Pending
440    }
441}
442
443#[derive(Debug)]
444struct OrderedBodiesResponse<B: Block> {
445    resp: Vec<BlockResponse<B>>,
446    /// The total size of the response in bytes
447    size: usize,
448}
449
450impl<B: Block> OrderedBodiesResponse<B> {
451    #[inline]
452    fn len(&self) -> usize {
453        self.resp.len()
454    }
455
456    /// Returns the size of the response in bytes
457    ///
458    /// See [`BlockResponse::size`]
459    #[inline]
460    const fn size(&self) -> usize {
461        self.size
462    }
463}
464
465impl<B: Block> OrderedBodiesResponse<B> {
466    /// Returns the block number of the first element
467    ///
468    /// # Panics
469    /// If the response vec is empty.
470    fn first_block_number(&self) -> u64 {
471        self.resp.first().expect("is not empty").block_number()
472    }
473
474    /// Returns the range of the block numbers in the response
475    ///
476    /// # Panics
477    /// If the response vec is empty.
478    fn block_range(&self) -> RangeInclusive<u64> {
479        self.first_block_number()..=self.resp.last().expect("is not empty").block_number()
480    }
481}
482
483impl<B: Block> PartialEq for OrderedBodiesResponse<B> {
484    fn eq(&self, other: &Self) -> bool {
485        self.first_block_number() == other.first_block_number()
486    }
487}
488
489impl<B: Block> Eq for OrderedBodiesResponse<B> {}
490
491impl<B: Block> PartialOrd for OrderedBodiesResponse<B> {
492    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
493        Some(self.cmp(other))
494    }
495}
496
497impl<B: Block> Ord for OrderedBodiesResponse<B> {
498    fn cmp(&self, other: &Self) -> Ordering {
499        self.first_block_number().cmp(&other.first_block_number()).reverse()
500    }
501}
502
503/// Builder for [`BodiesDownloader`].
504#[derive(Debug, Clone)]
505pub struct BodiesDownloaderBuilder {
506    /// The batch size of non-empty blocks per one request
507    pub request_limit: u64,
508    /// The maximum number of block bodies returned at once from the stream
509    pub stream_batch_size: usize,
510    /// Maximum number of bytes of received bodies to buffer internally.
511    pub max_buffered_blocks_size_bytes: usize,
512    /// The maximum number of requests to send concurrently.
513    pub concurrent_requests_range: RangeInclusive<usize>,
514}
515
516impl BodiesDownloaderBuilder {
517    /// Creates a new [`BodiesDownloaderBuilder`] with configurations based on the provided
518    /// [`BodiesConfig`].
519    pub fn new(config: BodiesConfig) -> Self {
520        Self::default()
521            .with_stream_batch_size(config.downloader_stream_batch_size)
522            .with_request_limit(config.downloader_request_limit)
523            .with_max_buffered_blocks_size_bytes(config.downloader_max_buffered_blocks_size_bytes)
524            .with_concurrent_requests_range(
525                config.downloader_min_concurrent_requests..=
526                    config.downloader_max_concurrent_requests,
527            )
528    }
529}
530
531impl Default for BodiesDownloaderBuilder {
532    fn default() -> Self {
533        Self {
534            request_limit: 200,
535            stream_batch_size: 1_000,
536            max_buffered_blocks_size_bytes: 2 * 1024 * 1024 * 1024, // ~2GB
537            concurrent_requests_range: 5..=100,
538        }
539    }
540}
541
542impl BodiesDownloaderBuilder {
543    /// Set request batch size on the downloader.
544    pub const fn with_request_limit(mut self, request_limit: u64) -> Self {
545        self.request_limit = request_limit;
546        self
547    }
548
549    /// Set stream batch size on the downloader.
550    pub const fn with_stream_batch_size(mut self, stream_batch_size: usize) -> Self {
551        self.stream_batch_size = stream_batch_size;
552        self
553    }
554
555    /// Set concurrent requests range on the downloader.
556    pub const fn with_concurrent_requests_range(
557        mut self,
558        concurrent_requests_range: RangeInclusive<usize>,
559    ) -> Self {
560        self.concurrent_requests_range = concurrent_requests_range;
561        self
562    }
563
564    /// Set max buffered block bytes on the downloader.
565    pub const fn with_max_buffered_blocks_size_bytes(
566        mut self,
567        max_buffered_blocks_size_bytes: usize,
568    ) -> Self {
569        self.max_buffered_blocks_size_bytes = max_buffered_blocks_size_bytes;
570        self
571    }
572
573    /// Consume self and return the concurrent downloader.
574    pub fn build<B, C, Provider>(
575        self,
576        client: C,
577        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
578        provider: Provider,
579    ) -> BodiesDownloader<B, C, Provider>
580    where
581        B: Block,
582        C: BodiesClient<Body = B::Body> + 'static,
583        Provider: HeaderProvider<Header = B::Header>,
584    {
585        let Self {
586            request_limit,
587            stream_batch_size,
588            concurrent_requests_range,
589            max_buffered_blocks_size_bytes,
590        } = self;
591        let metrics = BodyDownloaderMetrics::default();
592        let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
593        BodiesDownloader {
594            client: Arc::new(client),
595            consensus,
596            provider,
597            request_limit,
598            stream_batch_size,
599            max_buffered_blocks_size_bytes,
600            concurrent_requests_range,
601            in_progress_queue,
602            metrics,
603            download_range: RangeInclusive::new(1, 0),
604            latest_queued_block_number: None,
605            buffered_responses: Default::default(),
606            queued_bodies: Default::default(),
607            buffered_blocks_size_bytes: 0,
608        }
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use crate::{
616        bodies::test_utils::{insert_headers, zip_blocks},
617        test_utils::{generate_bodies, TestBodiesClient},
618    };
619    use alloy_primitives::B256;
620    use assert_matches::assert_matches;
621    use reth_chainspec::MAINNET;
622    use reth_consensus::test_utils::TestConsensus;
623    use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
624    use reth_provider::{
625        providers::StaticFileProvider, test_utils::MockNodeTypesWithDB, ProviderFactory,
626    };
627    use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
628    use std::collections::HashMap;
629
630    // Check that the blocks are emitted in order of block number, not in order of
631    // first-downloaded
632    #[tokio::test]
633    async fn streams_bodies_in_order() {
634        // Generate some random blocks
635        let db = create_test_rw_db();
636        let (headers, mut bodies) = generate_bodies(0..=19);
637
638        insert_headers(db.db(), &headers);
639
640        let client = Arc::new(
641            TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
642        );
643        let (_static_dir, static_dir_path) = create_test_static_files_dir();
644
645        let mut downloader = BodiesDownloaderBuilder::default()
646            .build::<reth_ethereum_primitives::Block, _, _>(
647                client.clone(),
648                Arc::new(TestConsensus::default()),
649                ProviderFactory::<MockNodeTypesWithDB>::new(
650                    db,
651                    MAINNET.clone(),
652                    StaticFileProvider::read_write(static_dir_path).unwrap(),
653                ),
654            );
655        downloader.set_download_range(0..=19).expect("failed to set download range");
656
657        assert_matches!(
658            downloader.next().await,
659            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter(), &mut bodies))
660        );
661        assert_eq!(client.times_requested(), 1);
662    }
663
664    // Check that the number of times requested equals to the number of headers divided by request
665    // limit.
666    #[tokio::test]
667    async fn requests_correct_number_of_times() {
668        // Generate some random blocks
669        let db = create_test_rw_db();
670        let mut rng = generators::rng();
671        let blocks = random_block_range(
672            &mut rng,
673            0..=199,
674            BlockRangeParams { parent: Some(B256::ZERO), tx_count: 1..2, ..Default::default() },
675        );
676
677        let headers = blocks.iter().map(|block| block.clone_sealed_header()).collect::<Vec<_>>();
678        let bodies = blocks
679            .into_iter()
680            .map(|block| (block.hash(), block.into_body()))
681            .collect::<HashMap<_, _>>();
682
683        insert_headers(db.db(), &headers);
684
685        let request_limit = 10;
686        let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
687        let (_static_dir, static_dir_path) = create_test_static_files_dir();
688
689        let mut downloader = BodiesDownloaderBuilder::default()
690            .with_request_limit(request_limit)
691            .build::<reth_ethereum_primitives::Block, _, _>(
692            client.clone(),
693            Arc::new(TestConsensus::default()),
694            ProviderFactory::<MockNodeTypesWithDB>::new(
695                db,
696                MAINNET.clone(),
697                StaticFileProvider::read_write(static_dir_path).unwrap(),
698            ),
699        );
700        downloader.set_download_range(0..=199).expect("failed to set download range");
701
702        let _ = downloader.collect::<Vec<_>>().await;
703        assert_eq!(client.times_requested(), 20);
704    }
705
706    // Check that bodies are returned in correct order
707    // after resetting the download range multiple times.
708    #[tokio::test]
709    async fn streams_bodies_in_order_after_range_reset() {
710        // Generate some random blocks
711        let db = create_test_rw_db();
712        let (headers, mut bodies) = generate_bodies(0..=99);
713
714        insert_headers(db.db(), &headers);
715
716        let stream_batch_size = 20;
717        let request_limit = 10;
718        let client = Arc::new(
719            TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true),
720        );
721        let (_static_dir, static_dir_path) = create_test_static_files_dir();
722        let mut downloader = BodiesDownloaderBuilder::default()
723            .with_stream_batch_size(stream_batch_size)
724            .with_request_limit(request_limit)
725            .build::<reth_ethereum_primitives::Block, _, _>(
726                client.clone(),
727                Arc::new(TestConsensus::default()),
728                ProviderFactory::<MockNodeTypesWithDB>::new(
729                    db,
730                    MAINNET.clone(),
731                    StaticFileProvider::read_write(static_dir_path).unwrap(),
732                ),
733            );
734
735        let mut range_start = 0;
736        while range_start < 100 {
737            downloader.set_download_range(range_start..=99).expect("failed to set download range");
738
739            assert_matches!(
740                downloader.next().await,
741                Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(range_start as usize).take(stream_batch_size), &mut bodies))
742            );
743            assert!(downloader.latest_queued_block_number >= Some(range_start));
744            range_start += stream_batch_size as u64;
745        }
746    }
747
748    // Check that the downloader picks up the new range and downloads bodies after previous range
749    // was completed.
750    #[tokio::test]
751    async fn can_download_new_range_after_termination() {
752        // Generate some random blocks
753        let db = create_test_rw_db();
754        let (headers, mut bodies) = generate_bodies(0..=199);
755
756        insert_headers(db.db(), &headers);
757
758        let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
759        let (_static_dir, static_dir_path) = create_test_static_files_dir();
760
761        let mut downloader = BodiesDownloaderBuilder::default()
762            .with_stream_batch_size(100)
763            .build::<reth_ethereum_primitives::Block, _, _>(
764            client.clone(),
765            Arc::new(TestConsensus::default()),
766            ProviderFactory::<MockNodeTypesWithDB>::new(
767                db,
768                MAINNET.clone(),
769                StaticFileProvider::read_write(static_dir_path).unwrap(),
770            ),
771        );
772
773        // Set and download the first range
774        downloader.set_download_range(0..=99).expect("failed to set download range");
775        assert_matches!(
776            downloader.next().await,
777            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
778        );
779
780        // Check that the stream is terminated
781        assert!(downloader.next().await.is_none());
782
783        // Set and download the second range
784        downloader.set_download_range(100..=199).expect("failed to set download range");
785        assert_matches!(
786            downloader.next().await,
787            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies))
788        );
789    }
790
791    // Check that the downloader continues after the size limit is reached.
792    #[tokio::test]
793    async fn can_download_after_exceeding_limit() {
794        // Generate some random blocks
795        let db = create_test_rw_db();
796        let (headers, mut bodies) = generate_bodies(0..=199);
797
798        insert_headers(db.db(), &headers);
799
800        let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone()));
801
802        let (_static_dir, static_dir_path) = create_test_static_files_dir();
803        // Set the max buffered block size to 1 byte, to make sure that every response exceeds the
804        // limit
805        let mut downloader = BodiesDownloaderBuilder::default()
806            .with_stream_batch_size(10)
807            .with_request_limit(1)
808            .with_max_buffered_blocks_size_bytes(1)
809            .build::<reth_ethereum_primitives::Block, _, _>(
810                client.clone(),
811                Arc::new(TestConsensus::default()),
812                ProviderFactory::<MockNodeTypesWithDB>::new(
813                    db,
814                    MAINNET.clone(),
815                    StaticFileProvider::read_write(static_dir_path).unwrap(),
816                ),
817            );
818
819        // Set and download the entire range
820        downloader.set_download_range(0..=199).expect("failed to set download range");
821        let mut header = 0;
822        while let Some(Ok(resp)) = downloader.next().await {
823            assert_eq!(resp, zip_blocks(headers.iter().skip(header).take(resp.len()), &mut bodies));
824            header += resp.len();
825        }
826    }
827
828    // Check that the downloader can tolerate a few completely empty responses
829    #[tokio::test]
830    async fn can_tolerate_empty_responses() {
831        // Generate some random blocks
832        let db = create_test_rw_db();
833        let (headers, mut bodies) = generate_bodies(0..=99);
834
835        insert_headers(db.db(), &headers);
836
837        // respond with empty bodies for every other request.
838        let client = Arc::new(
839            TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2),
840        );
841        let (_static_dir, static_dir_path) = create_test_static_files_dir();
842
843        let mut downloader = BodiesDownloaderBuilder::default()
844            .with_request_limit(3)
845            .with_stream_batch_size(100)
846            .build::<reth_ethereum_primitives::Block, _, _>(
847                client.clone(),
848                Arc::new(TestConsensus::default()),
849                ProviderFactory::<MockNodeTypesWithDB>::new(
850                    db,
851                    MAINNET.clone(),
852                    StaticFileProvider::read_write(static_dir_path).unwrap(),
853                ),
854            );
855
856        // Download the requested range
857        downloader.set_download_range(0..=99).expect("failed to set download range");
858        assert_matches!(
859            downloader.next().await,
860            Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies))
861        );
862    }
863}