1use super::headers::client::HeadersRequest;
2use crate::{
3 bodies::client::{BodiesClient, SingleBodyRequest},
4 download::DownloadClient,
5 error::PeerRequestResult,
6 headers::client::{HeadersClient, SingleHeaderRequest},
7 priority::Priority,
8 BlockClient,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{Sealable, B256};
12use core::marker::PhantomData;
13use reth_consensus::Consensus;
14use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
15use reth_network_peers::{PeerId, WithPeerId};
16use reth_primitives_traits::{SealedBlock, SealedHeader};
17use std::{
18 cmp::Reverse,
19 collections::{HashMap, VecDeque},
20 fmt::Debug,
21 future::Future,
22 hash::Hash,
23 ops::RangeInclusive,
24 pin::Pin,
25 sync::Arc,
26 task::{ready, Context, Poll},
27};
28use tracing::debug;
29
30#[derive(Debug, Clone)]
32pub struct FullBlockClient<Client>
33where
34 Client: BlockClient,
35{
36 client: Client,
37 consensus: Arc<dyn Consensus<Client::Block>>,
38}
39
40impl<Client> FullBlockClient<Client>
41where
42 Client: BlockClient,
43{
44 pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
46 Self { client, consensus }
47 }
48
49 #[cfg(any(test, feature = "test-utils"))]
51 pub fn test_client(client: Client) -> Self {
52 Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
53 }
54}
55
56impl<Client> FullBlockClient<Client>
57where
58 Client: BlockClient,
59{
60 pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
67 let client = self.client.clone();
68 FetchFullBlockFuture {
69 hash,
70 consensus: self.consensus.clone(),
71 request: FullBlockRequest {
72 header: Some(client.get_header(hash.into())),
73 body: Some(client.get_block_body(hash)),
74 },
75 client,
76 header: None,
77 body: None,
78 }
79 }
80
81 pub fn get_full_block_range(
91 &self,
92 hash: B256,
93 count: u64,
94 ) -> FetchFullBlockRangeFuture<Client> {
95 let client = self.client.clone();
96 FetchFullBlockRangeFuture {
97 start_hash: hash,
98 count,
99 request: FullBlockRangeRequest {
100 headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
101 bodies: None,
102 },
103 client,
104 headers: None,
105 pending_headers: VecDeque::new(),
106 bodies: HashMap::default(),
107 consensus: Arc::clone(&self.consensus),
108 }
109 }
110}
111
112#[must_use = "futures do nothing unless polled"]
117pub struct FetchFullBlockFuture<Client>
118where
119 Client: BlockClient,
120{
121 client: Client,
122 consensus: Arc<dyn Consensus<Client::Block>>,
123 hash: B256,
124 request: FullBlockRequest<Client>,
125 header: Option<SealedHeader<Client::Header>>,
126 body: Option<BodyResponse<Client::Body>>,
127}
128
129impl<Client> FetchFullBlockFuture<Client>
130where
131 Client: BlockClient<Header: BlockHeader>,
132{
133 pub const fn hash(&self) -> &B256 {
135 &self.hash
136 }
137
138 pub fn block_number(&self) -> Option<u64> {
140 self.header.as_ref().map(|h| h.number())
141 }
142
143 fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
145 if self.header.is_none() || self.body.is_none() {
146 return None
147 }
148
149 let header = self.header.take().unwrap();
150 let resp = self.body.take().unwrap();
151 match resp {
152 BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
153 BodyResponse::PendingValidation(resp) => {
154 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
156 {
157 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
158 self.client.report_bad_message(resp.peer_id());
159 self.header = Some(header);
160 self.request.body = Some(self.client.get_block_body(self.hash));
161 return None
162 }
163 Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
164 }
165 }
166 }
167
168 fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
169 if let Some(ref header) = self.header {
170 if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
171 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
172 self.client.report_bad_message(resp.peer_id());
173 return
174 }
175 self.body = Some(BodyResponse::Validated(resp.into_data()));
176 return
177 }
178 self.body = Some(BodyResponse::PendingValidation(resp));
179 }
180}
181
182impl<Client> Future for FetchFullBlockFuture<Client>
183where
184 Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
185{
186 type Output = SealedBlock<Client::Block>;
187
188 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189 let this = self.get_mut();
190
191 let mut budget = 4;
193
194 loop {
195 match ready!(this.request.poll(cx)) {
196 ResponseResult::Header(res) => {
197 match res {
198 Ok(maybe_header) => {
199 let (peer, maybe_header) =
200 maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
201 if let Some(header) = maybe_header {
202 if header.hash() == this.hash {
203 this.header = Some(header);
204 } else {
205 debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
206 this.client.report_bad_message(peer)
208 }
209 }
210 }
211 Err(err) => {
212 debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
213 }
214 }
215
216 if this.header.is_none() {
217 this.request.header = Some(this.client.get_header(this.hash.into()));
219 }
220 }
221 ResponseResult::Body(res) => {
222 match res {
223 Ok(maybe_body) => {
224 if let Some(body) = maybe_body.transpose() {
225 this.on_block_response(body);
226 }
227 }
228 Err(err) => {
229 debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
230 }
231 }
232 if this.body.is_none() {
233 this.request.body = Some(this.client.get_block_body(this.hash));
235 }
236 }
237 }
238
239 if let Some(res) = this.take_block() {
240 return Poll::Ready(res)
241 }
242
243 budget -= 1;
245 if budget == 0 {
246 cx.waker().wake_by_ref();
248 return Poll::Pending
249 }
250 }
251 }
252}
253
254impl<Client> Debug for FetchFullBlockFuture<Client>
255where
256 Client: BlockClient<Header: Debug, Body: Debug>,
257{
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct("FetchFullBlockFuture")
260 .field("hash", &self.hash)
261 .field("header", &self.header)
262 .field("body", &self.body)
263 .finish()
264 }
265}
266
267struct FullBlockRequest<Client>
268where
269 Client: BlockClient,
270{
271 header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
272 body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
273}
274
275impl<Client> FullBlockRequest<Client>
276where
277 Client: BlockClient,
278{
279 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
280 if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
281 let Poll::Ready(res) = fut.poll(cx)
282 {
283 self.header = None;
284 return Poll::Ready(ResponseResult::Header(res))
285 }
286
287 if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
288 let Poll::Ready(res) = fut.poll(cx)
289 {
290 self.body = None;
291 return Poll::Ready(ResponseResult::Body(res))
292 }
293
294 Poll::Pending
295 }
296}
297
298enum ResponseResult<H, B> {
301 Header(PeerRequestResult<Option<H>>),
302 Body(PeerRequestResult<Option<B>>),
303}
304
305#[derive(Debug)]
307enum BodyResponse<B> {
308 Validated(B),
310 PendingValidation(WithPeerId<B>),
312}
313#[must_use = "futures do nothing unless polled"]
326#[expect(missing_debug_implementations)]
327pub struct FetchFullBlockRangeFuture<Client>
328where
329 Client: BlockClient,
330{
331 client: Client,
333 consensus: Arc<dyn Consensus<Client::Block>>,
335 start_hash: B256,
337 count: u64,
339 request: FullBlockRangeRequest<Client>,
341 headers: Option<Vec<SealedHeader<Client::Header>>>,
343 pending_headers: VecDeque<SealedHeader<Client::Header>>,
345 bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
347}
348
349impl<Client> FetchFullBlockRangeFuture<Client>
350where
351 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
352{
353 pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
355 self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
356 }
357
358 fn is_bodies_complete(&self) -> bool {
360 self.bodies.len() == self.count as usize
361 }
362
363 fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
367 if let Some(header) = self.pending_headers.pop_front() {
368 self.bodies.insert(header, body_response);
369 }
370 }
371
372 fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
374 for body in bodies {
375 self.insert_body(body);
376 }
377 }
378
379 fn remaining_bodies_hashes(&self) -> Vec<B256> {
382 self.pending_headers.iter().map(|h| h.hash()).collect()
383 }
384
385 fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
393 if !self.is_bodies_complete() {
394 return None
396 }
397
398 let headers = self.headers.take()?;
399 let mut needs_retry = false;
400 let mut valid_responses = Vec::new();
401
402 for header in &headers {
403 if let Some(body_resp) = self.bodies.remove(header) {
404 let body = match body_resp {
406 BodyResponse::Validated(body) => body,
407 BodyResponse::PendingValidation(resp) => {
408 if let Err(err) =
410 self.consensus.validate_body_against_header(resp.data(), header)
411 {
412 debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
413 self.client.report_bad_message(resp.peer_id());
414
415 self.pending_headers.push_back(header.clone());
417 needs_retry = true;
418 continue
419 }
420
421 resp.into_data()
422 }
423 };
424
425 valid_responses
426 .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
427 }
428 }
429
430 if needs_retry {
431 for block in valid_responses {
434 let (header, body) = block.split_sealed_header_body();
435 self.bodies.insert(header, BodyResponse::Validated(body));
436 }
437
438 self.headers = Some(headers);
440
441 let hashes = self.remaining_bodies_hashes();
443 self.request.bodies = Some(self.client.get_block_bodies(hashes));
444 return None
445 }
446
447 Some(valid_responses)
448 }
449
450 fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
451 let (peer, mut headers_falling) =
452 headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
453
454 if headers_falling.len() == self.count as usize {
456 headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
458
459 if headers_falling[0].hash() == self.start_hash {
461 let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
462 if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
464 debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
465 self.client.report_bad_message(peer);
466 }
467
468 let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
470
471 self.pending_headers = headers_falling.clone().into();
473
474 if !self.has_bodies_request_started() {
476 self.request.bodies = Some(self.client.get_block_bodies(hashes));
478 }
479
480 self.headers = Some(headers_falling);
482 } else {
483 self.client.report_bad_message(peer);
485 }
486 }
487 }
488
489 const fn has_bodies_request_started(&self) -> bool {
492 self.request.bodies.is_some()
493 }
494
495 pub const fn start_hash(&self) -> B256 {
497 self.start_hash
498 }
499
500 pub const fn count(&self) -> u64 {
502 self.count
503 }
504}
505
506impl<Client> Future for FetchFullBlockRangeFuture<Client>
507where
508 Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
509{
510 type Output = Vec<SealedBlock<Client::Block>>;
511
512 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
513 let this = self.get_mut();
514
515 loop {
516 match ready!(this.request.poll(cx)) {
517 RangeResponseResult::Header(res) => {
526 match res {
527 Ok(headers) => {
528 this.on_headers_response(headers);
529 }
530 Err(err) => {
531 debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
532 }
533 }
534
535 if this.headers.is_none() {
536 this.request.headers = Some(this.client.get_headers(HeadersRequest {
538 start: this.start_hash.into(),
539 limit: this.count,
540 direction: HeadersDirection::Falling,
541 }));
542 }
543 }
544 RangeResponseResult::Body(res) => {
550 match res {
551 Ok(bodies_resp) => {
552 let (peer, new_bodies) = bodies_resp.split();
553
554 this.insert_bodies(
556 new_bodies
557 .into_iter()
558 .map(|resp| WithPeerId::new(peer, resp))
559 .map(BodyResponse::PendingValidation),
560 );
561
562 if !this.is_bodies_complete() {
563 let req_hashes = this.remaining_bodies_hashes();
565
566 this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
568 }
569 }
570 Err(err) => {
571 debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
572 }
573 }
574 if this.request.bodies.is_none() && !this.is_bodies_complete() {
575 let hashes = this.remaining_bodies_hashes();
588 if !hashes.is_empty() {
589 this.request.bodies = Some(this.client.get_block_bodies(hashes));
590 }
591 }
592 }
593 }
594
595 if let Some(res) = this.take_blocks() {
596 return Poll::Ready(res)
597 }
598 }
599 }
600}
601
602struct FullBlockRangeRequest<Client>
606where
607 Client: BlockClient,
608{
609 headers: Option<<Client as HeadersClient>::Output>,
610 bodies: Option<<Client as BodiesClient>::Output>,
611}
612
613impl<Client> FullBlockRangeRequest<Client>
614where
615 Client: BlockClient,
616{
617 fn poll(
618 &mut self,
619 cx: &mut Context<'_>,
620 ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
621 if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
622 let Poll::Ready(res) = fut.poll(cx)
623 {
624 self.headers = None;
625 return Poll::Ready(RangeResponseResult::Header(res))
626 }
627
628 if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
629 let Poll::Ready(res) = fut.poll(cx)
630 {
631 self.bodies = None;
632 return Poll::Ready(RangeResponseResult::Body(res))
633 }
634
635 Poll::Pending
636 }
637}
638
639enum RangeResponseResult<H, B> {
642 Header(PeerRequestResult<Vec<H>>),
643 Body(PeerRequestResult<Vec<B>>),
644}
645
646#[derive(Debug, Clone)]
648#[non_exhaustive]
649pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
650
651impl<Net> DownloadClient for NoopFullBlockClient<Net>
653where
654 Net: Debug + Send + Sync,
655{
656 fn report_bad_message(&self, _peer_id: PeerId) {}
663
664 fn num_connected_peers(&self) -> usize {
670 0
671 }
672}
673
674impl<Net> BodiesClient for NoopFullBlockClient<Net>
676where
677 Net: NetworkPrimitives,
678{
679 type Body = Net::BlockBody;
680 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
682
683 fn get_block_bodies_with_priority_and_range_hint(
694 &self,
695 _hashes: Vec<B256>,
696 _priority: Priority,
697 _range_hint: Option<RangeInclusive<u64>>,
698 ) -> Self::Output {
699 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
702 }
703}
704
705impl<Net> HeadersClient for NoopFullBlockClient<Net>
706where
707 Net: NetworkPrimitives,
708{
709 type Header = Net::BlockHeader;
710 type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
713
714 fn get_headers_with_priority(
728 &self,
729 _request: HeadersRequest,
730 _priority: Priority,
731 ) -> Self::Output {
732 futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
733 }
734}
735
736impl<Net> BlockClient for NoopFullBlockClient<Net>
737where
738 Net: NetworkPrimitives,
739{
740 type Block = Net::Block;
741}
742
743impl<Net> Default for NoopFullBlockClient<Net> {
744 fn default() -> Self {
745 Self(PhantomData::<Net>)
746 }
747}
748
749#[cfg(test)]
750mod tests {
751 use reth_ethereum_primitives::BlockBody;
752
753 use super::*;
754 use crate::{error::RequestError, test_utils::TestFullBlockClient};
755 use std::{
756 ops::Range,
757 sync::atomic::{AtomicUsize, Ordering},
758 };
759 use tokio::time::{timeout, Duration};
760
761 #[tokio::test]
762 async fn download_single_full_block() {
763 let client = TestFullBlockClient::default();
764 let header: SealedHeader = SealedHeader::default();
765 let body = BlockBody::default();
766 client.insert(header.clone(), body.clone());
767 let client = FullBlockClient::test_client(client);
768
769 let received = client.get_full_block(header.hash()).await;
770 assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
771 }
772
773 #[tokio::test]
774 async fn download_single_full_block_range() {
775 let client = TestFullBlockClient::default();
776 let header: SealedHeader = SealedHeader::default();
777 let body = BlockBody::default();
778 client.insert(header.clone(), body.clone());
779 let client = FullBlockClient::test_client(client);
780
781 let received = client.get_full_block_range(header.hash(), 1).await;
782 let received = received.first().expect("response should include a block");
783 assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
784 }
785
786 fn insert_headers_into_client(
788 client: &TestFullBlockClient,
789 range: Range<usize>,
790 ) -> (SealedHeader, BlockBody) {
791 let mut sealed_header: SealedHeader = SealedHeader::default();
792 let body = BlockBody::default();
793 for _ in range {
794 let (mut header, hash) = sealed_header.split();
795 header.parent_hash = hash;
797 header.number += 1;
798
799 sealed_header = SealedHeader::seal_slow(header);
800
801 client.insert(sealed_header.clone(), body.clone());
802 }
803
804 (sealed_header, body)
805 }
806
807 #[derive(Clone, Debug)]
808 struct FailingBodiesClient {
809 inner: TestFullBlockClient,
810 fail_on: usize,
811 body_requests: Arc<AtomicUsize>,
812 }
813
814 impl FailingBodiesClient {
815 fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
816 Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
817 }
818 }
819
820 impl DownloadClient for FailingBodiesClient {
821 fn report_bad_message(&self, peer_id: PeerId) {
822 self.inner.report_bad_message(peer_id);
823 }
824
825 fn num_connected_peers(&self) -> usize {
826 self.inner.num_connected_peers()
827 }
828 }
829
830 impl HeadersClient for FailingBodiesClient {
831 type Header = <TestFullBlockClient as HeadersClient>::Header;
832 type Output = <TestFullBlockClient as HeadersClient>::Output;
833
834 fn get_headers_with_priority(
835 &self,
836 request: HeadersRequest,
837 priority: Priority,
838 ) -> Self::Output {
839 self.inner.get_headers_with_priority(request, priority)
840 }
841 }
842
843 impl BodiesClient for FailingBodiesClient {
844 type Body = <TestFullBlockClient as BodiesClient>::Body;
845 type Output = <TestFullBlockClient as BodiesClient>::Output;
846
847 fn get_block_bodies_with_priority_and_range_hint(
848 &self,
849 hashes: Vec<B256>,
850 priority: Priority,
851 range_hint: Option<RangeInclusive<u64>>,
852 ) -> Self::Output {
853 let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
854 if attempt == self.fail_on {
855 return futures::future::ready(Err(RequestError::Timeout))
856 }
857
858 self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
859 }
860 }
861
862 impl BlockClient for FailingBodiesClient {
863 type Block = reth_ethereum_primitives::Block;
864 }
865
866 #[tokio::test]
867 async fn download_full_block_range() {
868 let client = TestFullBlockClient::default();
869 let (header, body) = insert_headers_into_client(&client, 0..50);
870 let client = FullBlockClient::test_client(client);
871
872 let received = client.get_full_block_range(header.hash(), 1).await;
873 let received = received.first().expect("response should include a block");
874 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
875
876 let received = client.get_full_block_range(header.hash(), 10).await;
877 assert_eq!(received.len(), 10);
878 for (i, block) in received.iter().enumerate() {
879 let expected_number = header.number - i as u64;
880 assert_eq!(block.number, expected_number);
881 }
882 }
883
884 #[tokio::test]
885 async fn download_full_block_range_over_soft_limit() {
886 let client = TestFullBlockClient::default();
888 let (header, body) = insert_headers_into_client(&client, 0..50);
889 let client = FullBlockClient::test_client(client);
890
891 let received = client.get_full_block_range(header.hash(), 1).await;
892 let received = received.first().expect("response should include a block");
893 assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
894
895 let received = client.get_full_block_range(header.hash(), 50).await;
896 assert_eq!(received.len(), 50);
897 for (i, block) in received.iter().enumerate() {
898 let expected_number = header.number - i as u64;
899 assert_eq!(block.number, expected_number);
900 }
901 }
902
903 #[tokio::test]
904 async fn download_full_block_range_retries_after_body_error() {
905 let mut client = TestFullBlockClient::default();
906 client.set_soft_limit(2);
907 let (header, _) = insert_headers_into_client(&client, 0..3);
908
909 let client = FailingBodiesClient::new(client, 1);
910 let body_requests = Arc::clone(&client.body_requests);
911 let client = FullBlockClient::test_client(client);
912
913 let received =
914 timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
915 .await
916 .expect("body request retry should complete");
917
918 assert_eq!(received.len(), 3);
919 assert_eq!(body_requests.load(Ordering::SeqCst), 3);
920 }
921
922 #[tokio::test]
923 async fn download_full_block_range_with_invalid_header() {
924 let client = TestFullBlockClient::default();
925 let range_length: usize = 3;
926 let (header, _) = insert_headers_into_client(&client, 0..range_length);
927
928 let test_consensus = reth_consensus::test_utils::TestConsensus::default();
929 test_consensus.set_fail_validation(true);
930 test_consensus.set_fail_body_against_header(false);
931 let client = FullBlockClient::new(client, Arc::new(test_consensus));
932
933 let received = client.get_full_block_range(header.hash(), range_length as u64).await;
934
935 assert_eq!(received.len(), range_length);
936 for (i, block) in received.iter().enumerate() {
937 let expected_number = header.number - i as u64;
938 assert_eq!(block.number, expected_number);
939 }
940 }
941}