reth_downloaders/bodies/
queue.rs
1use super::request::BodiesRequestFuture;
2use crate::metrics::BodyDownloaderMetrics;
3use alloy_consensus::BlockHeader;
4use alloy_primitives::BlockNumber;
5use futures::{stream::FuturesUnordered, Stream};
6use futures_util::StreamExt;
7use reth_consensus::{Consensus, ConsensusError};
8use reth_network_p2p::{
9 bodies::{client::BodiesClient, response::BlockResponse},
10 error::DownloadResult,
11};
12use reth_primitives_traits::{Block, SealedHeader};
13use std::{
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll},
17};
18
19#[derive(Debug)]
22pub(crate) struct BodiesRequestQueue<B: Block, C: BodiesClient<Body = B::Body>> {
23 inner: FuturesUnordered<BodiesRequestFuture<B, C>>,
25 metrics: BodyDownloaderMetrics,
27 pub(crate) last_requested_block_number: Option<BlockNumber>,
29}
30
31impl<B, C> BodiesRequestQueue<B, C>
32where
33 B: Block,
34 C: BodiesClient<Body = B::Body> + 'static,
35{
36 pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
38 Self { metrics, inner: Default::default(), last_requested_block_number: None }
39 }
40
41 pub(crate) fn is_empty(&self) -> bool {
43 self.inner.is_empty()
44 }
45
46 pub(crate) fn len(&self) -> usize {
48 self.inner.len()
49 }
50
51 pub(crate) fn clear(&mut self) {
53 self.inner.clear();
54 self.last_requested_block_number.take();
55 }
56 pub(crate) fn push_new_request(
59 &mut self,
60 client: Arc<C>,
61 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
62 request: Vec<SealedHeader<B::Header>>,
63 ) {
64 self.last_requested_block_number = request
66 .last()
67 .map(|last| match self.last_requested_block_number {
68 Some(num) => last.number().max(num),
69 None => last.number(),
70 })
71 .or(self.last_requested_block_number);
72
73 self.inner.push(
75 BodiesRequestFuture::new(client, consensus, self.metrics.clone()).with_headers(request),
76 )
77 }
78}
79
80impl<B, C> Stream for BodiesRequestQueue<B, C>
81where
82 B: Block + 'static,
83 C: BodiesClient<Body = B::Body> + 'static,
84{
85 type Item = DownloadResult<Vec<BlockResponse<B>>>;
86
87 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88 self.get_mut().inner.poll_next_unpin(cx)
89 }
90}