1use super::headers::client::HeadersRequest;
2use crate::{
3 bodies::client::{BodiesClient, SingleBodyRequest},
4 download::DownloadClient,
5 error::PeerRequestResult,
6 headers::client::{HeadersClient, SingleHeaderRequest},
7 priority::Priority,
8 BlockClient,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{Sealable, B256};
12use core::marker::PhantomData;
13use reth_consensus::{Consensus, ConsensusError};
14use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
15use reth_network_peers::{PeerId, WithPeerId};
16use reth_primitives_traits::{SealedBlock, SealedHeader};
17use std::{
18 cmp::Reverse,
19 collections::{HashMap, VecDeque},
20 fmt::Debug,
21 future::Future,
22 hash::Hash,
23 ops::RangeInclusive,
24 pin::Pin,
25 sync::Arc,
26 task::{ready, Context, Poll},
27};
28use tracing::debug;
29
30#[derive(Debug, Clone)]
32pub struct FullBlockClient<Client>
33where
34 Client: BlockClient,
35{
36 client: Client,
37 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
38}
39
40impl<Client> FullBlockClient<Client>
41where
42 Client: BlockClient,
43{
44 pub fn new(
46 client: Client,
47 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
48 ) -> Self {
49 Self { client, consensus }
50 }
51
52 #[cfg(any(test, feature = "test-utils"))]
54 pub fn test_client(client: Client) -> Self {
55 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
56 }
57}
58
59impl<Client> FullBlockClient<Client>
60where
61 Client: BlockClient,
62{
63 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
70 let client = self.client.clone();
71 FetchFullBlockFuture {
72 hash,
73 consensus: self.consensus.clone(),
74 request: FullBlockRequest {
75 header: Some(client.get_header(hash.into())),
76 body: Some(client.get_block_body(hash)),
77 },
78 client,
79 header: None,
80 body: None,
81 }
82 }
83
84 pub fn get_full_block_range(
94 &self,
95 hash: B256,
96 count: u64,
97 ) -> FetchFullBlockRangeFuture<Client> {
98 let client = self.client.clone();
99 FetchFullBlockRangeFuture {
100 start_hash: hash,
101 count,
102 request: FullBlockRangeRequest {
103 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
104 bodies: None,
105 },
106 client,
107 headers: None,
108 pending_headers: VecDeque::new(),
109 bodies: HashMap::default(),
110 consensus: Arc::clone(&self.consensus),
111 }
112 }
113}
114
115#[must_use = "futures do nothing unless polled"]
120pub struct FetchFullBlockFuture<Client>
121where
122 Client: BlockClient,
123{
124 client: Client,
125 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
126 hash: B256,
127 request: FullBlockRequest<Client>,
128 header: Option<SealedHeader<Client::Header>>,
129 body: Option<BodyResponse<Client::Body>>,
130}
131
132impl<Client> FetchFullBlockFuture<Client>
133where
134 Client: BlockClient<Header: BlockHeader>,
135{
136 pub const fn hash(&self) -> &B256 {
138 &self.hash
139 }
140
141 pub fn block_number(&self) -> Option<u64> {
143 self.header.as_ref().map(|h| h.number())
144 }
145
146 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
148 if self.header.is_none() || self.body.is_none() {
149 return None
150 }
151
152 let header = self.header.take().unwrap();
153 let resp = self.body.take().unwrap();
154 match resp {
155 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
156 BodyResponse::PendingValidation(resp) => {
157 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
159 {
160 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
161 self.client.report_bad_message(resp.peer_id());
162 self.header = Some(header);
163 self.request.body = Some(self.client.get_block_body(self.hash));
164 return None
165 }
166 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
167 }
168 }
169 }
170
171 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
172 if let Some(ref header) = self.header {
173 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
174 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
175 self.client.report_bad_message(resp.peer_id());
176 return
177 }
178 self.body = Some(BodyResponse::Validated(resp.into_data()));
179 return
180 }
181 self.body = Some(BodyResponse::PendingValidation(resp));
182 }
183}
184
185impl<Client> Future for FetchFullBlockFuture<Client>
186where
187 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
188{
189 type Output = SealedBlock<Client::Block>;
190
191 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
192 let this = self.get_mut();
193
194 let mut budget = 4;
196
197 loop {
198 match ready!(this.request.poll(cx)) {
199 ResponseResult::Header(res) => {
200 match res {
201 Ok(maybe_header) => {
202 let (peer, maybe_header) =
203 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
204 if let Some(header) = maybe_header {
205 if header.hash() == this.hash {
206 this.header = Some(header);
207 } else {
208 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
209 this.client.report_bad_message(peer)
211 }
212 }
213 }
214 Err(err) => {
215 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
216 }
217 }
218
219 if this.header.is_none() {
220 this.request.header = Some(this.client.get_header(this.hash.into()));
222 }
223 }
224 ResponseResult::Body(res) => {
225 match res {
226 Ok(maybe_body) => {
227 if let Some(body) = maybe_body.transpose() {
228 this.on_block_response(body);
229 }
230 }
231 Err(err) => {
232 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
233 }
234 }
235 if this.body.is_none() {
236 this.request.body = Some(this.client.get_block_body(this.hash));
238 }
239 }
240 }
241
242 if let Some(res) = this.take_block() {
243 return Poll::Ready(res)
244 }
245
246 budget -= 1;
248 if budget == 0 {
249 cx.waker().wake_by_ref();
251 return Poll::Pending
252 }
253 }
254 }
255}
256
257impl<Client> Debug for FetchFullBlockFuture<Client>
258where
259 Client: BlockClient<Header: Debug, Body: Debug>,
260{
261 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262 f.debug_struct("FetchFullBlockFuture")
263 .field("hash", &self.hash)
264 .field("header", &self.header)
265 .field("body", &self.body)
266 .finish()
267 }
268}
269
270struct FullBlockRequest<Client>
271where
272 Client: BlockClient,
273{
274 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
275 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
276}
277
278impl<Client> FullBlockRequest<Client>
279where
280 Client: BlockClient,
281{
282 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
283 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
284 if let Poll::Ready(res) = fut.poll(cx) {
285 self.header = None;
286 return Poll::Ready(ResponseResult::Header(res))
287 }
288 }
289
290 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
291 if let Poll::Ready(res) = fut.poll(cx) {
292 self.body = None;
293 return Poll::Ready(ResponseResult::Body(res))
294 }
295 }
296
297 Poll::Pending
298 }
299}
300
301enum ResponseResult<H, B> {
304 Header(PeerRequestResult<Option<H>>),
305 Body(PeerRequestResult<Option<B>>),
306}
307
308#[derive(Debug)]
310enum BodyResponse<B> {
311 Validated(B),
313 PendingValidation(WithPeerId<B>),
315}
316#[must_use = "futures do nothing unless polled"]
329#[expect(missing_debug_implementations)]
330pub struct FetchFullBlockRangeFuture<Client>
331where
332 Client: BlockClient,
333{
334 client: Client,
336 consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
338 start_hash: B256,
340 count: u64,
342 request: FullBlockRangeRequest<Client>,
344 headers: Option<Vec<SealedHeader<Client::Header>>>,
346 pending_headers: VecDeque<SealedHeader<Client::Header>>,
348 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
350}
351
352impl<Client> FetchFullBlockRangeFuture<Client>
353where
354 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
355{
356 pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
358 self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
359 }
360
361 fn is_bodies_complete(&self) -> bool {
363 self.bodies.len() == self.count as usize
364 }
365
366 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
370 if let Some(header) = self.pending_headers.pop_front() {
371 self.bodies.insert(header, body_response);
372 }
373 }
374
375 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
377 for body in bodies {
378 self.insert_body(body);
379 }
380 }
381
382 fn remaining_bodies_hashes(&self) -> Vec<B256> {
385 self.pending_headers.iter().map(|h| h.hash()).collect()
386 }
387
388 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
396 if !self.is_bodies_complete() {
397 return None
399 }
400
401 let headers = self.headers.take()?;
402 let mut needs_retry = false;
403 let mut valid_responses = Vec::new();
404
405 for header in &headers {
406 if let Some(body_resp) = self.bodies.remove(header) {
407 let body = match body_resp {
409 BodyResponse::Validated(body) => body,
410 BodyResponse::PendingValidation(resp) => {
411 if let Err(err) =
413 self.consensus.validate_body_against_header(resp.data(), header)
414 {
415 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
416 self.client.report_bad_message(resp.peer_id());
417
418 self.pending_headers.push_back(header.clone());
420 needs_retry = true;
421 continue
422 }
423
424 resp.into_data()
425 }
426 };
427
428 valid_responses
429 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
430 }
431 }
432
433 if needs_retry {
434 for block in valid_responses {
437 let (header, body) = block.split_sealed_header_body();
438 self.bodies.insert(header, BodyResponse::Validated(body));
439 }
440
441 self.headers = Some(headers);
443
444 let hashes = self.remaining_bodies_hashes();
446 self.request.bodies = Some(self.client.get_block_bodies(hashes));
447 return None
448 }
449
450 Some(valid_responses)
451 }
452
453 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
454 let (peer, mut headers_falling) =
455 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
456
457 if headers_falling.len() == self.count as usize {
459 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
461
462 if headers_falling[0].hash() == self.start_hash {
464 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
465 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
467 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
468 self.client.report_bad_message(peer);
469 }
470
471 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
473
474 self.pending_headers = headers_falling.clone().into();
476
477 if !self.has_bodies_request_started() {
479 self.request.bodies = Some(self.client.get_block_bodies(hashes));
481 }
482
483 self.headers = Some(headers_falling);
485 } else {
486 self.client.report_bad_message(peer);
488 }
489 }
490 }
491
492 const fn has_bodies_request_started(&self) -> bool {
495 self.request.bodies.is_some()
496 }
497
498 pub const fn start_hash(&self) -> B256 {
500 self.start_hash
501 }
502
503 pub const fn count(&self) -> u64 {
505 self.count
506 }
507}
508
509impl<Client> Future for FetchFullBlockRangeFuture<Client>
510where
511 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
512{
513 type Output = Vec<SealedBlock<Client::Block>>;
514
515 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
516 let this = self.get_mut();
517
518 loop {
519 match ready!(this.request.poll(cx)) {
520 RangeResponseResult::Header(res) => {
529 match res {
530 Ok(headers) => {
531 this.on_headers_response(headers);
532 }
533 Err(err) => {
534 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
535 }
536 }
537
538 if this.headers.is_none() {
539 this.request.headers = Some(this.client.get_headers(HeadersRequest {
541 start: this.start_hash.into(),
542 limit: this.count,
543 direction: HeadersDirection::Falling,
544 }));
545 }
546 }
547 RangeResponseResult::Body(res) => {
553 match res {
554 Ok(bodies_resp) => {
555 let (peer, new_bodies) = bodies_resp.split();
556
557 this.insert_bodies(
559 new_bodies
560 .into_iter()
561 .map(|resp| WithPeerId::new(peer, resp))
562 .map(BodyResponse::PendingValidation),
563 );
564
565 if !this.is_bodies_complete() {
566 let req_hashes = this.remaining_bodies_hashes();
568
569 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
571 }
572 }
573 Err(err) => {
574 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
575 }
576 }
577 if this.bodies.is_empty() {
578 let hashes = this.remaining_bodies_hashes();
591 if !hashes.is_empty() {
592 this.request.bodies = Some(this.client.get_block_bodies(hashes));
593 }
594 }
595 }
596 }
597
598 if let Some(res) = this.take_blocks() {
599 return Poll::Ready(res)
600 }
601 }
602 }
603}
604
605struct FullBlockRangeRequest<Client>
609where
610 Client: BlockClient,
611{
612 headers: Option<<Client as HeadersClient>::Output>,
613 bodies: Option<<Client as BodiesClient>::Output>,
614}
615
616impl<Client> FullBlockRangeRequest<Client>
617where
618 Client: BlockClient,
619{
620 fn poll(
621 &mut self,
622 cx: &mut Context<'_>,
623 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
624 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
625 if let Poll::Ready(res) = fut.poll(cx) {
626 self.headers = None;
627 return Poll::Ready(RangeResponseResult::Header(res))
628 }
629 }
630
631 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
632 if let Poll::Ready(res) = fut.poll(cx) {
633 self.bodies = None;
634 return Poll::Ready(RangeResponseResult::Body(res))
635 }
636 }
637
638 Poll::Pending
639 }
640}
641
642enum RangeResponseResult<H, B> {
645 Header(PeerRequestResult<Vec<H>>),
646 Body(PeerRequestResult<Vec<B>>),
647}
648
649#[derive(Debug, Clone)]
651#[non_exhaustive]
652pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
653
654impl<Net> DownloadClient for NoopFullBlockClient<Net>
656where
657 Net: Debug + Send + Sync,
658{
659 fn report_bad_message(&self, _peer_id: PeerId) {}
666
667 fn num_connected_peers(&self) -> usize {
673 0
674 }
675}
676
677impl<Net> BodiesClient for NoopFullBlockClient<Net>
679where
680 Net: NetworkPrimitives,
681{
682 type Body = Net::BlockBody;
683 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
685
686 fn get_block_bodies_with_priority_and_range_hint(
697 &self,
698 _hashes: Vec<B256>,
699 _priority: Priority,
700 _range_hint: Option<RangeInclusive<u64>>,
701 ) -> Self::Output {
702 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
705 }
706}
707
708impl<Net> HeadersClient for NoopFullBlockClient<Net>
709where
710 Net: NetworkPrimitives,
711{
712 type Header = Net::BlockHeader;
713 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
716
717 fn get_headers_with_priority(
731 &self,
732 _request: HeadersRequest,
733 _priority: Priority,
734 ) -> Self::Output {
735 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
736 }
737}
738
739impl<Net> BlockClient for NoopFullBlockClient<Net>
740where
741 Net: NetworkPrimitives,
742{
743 type Block = Net::Block;
744}
745
746impl<Net> Default for NoopFullBlockClient<Net> {
747 fn default() -> Self {
748 Self(PhantomData::<Net>)
749 }
750}
751
752#[cfg(test)]
753mod tests {
754 use reth_ethereum_primitives::BlockBody;
755
756 use super::*;
757 use crate::test_utils::TestFullBlockClient;
758 use std::ops::Range;
759
760 #[tokio::test]
761 async fn download_single_full_block() {
762 let client = TestFullBlockClient::default();
763 let header: SealedHeader = SealedHeader::default();
764 let body = BlockBody::default();
765 client.insert(header.clone(), body.clone());
766 let client = FullBlockClient::test_client(client);
767
768 let received = client.get_full_block(header.hash()).await;
769 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
770 }
771
772 #[tokio::test]
773 async fn download_single_full_block_range() {
774 let client = TestFullBlockClient::default();
775 let header: SealedHeader = SealedHeader::default();
776 let body = BlockBody::default();
777 client.insert(header.clone(), body.clone());
778 let client = FullBlockClient::test_client(client);
779
780 let received = client.get_full_block_range(header.hash(), 1).await;
781 let received = received.first().expect("response should include a block");
782 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
783 }
784
785 fn insert_headers_into_client(
787 client: &TestFullBlockClient,
788 range: Range<usize>,
789 ) -> (SealedHeader, BlockBody) {
790 let mut sealed_header: SealedHeader = SealedHeader::default();
791 let body = BlockBody::default();
792 for _ in range {
793 let (mut header, hash) = sealed_header.split();
794 header.parent_hash = hash;
796 header.number += 1;
797
798 sealed_header = SealedHeader::seal_slow(header);
799
800 client.insert(sealed_header.clone(), body.clone());
801 }
802
803 (sealed_header, body)
804 }
805
806 #[tokio::test]
807 async fn download_full_block_range() {
808 let client = TestFullBlockClient::default();
809 let (header, body) = insert_headers_into_client(&client, 0..50);
810 let client = FullBlockClient::test_client(client);
811
812 let received = client.get_full_block_range(header.hash(), 1).await;
813 let received = received.first().expect("response should include a block");
814 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
815
816 let received = client.get_full_block_range(header.hash(), 10).await;
817 assert_eq!(received.len(), 10);
818 for (i, block) in received.iter().enumerate() {
819 let expected_number = header.number - i as u64;
820 assert_eq!(block.number, expected_number);
821 }
822 }
823
824 #[tokio::test]
825 async fn download_full_block_range_over_soft_limit() {
826 let client = TestFullBlockClient::default();
828 let (header, body) = insert_headers_into_client(&client, 0..50);
829 let client = FullBlockClient::test_client(client);
830
831 let received = client.get_full_block_range(header.hash(), 1).await;
832 let received = received.first().expect("response should include a block");
833 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
834
835 let received = client.get_full_block_range(header.hash(), 50).await;
836 assert_eq!(received.len(), 50);
837 for (i, block) in received.iter().enumerate() {
838 let expected_number = header.number - i as u64;
839 assert_eq!(block.number, expected_number);
840 }
841 }
842
843 #[tokio::test]
844 async fn download_full_block_range_with_invalid_header() {
845 let client = TestFullBlockClient::default();
846 let range_length: usize = 3;
847 let (header, _) = insert_headers_into_client(&client, 0..range_length);
848
849 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
850 test_consensus.set_fail_validation(true);
851 test_consensus.set_fail_body_against_header(false);
852 let client = FullBlockClient::new(client, Arc::new(test_consensus));
853
854 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
855
856 assert_eq!(received.len(), range_length);
857 for (i, block) in received.iter().enumerate() {
858 let expected_number = header.number - i as u64;
859 assert_eq!(block.number, expected_number);
860 }
861 }
862}