reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    bodies::client::{BodiesClient, SingleBodyRequest},
4    error::PeerRequestResult,
5    headers::client::{HeadersClient, SingleHeaderRequest},
6    BlockClient,
7};
8use alloy_consensus::BlockHeader;
9use alloy_primitives::{Sealable, B256};
10use reth_consensus::{Consensus, ConsensusError};
11use reth_eth_wire_types::HeadersDirection;
12use reth_network_peers::WithPeerId;
13use reth_primitives_traits::{SealedBlock, SealedHeader};
14use std::{
15    cmp::Reverse,
16    collections::{HashMap, VecDeque},
17    fmt::Debug,
18    future::Future,
19    hash::Hash,
20    pin::Pin,
21    sync::Arc,
22    task::{ready, Context, Poll},
23};
24use tracing::debug;
25
26/// A Client that can fetch full blocks from the network.
27#[derive(Debug, Clone)]
28pub struct FullBlockClient<Client>
29where
30    Client: BlockClient,
31{
32    client: Client,
33    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
34}
35
36impl<Client> FullBlockClient<Client>
37where
38    Client: BlockClient,
39{
40    /// Creates a new instance of `FullBlockClient`.
41    pub fn new(
42        client: Client,
43        consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
44    ) -> Self {
45        Self { client, consensus }
46    }
47
48    /// Returns a client with Test consensus
49    #[cfg(any(test, feature = "test-utils"))]
50    pub fn test_client(client: Client) -> Self {
51        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
52    }
53}
54
55impl<Client> FullBlockClient<Client>
56where
57    Client: BlockClient,
58{
59    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
60    ///
61    /// Note: this future is cancel safe
62    ///
63    /// Caution: This does no validation of body (transactions) response but guarantees that the
64    /// [`SealedHeader`] matches the requested hash.
65    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
66        let client = self.client.clone();
67        FetchFullBlockFuture {
68            hash,
69            consensus: self.consensus.clone(),
70            request: FullBlockRequest {
71                header: Some(client.get_header(hash.into())),
72                body: Some(client.get_block_body(hash)),
73            },
74            client,
75            header: None,
76            body: None,
77        }
78    }
79
80    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
81    ///
82    /// Note: this future is cancel safe
83    ///
84    /// Caution: This does no validation of body (transactions) responses but guarantees that
85    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
86    /// bodies received matches the requested limit.
87    ///
88    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
89    pub fn get_full_block_range(
90        &self,
91        hash: B256,
92        count: u64,
93    ) -> FetchFullBlockRangeFuture<Client> {
94        let client = self.client.clone();
95        FetchFullBlockRangeFuture {
96            start_hash: hash,
97            count,
98            request: FullBlockRangeRequest {
99                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
100                bodies: None,
101            },
102            client,
103            headers: None,
104            pending_headers: VecDeque::new(),
105            bodies: HashMap::default(),
106            consensus: Arc::clone(&self.consensus),
107        }
108    }
109}
110
111/// A future that downloads a full block from the network.
112///
113/// This will attempt to fetch both the header and body for the given block hash at the same time.
114/// When both requests succeed, the future will yield the full block.
115#[must_use = "futures do nothing unless polled"]
116pub struct FetchFullBlockFuture<Client>
117where
118    Client: BlockClient,
119{
120    client: Client,
121    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
122    hash: B256,
123    request: FullBlockRequest<Client>,
124    header: Option<SealedHeader<Client::Header>>,
125    body: Option<BodyResponse<Client::Body>>,
126}
127
128impl<Client> FetchFullBlockFuture<Client>
129where
130    Client: BlockClient<Header: BlockHeader>,
131{
132    /// Returns the hash of the block being requested.
133    pub const fn hash(&self) -> &B256 {
134        &self.hash
135    }
136
137    /// If the header request is already complete, this returns the block number
138    pub fn block_number(&self) -> Option<u64> {
139        self.header.as_ref().map(|h| h.number())
140    }
141
142    /// Returns the [`SealedBlock`] if the request is complete and valid.
143    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
144        if self.header.is_none() || self.body.is_none() {
145            return None
146        }
147
148        let header = self.header.take().unwrap();
149        let resp = self.body.take().unwrap();
150        match resp {
151            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
152            BodyResponse::PendingValidation(resp) => {
153                // ensure the block is valid, else retry
154                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
155                {
156                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
157                    self.client.report_bad_message(resp.peer_id());
158                    self.header = Some(header);
159                    self.request.body = Some(self.client.get_block_body(self.hash));
160                    return None
161                }
162                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
163            }
164        }
165    }
166
167    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
168        if let Some(ref header) = self.header {
169            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
170                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
171                self.client.report_bad_message(resp.peer_id());
172                return
173            }
174            self.body = Some(BodyResponse::Validated(resp.into_data()));
175            return
176        }
177        self.body = Some(BodyResponse::PendingValidation(resp));
178    }
179}
180
181impl<Client> Future for FetchFullBlockFuture<Client>
182where
183    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
184{
185    type Output = SealedBlock<Client::Block>;
186
187    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
188        let this = self.get_mut();
189
190        // preemptive yield point
191        let mut budget = 4;
192
193        loop {
194            match ready!(this.request.poll(cx)) {
195                ResponseResult::Header(res) => {
196                    match res {
197                        Ok(maybe_header) => {
198                            let (peer, maybe_header) =
199                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
200                            if let Some(header) = maybe_header {
201                                if header.hash() == this.hash {
202                                    this.header = Some(header);
203                                } else {
204                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
205                                    // received a different header than requested
206                                    this.client.report_bad_message(peer)
207                                }
208                            }
209                        }
210                        Err(err) => {
211                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
212                        }
213                    }
214
215                    if this.header.is_none() {
216                        // received bad response
217                        this.request.header = Some(this.client.get_header(this.hash.into()));
218                    }
219                }
220                ResponseResult::Body(res) => {
221                    match res {
222                        Ok(maybe_body) => {
223                            if let Some(body) = maybe_body.transpose() {
224                                this.on_block_response(body);
225                            }
226                        }
227                        Err(err) => {
228                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
229                        }
230                    }
231                    if this.body.is_none() {
232                        // received bad response
233                        this.request.body = Some(this.client.get_block_body(this.hash));
234                    }
235                }
236            }
237
238            if let Some(res) = this.take_block() {
239                return Poll::Ready(res)
240            }
241
242            // ensure we still have enough budget for another iteration
243            budget -= 1;
244            if budget == 0 {
245                // make sure we're woken up again
246                cx.waker().wake_by_ref();
247                return Poll::Pending
248            }
249        }
250    }
251}
252
253impl<Client> Debug for FetchFullBlockFuture<Client>
254where
255    Client: BlockClient<Header: Debug, Body: Debug>,
256{
257    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258        f.debug_struct("FetchFullBlockFuture")
259            .field("hash", &self.hash)
260            .field("header", &self.header)
261            .field("body", &self.body)
262            .finish()
263    }
264}
265
266struct FullBlockRequest<Client>
267where
268    Client: BlockClient,
269{
270    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
271    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
272}
273
274impl<Client> FullBlockRequest<Client>
275where
276    Client: BlockClient,
277{
278    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
279        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
280            if let Poll::Ready(res) = fut.poll(cx) {
281                self.header = None;
282                return Poll::Ready(ResponseResult::Header(res))
283            }
284        }
285
286        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
287            if let Poll::Ready(res) = fut.poll(cx) {
288                self.body = None;
289                return Poll::Ready(ResponseResult::Body(res))
290            }
291        }
292
293        Poll::Pending
294    }
295}
296
297/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
298/// future.
299enum ResponseResult<H, B> {
300    Header(PeerRequestResult<Option<H>>),
301    Body(PeerRequestResult<Option<B>>),
302}
303
304/// The response of a body request.
305#[derive(Debug)]
306enum BodyResponse<B> {
307    /// Already validated against transaction root of header
308    Validated(B),
309    /// Still needs to be validated against header
310    PendingValidation(WithPeerId<B>),
311}
312/// A future that downloads a range of full blocks from the network.
313///
314/// This first fetches the headers for the given range using the inner `Client`. Once the request
315/// is complete, it will fetch the bodies for the headers it received.
316///
317/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
318/// yield the full block range.
319///
320/// The full block range will be returned with falling block numbers, i.e. in descending order.
321///
322/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
323/// hash array used to request them.
324#[must_use = "futures do nothing unless polled"]
325#[expect(missing_debug_implementations)]
326pub struct FetchFullBlockRangeFuture<Client>
327where
328    Client: BlockClient,
329{
330    /// The client used to fetch headers and bodies.
331    client: Client,
332    /// The consensus instance used to validate the blocks.
333    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
334    /// The block hash to start fetching from (inclusive).
335    start_hash: B256,
336    /// How many blocks to fetch: `len([start_hash, ..]) == count`
337    count: u64,
338    /// Requests for headers and bodies that are in progress.
339    request: FullBlockRangeRequest<Client>,
340    /// Fetched headers.
341    headers: Option<Vec<SealedHeader<Client::Header>>>,
342    /// The next headers to request bodies for. This is drained as responses are received.
343    pending_headers: VecDeque<SealedHeader<Client::Header>>,
344    /// The bodies that have been received so far.
345    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
346}
347
348impl<Client> FetchFullBlockRangeFuture<Client>
349where
350    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
351{
352    /// Returns the block hashes for the given range, if they are available.
353    pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
354        self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
355    }
356
357    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
358    fn is_bodies_complete(&self) -> bool {
359        self.bodies.len() == self.count as usize
360    }
361
362    /// Inserts a block body, matching it with the `next_header`.
363    ///
364    /// Note: this assumes the response matches the next header in the queue.
365    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
366        if let Some(header) = self.pending_headers.pop_front() {
367            self.bodies.insert(header, body_response);
368        }
369    }
370
371    /// Inserts multiple block bodies.
372    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
373        for body in bodies {
374            self.insert_body(body);
375        }
376    }
377
378    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
379    /// in the `root_map`.
380    fn remaining_bodies_hashes(&self) -> Vec<B256> {
381        self.pending_headers.iter().map(|h| h.hash()).collect()
382    }
383
384    /// Returns the [`SealedBlock`]s if the request is complete and valid.
385    ///
386    /// The request is complete if the number of blocks requested is equal to the number of blocks
387    /// received. The request is valid if the returned bodies match the roots in the headers.
388    ///
389    /// These are returned in falling order starting with the requested `hash`, i.e. with
390    /// descending block numbers.
391    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
392        if !self.is_bodies_complete() {
393            // not done with bodies yet
394            return None
395        }
396
397        let headers = self.headers.take()?;
398        let mut needs_retry = false;
399        let mut valid_responses = Vec::new();
400
401        for header in &headers {
402            if let Some(body_resp) = self.bodies.remove(header) {
403                // validate body w.r.t. the hashes in the header, only inserting into the response
404                let body = match body_resp {
405                    BodyResponse::Validated(body) => body,
406                    BodyResponse::PendingValidation(resp) => {
407                        // ensure the block is valid, else retry
408                        if let Err(err) =
409                            self.consensus.validate_body_against_header(resp.data(), header)
410                        {
411                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
412                            self.client.report_bad_message(resp.peer_id());
413
414                            // get body that doesn't match, put back into vecdeque, and retry it
415                            self.pending_headers.push_back(header.clone());
416                            needs_retry = true;
417                            continue
418                        }
419
420                        resp.into_data()
421                    }
422                };
423
424                valid_responses
425                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
426            }
427        }
428
429        if needs_retry {
430            // put response hashes back into bodies map since we aren't returning them as a
431            // response
432            for block in valid_responses {
433                let (header, body) = block.split_sealed_header_body();
434                self.bodies.insert(header, BodyResponse::Validated(body));
435            }
436
437            // put headers back since they were `take`n before
438            self.headers = Some(headers);
439
440            // create response for failing bodies
441            let hashes = self.remaining_bodies_hashes();
442            self.request.bodies = Some(self.client.get_block_bodies(hashes));
443            return None
444        }
445
446        Some(valid_responses)
447    }
448
449    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
450        let (peer, mut headers_falling) =
451            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
452
453        // fill in the response if it's the correct length
454        if headers_falling.len() == self.count as usize {
455            // sort headers from highest to lowest block number
456            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
457
458            // check the starting hash
459            if headers_falling[0].hash() == self.start_hash {
460                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
461                // check if the downloaded headers are valid
462                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
463                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
464                    self.client.report_bad_message(peer);
465                }
466
467                // get the bodies request so it can be polled later
468                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
469
470                // populate the pending headers
471                self.pending_headers = headers_falling.clone().into();
472
473                // set the actual request if it hasn't been started yet
474                if !self.has_bodies_request_started() {
475                    // request the bodies for the downloaded headers
476                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
477                }
478
479                // set the headers response
480                self.headers = Some(headers_falling);
481            } else {
482                // received a different header than requested
483                self.client.report_bad_message(peer);
484            }
485        }
486    }
487
488    /// Returns whether or not a bodies request has been started, returning false if there is no
489    /// pending request.
490    const fn has_bodies_request_started(&self) -> bool {
491        self.request.bodies.is_some()
492    }
493
494    /// Returns the start hash for the request
495    pub const fn start_hash(&self) -> B256 {
496        self.start_hash
497    }
498
499    /// Returns the block count for the request
500    pub const fn count(&self) -> u64 {
501        self.count
502    }
503}
504
505impl<Client> Future for FetchFullBlockRangeFuture<Client>
506where
507    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
508{
509    type Output = Vec<SealedBlock<Client::Block>>;
510
511    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
512        let this = self.get_mut();
513
514        loop {
515            match ready!(this.request.poll(cx)) {
516                // This branch handles headers responses from peers - it first ensures that the
517                // starting hash and number of headers matches what we requested.
518                //
519                // If these don't match, we penalize the peer and retry the request.
520                // If they do match, we sort the headers by block number and start the request for
521                // the corresponding block bodies.
522                //
523                // The next result that should be yielded by `poll` is the bodies response.
524                RangeResponseResult::Header(res) => {
525                    match res {
526                        Ok(headers) => {
527                            this.on_headers_response(headers);
528                        }
529                        Err(err) => {
530                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
531                        }
532                    }
533
534                    if this.headers.is_none() {
535                        // did not receive a correct response yet, retry
536                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
537                            start: this.start_hash.into(),
538                            limit: this.count,
539                            direction: HeadersDirection::Falling,
540                        }));
541                    }
542                }
543                // This branch handles block body responses from peers - it first inserts the
544                // bodies into the `bodies` map, and then checks if the request is complete.
545                //
546                // If the request is not complete, and we need to request more bodies, we send
547                // a bodies request for the headers we don't yet have bodies for.
548                RangeResponseResult::Body(res) => {
549                    match res {
550                        Ok(bodies_resp) => {
551                            let (peer, new_bodies) = bodies_resp.split();
552
553                            // first insert the received bodies
554                            this.insert_bodies(
555                                new_bodies
556                                    .into_iter()
557                                    .map(|resp| WithPeerId::new(peer, resp))
558                                    .map(BodyResponse::PendingValidation),
559                            );
560
561                            if !this.is_bodies_complete() {
562                                // get remaining hashes so we can send the next request
563                                let req_hashes = this.remaining_bodies_hashes();
564
565                                // set a new request
566                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
567                            }
568                        }
569                        Err(err) => {
570                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
571                        }
572                    }
573                    if this.bodies.is_empty() {
574                        // received bad response, re-request headers
575                        // TODO: convert this into two futures, one which is a headers range
576                        // future, and one which is a bodies range future.
577                        //
578                        // The headers range future should yield the bodies range future.
579                        // The bodies range future should not have an Option<Vec<B256>>, it should
580                        // have a populated Vec<B256> from the successful headers range future.
581                        //
582                        // This is optimal because we can not send a bodies request without
583                        // first completing the headers request. This way we can get rid of the
584                        // following `if let Some`. A bodies request should never be sent before
585                        // the headers request completes, so this should always be `Some` anyways.
586                        let hashes = this.remaining_bodies_hashes();
587                        if !hashes.is_empty() {
588                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
589                        }
590                    }
591                }
592            }
593
594            if let Some(res) = this.take_blocks() {
595                return Poll::Ready(res)
596            }
597        }
598    }
599}
600
601/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
602/// futures until they return responses. It will return either the header or body result, depending
603/// on which future successfully returned.
604struct FullBlockRangeRequest<Client>
605where
606    Client: BlockClient,
607{
608    headers: Option<<Client as HeadersClient>::Output>,
609    bodies: Option<<Client as BodiesClient>::Output>,
610}
611
612impl<Client> FullBlockRangeRequest<Client>
613where
614    Client: BlockClient,
615{
616    fn poll(
617        &mut self,
618        cx: &mut Context<'_>,
619    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
620        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
621            if let Poll::Ready(res) = fut.poll(cx) {
622                self.headers = None;
623                return Poll::Ready(RangeResponseResult::Header(res))
624            }
625        }
626
627        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
628            if let Poll::Ready(res) = fut.poll(cx) {
629                self.bodies = None;
630                return Poll::Ready(RangeResponseResult::Body(res))
631            }
632        }
633
634        Poll::Pending
635    }
636}
637
638// The result of a request for headers or block bodies. This is yielded by the
639// `FullBlockRangeRequest` future.
640enum RangeResponseResult<H, B> {
641    Header(PeerRequestResult<Vec<H>>),
642    Body(PeerRequestResult<Vec<B>>),
643}
644
645#[cfg(test)]
646mod tests {
647    use reth_ethereum_primitives::BlockBody;
648
649    use super::*;
650    use crate::test_utils::TestFullBlockClient;
651    use std::ops::Range;
652
653    #[tokio::test]
654    async fn download_single_full_block() {
655        let client = TestFullBlockClient::default();
656        let header: SealedHeader = SealedHeader::default();
657        let body = BlockBody::default();
658        client.insert(header.clone(), body.clone());
659        let client = FullBlockClient::test_client(client);
660
661        let received = client.get_full_block(header.hash()).await;
662        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
663    }
664
665    #[tokio::test]
666    async fn download_single_full_block_range() {
667        let client = TestFullBlockClient::default();
668        let header: SealedHeader = SealedHeader::default();
669        let body = BlockBody::default();
670        client.insert(header.clone(), body.clone());
671        let client = FullBlockClient::test_client(client);
672
673        let received = client.get_full_block_range(header.hash(), 1).await;
674        let received = received.first().expect("response should include a block");
675        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
676    }
677
678    /// Inserts headers and returns the last header and block body.
679    fn insert_headers_into_client(
680        client: &TestFullBlockClient,
681        range: Range<usize>,
682    ) -> (SealedHeader, BlockBody) {
683        let mut sealed_header: SealedHeader = SealedHeader::default();
684        let body = BlockBody::default();
685        for _ in range {
686            let (mut header, hash) = sealed_header.split();
687            // update to the next header
688            header.parent_hash = hash;
689            header.number += 1;
690
691            sealed_header = SealedHeader::seal_slow(header);
692
693            client.insert(sealed_header.clone(), body.clone());
694        }
695
696        (sealed_header, body)
697    }
698
699    #[tokio::test]
700    async fn download_full_block_range() {
701        let client = TestFullBlockClient::default();
702        let (header, body) = insert_headers_into_client(&client, 0..50);
703        let client = FullBlockClient::test_client(client);
704
705        let received = client.get_full_block_range(header.hash(), 1).await;
706        let received = received.first().expect("response should include a block");
707        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
708
709        let received = client.get_full_block_range(header.hash(), 10).await;
710        assert_eq!(received.len(), 10);
711        for (i, block) in received.iter().enumerate() {
712            let expected_number = header.number - i as u64;
713            assert_eq!(block.number, expected_number);
714        }
715    }
716
717    #[tokio::test]
718    async fn download_full_block_range_over_soft_limit() {
719        // default soft limit is 20, so we will request 50 blocks
720        let client = TestFullBlockClient::default();
721        let (header, body) = insert_headers_into_client(&client, 0..50);
722        let client = FullBlockClient::test_client(client);
723
724        let received = client.get_full_block_range(header.hash(), 1).await;
725        let received = received.first().expect("response should include a block");
726        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
727
728        let received = client.get_full_block_range(header.hash(), 50).await;
729        assert_eq!(received.len(), 50);
730        for (i, block) in received.iter().enumerate() {
731            let expected_number = header.number - i as u64;
732            assert_eq!(block.number, expected_number);
733        }
734    }
735
736    #[tokio::test]
737    async fn download_full_block_range_with_invalid_header() {
738        let client = TestFullBlockClient::default();
739        let range_length: usize = 3;
740        let (header, _) = insert_headers_into_client(&client, 0..range_length);
741
742        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
743        test_consensus.set_fail_validation(true);
744        test_consensus.set_fail_body_against_header(false);
745        let client = FullBlockClient::new(client, Arc::new(test_consensus));
746
747        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
748
749        assert_eq!(received.len(), range_length);
750        for (i, block) in received.iter().enumerate() {
751            let expected_number = header.number - i as u64;
752            assert_eq!(block.number, expected_number);
753        }
754    }
755}