Skip to main content

reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    bodies::client::{BodiesClient, SingleBodyRequest},
4    download::DownloadClient,
5    error::PeerRequestResult,
6    headers::client::{HeadersClient, SingleHeaderRequest},
7    priority::Priority,
8    BlockClient,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{Sealable, B256};
12use core::marker::PhantomData;
13use reth_consensus::Consensus;
14use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
15use reth_network_peers::{PeerId, WithPeerId};
16use reth_primitives_traits::{SealedBlock, SealedHeader};
17use std::{
18    cmp::Reverse,
19    collections::{HashMap, VecDeque},
20    fmt::Debug,
21    future::Future,
22    hash::Hash,
23    ops::RangeInclusive,
24    pin::Pin,
25    sync::Arc,
26    task::{ready, Context, Poll},
27};
28use tracing::debug;
29
30/// A Client that can fetch full blocks from the network.
31#[derive(Debug, Clone)]
32pub struct FullBlockClient<Client>
33where
34    Client: BlockClient,
35{
36    client: Client,
37    consensus: Arc<dyn Consensus<Client::Block>>,
38}
39
40impl<Client> FullBlockClient<Client>
41where
42    Client: BlockClient,
43{
44    /// Creates a new instance of `FullBlockClient`.
45    pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
46        Self { client, consensus }
47    }
48
49    /// Returns a client with Test consensus
50    #[cfg(any(test, feature = "test-utils"))]
51    pub fn test_client(client: Client) -> Self {
52        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
53    }
54}
55
56impl<Client> FullBlockClient<Client>
57where
58    Client: BlockClient,
59{
60    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
61    ///
62    /// Note: this future is cancel safe
63    ///
64    /// Caution: This does no validation of body (transactions) response but guarantees that the
65    /// [`SealedHeader`] matches the requested hash.
66    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
67        let client = self.client.clone();
68        FetchFullBlockFuture {
69            hash,
70            consensus: self.consensus.clone(),
71            request: FullBlockRequest {
72                header: Some(client.get_header(hash.into())),
73                body: Some(client.get_block_body(hash)),
74            },
75            client,
76            header: None,
77            body: None,
78        }
79    }
80
81    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
82    ///
83    /// Note: this future is cancel safe
84    ///
85    /// Caution: This does no validation of body (transactions) responses but guarantees that
86    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
87    /// bodies received matches the requested limit.
88    ///
89    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
90    pub fn get_full_block_range(
91        &self,
92        hash: B256,
93        count: u64,
94    ) -> FetchFullBlockRangeFuture<Client> {
95        let client = self.client.clone();
96        FetchFullBlockRangeFuture {
97            start_hash: hash,
98            count,
99            request: FullBlockRangeRequest {
100                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
101                bodies: None,
102            },
103            client,
104            headers: None,
105            pending_headers: VecDeque::new(),
106            bodies: HashMap::default(),
107            consensus: Arc::clone(&self.consensus),
108        }
109    }
110}
111
112/// A future that downloads a full block from the network.
113///
114/// This will attempt to fetch both the header and body for the given block hash at the same time.
115/// When both requests succeed, the future will yield the full block.
116#[must_use = "futures do nothing unless polled"]
117pub struct FetchFullBlockFuture<Client>
118where
119    Client: BlockClient,
120{
121    client: Client,
122    consensus: Arc<dyn Consensus<Client::Block>>,
123    hash: B256,
124    request: FullBlockRequest<Client>,
125    header: Option<SealedHeader<Client::Header>>,
126    body: Option<BodyResponse<Client::Body>>,
127}
128
129impl<Client> FetchFullBlockFuture<Client>
130where
131    Client: BlockClient<Header: BlockHeader>,
132{
133    /// Returns the hash of the block being requested.
134    pub const fn hash(&self) -> &B256 {
135        &self.hash
136    }
137
138    /// If the header request is already complete, this returns the block number
139    pub fn block_number(&self) -> Option<u64> {
140        self.header.as_ref().map(|h| h.number())
141    }
142
143    /// Returns the [`SealedBlock`] if the request is complete and valid.
144    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
145        if self.header.is_none() || self.body.is_none() {
146            return None
147        }
148
149        let header = self.header.take().unwrap();
150        let resp = self.body.take().unwrap();
151        match resp {
152            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
153            BodyResponse::PendingValidation(resp) => {
154                // ensure the block is valid, else retry
155                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
156                {
157                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
158                    self.client.report_bad_message(resp.peer_id());
159                    self.header = Some(header);
160                    self.request.body = Some(self.client.get_block_body(self.hash));
161                    return None
162                }
163                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
164            }
165        }
166    }
167
168    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
169        if let Some(ref header) = self.header {
170            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
171                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
172                self.client.report_bad_message(resp.peer_id());
173                return
174            }
175            self.body = Some(BodyResponse::Validated(resp.into_data()));
176            return
177        }
178        self.body = Some(BodyResponse::PendingValidation(resp));
179    }
180}
181
182impl<Client> Future for FetchFullBlockFuture<Client>
183where
184    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
185{
186    type Output = SealedBlock<Client::Block>;
187
188    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
189        let this = self.get_mut();
190
191        // preemptive yield point
192        let mut budget = 4;
193
194        loop {
195            match ready!(this.request.poll(cx)) {
196                ResponseResult::Header(res) => {
197                    match res {
198                        Ok(maybe_header) => {
199                            let (peer, maybe_header) =
200                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
201                            if let Some(header) = maybe_header {
202                                if header.hash() == this.hash {
203                                    this.header = Some(header);
204                                } else {
205                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
206                                    // received a different header than requested
207                                    this.client.report_bad_message(peer)
208                                }
209                            }
210                        }
211                        Err(err) => {
212                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
213                        }
214                    }
215
216                    if this.header.is_none() {
217                        // received bad response
218                        this.request.header = Some(this.client.get_header(this.hash.into()));
219                    }
220                }
221                ResponseResult::Body(res) => {
222                    match res {
223                        Ok(maybe_body) => {
224                            if let Some(body) = maybe_body.transpose() {
225                                this.on_block_response(body);
226                            }
227                        }
228                        Err(err) => {
229                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
230                        }
231                    }
232                    if this.body.is_none() {
233                        // received bad response
234                        this.request.body = Some(this.client.get_block_body(this.hash));
235                    }
236                }
237            }
238
239            if let Some(res) = this.take_block() {
240                return Poll::Ready(res)
241            }
242
243            // ensure we still have enough budget for another iteration
244            budget -= 1;
245            if budget == 0 {
246                // make sure we're woken up again
247                cx.waker().wake_by_ref();
248                return Poll::Pending
249            }
250        }
251    }
252}
253
254impl<Client> Debug for FetchFullBlockFuture<Client>
255where
256    Client: BlockClient<Header: Debug, Body: Debug>,
257{
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        f.debug_struct("FetchFullBlockFuture")
260            .field("hash", &self.hash)
261            .field("header", &self.header)
262            .field("body", &self.body)
263            .finish()
264    }
265}
266
267struct FullBlockRequest<Client>
268where
269    Client: BlockClient,
270{
271    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
272    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
273}
274
275impl<Client> FullBlockRequest<Client>
276where
277    Client: BlockClient,
278{
279    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
280        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
281            let Poll::Ready(res) = fut.poll(cx)
282        {
283            self.header = None;
284            return Poll::Ready(ResponseResult::Header(res))
285        }
286
287        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
288            let Poll::Ready(res) = fut.poll(cx)
289        {
290            self.body = None;
291            return Poll::Ready(ResponseResult::Body(res))
292        }
293
294        Poll::Pending
295    }
296}
297
298/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
299/// future.
300enum ResponseResult<H, B> {
301    Header(PeerRequestResult<Option<H>>),
302    Body(PeerRequestResult<Option<B>>),
303}
304
305/// The response of a body request.
306#[derive(Debug)]
307enum BodyResponse<B> {
308    /// Already validated against transaction root of header
309    Validated(B),
310    /// Still needs to be validated against header
311    PendingValidation(WithPeerId<B>),
312}
313/// A future that downloads a range of full blocks from the network.
314///
315/// This first fetches the headers for the given range using the inner `Client`. Once the request
316/// is complete, it will fetch the bodies for the headers it received.
317///
318/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
319/// yield the full block range.
320///
321/// The full block range will be returned with falling block numbers, i.e. in descending order.
322///
323/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
324/// hash array used to request them.
325#[must_use = "futures do nothing unless polled"]
326#[expect(missing_debug_implementations)]
327pub struct FetchFullBlockRangeFuture<Client>
328where
329    Client: BlockClient,
330{
331    /// The client used to fetch headers and bodies.
332    client: Client,
333    /// The consensus instance used to validate the blocks.
334    consensus: Arc<dyn Consensus<Client::Block>>,
335    /// The block hash to start fetching from (inclusive).
336    start_hash: B256,
337    /// How many blocks to fetch: `len([start_hash, ..]) == count`
338    count: u64,
339    /// Requests for headers and bodies that are in progress.
340    request: FullBlockRangeRequest<Client>,
341    /// Fetched headers.
342    headers: Option<Vec<SealedHeader<Client::Header>>>,
343    /// The next headers to request bodies for. This is drained as responses are received.
344    pending_headers: VecDeque<SealedHeader<Client::Header>>,
345    /// The bodies that have been received so far.
346    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
347}
348
349impl<Client> FetchFullBlockRangeFuture<Client>
350where
351    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
352{
353    /// Returns the block hashes for the given range, if they are available.
354    pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
355        self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
356    }
357
358    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
359    fn is_bodies_complete(&self) -> bool {
360        self.bodies.len() == self.count as usize
361    }
362
363    /// Inserts a block body, matching it with the `next_header`.
364    ///
365    /// Note: this assumes the response matches the next header in the queue.
366    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
367        if let Some(header) = self.pending_headers.pop_front() {
368            self.bodies.insert(header, body_response);
369        }
370    }
371
372    /// Inserts multiple block bodies.
373    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
374        for body in bodies {
375            self.insert_body(body);
376        }
377    }
378
379    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
380    /// in the `root_map`.
381    fn remaining_bodies_hashes(&self) -> Vec<B256> {
382        self.pending_headers.iter().map(|h| h.hash()).collect()
383    }
384
385    /// Returns the [`SealedBlock`]s if the request is complete and valid.
386    ///
387    /// The request is complete if the number of blocks requested is equal to the number of blocks
388    /// received. The request is valid if the returned bodies match the roots in the headers.
389    ///
390    /// These are returned in falling order starting with the requested `hash`, i.e. with
391    /// descending block numbers.
392    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
393        if !self.is_bodies_complete() {
394            // not done with bodies yet
395            return None
396        }
397
398        let headers = self.headers.take()?;
399        let mut needs_retry = false;
400        let mut valid_responses = Vec::new();
401
402        for header in &headers {
403            if let Some(body_resp) = self.bodies.remove(header) {
404                // validate body w.r.t. the hashes in the header, only inserting into the response
405                let body = match body_resp {
406                    BodyResponse::Validated(body) => body,
407                    BodyResponse::PendingValidation(resp) => {
408                        // ensure the block is valid, else retry
409                        if let Err(err) =
410                            self.consensus.validate_body_against_header(resp.data(), header)
411                        {
412                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
413                            self.client.report_bad_message(resp.peer_id());
414
415                            // get body that doesn't match, put back into vecdeque, and retry it
416                            self.pending_headers.push_back(header.clone());
417                            needs_retry = true;
418                            continue
419                        }
420
421                        resp.into_data()
422                    }
423                };
424
425                valid_responses
426                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
427            }
428        }
429
430        if needs_retry {
431            // put response hashes back into bodies map since we aren't returning them as a
432            // response
433            for block in valid_responses {
434                let (header, body) = block.split_sealed_header_body();
435                self.bodies.insert(header, BodyResponse::Validated(body));
436            }
437
438            // put headers back since they were `take`n before
439            self.headers = Some(headers);
440
441            // create response for failing bodies
442            let hashes = self.remaining_bodies_hashes();
443            self.request.bodies = Some(self.client.get_block_bodies(hashes));
444            return None
445        }
446
447        Some(valid_responses)
448    }
449
450    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
451        let (peer, mut headers_falling) =
452            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
453
454        // fill in the response if it's the correct length
455        if headers_falling.len() == self.count as usize {
456            // sort headers from highest to lowest block number
457            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
458
459            // check the starting hash
460            if headers_falling[0].hash() == self.start_hash {
461                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
462                // check if the downloaded headers are valid
463                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
464                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
465                    self.client.report_bad_message(peer);
466                }
467
468                // get the bodies request so it can be polled later
469                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
470
471                // populate the pending headers
472                self.pending_headers = headers_falling.clone().into();
473
474                // set the actual request if it hasn't been started yet
475                if !self.has_bodies_request_started() {
476                    // request the bodies for the downloaded headers
477                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
478                }
479
480                // set the headers response
481                self.headers = Some(headers_falling);
482            } else {
483                // received a different header than requested
484                self.client.report_bad_message(peer);
485            }
486        }
487    }
488
489    /// Returns whether or not a bodies request has been started, returning false if there is no
490    /// pending request.
491    const fn has_bodies_request_started(&self) -> bool {
492        self.request.bodies.is_some()
493    }
494
495    /// Returns the start hash for the request
496    pub const fn start_hash(&self) -> B256 {
497        self.start_hash
498    }
499
500    /// Returns the block count for the request
501    pub const fn count(&self) -> u64 {
502        self.count
503    }
504}
505
506impl<Client> Future for FetchFullBlockRangeFuture<Client>
507where
508    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
509{
510    type Output = Vec<SealedBlock<Client::Block>>;
511
512    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
513        let this = self.get_mut();
514
515        loop {
516            match ready!(this.request.poll(cx)) {
517                // This branch handles headers responses from peers - it first ensures that the
518                // starting hash and number of headers matches what we requested.
519                //
520                // If these don't match, we penalize the peer and retry the request.
521                // If they do match, we sort the headers by block number and start the request for
522                // the corresponding block bodies.
523                //
524                // The next result that should be yielded by `poll` is the bodies response.
525                RangeResponseResult::Header(res) => {
526                    match res {
527                        Ok(headers) => {
528                            this.on_headers_response(headers);
529                        }
530                        Err(err) => {
531                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
532                        }
533                    }
534
535                    if this.headers.is_none() {
536                        // did not receive a correct response yet, retry
537                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
538                            start: this.start_hash.into(),
539                            limit: this.count,
540                            direction: HeadersDirection::Falling,
541                        }));
542                    }
543                }
544                // This branch handles block body responses from peers - it first inserts the
545                // bodies into the `bodies` map, and then checks if the request is complete.
546                //
547                // If the request is not complete, and we need to request more bodies, we send
548                // a bodies request for the headers we don't yet have bodies for.
549                RangeResponseResult::Body(res) => {
550                    match res {
551                        Ok(bodies_resp) => {
552                            let (peer, new_bodies) = bodies_resp.split();
553
554                            // first insert the received bodies
555                            this.insert_bodies(
556                                new_bodies
557                                    .into_iter()
558                                    .map(|resp| WithPeerId::new(peer, resp))
559                                    .map(BodyResponse::PendingValidation),
560                            );
561
562                            if !this.is_bodies_complete() {
563                                // get remaining hashes so we can send the next request
564                                let req_hashes = this.remaining_bodies_hashes();
565
566                                // set a new request
567                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
568                            }
569                        }
570                        Err(err) => {
571                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
572                        }
573                    }
574                    if this.request.bodies.is_none() && !this.is_bodies_complete() {
575                        // no pending bodies request (e.g., request error), retry remaining bodies
576                        // TODO: convert this into two futures, one which is a headers range
577                        // future, and one which is a bodies range future.
578                        //
579                        // The headers range future should yield the bodies range future.
580                        // The bodies range future should not have an Option<Vec<B256>>, it should
581                        // have a populated Vec<B256> from the successful headers range future.
582                        //
583                        // This is optimal because we can not send a bodies request without
584                        // first completing the headers request. This way we can get rid of the
585                        // following `if let Some`. A bodies request should never be sent before
586                        // the headers request completes, so this should always be `Some` anyways.
587                        let hashes = this.remaining_bodies_hashes();
588                        if !hashes.is_empty() {
589                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
590                        }
591                    }
592                }
593            }
594
595            if let Some(res) = this.take_blocks() {
596                return Poll::Ready(res)
597            }
598        }
599    }
600}
601
602/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
603/// futures until they return responses. It will return either the header or body result, depending
604/// on which future successfully returned.
605struct FullBlockRangeRequest<Client>
606where
607    Client: BlockClient,
608{
609    headers: Option<<Client as HeadersClient>::Output>,
610    bodies: Option<<Client as BodiesClient>::Output>,
611}
612
613impl<Client> FullBlockRangeRequest<Client>
614where
615    Client: BlockClient,
616{
617    fn poll(
618        &mut self,
619        cx: &mut Context<'_>,
620    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
621        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
622            let Poll::Ready(res) = fut.poll(cx)
623        {
624            self.headers = None;
625            return Poll::Ready(RangeResponseResult::Header(res))
626        }
627
628        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
629            let Poll::Ready(res) = fut.poll(cx)
630        {
631            self.bodies = None;
632            return Poll::Ready(RangeResponseResult::Body(res))
633        }
634
635        Poll::Pending
636    }
637}
638
639// The result of a request for headers or block bodies. This is yielded by the
640// `FullBlockRangeRequest` future.
641enum RangeResponseResult<H, B> {
642    Header(PeerRequestResult<Vec<H>>),
643    Body(PeerRequestResult<Vec<B>>),
644}
645
646/// A headers+bodies client implementation that does nothing.
647#[derive(Debug, Clone)]
648#[non_exhaustive]
649pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
650
651/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
652impl<Net> DownloadClient for NoopFullBlockClient<Net>
653where
654    Net: Debug + Send + Sync,
655{
656    /// Reports a bad message received from a peer.
657    ///
658    /// # Arguments
659    ///
660    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
661    ///   implementation).
662    fn report_bad_message(&self, _peer_id: PeerId) {}
663
664    /// Retrieves the number of connected peers.
665    ///
666    /// # Returns
667    ///
668    /// The number of connected peers, which is always zero in this implementation.
669    fn num_connected_peers(&self) -> usize {
670        0
671    }
672}
673
674/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
675impl<Net> BodiesClient for NoopFullBlockClient<Net>
676where
677    Net: NetworkPrimitives,
678{
679    type Body = Net::BlockBody;
680    /// Defines the output type of the function.
681    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
682
683    /// Retrieves block bodies based on provided hashes and priority.
684    ///
685    /// # Arguments
686    ///
687    /// * `_hashes` - A vector of block hashes (unused in this implementation).
688    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
689    ///
690    /// # Returns
691    ///
692    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
693    fn get_block_bodies_with_priority_and_range_hint(
694        &self,
695        _hashes: Vec<B256>,
696        _priority: Priority,
697        _range_hint: Option<RangeInclusive<u64>>,
698    ) -> Self::Output {
699        // Create a future that immediately returns an empty vector of block bodies and a random
700        // PeerId.
701        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
702    }
703}
704
705impl<Net> HeadersClient for NoopFullBlockClient<Net>
706where
707    Net: NetworkPrimitives,
708{
709    type Header = Net::BlockHeader;
710    /// The output type representing a future containing a peer request result with a vector of
711    /// headers.
712    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
713
714    /// Retrieves headers with a specified priority level.
715    ///
716    /// This implementation does nothing and returns an empty vector of headers.
717    ///
718    /// # Arguments
719    ///
720    /// * `_request` - A request for headers (unused in this implementation).
721    /// * `_priority` - The priority level for the headers request (unused in this implementation).
722    ///
723    /// # Returns
724    ///
725    /// Always returns a ready future with an empty vector of headers wrapped in a
726    /// `PeerRequestResult`.
727    fn get_headers_with_priority(
728        &self,
729        _request: HeadersRequest,
730        _priority: Priority,
731    ) -> Self::Output {
732        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
733    }
734}
735
736impl<Net> BlockClient for NoopFullBlockClient<Net>
737where
738    Net: NetworkPrimitives,
739{
740    type Block = Net::Block;
741}
742
743impl<Net> Default for NoopFullBlockClient<Net> {
744    fn default() -> Self {
745        Self(PhantomData::<Net>)
746    }
747}
748
749#[cfg(test)]
750mod tests {
751    use reth_ethereum_primitives::BlockBody;
752
753    use super::*;
754    use crate::{error::RequestError, test_utils::TestFullBlockClient};
755    use std::{
756        ops::Range,
757        sync::atomic::{AtomicUsize, Ordering},
758    };
759    use tokio::time::{timeout, Duration};
760
761    #[tokio::test]
762    async fn download_single_full_block() {
763        let client = TestFullBlockClient::default();
764        let header: SealedHeader = SealedHeader::default();
765        let body = BlockBody::default();
766        client.insert(header.clone(), body.clone());
767        let client = FullBlockClient::test_client(client);
768
769        let received = client.get_full_block(header.hash()).await;
770        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
771    }
772
773    #[tokio::test]
774    async fn download_single_full_block_range() {
775        let client = TestFullBlockClient::default();
776        let header: SealedHeader = SealedHeader::default();
777        let body = BlockBody::default();
778        client.insert(header.clone(), body.clone());
779        let client = FullBlockClient::test_client(client);
780
781        let received = client.get_full_block_range(header.hash(), 1).await;
782        let received = received.first().expect("response should include a block");
783        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
784    }
785
786    /// Inserts headers and returns the last header and block body.
787    fn insert_headers_into_client(
788        client: &TestFullBlockClient,
789        range: Range<usize>,
790    ) -> (SealedHeader, BlockBody) {
791        let mut sealed_header: SealedHeader = SealedHeader::default();
792        let body = BlockBody::default();
793        for _ in range {
794            let (mut header, hash) = sealed_header.split();
795            // update to the next header
796            header.parent_hash = hash;
797            header.number += 1;
798
799            sealed_header = SealedHeader::seal_slow(header);
800
801            client.insert(sealed_header.clone(), body.clone());
802        }
803
804        (sealed_header, body)
805    }
806
807    #[derive(Clone, Debug)]
808    struct FailingBodiesClient {
809        inner: TestFullBlockClient,
810        fail_on: usize,
811        body_requests: Arc<AtomicUsize>,
812    }
813
814    impl FailingBodiesClient {
815        fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
816            Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
817        }
818    }
819
820    impl DownloadClient for FailingBodiesClient {
821        fn report_bad_message(&self, peer_id: PeerId) {
822            self.inner.report_bad_message(peer_id);
823        }
824
825        fn num_connected_peers(&self) -> usize {
826            self.inner.num_connected_peers()
827        }
828    }
829
830    impl HeadersClient for FailingBodiesClient {
831        type Header = <TestFullBlockClient as HeadersClient>::Header;
832        type Output = <TestFullBlockClient as HeadersClient>::Output;
833
834        fn get_headers_with_priority(
835            &self,
836            request: HeadersRequest,
837            priority: Priority,
838        ) -> Self::Output {
839            self.inner.get_headers_with_priority(request, priority)
840        }
841    }
842
843    impl BodiesClient for FailingBodiesClient {
844        type Body = <TestFullBlockClient as BodiesClient>::Body;
845        type Output = <TestFullBlockClient as BodiesClient>::Output;
846
847        fn get_block_bodies_with_priority_and_range_hint(
848            &self,
849            hashes: Vec<B256>,
850            priority: Priority,
851            range_hint: Option<RangeInclusive<u64>>,
852        ) -> Self::Output {
853            let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
854            if attempt == self.fail_on {
855                return futures::future::ready(Err(RequestError::Timeout))
856            }
857
858            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
859        }
860    }
861
862    impl BlockClient for FailingBodiesClient {
863        type Block = reth_ethereum_primitives::Block;
864    }
865
866    #[tokio::test]
867    async fn download_full_block_range() {
868        let client = TestFullBlockClient::default();
869        let (header, body) = insert_headers_into_client(&client, 0..50);
870        let client = FullBlockClient::test_client(client);
871
872        let received = client.get_full_block_range(header.hash(), 1).await;
873        let received = received.first().expect("response should include a block");
874        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
875
876        let received = client.get_full_block_range(header.hash(), 10).await;
877        assert_eq!(received.len(), 10);
878        for (i, block) in received.iter().enumerate() {
879            let expected_number = header.number - i as u64;
880            assert_eq!(block.number, expected_number);
881        }
882    }
883
884    #[tokio::test]
885    async fn download_full_block_range_over_soft_limit() {
886        // default soft limit is 20, so we will request 50 blocks
887        let client = TestFullBlockClient::default();
888        let (header, body) = insert_headers_into_client(&client, 0..50);
889        let client = FullBlockClient::test_client(client);
890
891        let received = client.get_full_block_range(header.hash(), 1).await;
892        let received = received.first().expect("response should include a block");
893        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
894
895        let received = client.get_full_block_range(header.hash(), 50).await;
896        assert_eq!(received.len(), 50);
897        for (i, block) in received.iter().enumerate() {
898            let expected_number = header.number - i as u64;
899            assert_eq!(block.number, expected_number);
900        }
901    }
902
903    #[tokio::test]
904    async fn download_full_block_range_retries_after_body_error() {
905        let mut client = TestFullBlockClient::default();
906        client.set_soft_limit(2);
907        let (header, _) = insert_headers_into_client(&client, 0..3);
908
909        let client = FailingBodiesClient::new(client, 1);
910        let body_requests = Arc::clone(&client.body_requests);
911        let client = FullBlockClient::test_client(client);
912
913        let received =
914            timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
915                .await
916                .expect("body request retry should complete");
917
918        assert_eq!(received.len(), 3);
919        assert_eq!(body_requests.load(Ordering::SeqCst), 3);
920    }
921
922    #[tokio::test]
923    async fn download_full_block_range_with_invalid_header() {
924        let client = TestFullBlockClient::default();
925        let range_length: usize = 3;
926        let (header, _) = insert_headers_into_client(&client, 0..range_length);
927
928        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
929        test_consensus.set_fail_validation(true);
930        test_consensus.set_fail_body_against_header(false);
931        let client = FullBlockClient::new(client, Arc::new(test_consensus));
932
933        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
934
935        assert_eq!(received.len(), range_length);
936        for (i, block) in received.iter().enumerate() {
937            let expected_number = header.number - i as u64;
938            assert_eq!(block.number, expected_number);
939        }
940    }
941}