Skip to main content

reth_downloaders/bodies/
queue.rs

1use super::request::BodiesRequestFuture;
2use crate::metrics::BodyDownloaderMetrics;
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::{stream::FuturesUnordered, Stream};
6use futures_util::StreamExt;
7use reth_consensus::Consensus;
8use reth_network_p2p::{
9    bodies::{client::BodiesClient, response::BlockResponse},
10    error::DownloadResult,
11};
12use reth_primitives_traits::{Block, SealedHeader};
13use std::{
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17};
18
19/// The wrapper around [`FuturesUnordered`] that keeps information
20/// about the blocks currently being requested.
21#[derive(Debug)]
22pub(crate) struct BodiesRequestQueue<B: Block, C: BodiesClient<Body = B::Body>> {
23    /// Inner body request queue.
24    inner: FuturesUnordered<BodiesRequestFuture<B, C>>,
25    /// The downloader metrics.
26    metrics: BodyDownloaderMetrics,
27    /// Last requested block number.
28    pub(crate) last_requested_block_number: Option<BlockNumber>,
29}
30
31impl<B, C> BodiesRequestQueue<B, C>
32where
33    B: Block,
34    C: BodiesClient<Body = B::Body> + 'static,
35{
36    /// Create new instance of request queue.
37    pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
38        Self { metrics, inner: Default::default(), last_requested_block_number: None }
39    }
40
41    /// Returns `true` if the queue is empty.
42    pub(crate) fn is_empty(&self) -> bool {
43        self.inner.is_empty()
44    }
45
46    /// Returns the number of queued requests.
47    pub(crate) fn len(&self) -> usize {
48        self.inner.len()
49    }
50
51    /// Clears the inner queue and related data.
52    pub(crate) fn clear(&mut self) {
53        self.inner.clear();
54        self.last_requested_block_number.take();
55        self.metrics.clear();
56    }
57    /// Add new request to the queue.
58    /// Expects a sorted list of headers.
59    pub(crate) fn push_new_request(
60        &mut self,
61        client: Arc<C>,
62        consensus: Arc<dyn Consensus<B>>,
63        request: Vec<SealedHeader<B::Header>>,
64    ) {
65        // Set last max requested block number
66        self.last_requested_block_number = request
67            .last()
68            .map(|last| match self.last_requested_block_number {
69                Some(num) => last.number().max(num),
70                None => last.number(),
71            })
72            .or(self.last_requested_block_number);
73
74        // Create request and push into the queue.
75        self.inner.push(
76            BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request),
77        )
78    }
79}
80
81impl<B, C> Stream for BodiesRequestQueue<B, C>
82where
83    B: Block + 'static,
84    C: BodiesClient<Body = B::Body> + 'static,
85{
86    type Item = DownloadResult<Vec<BlockResponse<B>>>;
87
88    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89        self.get_mut().inner.poll_next_unpin(cx)
90    }
91}