1use super::headers::client::HeadersRequest;
2use crate::{
3 bodies::client::{BodiesClient, SingleBodyRequest},
4 error::PeerRequestResult,
5 headers::client::{HeadersClient, SingleHeaderRequest},
6 BlockClient,
7};
8use alloy_consensus::BlockHeader;
9use alloy_primitives::{Sealable, B256};
10use reth_consensus::{Consensus, ConsensusError};
11use reth_eth_wire_types::HeadersDirection;
12use reth_network_peers::WithPeerId;
13use reth_primitives_traits::{SealedBlock, SealedHeader};
14use std::{
15 cmp::Reverse,
16 collections::{HashMap, VecDeque},
17 fmt::Debug,
18 future::Future,
19 hash::Hash,
20 pin::Pin,
21 sync::Arc,
22 task::{ready, Context, Poll},
23};
24use tracing::debug;
25
26#[derive(Debug, Clone)]
28pub struct FullBlockClient<Client>
29where
30 Client: BlockClient,
31{
32 client: Client,
33 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
34}
35
36impl<Client> FullBlockClient<Client>
37where
38 Client: BlockClient,
39{
40 pub fn new(
42 client: Client,
43 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
44 ) -> Self {
45 Self { client, consensus }
46 }
47
48 #[cfg(any(test, feature = "test-utils"))]
50 pub fn test_client(client: Client) -> Self {
51 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
52 }
53}
54
55impl<Client> FullBlockClient<Client>
56where
57 Client: BlockClient,
58{
59 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
66 let client = self.client.clone();
67 FetchFullBlockFuture {
68 hash,
69 consensus: self.consensus.clone(),
70 request: FullBlockRequest {
71 header: Some(client.get_header(hash.into())),
72 body: Some(client.get_block_body(hash)),
73 },
74 client,
75 header: None,
76 body: None,
77 }
78 }
79
80 pub fn get_full_block_range(
90 &self,
91 hash: B256,
92 count: u64,
93 ) -> FetchFullBlockRangeFuture<Client> {
94 let client = self.client.clone();
95 FetchFullBlockRangeFuture {
96 start_hash: hash,
97 count,
98 request: FullBlockRangeRequest {
99 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
100 bodies: None,
101 },
102 client,
103 headers: None,
104 pending_headers: VecDeque::new(),
105 bodies: HashMap::default(),
106 consensus: Arc::clone(&self.consensus),
107 }
108 }
109}
110
111#[must_use = "futures do nothing unless polled"]
116pub struct FetchFullBlockFuture<Client>
117where
118 Client: BlockClient,
119{
120 client: Client,
121 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
122 hash: B256,
123 request: FullBlockRequest<Client>,
124 header: Option<SealedHeader<Client::Header>>,
125 body: Option<BodyResponse<Client::Body>>,
126}
127
128impl<Client> FetchFullBlockFuture<Client>
129where
130 Client: BlockClient<Header: BlockHeader>,
131{
132 pub const fn hash(&self) -> &B256 {
134 &self.hash
135 }
136
137 pub fn block_number(&self) -> Option<u64> {
139 self.header.as_ref().map(|h| h.number())
140 }
141
142 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
144 if self.header.is_none() || self.body.is_none() {
145 return None
146 }
147
148 let header = self.header.take().unwrap();
149 let resp = self.body.take().unwrap();
150 match resp {
151 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
152 BodyResponse::PendingValidation(resp) => {
153 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
155 {
156 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
157 self.client.report_bad_message(resp.peer_id());
158 self.header = Some(header);
159 self.request.body = Some(self.client.get_block_body(self.hash));
160 return None
161 }
162 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
163 }
164 }
165 }
166
167 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
168 if let Some(ref header) = self.header {
169 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
170 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
171 self.client.report_bad_message(resp.peer_id());
172 return
173 }
174 self.body = Some(BodyResponse::Validated(resp.into_data()));
175 return
176 }
177 self.body = Some(BodyResponse::PendingValidation(resp));
178 }
179}
180
181impl<Client> Future for FetchFullBlockFuture<Client>
182where
183 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
184{
185 type Output = SealedBlock<Client::Block>;
186
187 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188 let this = self.get_mut();
189
190 let mut budget = 4;
192
193 loop {
194 match ready!(this.request.poll(cx)) {
195 ResponseResult::Header(res) => {
196 match res {
197 Ok(maybe_header) => {
198 let (peer, maybe_header) =
199 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
200 if let Some(header) = maybe_header {
201 if header.hash() == this.hash {
202 this.header = Some(header);
203 } else {
204 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
205 this.client.report_bad_message(peer)
207 }
208 }
209 }
210 Err(err) => {
211 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
212 }
213 }
214
215 if this.header.is_none() {
216 this.request.header = Some(this.client.get_header(this.hash.into()));
218 }
219 }
220 ResponseResult::Body(res) => {
221 match res {
222 Ok(maybe_body) => {
223 if let Some(body) = maybe_body.transpose() {
224 this.on_block_response(body);
225 }
226 }
227 Err(err) => {
228 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
229 }
230 }
231 if this.body.is_none() {
232 this.request.body = Some(this.client.get_block_body(this.hash));
234 }
235 }
236 }
237
238 if let Some(res) = this.take_block() {
239 return Poll::Ready(res)
240 }
241
242 budget -= 1;
244 if budget == 0 {
245 cx.waker().wake_by_ref();
247 return Poll::Pending
248 }
249 }
250 }
251}
252
253impl<Client> Debug for FetchFullBlockFuture<Client>
254where
255 Client: BlockClient<Header: Debug, Body: Debug>,
256{
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("FetchFullBlockFuture")
259 .field("hash", &self.hash)
260 .field("header", &self.header)
261 .field("body", &self.body)
262 .finish()
263 }
264}
265
266struct FullBlockRequest<Client>
267where
268 Client: BlockClient,
269{
270 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
271 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
272}
273
274impl<Client> FullBlockRequest<Client>
275where
276 Client: BlockClient,
277{
278 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
279 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
280 if let Poll::Ready(res) = fut.poll(cx) {
281 self.header = None;
282 return Poll::Ready(ResponseResult::Header(res))
283 }
284 }
285
286 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
287 if let Poll::Ready(res) = fut.poll(cx) {
288 self.body = None;
289 return Poll::Ready(ResponseResult::Body(res))
290 }
291 }
292
293 Poll::Pending
294 }
295}
296
297enum ResponseResult<H, B> {
300 Header(PeerRequestResult<Option<H>>),
301 Body(PeerRequestResult<Option<B>>),
302}
303
304#[derive(Debug)]
306enum BodyResponse<B> {
307 Validated(B),
309 PendingValidation(WithPeerId<B>),
311}
312#[must_use = "futures do nothing unless polled"]
325#[expect(missing_debug_implementations)]
326pub struct FetchFullBlockRangeFuture<Client>
327where
328 Client: BlockClient,
329{
330 client: Client,
332 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
334 start_hash: B256,
336 count: u64,
338 request: FullBlockRangeRequest<Client>,
340 headers: Option<Vec<SealedHeader<Client::Header>>>,
342 pending_headers: VecDeque<SealedHeader<Client::Header>>,
344 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
346}
347
348impl<Client> FetchFullBlockRangeFuture<Client>
349where
350 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
351{
352 pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
354 self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
355 }
356
357 fn is_bodies_complete(&self) -> bool {
359 self.bodies.len() == self.count as usize
360 }
361
362 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
366 if let Some(header) = self.pending_headers.pop_front() {
367 self.bodies.insert(header, body_response);
368 }
369 }
370
371 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
373 for body in bodies {
374 self.insert_body(body);
375 }
376 }
377
378 fn remaining_bodies_hashes(&self) -> Vec<B256> {
381 self.pending_headers.iter().map(|h| h.hash()).collect()
382 }
383
384 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
392 if !self.is_bodies_complete() {
393 return None
395 }
396
397 let headers = self.headers.take()?;
398 let mut needs_retry = false;
399 let mut valid_responses = Vec::new();
400
401 for header in &headers {
402 if let Some(body_resp) = self.bodies.remove(header) {
403 let body = match body_resp {
405 BodyResponse::Validated(body) => body,
406 BodyResponse::PendingValidation(resp) => {
407 if let Err(err) =
409 self.consensus.validate_body_against_header(resp.data(), header)
410 {
411 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
412 self.client.report_bad_message(resp.peer_id());
413
414 self.pending_headers.push_back(header.clone());
416 needs_retry = true;
417 continue
418 }
419
420 resp.into_data()
421 }
422 };
423
424 valid_responses
425 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
426 }
427 }
428
429 if needs_retry {
430 for block in valid_responses {
433 let (header, body) = block.split_sealed_header_body();
434 self.bodies.insert(header, BodyResponse::Validated(body));
435 }
436
437 self.headers = Some(headers);
439
440 let hashes = self.remaining_bodies_hashes();
442 self.request.bodies = Some(self.client.get_block_bodies(hashes));
443 return None
444 }
445
446 Some(valid_responses)
447 }
448
449 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
450 let (peer, mut headers_falling) =
451 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
452
453 if headers_falling.len() == self.count as usize {
455 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
457
458 if headers_falling[0].hash() == self.start_hash {
460 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
461 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
463 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
464 self.client.report_bad_message(peer);
465 }
466
467 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
469
470 self.pending_headers = headers_falling.clone().into();
472
473 if !self.has_bodies_request_started() {
475 self.request.bodies = Some(self.client.get_block_bodies(hashes));
477 }
478
479 self.headers = Some(headers_falling);
481 } else {
482 self.client.report_bad_message(peer);
484 }
485 }
486 }
487
488 const fn has_bodies_request_started(&self) -> bool {
491 self.request.bodies.is_some()
492 }
493
494 pub const fn start_hash(&self) -> B256 {
496 self.start_hash
497 }
498
499 pub const fn count(&self) -> u64 {
501 self.count
502 }
503}
504
505impl<Client> Future for FetchFullBlockRangeFuture<Client>
506where
507 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
508{
509 type Output = Vec<SealedBlock<Client::Block>>;
510
511 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
512 let this = self.get_mut();
513
514 loop {
515 match ready!(this.request.poll(cx)) {
516 RangeResponseResult::Header(res) => {
525 match res {
526 Ok(headers) => {
527 this.on_headers_response(headers);
528 }
529 Err(err) => {
530 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
531 }
532 }
533
534 if this.headers.is_none() {
535 this.request.headers = Some(this.client.get_headers(HeadersRequest {
537 start: this.start_hash.into(),
538 limit: this.count,
539 direction: HeadersDirection::Falling,
540 }));
541 }
542 }
543 RangeResponseResult::Body(res) => {
549 match res {
550 Ok(bodies_resp) => {
551 let (peer, new_bodies) = bodies_resp.split();
552
553 this.insert_bodies(
555 new_bodies
556 .into_iter()
557 .map(|resp| WithPeerId::new(peer, resp))
558 .map(BodyResponse::PendingValidation),
559 );
560
561 if !this.is_bodies_complete() {
562 let req_hashes = this.remaining_bodies_hashes();
564
565 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
567 }
568 }
569 Err(err) => {
570 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
571 }
572 }
573 if this.bodies.is_empty() {
574 let hashes = this.remaining_bodies_hashes();
587 if !hashes.is_empty() {
588 this.request.bodies = Some(this.client.get_block_bodies(hashes));
589 }
590 }
591 }
592 }
593
594 if let Some(res) = this.take_blocks() {
595 return Poll::Ready(res)
596 }
597 }
598 }
599}
600
601struct FullBlockRangeRequest<Client>
605where
606 Client: BlockClient,
607{
608 headers: Option<<Client as HeadersClient>::Output>,
609 bodies: Option<<Client as BodiesClient>::Output>,
610}
611
612impl<Client> FullBlockRangeRequest<Client>
613where
614 Client: BlockClient,
615{
616 fn poll(
617 &mut self,
618 cx: &mut Context<'_>,
619 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
620 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
621 if let Poll::Ready(res) = fut.poll(cx) {
622 self.headers = None;
623 return Poll::Ready(RangeResponseResult::Header(res))
624 }
625 }
626
627 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
628 if let Poll::Ready(res) = fut.poll(cx) {
629 self.bodies = None;
630 return Poll::Ready(RangeResponseResult::Body(res))
631 }
632 }
633
634 Poll::Pending
635 }
636}
637
638enum RangeResponseResult<H, B> {
641 Header(PeerRequestResult<Vec<H>>),
642 Body(PeerRequestResult<Vec<B>>),
643}
644
645#[cfg(test)]
646mod tests {
647 use reth_ethereum_primitives::BlockBody;
648
649 use super::*;
650 use crate::test_utils::TestFullBlockClient;
651 use std::ops::Range;
652
653 #[tokio::test]
654 async fn download_single_full_block() {
655 let client = TestFullBlockClient::default();
656 let header: SealedHeader = SealedHeader::default();
657 let body = BlockBody::default();
658 client.insert(header.clone(), body.clone());
659 let client = FullBlockClient::test_client(client);
660
661 let received = client.get_full_block(header.hash()).await;
662 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
663 }
664
665 #[tokio::test]
666 async fn download_single_full_block_range() {
667 let client = TestFullBlockClient::default();
668 let header: SealedHeader = SealedHeader::default();
669 let body = BlockBody::default();
670 client.insert(header.clone(), body.clone());
671 let client = FullBlockClient::test_client(client);
672
673 let received = client.get_full_block_range(header.hash(), 1).await;
674 let received = received.first().expect("response should include a block");
675 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
676 }
677
678 fn insert_headers_into_client(
680 client: &TestFullBlockClient,
681 range: Range<usize>,
682 ) -> (SealedHeader, BlockBody) {
683 let mut sealed_header: SealedHeader = SealedHeader::default();
684 let body = BlockBody::default();
685 for _ in range {
686 let (mut header, hash) = sealed_header.split();
687 header.parent_hash = hash;
689 header.number += 1;
690
691 sealed_header = SealedHeader::seal_slow(header);
692
693 client.insert(sealed_header.clone(), body.clone());
694 }
695
696 (sealed_header, body)
697 }
698
699 #[tokio::test]
700 async fn download_full_block_range() {
701 let client = TestFullBlockClient::default();
702 let (header, body) = insert_headers_into_client(&client, 0..50);
703 let client = FullBlockClient::test_client(client);
704
705 let received = client.get_full_block_range(header.hash(), 1).await;
706 let received = received.first().expect("response should include a block");
707 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
708
709 let received = client.get_full_block_range(header.hash(), 10).await;
710 assert_eq!(received.len(), 10);
711 for (i, block) in received.iter().enumerate() {
712 let expected_number = header.number - i as u64;
713 assert_eq!(block.number, expected_number);
714 }
715 }
716
717 #[tokio::test]
718 async fn download_full_block_range_over_soft_limit() {
719 let client = TestFullBlockClient::default();
721 let (header, body) = insert_headers_into_client(&client, 0..50);
722 let client = FullBlockClient::test_client(client);
723
724 let received = client.get_full_block_range(header.hash(), 1).await;
725 let received = received.first().expect("response should include a block");
726 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
727
728 let received = client.get_full_block_range(header.hash(), 50).await;
729 assert_eq!(received.len(), 50);
730 for (i, block) in received.iter().enumerate() {
731 let expected_number = header.number - i as u64;
732 assert_eq!(block.number, expected_number);
733 }
734 }
735
736 #[tokio::test]
737 async fn download_full_block_range_with_invalid_header() {
738 let client = TestFullBlockClient::default();
739 let range_length: usize = 3;
740 let (header, _) = insert_headers_into_client(&client, 0..range_length);
741
742 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
743 test_consensus.set_fail_validation(true);
744 test_consensus.set_fail_body_against_header(false);
745 let client = FullBlockClient::new(client, Arc::new(test_consensus));
746
747 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
748
749 assert_eq!(received.len(), range_length);
750 for (i, block) in received.iter().enumerate() {
751 let expected_number = header.number - i as u64;
752 assert_eq!(block.number, expected_number);
753 }
754 }
755}