reth_downloaders/bodies/
queue.rs1use super::request::BodiesRequestFuture;
2use crate::metrics::BodyDownloaderMetrics;
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::{stream::FuturesUnordered, Stream};
6use futures_util::StreamExt;
7use reth_consensus::Consensus;
8use reth_network_p2p::{
9 bodies::{client::BodiesClient, response::BlockResponse},
10 error::DownloadResult,
11};
12use reth_primitives_traits::{Block, SealedHeader};
13use std::{
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll},
17};
18
19#[derive(Debug)]
22pub(crate) struct BodiesRequestQueue<B: Block, C: BodiesClient<Body = B::Body>> {
23 inner: FuturesUnordered<BodiesRequestFuture<B, C>>,
25 metrics: BodyDownloaderMetrics,
27 pub(crate) last_requested_block_number: Option<BlockNumber>,
29}
30
31impl<B, C> BodiesRequestQueue<B, C>
32where
33 B: Block,
34 C: BodiesClient<Body = B::Body> + 'static,
35{
36 pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
38 Self { metrics, inner: Default::default(), last_requested_block_number: None }
39 }
40
41 pub(crate) fn is_empty(&self) -> bool {
43 self.inner.is_empty()
44 }
45
46 pub(crate) fn len(&self) -> usize {
48 self.inner.len()
49 }
50
51 pub(crate) fn clear(&mut self) {
53 self.inner.clear();
54 self.last_requested_block_number.take();
55 self.metrics.clear();
56 }
57 pub(crate) fn push_new_request(
60 &mut self,
61 client: Arc<C>,
62 consensus: Arc<dyn Consensus<B>>,
63 request: Vec<SealedHeader<B::Header>>,
64 ) {
65 self.last_requested_block_number = request
67 .last()
68 .map(|last| match self.last_requested_block_number {
69 Some(num) => last.number().max(num),
70 None => last.number(),
71 })
72 .or(self.last_requested_block_number);
73
74 self.inner.push(
76 BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request),
77 )
78 }
79}
80
81impl<B, C> Stream for BodiesRequestQueue<B, C>
82where
83 B: Block + 'static,
84 C: BodiesClient<Body = B::Body> + 'static,
85{
86 type Item = DownloadResult<Vec<BlockResponse<B>>>;
87
88 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
89 self.get_mut().inner.poll_next_unpin(cx)
90 }
91}