1use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
2use alloy_consensus::BlockHeader;
3use alloy_primitives::B256;
4use futures::{Future, FutureExt};
5use reth_consensus::{Consensus, ConsensusError};
6use reth_network_p2p::{
7 bodies::{client::BodiesClient, response::BlockResponse},
8 error::{DownloadError, DownloadResult},
9 priority::Priority,
10};
11use reth_network_peers::{PeerId, WithPeerId};
12use reth_primitives_traits::{Block, GotExpected, InMemorySize, SealedBlock, SealedHeader};
13use std::{
14 collections::VecDeque,
15 pin::Pin,
16 sync::Arc,
17 task::{ready, Context, Poll},
18};
19
20pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
40 client: Arc<C>,
41 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
42 metrics: BodyDownloaderMetrics,
43 response_metrics: ResponseMetrics,
46 pending_headers: VecDeque<SealedHeader<B::Header>>,
48 buffer: Vec<BlockResponse<B>>,
50 fut: Option<C::Output>,
51 last_request_len: Option<usize>,
53}
54
55impl<B, C> BodiesRequestFuture<B, C>
56where
57 B: Block,
58 C: BodiesClient<Body = B::Body> + 'static,
59{
60 pub(crate) fn new(
62 client: Arc<C>,
63 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
64 metrics: BodyDownloaderMetrics,
65 ) -> Self {
66 Self {
67 client,
68 consensus,
69 metrics,
70 response_metrics: Default::default(),
71 pending_headers: Default::default(),
72 buffer: Default::default(),
73 last_request_len: None,
74 fut: None,
75 }
76 }
77
78 pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<B::Header>>) -> Self {
79 self.buffer.reserve_exact(headers.len());
80 self.pending_headers = VecDeque::from(headers);
81 if let Some(req) = self.next_request() {
84 self.submit_request(req, Priority::Normal);
85 }
86 self
87 }
88
89 fn on_error(&mut self, error: DownloadError, peer_id: Option<PeerId>) {
90 self.metrics.increment_errors(&error);
91 tracing::debug!(target: "downloaders::bodies", ?peer_id, %error, "Error requesting bodies");
92 if let Some(peer_id) = peer_id {
93 self.client.report_bad_message(peer_id);
94 }
95 self.submit_request(
96 self.next_request().expect("existing hashes to resubmit"),
97 Priority::High,
98 );
99 }
100
101 fn next_request(&self) -> Option<Vec<B256>> {
103 let mut hashes =
104 self.pending_headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).peekable();
105 hashes.peek().is_some().then(|| hashes.collect())
106 }
107
108 fn submit_request(&mut self, req: Vec<B256>, priority: Priority) {
110 tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
111 let client = Arc::clone(&self.client);
112 self.last_request_len = Some(req.len());
113 self.fut = Some(client.get_block_bodies_with_priority(req, priority));
114 }
115
116 fn on_block_response(&mut self, response: WithPeerId<Vec<B::Body>>) -> DownloadResult<()>
119 where
120 B::Body: InMemorySize,
121 {
122 let (peer_id, bodies) = response.split();
123 let request_len = self.last_request_len.unwrap_or_default();
124 let response_len = bodies.len();
125
126 tracing::trace!(target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies");
127
128 self.metrics.total_downloaded.increment(response_len as u64);
130
131 if bodies.is_empty() {
136 return Err(DownloadError::EmptyResponse)
137 }
138
139 if response_len > request_len {
140 return Err(DownloadError::TooManyBodies(GotExpected {
141 got: response_len,
142 expected: request_len,
143 }))
144 }
145
146 self.try_buffer_blocks(bodies)?;
148
149 if let Some(req) = self.next_request() {
151 self.submit_request(req, Priority::High);
152 } else {
153 self.fut = None;
154 }
155
156 Ok(())
157 }
158
159 fn try_buffer_blocks(&mut self, bodies: Vec<C::Body>) -> DownloadResult<()>
165 where
166 C::Body: InMemorySize,
167 {
168 let bodies_len = bodies.len();
169 let mut bodies = bodies.into_iter().peekable();
170
171 let mut total_size = 0;
172 while bodies.peek().is_some() {
173 let next_header = match self.pending_headers.pop_front() {
174 Some(header) => header,
175 None => return Ok(()), };
177
178 if next_header.is_empty() {
179 self.buffer.push(BlockResponse::Empty(next_header));
180 } else {
181 let next_body = bodies.next().unwrap();
182
183 total_size += next_body.size();
185
186 let block = SealedBlock::from_sealed_parts(next_header, next_body);
187
188 if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
189 let hash = block.hash();
191 let number = block.number();
192 self.pending_headers.push_front(block.into_sealed_header());
193 return Err(DownloadError::BodyValidation {
194 hash,
195 number,
196 error: Box::new(error),
197 })
198 }
199
200 self.buffer.push(BlockResponse::Full(block));
201 }
202 }
203
204 self.response_metrics.response_size_bytes.set(total_size as f64);
206 self.response_metrics.response_length.set(bodies_len as f64);
207
208 Ok(())
209 }
210}
211
212impl<B, C> Future for BodiesRequestFuture<B, C>
213where
214 B: Block + 'static,
215 C: BodiesClient<Body = B::Body> + 'static,
216{
217 type Output = DownloadResult<Vec<BlockResponse<B>>>;
218
219 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
220 let this = self.get_mut();
221
222 loop {
223 if this.pending_headers.is_empty() {
224 return Poll::Ready(Ok(std::mem::take(&mut this.buffer)))
225 }
226
227 if let Some(fut) = this.fut.as_mut() {
230 match ready!(fut.poll_unpin(cx)) {
231 Ok(response) => {
232 let peer_id = response.peer_id();
233 if let Err(error) = this.on_block_response(response) {
234 this.on_error(error, Some(peer_id));
235 }
236 }
237 Err(error) => {
238 if error.is_channel_closed() {
239 return Poll::Ready(Err(error.into()))
240 }
241
242 this.on_error(error.into(), None);
243 }
244 }
245 }
246
247 while this.pending_headers.front().is_some_and(|h| h.is_empty()) {
249 let header = this.pending_headers.pop_front().unwrap();
250 this.buffer.push(BlockResponse::Empty(header));
251 }
252 }
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::{
260 bodies::test_utils::zip_blocks,
261 test_utils::{generate_bodies, TestBodiesClient},
262 };
263 use reth_consensus::test_utils::TestConsensus;
264 use reth_ethereum_primitives::Block;
265 use reth_testing_utils::{generators, generators::random_header_range};
266
267 #[tokio::test]
269 async fn request_returns_empty_bodies() {
270 let mut rng = generators::rng();
271 let headers = random_header_range(&mut rng, 0..20, B256::ZERO);
272
273 let client = Arc::new(TestBodiesClient::default());
274 let fut = BodiesRequestFuture::<Block, _>::new(
275 client.clone(),
276 Arc::new(TestConsensus::default()),
277 BodyDownloaderMetrics::default(),
278 )
279 .with_headers(headers.clone());
280
281 assert_eq!(
282 fut.await.unwrap(),
283 headers.into_iter().map(BlockResponse::Empty).collect::<Vec<_>>()
284 );
285 assert_eq!(client.times_requested(), 0);
286 }
287
288 #[tokio::test]
290 async fn request_submits_until_fulfilled() {
291 let (headers, mut bodies) = generate_bodies(0..=19);
293
294 let batch_size = 2;
295 let client = Arc::new(
296 TestBodiesClient::default().with_bodies(bodies.clone()).with_max_batch_size(batch_size),
297 );
298 let fut = BodiesRequestFuture::<Block, _>::new(
299 client.clone(),
300 Arc::new(TestConsensus::default()),
301 BodyDownloaderMetrics::default(),
302 )
303 .with_headers(headers.clone());
304
305 assert_eq!(fut.await.unwrap(), zip_blocks(headers.iter(), &mut bodies));
306 assert_eq!(
307 client.times_requested(),
308 (headers.into_iter().filter(|h| !h.is_empty()).count() as u64).div_ceil(2)
310 );
311 }
312}