1use crate::metrics::{BodyDownloaderMetrics, ResponseMetrics};
2use alloy_consensus::BlockHeader;
3use alloy_primitives::B256;
4use futures::{Future, FutureExt};
5use reth_consensus::{Consensus, ConsensusError};
6use reth_network_p2p::{
7 bodies::{client::BodiesClient, response::BlockResponse},
8 error::{DownloadError, DownloadResult},
9 priority::Priority,
10};
11use reth_network_peers::{PeerId, WithPeerId};
12use reth_primitives_traits::{Block, GotExpected, InMemorySize, SealedBlock, SealedHeader};
13use std::{
14 collections::VecDeque,
15 mem,
16 pin::Pin,
17 sync::Arc,
18 task::{ready, Context, Poll},
19};
20
21pub(crate) struct BodiesRequestFuture<B: Block, C: BodiesClient<Body = B::Body>> {
41 client: Arc<C>,
42 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
43 metrics: BodyDownloaderMetrics,
44 response_metrics: ResponseMetrics,
47 pending_headers: VecDeque<SealedHeader<B::Header>>,
49 buffer: Vec<BlockResponse<B>>,
51 fut: Option<C::Output>,
52 last_request_len: Option<usize>,
54}
55
56impl<B, C> BodiesRequestFuture<B, C>
57where
58 B: Block,
59 C: BodiesClient<Body = B::Body> + 'static,
60{
61 pub(crate) fn new(
63 client: Arc<C>,
64 consensus: Arc<dyn Consensus<B, Error = ConsensusError>>,
65 metrics: BodyDownloaderMetrics,
66 ) -> Self {
67 Self {
68 client,
69 consensus,
70 metrics,
71 response_metrics: Default::default(),
72 pending_headers: Default::default(),
73 buffer: Default::default(),
74 last_request_len: None,
75 fut: None,
76 }
77 }
78
79 pub(crate) fn with_headers(mut self, headers: Vec<SealedHeader<B::Header>>) -> Self {
80 self.buffer.reserve_exact(headers.len());
81 self.pending_headers = VecDeque::from(headers);
82 if let Some(req) = self.next_request() {
85 self.submit_request(req, Priority::Normal);
86 }
87 self
88 }
89
90 fn on_error(&mut self, error: DownloadError, peer_id: Option<PeerId>) {
91 self.metrics.increment_errors(&error);
92 tracing::debug!(target: "downloaders::bodies", ?peer_id, %error, "Error requesting bodies");
93 if let Some(peer_id) = peer_id {
94 self.client.report_bad_message(peer_id);
95 }
96 self.submit_request(
97 self.next_request().expect("existing hashes to resubmit"),
98 Priority::High,
99 );
100 }
101
102 fn next_request(&self) -> Option<Vec<B256>> {
104 let mut hashes =
105 self.pending_headers.iter().filter(|h| !h.is_empty()).map(|h| h.hash()).peekable();
106 hashes.peek().is_some().then(|| hashes.collect())
107 }
108
109 fn submit_request(&mut self, req: Vec<B256>, priority: Priority) {
111 tracing::trace!(target: "downloaders::bodies", request_len = req.len(), "Requesting bodies");
112 let client = Arc::clone(&self.client);
113 self.last_request_len = Some(req.len());
114 self.fut = Some(client.get_block_bodies_with_priority(req, priority));
115 }
116
117 fn on_block_response(&mut self, response: WithPeerId<Vec<B::Body>>) -> DownloadResult<()>
120 where
121 B::Body: InMemorySize,
122 {
123 let (peer_id, bodies) = response.split();
124 let request_len = self.last_request_len.unwrap_or_default();
125 let response_len = bodies.len();
126
127 tracing::trace!(target: "downloaders::bodies", request_len, response_len, ?peer_id, "Received bodies");
128
129 self.metrics.total_downloaded.increment(response_len as u64);
131
132 if bodies.is_empty() {
137 return Err(DownloadError::EmptyResponse)
138 }
139
140 if response_len > request_len {
141 return Err(DownloadError::TooManyBodies(GotExpected {
142 got: response_len,
143 expected: request_len,
144 }))
145 }
146
147 self.try_buffer_blocks(bodies)?;
149
150 if let Some(req) = self.next_request() {
152 self.submit_request(req, Priority::High);
153 } else {
154 self.fut = None;
155 }
156
157 Ok(())
158 }
159
160 fn try_buffer_blocks(&mut self, bodies: Vec<C::Body>) -> DownloadResult<()>
166 where
167 C::Body: InMemorySize,
168 {
169 let bodies_capacity = bodies.capacity();
170 let bodies_len = bodies.len();
171 let mut bodies = bodies.into_iter().peekable();
172
173 let mut total_size = bodies_capacity * mem::size_of::<C::Body>();
174 while bodies.peek().is_some() {
175 let next_header = match self.pending_headers.pop_front() {
176 Some(header) => header,
177 None => return Ok(()), };
179
180 if next_header.is_empty() {
181 total_size += mem::size_of::<C::Body>();
183 self.buffer.push(BlockResponse::Empty(next_header));
184 } else {
185 let next_body = bodies.next().unwrap();
186
187 total_size += next_body.size();
189
190 let block = SealedBlock::from_sealed_parts(next_header, next_body);
191
192 if let Err(error) = self.consensus.validate_block_pre_execution(&block) {
193 let hash = block.hash();
195 let number = block.number();
196 self.pending_headers.push_front(block.into_sealed_header());
197 return Err(DownloadError::BodyValidation {
198 hash,
199 number,
200 error: Box::new(error),
201 })
202 }
203
204 self.buffer.push(BlockResponse::Full(block));
205 }
206 }
207
208 self.response_metrics.response_size_bytes.set(total_size as f64);
210 self.response_metrics.response_length.set(bodies_len as f64);
211
212 Ok(())
213 }
214}
215
216impl<B, C> Future for BodiesRequestFuture<B, C>
217where
218 B: Block + 'static,
219 C: BodiesClient<Body = B::Body> + 'static,
220{
221 type Output = DownloadResult<Vec<BlockResponse<B>>>;
222
223 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
224 let this = self.get_mut();
225
226 loop {
227 if this.pending_headers.is_empty() {
228 return Poll::Ready(Ok(std::mem::take(&mut this.buffer)))
229 }
230
231 if let Some(fut) = this.fut.as_mut() {
234 match ready!(fut.poll_unpin(cx)) {
235 Ok(response) => {
236 let peer_id = response.peer_id();
237 if let Err(error) = this.on_block_response(response) {
238 this.on_error(error, Some(peer_id));
239 }
240 }
241 Err(error) => {
242 if error.is_channel_closed() {
243 return Poll::Ready(Err(error.into()))
244 }
245
246 this.on_error(error.into(), None);
247 }
248 }
249 }
250
251 while this.pending_headers.front().is_some_and(|h| h.is_empty()) {
253 let header = this.pending_headers.pop_front().unwrap();
254 this.buffer.push(BlockResponse::Empty(header));
255 }
256 }
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use super::*;
263 use crate::{
264 bodies::test_utils::zip_blocks,
265 test_utils::{generate_bodies, TestBodiesClient},
266 };
267 use reth_consensus::test_utils::TestConsensus;
268 use reth_ethereum_primitives::Block;
269 use reth_testing_utils::{generators, generators::random_header_range};
270
271 #[tokio::test]
273 async fn request_returns_empty_bodies() {
274 let mut rng = generators::rng();
275 let headers = random_header_range(&mut rng, 0..20, B256::ZERO);
276
277 let client = Arc::new(TestBodiesClient::default());
278 let fut = BodiesRequestFuture::<Block, _>::new(
279 client.clone(),
280 Arc::new(TestConsensus::default()),
281 BodyDownloaderMetrics::default(),
282 )
283 .with_headers(headers.clone());
284
285 assert_eq!(
286 fut.await.unwrap(),
287 headers.into_iter().map(BlockResponse::Empty).collect::<Vec<_>>()
288 );
289 assert_eq!(client.times_requested(), 0);
290 }
291
292 #[tokio::test]
294 async fn request_submits_until_fulfilled() {
295 let (headers, mut bodies) = generate_bodies(0..=19);
297
298 let batch_size = 2;
299 let client = Arc::new(
300 TestBodiesClient::default().with_bodies(bodies.clone()).with_max_batch_size(batch_size),
301 );
302 let fut = BodiesRequestFuture::<Block, _>::new(
303 client.clone(),
304 Arc::new(TestConsensus::default()),
305 BodyDownloaderMetrics::default(),
306 )
307 .with_headers(headers.clone());
308
309 assert_eq!(fut.await.unwrap(), zip_blocks(headers.iter(), &mut bodies));
310 assert_eq!(
311 client.times_requested(),
312 (headers.into_iter().filter(|h| !h.is_empty()).count() as u64).div_ceil(2)
314 );
315 }
316}