reth_downloaders/bodies/
queue.rs

1use super::request::BodiesRequestFuture;
2use crate::metrics::BodyDownloaderMetrics;
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::{stream::FuturesUnordered, Stream};
6use futures_util::StreamExt;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9    bodies::{client::BodiesClient, response::BlockResponse},
10    error::DownloadResult,
11};
12use reth_primitives_traits::{Block, SealedHeader};
13use std::{
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17};
18
19/// The wrapper around [`FuturesUnordered`] that keeps information
20/// about the blocks currently being requested.
21#[derive(Debug)]
22pub(crate) struct BodiesRequestQueue<B: Block, C: BodiesClient<Body = B::Body>> {
23    /// Inner body request queue.
24    inner: FuturesUnordered<BodiesRequestFuture<B, C>>,
25    /// The downloader metrics.
26    metrics: BodyDownloaderMetrics,
27    /// Last requested block number.
28    pub(crate) last_requested_block_number: Option<BlockNumber>,
29}
30
31impl<B, C> BodiesRequestQueue<B, C>
32where
33    B: Block,
34    C: BodiesClient<Body = B::Body> + 'static,
35{
36    /// Create new instance of request queue.
37    pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
38        Self { metrics, inner: Default::default(), last_requested_block_number: None }
39    }
40
41    /// Returns `true` if the queue is empty.
42    pub(crate) fn is_empty(&self) -> bool {
43        self.inner.is_empty()
44    }
45
46    /// Returns the number of queued requests.
47    pub(crate) fn len(&self) -> usize {
48        self.inner.len()
49    }
50
51    /// Clears the inner queue and related data.
52    pub(crate) fn clear(&mut self) {
53        self.inner.clear();
54        self.last_requested_block_number.take();
55    }
56    /// Add new request to the queue.
57    /// Expects a sorted list of headers.
58    pub(crate) fn push_new_request(
59        &mut self,
60        client: Arc<C>,
61        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
62        request: Vec<SealedHeader<B::Header>>,
63    ) {
64        // Set last max requested block number
65        self.last_requested_block_number = request
66            .last()
67            .map(|last| match self.last_requested_block_number {
68                Some(num) => last.number().max(num),
69                None => last.number(),
70            })
71            .or(self.last_requested_block_number);
72
73        // Create request and push into the queue.
74        self.inner.push(
75            BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request),
76        )
77    }
78}
79
80impl<B, C> Stream for BodiesRequestQueue<B, C>
81where
82    B: Block + 'static,
83    C: BodiesClient<Body = B::Body> + 'static,
84{
85    type Item = DownloadResult<Vec<BlockResponse<B>>>;
86
87    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88        self.get_mut().inner.poll_next_unpin(cx)
89    }
90}