reth_downloaders/bodies/
request.rs

1use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
2use alloy_consensus::BlockHeader;
3use alloy_primitives::B256;
4use futures::{Future, FutureExt};
5use reth_consensus::{Consensus, ConsensusError};
6use reth_network_p2p::{
7    bodies::{client::BodiesClient, response::BlockResponse},
8    error::{DownloadError, DownloadResult},
9    priority::Priority,
10};
11use reth_network_peers::{PeerId, WithPeerId};
12use reth_primitives::{BlockBody, GotExpected, SealedBlock, SealedHeader};
13use reth_primitives_traits::{Block, InMemorySize};
14use std::{
15    collections::VecDeque,
16    mem,
17    pin::Pin,
18    sync::Arc,
19    task::{ready, Context, Poll},
20};
21
22/// Body request implemented as a [Future].
23///
24/// The future will poll the underlying request until fulfilled.
25/// If the response arrived with insufficient number of bodies, the future
26/// will issue another request until all bodies are collected.
27///
28/// It then proceeds to verify the downloaded bodies. In case of a validation error,
29/// the future will start over.
30///
31/// The future will filter out any empty headers (see [`alloy_consensus::Header::is_empty`]) from
32/// the request. If [`BodiesRequestFuture`] was initialized with all empty headers, no request will
33/// be dispatched and they will be immediately returned upon polling.
34///
35/// NB: This assumes that peers respond with bodies in the order that they were requested.
36/// This is a reasonable assumption to make as that's [what Geth
37/// does](https://github.com/ethereum/go-ethereum/blob/f53ff0ff4a68ffc56004ab1d5cc244bcb64d3277/les/server_requests.go#L245).
38/// All errors regarding the response cause the peer to get penalized, meaning that adversaries
39/// that try to give us bodies that do not match the requested order are going to be penalized
40/// and eventually disconnected.
41pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
42    client: Arc<C>,
43    consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
44    metrics: BodyDownloaderMetrics,
45    /// Metrics for individual responses. This can be used to observe how the size (in bytes) of
46    /// responses change while bodies are being downloaded.
47    response_metrics: ResponseMetrics,
48    // Headers to download. The collection is shrunk as responses are buffered.
49    pending_headers: VecDeque<SealedHeader<B::Header>>,
50    /// Internal buffer for all blocks
51    buffer: Vec<BlockResponse<B>>,
52    fut: Option<C::Output>,
53    /// Tracks how many bodies we requested in the last request.
54    last_request_len: Option<usize>,
55}
56
57impl<B, C> BodiesRequestFuture<B, C>
58where
59    B: Block,
60    C: BodiesClient<Body = B::Body> + 'static,
61{
62    /// Returns an empty future. Use [`BodiesRequestFuture::with_headers`] to set the request.
63    pub(crate) fn new(
64        client: Arc<C>,
65        consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
66        metrics: BodyDownloaderMetrics,
67    ) -> Self {
68        Self {
69            client,
70            consensus,
71            metrics,
72            response_metrics: Default::default(),
73            pending_headers: Default::default(),
74            buffer: Default::default(),
75            last_request_len: None,
76            fut: None,
77        }
78    }
79
80    pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<B::Header>>) -> Self {
81        self.buffer.reserve_exact(headers.len());
82        self.pending_headers = VecDeque::from(headers);
83        // Submit the request only if there are any headers to download.
84        // Otherwise, the future will immediately be resolved.
85        if let Some(req) = self.next_request() {
86            self.submit_request(req, Priority::Normal);
87        }
88        self
89    }
90
91    fn on_error(&mut self, error: DownloadError, peer_id: Option<PeerId>) {
92        self.metrics.increment_errors(&error);
93        tracing::debug!(target: "downloaders::bodies", ?peer_id, %error, "Error requesting bodies");
94        if let Some(peer_id) = peer_id {
95            self.client.report_bad_message(peer_id);
96        }
97        self.submit_request(
98            self.next_request().expect("existing hashes to resubmit"),
99            Priority::High,
100        );
101    }
102
103    /// Retrieve header hashes for the next request.
104    fn next_request(&self) -> Option<Vec<B256>> {
105        let mut hashes =
106            self.pending_headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).peekable();
107        hashes.peek().is_some().then(|| hashes.collect())
108    }
109
110    /// Submit the request with the given priority.
111    fn submit_request(&mut self, req: Vec<B256>, priority: Priority) {
112        tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
113        let client = Arc::clone(&self.client);
114        self.last_request_len = Some(req.len());
115        self.fut = Some(client.get_block_bodies_with_priority(req, priority));
116    }
117
118    /// Process block response.
119    /// Returns an error if the response is invalid.
120    fn on_block_response(&mut self, response: WithPeerId<Vec<B::Body>>) -> DownloadResult<()>
121    where
122        B::Body: InMemorySize,
123    {
124        let (peer_id, bodies) = response.split();
125        let request_len = self.last_request_len.unwrap_or_default();
126        let response_len = bodies.len();
127
128        tracing::trace!(target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies");
129
130        // Increment total downloaded metric
131        self.metrics.total_downloaded.increment(response_len as u64);
132
133        // TODO: Malicious peers often return a single block even if it does not exceed the soft
134        // response limit (2MB). This could be penalized by checking if this block and the
135        // next one exceed the soft response limit, if not then peer either does not have the next
136        // block or deliberately sent a single block.
137        if bodies.is_empty() {
138            return Err(DownloadError::EmptyResponse)
139        }
140
141        if response_len > request_len {
142            return Err(DownloadError::TooManyBodies(GotExpected {
143                got: response_len,
144                expected: request_len,
145            }))
146        }
147
148        // Buffer block responses
149        self.try_buffer_blocks(bodies)?;
150
151        // Submit next request if any
152        if let Some(req) = self.next_request() {
153            self.submit_request(req, Priority::High);
154        } else {
155            self.fut = None;
156        }
157
158        Ok(())
159    }
160
161    /// Attempt to buffer body responses. Returns an error if body response fails validation.
162    /// Every body preceding the failed one will be buffered.
163    ///
164    /// This method removes headers from the internal collection.
165    /// If the response fails validation, then the header will be put back.
166    fn try_buffer_blocks(&mut self, bodies: Vec<C::Body>) -> DownloadResult<()>
167    where
168        C::Body: InMemorySize,
169    {
170        let bodies_capacity = bodies.capacity();
171        let bodies_len = bodies.len();
172        let mut bodies = bodies.into_iter().peekable();
173
174        let mut total_size = bodies_capacity * mem::size_of::<BlockBody>();
175        while bodies.peek().is_some() {
176            let next_header = match self.pending_headers.pop_front() {
177                Some(header) => header,
178                None => return Ok(()), // no more headers
179            };
180
181            if next_header.is_empty() {
182                // increment empty block body metric
183                total_size += mem::size_of::<C::Body>();
184                self.buffer.push(BlockResponse::Empty(next_header));
185            } else {
186                let next_body = bodies.next().unwrap();
187
188                // increment full block body metric
189                total_size += next_body.size();
190
191                let block = SealedBlock::from_sealed_parts(next_header, next_body);
192
193                if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
194                    // Body is invalid, put the header back and return an error
195                    let hash = block.hash();
196                    let number = block.number();
197                    self.pending_headers.push_front(block.into_sealed_header());
198                    return Err(DownloadError::BodyValidation {
199                        hash,
200                        number,
201                        error: Box::new(error),
202                    })
203                }
204
205                self.buffer.push(BlockResponse::Full(block));
206            }
207        }
208
209        // Increment per-response metric
210        self.response_metrics.response_size_bytes.set(total_size as f64);
211        self.response_metrics.response_length.set(bodies_len as f64);
212
213        Ok(())
214    }
215}
216
217impl<B, C> Future for BodiesRequestFuture<B, C>
218where
219    B: Block + 'static,
220    C: BodiesClient<Body = B::Body> + 'static,
221{
222    type Output = DownloadResult<Vec<BlockResponse<B>>>;
223
224    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
225        let this = self.get_mut();
226
227        loop {
228            if this.pending_headers.is_empty() {
229                return Poll::Ready(Ok(std::mem::take(&mut this.buffer)))
230            }
231
232            // Check if there is a pending requests. It might not exist if all
233            // headers are empty and there is nothing to download.
234            if let Some(fut) = this.fut.as_mut() {
235                match ready!(fut.poll_unpin(cx)) {
236                    Ok(response) => {
237                        let peer_id = response.peer_id();
238                        if let Err(error) = this.on_block_response(response) {
239                            this.on_error(error, Some(peer_id));
240                        }
241                    }
242                    Err(error) => {
243                        if error.is_channel_closed() {
244                            return Poll::Ready(Err(error.into()))
245                        }
246
247                        this.on_error(error.into(), None);
248                    }
249                }
250            }
251
252            // Buffer any empty headers
253            while this.pending_headers.front().is_some_and(|h| h.is_empty()) {
254                let header = this.pending_headers.pop_front().unwrap();
255                this.buffer.push(BlockResponse::Empty(header));
256            }
257        }
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use crate::{
265        bodies::test_utils::zip_blocks,
266        test_utils::{generate_bodies, TestBodiesClient},
267    };
268    use reth_consensus::test_utils::TestConsensus;
269    use reth_testing_utils::{generators, generators::random_header_range};
270
271    /// Check if future returns empty bodies without dispatching any requests.
272    #[tokio::test]
273    async fn request_returns_empty_bodies() {
274        let mut rng = generators::rng();
275        let headers = random_header_range(&mut rng, 0..20, B256::ZERO);
276
277        let client = Arc::new(TestBodiesClient::default());
278        let fut = BodiesRequestFuture::<reth_primitives::Block, _>::new(
279            client.clone(),
280            Arc::new(TestConsensus::default()),
281            BodyDownloaderMetrics::default(),
282        )
283        .with_headers(headers.clone());
284
285        assert_eq!(
286            fut.await.unwrap(),
287            headers.into_iter().map(BlockResponse::Empty).collect::<Vec<_>>()
288        );
289        assert_eq!(client.times_requested(), 0);
290    }
291
292    /// Check that the request future
293    #[tokio::test]
294    async fn request_submits_until_fulfilled() {
295        // Generate some random blocks
296        let (headers, mut bodies) = generate_bodies(0..=19);
297
298        let batch_size = 2;
299        let client = Arc::new(
300            TestBodiesClient::default().with_bodies(bodies.clone()).with_max_batch_size(batch_size),
301        );
302        let fut = BodiesRequestFuture::<reth_primitives::Block, _>::new(
303            client.clone(),
304            Arc::new(TestConsensus::default()),
305            BodyDownloaderMetrics::default(),
306        )
307        .with_headers(headers.clone());
308
309        assert_eq!(fut.await.unwrap(), zip_blocks(headers.iter(), &mut bodies));
310        assert_eq!(
311            client.times_requested(),
312            // div_ceild
313            (headers.into_iter().filter(|h| !h.is_empty()).count() as u64).div_ceil(2)
314        );
315    }
316}