reth_downloaders/bodies/
request.rs

1use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
2use alloy_consensus::BlockHeader;
3use alloy_primitives::B256;
4use futures::{Future, FutureExt};
5use reth_consensus::{Consensus, ConsensusError};
6use reth_network_p2p::{
7    bodies::{client::BodiesClient, response::BlockResponse},
8    error::{DownloadError, DownloadResult},
9    priority::Priority,
10};
11use reth_network_peers::{PeerId, WithPeerId};
12use reth_primitives_traits::{Block, GotExpected, InMemorySize, SealedBlock, SealedHeader};
13use std::{
14    collections::VecDeque,
15    pin::Pin,
16    sync::Arc,
17    task::{ready, Context, Poll},
18};
19
20/// Body request implemented as a [Future].
21///
22/// The future will poll the underlying request until fulfilled.
23/// If the response arrived with insufficient number of bodies, the future
24/// will issue another request until all bodies are collected.
25///
26/// It then proceeds to verify the downloaded bodies. In case of a validation error,
27/// the future will start over.
28///
29/// The future will filter out any empty headers (see [`alloy_consensus::Header::is_empty`]) from
30/// the request. If [`BodiesRequestFuture`] was initialized with all empty headers, no request will
31/// be dispatched and they will be immediately returned upon polling.
32///
33/// NB: This assumes that peers respond with bodies in the order that they were requested.
34/// This is a reasonable assumption to make as that's [what Geth
35/// does](https://github.com/ethereum/go-ethereum/blob/f53ff0ff4a68ffc56004ab1d5cc244bcb64d3277/les/server_requests.go#L245).
36/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
37/// that try to give us bodies that do not match the requested order are going to be penalized
38/// and eventually disconnected.
39pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
40    client: Arc<C>,
41    consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
42    metrics: BodyDownloaderMetrics,
43    /// Metrics for individual responses. This can be used to observe how the size (in bytes) of
44    /// responses change while bodies are being downloaded.
45    response_metrics: ResponseMetrics,
46    // Headers to download. The collection is shrunk as responses are buffered.
47    pending_headers: VecDeque<SealedHeader<B::Header>>,
48    /// Internal buffer for all blocks
49    buffer: Vec<BlockResponse<B>>,
50    fut: Option<C::Output>,
51    /// Tracks how many bodies we requested in the last request.
52    last_request_len: Option<usize>,
53}
54
55impl<B, C> BodiesRequestFuture<B, C>
56where
57    B: Block,
58    C: BodiesClient<Body = B::Body> + 'static,
59{
60    /// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
61    pub(crate) fn new(
62        client: Arc<C>,
63        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
64        metrics: BodyDownloaderMetrics,
65    ) -> Self {
66        Self {
67            client,
68            consensus,
69            metrics,
70            response_metrics: Default::default(),
71            pending_headers: Default::default(),
72            buffer: Default::default(),
73            last_request_len: None,
74            fut: None,
75        }
76    }
77
78    pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<B::Header>>) -> Self {
79        self.buffer.reserve_exact(headers.len());
80        self.pending_headers = VecDeque::from(headers);
81        // Submit the request only if there are any headers to download.
82        // Otherwise, the future will immediately be resolved.
83        if let Some(req) = self.next_request() {
84            self.submit_request(req, Priority::Normal);
85        }
86        self
87    }
88
89    fn on_error(&mut self, error: DownloadError, peer_id: Option<PeerId>) {
90        self.metrics.increment_errors(&error);
91        tracing::debug!(target: "downloaders::bodies", ?peer_id, %error, "Error requesting bodies");
92        if let Some(peer_id) = peer_id {
93            self.client.report_bad_message(peer_id);
94        }
95        self.submit_request(
96            self.next_request().expect("existing hashes to resubmit"),
97            Priority::High,
98        );
99    }
100
101    /// Retrieve header hashes for the next request.
102    fn next_request(&self) -> Option<Vec<B256>> {
103        let mut hashes =
104            self.pending_headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).peekable();
105        hashes.peek().is_some().then(|| hashes.collect())
106    }
107
108    /// Submit the request with the given priority.
109    fn submit_request(&mut self, req: Vec<B256>, priority: Priority) {
110        tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
111        let client = Arc::clone(&self.client);
112        self.last_request_len = Some(req.len());
113        self.fut = Some(client.get_block_bodies_with_priority(req, priority));
114    }
115
116    /// Process block response.
117    /// Returns an error if the response is invalid.
118    fn on_block_response(&mut self, response: WithPeerId<Vec<B::Body>>) -> DownloadResult<()>
119    where
120        B::Body: InMemorySize,
121    {
122        let (peer_id, bodies) = response.split();
123        let request_len = self.last_request_len.unwrap_or_default();
124        let response_len = bodies.len();
125
126        tracing::trace!(target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies");
127
128        // Increment total downloaded metric
129        self.metrics.total_downloaded.increment(response_len as u64);
130
131        // TODO: Malicious peers often return a single block even if it does not exceed the soft
132        // response limit (2MB). This could be penalized by checking if this block and the
133        // next one exceed the soft response limit, if not then peer either does not have the next
134        // block or deliberately sent a single block.
135        if bodies.is_empty() {
136            return Err(DownloadError::EmptyResponse)
137        }
138
139        if response_len > request_len {
140            return Err(DownloadError::TooManyBodies(GotExpected {
141                got: response_len,
142                expected: request_len,
143            }))
144        }
145
146        // Buffer block responses
147        self.try_buffer_blocks(bodies)?;
148
149        // Submit next request if any
150        if let Some(req) = self.next_request() {
151            self.submit_request(req, Priority::High);
152        } else {
153            self.fut = None;
154        }
155
156        Ok(())
157    }
158
159    /// Attempt to buffer body responses. Returns an error if body response fails validation.
160    /// Every body preceding the failed one will be buffered.
161    ///
162    /// This method removes headers from the internal collection.
163    /// If the response fails validation, then the header will be put back.
164    fn try_buffer_blocks(&mut self, bodies: Vec<C::Body>) -> DownloadResult<()>
165    where
166        C::Body: InMemorySize,
167    {
168        let bodies_len = bodies.len();
169        let mut bodies = bodies.into_iter().peekable();
170
171        let mut total_size = 0;
172        while bodies.peek().is_some() {
173            let next_header = match self.pending_headers.pop_front() {
174                Some(header) => header,
175                None => return Ok(()), // no more headers
176            };
177
178            if next_header.is_empty() {
179                self.buffer.push(BlockResponse::Empty(next_header));
180            } else {
181                let next_body = bodies.next().unwrap();
182
183                // increment full block body metric
184                total_size += next_body.size();
185
186                let block = SealedBlock::from_sealed_parts(next_header, next_body);
187
188                if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
189                    // Body is invalid, put the header back and return an error
190                    let hash = block.hash();
191                    let number = block.number();
192                    self.pending_headers.push_front(block.into_sealed_header());
193                    return Err(DownloadError::BodyValidation {
194                        hash,
195                        number,
196                        error: Box::new(error),
197                    })
198                }
199
200                self.buffer.push(BlockResponse::Full(block));
201            }
202        }
203
204        // Increment per-response metric
205        self.response_metrics.response_size_bytes.set(total_size as f64);
206        self.response_metrics.response_length.set(bodies_len as f64);
207
208        Ok(())
209    }
210}
211
212impl<B, C> Future for BodiesRequestFuture<B, C>
213where
214    B: Block + 'static,
215    C: BodiesClient<Body = B::Body> + 'static,
216{
217    type Output = DownloadResult<Vec<BlockResponse<B>>>;
218
219    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
220        let this = self.get_mut();
221
222        loop {
223            if this.pending_headers.is_empty() {
224                return Poll::Ready(Ok(std::mem::take(&mut this.buffer)))
225            }
226
227            // Check if there is a pending requests. It might not exist if all
228            // headers are empty and there is nothing to download.
229            if let Some(fut) = this.fut.as_mut() {
230                match ready!(fut.poll_unpin(cx)) {
231                    Ok(response) => {
232                        let peer_id = response.peer_id();
233                        if let Err(error) = this.on_block_response(response) {
234                            this.on_error(error, Some(peer_id));
235                        }
236                    }
237                    Err(error) => {
238                        if error.is_channel_closed() {
239                            return Poll::Ready(Err(error.into()))
240                        }
241
242                        this.on_error(error.into(), None);
243                    }
244                }
245            }
246
247            // Buffer any empty headers
248            while this.pending_headers.front().is_some_and(|h| h.is_empty()) {
249                let header = this.pending_headers.pop_front().unwrap();
250                this.buffer.push(BlockResponse::Empty(header));
251            }
252        }
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::{
260        bodies::test_utils::zip_blocks,
261        test_utils::{generate_bodies, TestBodiesClient},
262    };
263    use reth_consensus::test_utils::TestConsensus;
264    use reth_ethereum_primitives::Block;
265    use reth_testing_utils::{generators, generators::random_header_range};
266
267    /// Check if future returns empty bodies without dispatching any requests.
268    #[tokio::test]
269    async fn request_returns_empty_bodies() {
270        let mut rng = generators::rng();
271        let headers = random_header_range(&mut rng, 0..20, B256::ZERO);
272
273        let client = Arc::new(TestBodiesClient::default());
274        let fut = BodiesRequestFuture::<Block, _>::new(
275            client.clone(),
276            Arc::new(TestConsensus::default()),
277            BodyDownloaderMetrics::default(),
278        )
279        .with_headers(headers.clone());
280
281        assert_eq!(
282            fut.await.unwrap(),
283            headers.into_iter().map(BlockResponse::Empty).collect::<Vec<_>>()
284        );
285        assert_eq!(client.times_requested(), 0);
286    }
287
288    /// Check that the request future
289    #[tokio::test]
290    async fn request_submits_until_fulfilled() {
291        // Generate some random blocks
292        let (headers, mut bodies) = generate_bodies(0..=19);
293
294        let batch_size = 2;
295        let client = Arc::new(
296            TestBodiesClient::default().with_bodies(bodies.clone()).with_max_batch_size(batch_size),
297        );
298        let fut = BodiesRequestFuture::<Block, _>::new(
299            client.clone(),
300            Arc::new(TestConsensus::default()),
301            BodyDownloaderMetrics::default(),
302        )
303        .with_headers(headers.clone());
304
305        assert_eq!(fut.await.unwrap(), zip_blocks(headers.iter(), &mut bodies));
306        assert_eq!(
307            client.times_requested(),
308            // div_ceild
309            (headers.into_iter().filter(|h| !h.is_empty()).count() as u64).div_ceil(2)
310        );
311    }
312}