1use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
2use alloy_consensus::BlockHeader;
3use alloy_primitives::B256;
4use futures::{Future, FutureExt};
5use reth_consensus::{Consensus, ConsensusError};
6use reth_network_p2p::{
7 bodies::{client::BodiesClient, response::BlockResponse},
8 error::{DownloadError, DownloadResult},
9 priority::Priority,
10};
11use reth_network_peers::{PeerId, WithPeerId};
12use reth_primitives::{BlockBody, GotExpected, SealedBlock, SealedHeader};
13use reth_primitives_traits::{Block, InMemorySize};
14use std::{
15 collections::VecDeque,
16 mem,
17 pin::Pin,
18 sync::Arc,
19 task::{ready, Context, Poll},
20};
21
22pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
42 client: Arc<C>,
43 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
44 metrics: BodyDownloaderMetrics,
45 response_metrics: ResponseMetrics,
48 pending_headers: VecDeque<SealedHeader<B::Header>>,
50 buffer: Vec<BlockResponse<B>>,
52 fut: Option<C::Output>,
53 last_request_len: Option<usize>,
55}
56
57impl<B, C> BodiesRequestFuture<B, C>
58where
59 B: Block,
60 C: BodiesClient<Body = B::Body> + 'static,
61{
62 pub(crate) fn new(
64 client: Arc<C>,
65 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
66 metrics: BodyDownloaderMetrics,
67 ) -> Self {
68 Self {
69 client,
70 consensus,
71 metrics,
72 response_metrics: Default::default(),
73 pending_headers: Default::default(),
74 buffer: Default::default(),
75 last_request_len: None,
76 fut: None,
77 }
78 }
79
80 pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<B::Header>>) -> Self {
81 self.buffer.reserve_exact(headers.len());
82 self.pending_headers = VecDeque::from(headers);
83 if let Some(req) = self.next_request() {
86 self.submit_request(req, Priority::Normal);
87 }
88 self
89 }
90
91 fn on_error(&mut self, error: DownloadError, peer_id: Option<PeerId>) {
92 self.metrics.increment_errors(&error);
93 tracing::debug!(target: "downloaders::bodies", ?peer_id, %error, "Error requesting bodies");
94 if let Some(peer_id) = peer_id {
95 self.client.report_bad_message(peer_id);
96 }
97 self.submit_request(
98 self.next_request().expect("existing hashes to resubmit"),
99 Priority::High,
100 );
101 }
102
103 fn next_request(&self) -> Option<Vec<B256>> {
105 let mut hashes =
106 self.pending_headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).peekable();
107 hashes.peek().is_some().then(|| hashes.collect())
108 }
109
110 fn submit_request(&mut self, req: Vec<B256>, priority: Priority) {
112 tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
113 let client = Arc::clone(&self.client);
114 self.last_request_len = Some(req.len());
115 self.fut = Some(client.get_block_bodies_with_priority(req, priority));
116 }
117
118 fn on_block_response(&mut self, response: WithPeerId<Vec<B::Body>>) -> DownloadResult<()>
121 where
122 B::Body: InMemorySize,
123 {
124 let (peer_id, bodies) = response.split();
125 let request_len = self.last_request_len.unwrap_or_default();
126 let response_len = bodies.len();
127
128 tracing::trace!(target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies");
129
130 self.metrics.total_downloaded.increment(response_len as u64);
132
133 if bodies.is_empty() {
138 return Err(DownloadError::EmptyResponse)
139 }
140
141 if response_len > request_len {
142 return Err(DownloadError::TooManyBodies(GotExpected {
143 got: response_len,
144 expected: request_len,
145 }))
146 }
147
148 self.try_buffer_blocks(bodies)?;
150
151 if let Some(req) = self.next_request() {
153 self.submit_request(req, Priority::High);
154 } else {
155 self.fut = None;
156 }
157
158 Ok(())
159 }
160
161 fn try_buffer_blocks(&mut self, bodies: Vec<C::Body>) -> DownloadResult<()>
167 where
168 C::Body: InMemorySize,
169 {
170 let bodies_capacity = bodies.capacity();
171 let bodies_len = bodies.len();
172 let mut bodies = bodies.into_iter().peekable();
173
174 let mut total_size = bodies_capacity * mem::size_of::<BlockBody>();
175 while bodies.peek().is_some() {
176 let next_header = match self.pending_headers.pop_front() {
177 Some(header) => header,
178 None => return Ok(()), };
180
181 if next_header.is_empty() {
182 total_size += mem::size_of::<C::Body>();
184 self.buffer.push(BlockResponse::Empty(next_header));
185 } else {
186 let next_body = bodies.next().unwrap();
187
188 total_size += next_body.size();
190
191 let block = SealedBlock::from_sealed_parts(next_header, next_body);
192
193 if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
194 let hash = block.hash();
196 let number = block.number();
197 self.pending_headers.push_front(block.into_sealed_header());
198 return Err(DownloadError::BodyValidation {
199 hash,
200 number,
201 error: Box::new(error),
202 })
203 }
204
205 self.buffer.push(BlockResponse::Full(block));
206 }
207 }
208
209 self.response_metrics.response_size_bytes.set(total_size as f64);
211 self.response_metrics.response_length.set(bodies_len as f64);
212
213 Ok(())
214 }
215}
216
217impl<B, C> Future for BodiesRequestFuture<B, C>
218where
219 B: Block + 'static,
220 C: BodiesClient<Body = B::Body> + 'static,
221{
222 type Output = DownloadResult<Vec<BlockResponse<B>>>;
223
224 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
225 let this = self.get_mut();
226
227 loop {
228 if this.pending_headers.is_empty() {
229 return Poll::Ready(Ok(std::mem::take(&mut this.buffer)))
230 }
231
232 if let Some(fut) = this.fut.as_mut() {
235 match ready!(fut.poll_unpin(cx)) {
236 Ok(response) => {
237 let peer_id = response.peer_id();
238 if let Err(error) = this.on_block_response(response) {
239 this.on_error(error, Some(peer_id));
240 }
241 }
242 Err(error) => {
243 if error.is_channel_closed() {
244 return Poll::Ready(Err(error.into()))
245 }
246
247 this.on_error(error.into(), None);
248 }
249 }
250 }
251
252 while this.pending_headers.front().is_some_and(|h| h.is_empty()) {
254 let header = this.pending_headers.pop_front().unwrap();
255 this.buffer.push(BlockResponse::Empty(header));
256 }
257 }
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use crate::{
265 bodies::test_utils::zip_blocks,
266 test_utils::{generate_bodies, TestBodiesClient},
267 };
268 use reth_consensus::test_utils::TestConsensus;
269 use reth_testing_utils::{generators, generators::random_header_range};
270
271 #[tokio::test]
273 async fn request_returns_empty_bodies() {
274 let mut rng = generators::rng();
275 let headers = random_header_range(&mut rng, 0..20, B256::ZERO);
276
277 let client = Arc::new(TestBodiesClient::default());
278 let fut = BodiesRequestFuture::<reth_primitives::Block, _>::new(
279 client.clone(),
280 Arc::new(TestConsensus::default()),
281 BodyDownloaderMetrics::default(),
282 )
283 .with_headers(headers.clone());
284
285 assert_eq!(
286 fut.await.unwrap(),
287 headers.into_iter().map(BlockResponse::Empty).collect::<Vec<_>>()
288 );
289 assert_eq!(client.times_requested(), 0);
290 }
291
292 #[tokio::test]
294 async fn request_submits_until_fulfilled() {
295 let (headers, mut bodies) = generate_bodies(0..=19);
297
298 let batch_size = 2;
299 let client = Arc::new(
300 TestBodiesClient::default().with_bodies(bodies.clone()).with_max_batch_size(batch_size),
301 );
302 let fut = BodiesRequestFuture::<reth_primitives::Block, _>::new(
303 client.clone(),
304 Arc::new(TestConsensus::default()),
305 BodyDownloaderMetrics::default(),
306 )
307 .with_headers(headers.clone());
308
309 assert_eq!(fut.await.unwrap(), zip_blocks(headers.iter(), &mut bodies));
310 assert_eq!(
311 client.times_requested(),
312 (headers.into_iter().filter(|h| !h.is_empty()).count() as u64).div_ceil(2)
314 );
315 }
316}