reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    bodies::client::{BodiesClient, SingleBodyRequest},
4    download::DownloadClient,
5    error::PeerRequestResult,
6    headers::client::{HeadersClient, SingleHeaderRequest},
7    priority::Priority,
8    BlockClient,
9};
10use alloy_consensus::BlockHeader;
11use alloy_primitives::{Sealable, B256};
12use core::marker::PhantomData;
13use reth_consensus::{Consensus, ConsensusError};
14use reth_eth_wire_types::{EthNetworkPrimitives, HeadersDirection, NetworkPrimitives};
15use reth_network_peers::{PeerId, WithPeerId};
16use reth_primitives_traits::{SealedBlock, SealedHeader};
17use std::{
18    cmp::Reverse,
19    collections::{HashMap, VecDeque},
20    fmt::Debug,
21    future::Future,
22    hash::Hash,
23    ops::RangeInclusive,
24    pin::Pin,
25    sync::Arc,
26    task::{ready, Context, Poll},
27};
28use tracing::debug;
29
30/// A Client that can fetch full blocks from the network.
31#[derive(Debug, Clone)]
32pub struct FullBlockClient<Client>
33where
34    Client: BlockClient,
35{
36    client: Client,
37    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
38}
39
40impl<Client> FullBlockClient<Client>
41where
42    Client: BlockClient,
43{
44    /// Creates a new instance of `FullBlockClient`.
45    pub fn new(
46        client: Client,
47        consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
48    ) -> Self {
49        Self { client, consensus }
50    }
51
52    /// Returns a client with Test consensus
53    #[cfg(any(test, feature = "test-utils"))]
54    pub fn test_client(client: Client) -> Self {
55        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
56    }
57}
58
59impl<Client> FullBlockClient<Client>
60where
61    Client: BlockClient,
62{
63    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
64    ///
65    /// Note: this future is cancel safe
66    ///
67    /// Caution: This does no validation of body (transactions) response but guarantees that the
68    /// [`SealedHeader`] matches the requested hash.
69    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
70        let client = self.client.clone();
71        FetchFullBlockFuture {
72            hash,
73            consensus: self.consensus.clone(),
74            request: FullBlockRequest {
75                header: Some(client.get_header(hash.into())),
76                body: Some(client.get_block_body(hash)),
77            },
78            client,
79            header: None,
80            body: None,
81        }
82    }
83
84    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
85    ///
86    /// Note: this future is cancel safe
87    ///
88    /// Caution: This does no validation of body (transactions) responses but guarantees that
89    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
90    /// bodies received matches the requested limit.
91    ///
92    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
93    pub fn get_full_block_range(
94        &self,
95        hash: B256,
96        count: u64,
97    ) -> FetchFullBlockRangeFuture<Client> {
98        let client = self.client.clone();
99        FetchFullBlockRangeFuture {
100            start_hash: hash,
101            count,
102            request: FullBlockRangeRequest {
103                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
104                bodies: None,
105            },
106            client,
107            headers: None,
108            pending_headers: VecDeque::new(),
109            bodies: HashMap::default(),
110            consensus: Arc::clone(&self.consensus),
111        }
112    }
113}
114
115/// A future that downloads a full block from the network.
116///
117/// This will attempt to fetch both the header and body for the given block hash at the same time.
118/// When both requests succeed, the future will yield the full block.
119#[must_use = "futures do nothing unless polled"]
120pub struct FetchFullBlockFuture<Client>
121where
122    Client: BlockClient,
123{
124    client: Client,
125    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
126    hash: B256,
127    request: FullBlockRequest<Client>,
128    header: Option<SealedHeader<Client::Header>>,
129    body: Option<BodyResponse<Client::Body>>,
130}
131
132impl<Client> FetchFullBlockFuture<Client>
133where
134    Client: BlockClient<Header: BlockHeader>,
135{
136    /// Returns the hash of the block being requested.
137    pub const fn hash(&self) -> &B256 {
138        &self.hash
139    }
140
141    /// If the header request is already complete, this returns the block number
142    pub fn block_number(&self) -> Option<u64> {
143        self.header.as_ref().map(|h| h.number())
144    }
145
146    /// Returns the [`SealedBlock`] if the request is complete and valid.
147    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
148        if self.header.is_none() || self.body.is_none() {
149            return None
150        }
151
152        let header = self.header.take().unwrap();
153        let resp = self.body.take().unwrap();
154        match resp {
155            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
156            BodyResponse::PendingValidation(resp) => {
157                // ensure the block is valid, else retry
158                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
159                {
160                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
161                    self.client.report_bad_message(resp.peer_id());
162                    self.header = Some(header);
163                    self.request.body = Some(self.client.get_block_body(self.hash));
164                    return None
165                }
166                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
167            }
168        }
169    }
170
171    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
172        if let Some(ref header) = self.header {
173            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
174                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
175                self.client.report_bad_message(resp.peer_id());
176                return
177            }
178            self.body = Some(BodyResponse::Validated(resp.into_data()));
179            return
180        }
181        self.body = Some(BodyResponse::PendingValidation(resp));
182    }
183}
184
185impl<Client> Future for FetchFullBlockFuture<Client>
186where
187    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
188{
189    type Output = SealedBlock<Client::Block>;
190
191    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
192        let this = self.get_mut();
193
194        // preemptive yield point
195        let mut budget = 4;
196
197        loop {
198            match ready!(this.request.poll(cx)) {
199                ResponseResult::Header(res) => {
200                    match res {
201                        Ok(maybe_header) => {
202                            let (peer, maybe_header) =
203                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
204                            if let Some(header) = maybe_header {
205                                if header.hash() == this.hash {
206                                    this.header = Some(header);
207                                } else {
208                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
209                                    // received a different header than requested
210                                    this.client.report_bad_message(peer)
211                                }
212                            }
213                        }
214                        Err(err) => {
215                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
216                        }
217                    }
218
219                    if this.header.is_none() {
220                        // received bad response
221                        this.request.header = Some(this.client.get_header(this.hash.into()));
222                    }
223                }
224                ResponseResult::Body(res) => {
225                    match res {
226                        Ok(maybe_body) => {
227                            if let Some(body) = maybe_body.transpose() {
228                                this.on_block_response(body);
229                            }
230                        }
231                        Err(err) => {
232                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
233                        }
234                    }
235                    if this.body.is_none() {
236                        // received bad response
237                        this.request.body = Some(this.client.get_block_body(this.hash));
238                    }
239                }
240            }
241
242            if let Some(res) = this.take_block() {
243                return Poll::Ready(res)
244            }
245
246            // ensure we still have enough budget for another iteration
247            budget -= 1;
248            if budget == 0 {
249                // make sure we're woken up again
250                cx.waker().wake_by_ref();
251                return Poll::Pending
252            }
253        }
254    }
255}
256
257impl<Client> Debug for FetchFullBlockFuture<Client>
258where
259    Client: BlockClient<Header: Debug, Body: Debug>,
260{
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        f.debug_struct("FetchFullBlockFuture")
263            .field("hash", &self.hash)
264            .field("header", &self.header)
265            .field("body", &self.body)
266            .finish()
267    }
268}
269
270struct FullBlockRequest<Client>
271where
272    Client: BlockClient,
273{
274    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
275    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
276}
277
278impl<Client> FullBlockRequest<Client>
279where
280    Client: BlockClient,
281{
282    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
283        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() {
284            if let Poll::Ready(res) = fut.poll(cx) {
285                self.header = None;
286                return Poll::Ready(ResponseResult::Header(res))
287            }
288        }
289
290        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() {
291            if let Poll::Ready(res) = fut.poll(cx) {
292                self.body = None;
293                return Poll::Ready(ResponseResult::Body(res))
294            }
295        }
296
297        Poll::Pending
298    }
299}
300
301/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
302/// future.
303enum ResponseResult<H, B> {
304    Header(PeerRequestResult<Option<H>>),
305    Body(PeerRequestResult<Option<B>>),
306}
307
308/// The response of a body request.
309#[derive(Debug)]
310enum BodyResponse<B> {
311    /// Already validated against transaction root of header
312    Validated(B),
313    /// Still needs to be validated against header
314    PendingValidation(WithPeerId<B>),
315}
316/// A future that downloads a range of full blocks from the network.
317///
318/// This first fetches the headers for the given range using the inner `Client`. Once the request
319/// is complete, it will fetch the bodies for the headers it received.
320///
321/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
322/// yield the full block range.
323///
324/// The full block range will be returned with falling block numbers, i.e. in descending order.
325///
326/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
327/// hash array used to request them.
328#[must_use = "futures do nothing unless polled"]
329#[expect(missing_debug_implementations)]
330pub struct FetchFullBlockRangeFuture<Client>
331where
332    Client: BlockClient,
333{
334    /// The client used to fetch headers and bodies.
335    client: Client,
336    /// The consensus instance used to validate the blocks.
337    consensus: Arc<dyn Consensus<Client::Block, Error = ConsensusError>>,
338    /// The block hash to start fetching from (inclusive).
339    start_hash: B256,
340    /// How many blocks to fetch: `len([start_hash, ..]) == count`
341    count: u64,
342    /// Requests for headers and bodies that are in progress.
343    request: FullBlockRangeRequest<Client>,
344    /// Fetched headers.
345    headers: Option<Vec<SealedHeader<Client::Header>>>,
346    /// The next headers to request bodies for. This is drained as responses are received.
347    pending_headers: VecDeque<SealedHeader<Client::Header>>,
348    /// The bodies that have been received so far.
349    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
350}
351
352impl<Client> FetchFullBlockRangeFuture<Client>
353where
354    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
355{
356    /// Returns the block hashes for the given range, if they are available.
357    pub fn range_block_hashes(&self) -> Option<Vec<B256>> {
358        self.headers.as_ref().map(|h| h.iter().map(|h| h.hash()).collect())
359    }
360
361    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
362    fn is_bodies_complete(&self) -> bool {
363        self.bodies.len() == self.count as usize
364    }
365
366    /// Inserts a block body, matching it with the `next_header`.
367    ///
368    /// Note: this assumes the response matches the next header in the queue.
369    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
370        if let Some(header) = self.pending_headers.pop_front() {
371            self.bodies.insert(header, body_response);
372        }
373    }
374
375    /// Inserts multiple block bodies.
376    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
377        for body in bodies {
378            self.insert_body(body);
379        }
380    }
381
382    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
383    /// in the `root_map`.
384    fn remaining_bodies_hashes(&self) -> Vec<B256> {
385        self.pending_headers.iter().map(|h| h.hash()).collect()
386    }
387
388    /// Returns the [`SealedBlock`]s if the request is complete and valid.
389    ///
390    /// The request is complete if the number of blocks requested is equal to the number of blocks
391    /// received. The request is valid if the returned bodies match the roots in the headers.
392    ///
393    /// These are returned in falling order starting with the requested `hash`, i.e. with
394    /// descending block numbers.
395    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
396        if !self.is_bodies_complete() {
397            // not done with bodies yet
398            return None
399        }
400
401        let headers = self.headers.take()?;
402        let mut needs_retry = false;
403        let mut valid_responses = Vec::new();
404
405        for header in &headers {
406            if let Some(body_resp) = self.bodies.remove(header) {
407                // validate body w.r.t. the hashes in the header, only inserting into the response
408                let body = match body_resp {
409                    BodyResponse::Validated(body) => body,
410                    BodyResponse::PendingValidation(resp) => {
411                        // ensure the block is valid, else retry
412                        if let Err(err) =
413                            self.consensus.validate_body_against_header(resp.data(), header)
414                        {
415                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
416                            self.client.report_bad_message(resp.peer_id());
417
418                            // get body that doesn't match, put back into vecdeque, and retry it
419                            self.pending_headers.push_back(header.clone());
420                            needs_retry = true;
421                            continue
422                        }
423
424                        resp.into_data()
425                    }
426                };
427
428                valid_responses
429                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
430            }
431        }
432
433        if needs_retry {
434            // put response hashes back into bodies map since we aren't returning them as a
435            // response
436            for block in valid_responses {
437                let (header, body) = block.split_sealed_header_body();
438                self.bodies.insert(header, BodyResponse::Validated(body));
439            }
440
441            // put headers back since they were `take`n before
442            self.headers = Some(headers);
443
444            // create response for failing bodies
445            let hashes = self.remaining_bodies_hashes();
446            self.request.bodies = Some(self.client.get_block_bodies(hashes));
447            return None
448        }
449
450        Some(valid_responses)
451    }
452
453    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
454        let (peer, mut headers_falling) =
455            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
456
457        // fill in the response if it's the correct length
458        if headers_falling.len() == self.count as usize {
459            // sort headers from highest to lowest block number
460            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
461
462            // check the starting hash
463            if headers_falling[0].hash() == self.start_hash {
464                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
465                // check if the downloaded headers are valid
466                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
467                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
468                    self.client.report_bad_message(peer);
469                }
470
471                // get the bodies request so it can be polled later
472                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
473
474                // populate the pending headers
475                self.pending_headers = headers_falling.clone().into();
476
477                // set the actual request if it hasn't been started yet
478                if !self.has_bodies_request_started() {
479                    // request the bodies for the downloaded headers
480                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
481                }
482
483                // set the headers response
484                self.headers = Some(headers_falling);
485            } else {
486                // received a different header than requested
487                self.client.report_bad_message(peer);
488            }
489        }
490    }
491
492    /// Returns whether or not a bodies request has been started, returning false if there is no
493    /// pending request.
494    const fn has_bodies_request_started(&self) -> bool {
495        self.request.bodies.is_some()
496    }
497
498    /// Returns the start hash for the request
499    pub const fn start_hash(&self) -> B256 {
500        self.start_hash
501    }
502
503    /// Returns the block count for the request
504    pub const fn count(&self) -> u64 {
505        self.count
506    }
507}
508
509impl<Client> Future for FetchFullBlockRangeFuture<Client>
510where
511    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
512{
513    type Output = Vec<SealedBlock<Client::Block>>;
514
515    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
516        let this = self.get_mut();
517
518        loop {
519            match ready!(this.request.poll(cx)) {
520                // This branch handles headers responses from peers - it first ensures that the
521                // starting hash and number of headers matches what we requested.
522                //
523                // If these don't match, we penalize the peer and retry the request.
524                // If they do match, we sort the headers by block number and start the request for
525                // the corresponding block bodies.
526                //
527                // The next result that should be yielded by `poll` is the bodies response.
528                RangeResponseResult::Header(res) => {
529                    match res {
530                        Ok(headers) => {
531                            this.on_headers_response(headers);
532                        }
533                        Err(err) => {
534                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
535                        }
536                    }
537
538                    if this.headers.is_none() {
539                        // did not receive a correct response yet, retry
540                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
541                            start: this.start_hash.into(),
542                            limit: this.count,
543                            direction: HeadersDirection::Falling,
544                        }));
545                    }
546                }
547                // This branch handles block body responses from peers - it first inserts the
548                // bodies into the `bodies` map, and then checks if the request is complete.
549                //
550                // If the request is not complete, and we need to request more bodies, we send
551                // a bodies request for the headers we don't yet have bodies for.
552                RangeResponseResult::Body(res) => {
553                    match res {
554                        Ok(bodies_resp) => {
555                            let (peer, new_bodies) = bodies_resp.split();
556
557                            // first insert the received bodies
558                            this.insert_bodies(
559                                new_bodies
560                                    .into_iter()
561                                    .map(|resp| WithPeerId::new(peer, resp))
562                                    .map(BodyResponse::PendingValidation),
563                            );
564
565                            if !this.is_bodies_complete() {
566                                // get remaining hashes so we can send the next request
567                                let req_hashes = this.remaining_bodies_hashes();
568
569                                // set a new request
570                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
571                            }
572                        }
573                        Err(err) => {
574                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
575                        }
576                    }
577                    if this.bodies.is_empty() {
578                        // received bad response, re-request headers
579                        // TODO: convert this into two futures, one which is a headers range
580                        // future, and one which is a bodies range future.
581                        //
582                        // The headers range future should yield the bodies range future.
583                        // The bodies range future should not have an Option<Vec<B256>>, it should
584                        // have a populated Vec<B256> from the successful headers range future.
585                        //
586                        // This is optimal because we can not send a bodies request without
587                        // first completing the headers request. This way we can get rid of the
588                        // following `if let Some`. A bodies request should never be sent before
589                        // the headers request completes, so this should always be `Some` anyways.
590                        let hashes = this.remaining_bodies_hashes();
591                        if !hashes.is_empty() {
592                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
593                        }
594                    }
595                }
596            }
597
598            if let Some(res) = this.take_blocks() {
599                return Poll::Ready(res)
600            }
601        }
602    }
603}
604
605/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
606/// futures until they return responses. It will return either the header or body result, depending
607/// on which future successfully returned.
608struct FullBlockRangeRequest<Client>
609where
610    Client: BlockClient,
611{
612    headers: Option<<Client as HeadersClient>::Output>,
613    bodies: Option<<Client as BodiesClient>::Output>,
614}
615
616impl<Client> FullBlockRangeRequest<Client>
617where
618    Client: BlockClient,
619{
620    fn poll(
621        &mut self,
622        cx: &mut Context<'_>,
623    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
624        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() {
625            if let Poll::Ready(res) = fut.poll(cx) {
626                self.headers = None;
627                return Poll::Ready(RangeResponseResult::Header(res))
628            }
629        }
630
631        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() {
632            if let Poll::Ready(res) = fut.poll(cx) {
633                self.bodies = None;
634                return Poll::Ready(RangeResponseResult::Body(res))
635            }
636        }
637
638        Poll::Pending
639    }
640}
641
642// The result of a request for headers or block bodies. This is yielded by the
643// `FullBlockRangeRequest` future.
644enum RangeResponseResult<H, B> {
645    Header(PeerRequestResult<Vec<H>>),
646    Body(PeerRequestResult<Vec<B>>),
647}
648
649/// A headers+bodies client implementation that does nothing.
650#[derive(Debug, Clone)]
651#[non_exhaustive]
652pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
653
654/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
655impl<Net> DownloadClient for NoopFullBlockClient<Net>
656where
657    Net: Debug + Send + Sync,
658{
659    /// Reports a bad message received from a peer.
660    ///
661    /// # Arguments
662    ///
663    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
664    ///   implementation).
665    fn report_bad_message(&self, _peer_id: PeerId) {}
666
667    /// Retrieves the number of connected peers.
668    ///
669    /// # Returns
670    ///
671    /// The number of connected peers, which is always zero in this implementation.
672    fn num_connected_peers(&self) -> usize {
673        0
674    }
675}
676
677/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
678impl<Net> BodiesClient for NoopFullBlockClient<Net>
679where
680    Net: NetworkPrimitives,
681{
682    type Body = Net::BlockBody;
683    /// Defines the output type of the function.
684    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
685
686    /// Retrieves block bodies based on provided hashes and priority.
687    ///
688    /// # Arguments
689    ///
690    /// * `_hashes` - A vector of block hashes (unused in this implementation).
691    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
692    ///
693    /// # Returns
694    ///
695    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
696    fn get_block_bodies_with_priority_and_range_hint(
697        &self,
698        _hashes: Vec<B256>,
699        _priority: Priority,
700        _range_hint: Option<RangeInclusive<u64>>,
701    ) -> Self::Output {
702        // Create a future that immediately returns an empty vector of block bodies and a random
703        // PeerId.
704        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
705    }
706}
707
708impl<Net> HeadersClient for NoopFullBlockClient<Net>
709where
710    Net: NetworkPrimitives,
711{
712    type Header = Net::BlockHeader;
713    /// The output type representing a future containing a peer request result with a vector of
714    /// headers.
715    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
716
717    /// Retrieves headers with a specified priority level.
718    ///
719    /// This implementation does nothing and returns an empty vector of headers.
720    ///
721    /// # Arguments
722    ///
723    /// * `_request` - A request for headers (unused in this implementation).
724    /// * `_priority` - The priority level for the headers request (unused in this implementation).
725    ///
726    /// # Returns
727    ///
728    /// Always returns a ready future with an empty vector of headers wrapped in a
729    /// `PeerRequestResult`.
730    fn get_headers_with_priority(
731        &self,
732        _request: HeadersRequest,
733        _priority: Priority,
734    ) -> Self::Output {
735        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
736    }
737}
738
739impl<Net> BlockClient for NoopFullBlockClient<Net>
740where
741    Net: NetworkPrimitives,
742{
743    type Block = Net::Block;
744}
745
746impl<Net> Default for NoopFullBlockClient<Net> {
747    fn default() -> Self {
748        Self(PhantomData::<Net>)
749    }
750}
751
752#[cfg(test)]
753mod tests {
754    use reth_ethereum_primitives::BlockBody;
755
756    use super::*;
757    use crate::test_utils::TestFullBlockClient;
758    use std::ops::Range;
759
760    #[tokio::test]
761    async fn download_single_full_block() {
762        let client = TestFullBlockClient::default();
763        let header: SealedHeader = SealedHeader::default();
764        let body = BlockBody::default();
765        client.insert(header.clone(), body.clone());
766        let client = FullBlockClient::test_client(client);
767
768        let received = client.get_full_block(header.hash()).await;
769        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
770    }
771
772    #[tokio::test]
773    async fn download_single_full_block_range() {
774        let client = TestFullBlockClient::default();
775        let header: SealedHeader = SealedHeader::default();
776        let body = BlockBody::default();
777        client.insert(header.clone(), body.clone());
778        let client = FullBlockClient::test_client(client);
779
780        let received = client.get_full_block_range(header.hash(), 1).await;
781        let received = received.first().expect("response should include a block");
782        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
783    }
784
785    /// Inserts headers and returns the last header and block body.
786    fn insert_headers_into_client(
787        client: &TestFullBlockClient,
788        range: Range<usize>,
789    ) -> (SealedHeader, BlockBody) {
790        let mut sealed_header: SealedHeader = SealedHeader::default();
791        let body = BlockBody::default();
792        for _ in range {
793            let (mut header, hash) = sealed_header.split();
794            // update to the next header
795            header.parent_hash = hash;
796            header.number += 1;
797
798            sealed_header = SealedHeader::seal_slow(header);
799
800            client.insert(sealed_header.clone(), body.clone());
801        }
802
803        (sealed_header, body)
804    }
805
806    #[tokio::test]
807    async fn download_full_block_range() {
808        let client = TestFullBlockClient::default();
809        let (header, body) = insert_headers_into_client(&client, 0..50);
810        let client = FullBlockClient::test_client(client);
811
812        let received = client.get_full_block_range(header.hash(), 1).await;
813        let received = received.first().expect("response should include a block");
814        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
815
816        let received = client.get_full_block_range(header.hash(), 10).await;
817        assert_eq!(received.len(), 10);
818        for (i, block) in received.iter().enumerate() {
819            let expected_number = header.number - i as u64;
820            assert_eq!(block.number, expected_number);
821        }
822    }
823
824    #[tokio::test]
825    async fn download_full_block_range_over_soft_limit() {
826        // default soft limit is 20, so we will request 50 blocks
827        let client = TestFullBlockClient::default();
828        let (header, body) = insert_headers_into_client(&client, 0..50);
829        let client = FullBlockClient::test_client(client);
830
831        let received = client.get_full_block_range(header.hash(), 1).await;
832        let received = received.first().expect("response should include a block");
833        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
834
835        let received = client.get_full_block_range(header.hash(), 50).await;
836        assert_eq!(received.len(), 50);
837        for (i, block) in received.iter().enumerate() {
838            let expected_number = header.number - i as u64;
839            assert_eq!(block.number, expected_number);
840        }
841    }
842
843    #[tokio::test]
844    async fn download_full_block_range_with_invalid_header() {
845        let client = TestFullBlockClient::default();
846        let range_length: usize = 3;
847        let (header, _) = insert_headers_into_client(&client, 0..range_length);
848
849        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
850        test_consensus.set_fail_validation(true);
851        test_consensus.set_fail_body_against_header(false);
852        let client = FullBlockClient::new(client, Arc::new(test_consensus));
853
854        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
855
856        assert_eq!(received.len(), range_length);
857        for (i, block) in received.iter().enumerate() {
858            let expected_number = header.number - i as u64;
859            assert_eq!(block.number, expected_number);
860        }
861    }
862}