Skip to main content

reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4    bodies::client::{BodiesClient, SingleBodyRequest},
5    download::DownloadClient,
6    error::PeerRequestResult,
7    headers::client::{HeadersClient, SingleHeaderRequest},
8    priority::Priority,
9    BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_eip7928::bal::RawBal;
13use alloy_primitives::{Bytes, Sealable, B256};
14use core::marker::PhantomData;
15use futures::FutureExt;
16use reth_consensus::Consensus;
17use reth_eth_wire_types::{
18    BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
19};
20use reth_network_peers::{PeerId, WithPeerId};
21use reth_primitives_traits::{Block, SealedBlock, SealedBlockWith, SealedHeader};
22use std::{
23    cmp::Reverse,
24    collections::{HashMap, VecDeque},
25    fmt::Debug,
26    hash::Hash,
27    ops::RangeInclusive,
28    pin::Pin,
29    sync::Arc,
30    task::{ready, Context, Poll},
31};
32use tracing::{debug, trace};
33
34/// A sealed block with optional validated raw block access-list data.
35pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<RawBal>>;
36
37/// A Client that can fetch full blocks from the network.
38#[derive(Debug, Clone)]
39pub struct FullBlockClient<Client>
40where
41    Client: BlockClient,
42{
43    client: Client,
44    consensus: Arc<dyn Consensus<Client::Block>>,
45}
46
47impl<Client> FullBlockClient<Client>
48where
49    Client: BlockClient,
50{
51    /// Creates a new instance of `FullBlockClient`.
52    pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
53        Self { client, consensus }
54    }
55
56    /// Returns a client with Test consensus
57    #[cfg(any(test, feature = "test-utils"))]
58    pub fn test_client(client: Client) -> Self {
59        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
60    }
61}
62
63impl<Client> FullBlockClient<Client>
64where
65    Client: BlockClient,
66{
67    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
68    ///
69    /// Note: this future is cancel safe
70    ///
71    /// Caution: This does no validation of body (transactions) response but guarantees that the
72    /// [`SealedHeader`] matches the requested hash.
73    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
74        FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
75    }
76
77    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
78    ///
79    /// Note: this future is cancel safe.
80    ///
81    /// Caution: This does no validation of body (transactions) responses but guarantees that
82    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
83    /// bodies received matches the requested limit.
84    ///
85    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
86    pub fn get_full_block_range(
87        &self,
88        hash: B256,
89        count: u64,
90    ) -> FetchFullBlockRangeFuture<Client> {
91        let client = self.client.clone();
92        FetchFullBlockRangeFuture {
93            start_hash: hash,
94            count,
95            request: FullBlockRangeRequest {
96                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
97                bodies: None,
98            },
99            client,
100            headers: None,
101            pending_headers: VecDeque::new(),
102            bodies: HashMap::default(),
103            consensus: Arc::clone(&self.consensus),
104        }
105    }
106}
107
108impl<Client> FullBlockClient<Client>
109where
110    Client: BlockClient + BlockAccessListsClient,
111{
112    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
113    /// for the given hash.
114    ///
115    /// Note: this future is cancel safe
116    ///
117    /// Caution: This does no validation of body (transactions) response but guarantees that the
118    /// [`SealedHeader`] matches the requested hash.
119    pub fn get_full_block_with_access_lists(
120        &self,
121        hash: B256,
122    ) -> FetchFullBlockWithBalFuture<Client> {
123        self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
124    }
125
126    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
127    /// for the given hash using the requested BAL availability policy.
128    ///
129    /// Note: this future is cancel safe
130    ///
131    /// Caution: This does no validation of body (transactions) response but guarantees that the
132    /// [`SealedHeader`] matches the requested hash.
133    pub fn get_full_block_with_access_lists_with_requirement(
134        &self,
135        hash: B256,
136        requirement: BalRequirement,
137    ) -> FetchFullBlockWithBalFuture<Client> {
138        let client = self.client.clone();
139        FetchFullBlockWithBalFuture {
140            block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
141            block_result: None,
142            bal_request_state: BalRequestState::Pending(
143                client.get_block_access_lists_with_requirement(vec![hash], requirement),
144            ),
145        }
146    }
147
148    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
149    /// the given hash and count.
150    ///
151    /// The block range is always the primary result. Access lists are requested after the block
152    /// range is downloaded. Each block contains access-list data when the optional BAL response
153    /// included the corresponding entry.
154    pub fn get_full_block_range_with_optional_access_lists(
155        &self,
156        hash: B256,
157        count: u64,
158    ) -> FetchFullBlockRangeWithBalFuture<Client> {
159        self.get_full_block_range_with_optional_access_lists_with_requirement(
160            hash,
161            count,
162            BalRequirement::default(),
163        )
164    }
165
166    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
167    /// the given hash and count using the requested BAL availability policy.
168    ///
169    /// The block range is always the primary result. Access lists are requested after the block
170    /// range is downloaded. Each block contains access-list data when the optional BAL response
171    /// included the corresponding entry.
172    pub fn get_full_block_range_with_optional_access_lists_with_requirement(
173        &self,
174        hash: B256,
175        count: u64,
176        requirement: BalRequirement,
177    ) -> FetchFullBlockRangeWithBalFuture<Client> {
178        let client = self.client.clone();
179        FetchFullBlockRangeWithBalFuture {
180            blocks: self.get_full_block_range(hash, count),
181            client,
182            block_result: None,
183            access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
184        }
185    }
186}
187
188/// A future that downloads a full block from the network.
189///
190/// This will attempt to fetch both the header and body for the given block hash at the same time.
191/// When both requests succeed, the future will yield the full block.
192#[must_use = "futures do nothing unless polled"]
193pub struct FetchFullBlockFuture<Client>
194where
195    Client: BlockClient,
196{
197    client: Client,
198    consensus: Arc<dyn Consensus<Client::Block>>,
199    hash: B256,
200    request: FullBlockRequest<Client>,
201    header: Option<SealedHeader<Client::Header>>,
202    body: Option<BodyResponse<Client::Body>>,
203}
204
205impl<Client> FetchFullBlockFuture<Client>
206where
207    Client: BlockClient,
208{
209    fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
210        Self {
211            hash,
212            consensus,
213            request: FullBlockRequest {
214                header: Some(client.get_header(hash.into())),
215                body: Some(client.get_block_body(hash)),
216            },
217            client,
218            header: None,
219            body: None,
220        }
221    }
222
223    /// Returns the hash of the block being requested.
224    pub const fn hash(&self) -> &B256 {
225        &self.hash
226    }
227
228    /// If the header request is already complete, this returns the block number
229    pub fn block_number(&self) -> Option<u64> {
230        self.header.as_ref().map(|h| h.number())
231    }
232
233    /// Returns the [`SealedBlock`] if the request is complete and valid.
234    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
235        if self.header.is_none() || self.body.is_none() {
236            return None
237        }
238
239        let header = self.header.take().unwrap();
240        let resp = self.body.take().unwrap();
241        match resp {
242            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
243            BodyResponse::PendingValidation(resp) => {
244                // ensure the block is valid, else retry
245                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
246                {
247                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
248                    self.client.report_bad_message(resp.peer_id());
249                    self.header = Some(header);
250                    self.request.body = Some(self.client.get_block_body(self.hash));
251                    return None
252                }
253                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
254            }
255        }
256    }
257
258    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
259        if let Some(ref header) = self.header {
260            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
261                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
262                self.client.report_bad_message(resp.peer_id());
263                return
264            }
265            self.body = Some(BodyResponse::Validated(resp.into_data()));
266            return
267        }
268        self.body = Some(BodyResponse::PendingValidation(resp));
269    }
270}
271
272impl<Client> Future for FetchFullBlockFuture<Client>
273where
274    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
275{
276    type Output = SealedBlock<Client::Block>;
277
278    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
279        let this = self.get_mut();
280
281        // preemptive yield point
282        let mut budget = 4;
283
284        loop {
285            match ready!(this.request.poll(cx)) {
286                ResponseResult::Header(res) => {
287                    match res {
288                        Ok(maybe_header) => {
289                            let (peer, maybe_header) =
290                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
291                            if let Some(header) = maybe_header {
292                                if header.hash() == this.hash {
293                                    this.header = Some(header);
294                                } else {
295                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
296                                    // received a different header than requested
297                                    this.client.report_bad_message(peer)
298                                }
299                            }
300                        }
301                        Err(err) => {
302                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
303                        }
304                    }
305
306                    if this.header.is_none() {
307                        // received bad response
308                        this.request.header = Some(this.client.get_header(this.hash.into()));
309                    }
310                }
311                ResponseResult::Body(res) => {
312                    match res {
313                        Ok(maybe_body) => {
314                            if let Some(body) = maybe_body.transpose() {
315                                this.on_block_response(body);
316                            }
317                        }
318                        Err(err) => {
319                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
320                        }
321                    }
322                    if this.body.is_none() {
323                        // received bad response
324                        this.request.body = Some(this.client.get_block_body(this.hash));
325                    }
326                }
327            }
328
329            if let Some(res) = this.take_block() {
330                return Poll::Ready(res)
331            }
332
333            // ensure we still have enough budget for another iteration
334            budget -= 1;
335            if budget == 0 {
336                // make sure we're woken up again
337                cx.waker().wake_by_ref();
338                return Poll::Pending
339            }
340        }
341    }
342}
343
344/// A future that downloads a full block and optionally its block access lists from the network.
345///
346/// This composes the existing full block downloader with a block access list request so the
347/// header/body logic stays centralized. Missing access lists do not block returning the full block.
348#[must_use = "futures do nothing unless polled"]
349pub struct FetchFullBlockWithBalFuture<Client>
350where
351    Client: BlockClient + BlockAccessListsClient,
352{
353    block: FetchFullBlockFuture<Client>,
354    block_result: Option<SealedBlock<Client::Block>>,
355    bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
356}
357
358impl<Client> FetchFullBlockWithBalFuture<Client>
359where
360    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
361{
362    /// Returns the hash of the block being requested.
363    pub const fn hash(&self) -> &B256 {
364        self.block.hash()
365    }
366}
367
368impl<Client> FetchFullBlockWithBalFuture<Client>
369where
370    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
371{
372    /// If the header request is already complete, this returns the block number.
373    pub fn block_number(&self) -> Option<u64> {
374        self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
375    }
376
377    /// Polls the optional BAL request once and records its final best-effort result.
378    ///
379    /// Exactly one returned BAL entry is preserved. Empty responses, malformed responses, and
380    /// request errors resolve to `None` instead of being retried, so a BAL failure cannot block
381    /// returning the downloaded block.
382    fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
383        let res = match &mut self.bal_request_state {
384            BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
385            BalRequestState::Ready(_) => return Poll::Ready(()),
386        };
387
388        match res {
389            Ok(bal) => {
390                let (peer, access_lists) = bal.split();
391                match access_lists.0.len() {
392                    0 => self.bal_request_state = BalRequestState::Ready(None),
393                    1 => {
394                        let bal = access_lists.0.into_iter().next().expect("len checked");
395                        self.bal_request_state =
396                            BalRequestState::Ready(Some(WithPeerId::new(peer, bal)));
397                    }
398                    received => {
399                        debug!(
400                            target: "downloaders",
401                            hash = ?self.block.hash(),
402                            expected = 1,
403                            received,
404                            "Received wrong access list response",
405                        );
406                        self.block.client.report_bad_message(peer);
407                        self.bal_request_state = BalRequestState::Ready(None);
408                    }
409                }
410            }
411            Err(err) => {
412                debug!(
413                    target: "downloaders",
414                    %err,
415                    hash = ?self.block.hash(),
416                    "Access list download failed",
417                );
418                self.bal_request_state = BalRequestState::Ready(None);
419            }
420        }
421
422        Poll::Ready(())
423    }
424
425    /// Returns the block once the block download and optional BAL lookup have both completed.
426    ///
427    /// The BAL lookup must be ready even when it resolved to `None`, which prevents a fast block
428    /// response from racing a still-pending BAL response and incorrectly dropping available BAL
429    /// data.
430    fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
431        let BalRequestState::Ready(bal) = &mut self.bal_request_state else { return None };
432        let block = self.block_result.take()?;
433        let raw_bal =
434            bal.take().and_then(|bal| match seal_block_access_list_for_block(&block, bal) {
435                Ok(raw_bal) => raw_bal,
436                Err(peer) => {
437                    self.block.client.report_bad_message(peer);
438                    None
439                }
440            });
441        Some(SealedBlockWith::new(block, raw_bal))
442    }
443}
444
445impl<Client> Future for FetchFullBlockWithBalFuture<Client>
446where
447    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
448{
449    type Output = SealedBlockWithAccessList<Client::Block>;
450
451    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
452        let this = self.get_mut();
453
454        if this.block_result.is_none() &&
455            let Poll::Ready(block) = this.block.poll_unpin(cx)
456        {
457            this.block_result = Some(block);
458        }
459
460        ready!(this.poll_bal_request(cx));
461
462        if let Some(res) = this.take_block_and_access_lists() {
463            return Poll::Ready(res)
464        }
465
466        Poll::Pending
467    }
468}
469
470impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
471where
472    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
473{
474    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475        f.debug_struct("FetchFullBlockWithBalFuture")
476            .field("hash", &self.block.hash())
477            .field("block_ready", &self.block_result.is_some())
478            .field("bal_request_ready", &self.bal_request_state.is_ready())
479            .finish()
480    }
481}
482
483/// Tracks the BAL request and its completed result.
484enum BalRequestState<Req> {
485    Pending(Req),
486    Ready(Option<WithPeerId<Option<Bytes>>>),
487}
488
489impl<Req> BalRequestState<Req> {
490    const fn is_ready(&self) -> bool {
491        matches!(self, Self::Ready(_))
492    }
493}
494
495/// A future that downloads a range of full blocks and optionally their block access lists from the
496/// network.
497///
498/// This composes the existing full block range downloader with a follow-up optional block
499/// access-list request, so callers that only need blocks can keep using
500/// [`FetchFullBlockRangeFuture`].
501#[must_use = "futures do nothing unless polled"]
502#[expect(missing_debug_implementations)]
503pub struct FetchFullBlockRangeWithBalFuture<Client>
504where
505    Client: BlockClient + BlockAccessListsClient,
506{
507    blocks: FetchFullBlockRangeFuture<Client>,
508    client: Client,
509    block_result: Option<Vec<SealedBlock<Client::Block>>>,
510    access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
511}
512
513impl<Client> FetchFullBlockRangeWithBalFuture<Client>
514where
515    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
516        + BlockAccessListsClient,
517{
518    fn start_access_lists_request_if_possible(&mut self) {
519        let requirement = match &self.access_lists {
520            OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
521            OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
522                return
523            }
524        };
525
526        // BALs are requested by block hash, so wait until the block range is fully assembled.
527        let Some(blocks) = self.block_result.as_ref() else { return };
528        let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
529        self.access_lists = OptionalBlockAccessListsState::Pending(
530            self.client.get_block_access_lists_with_requirement(hashes, requirement),
531        );
532    }
533
534    /// Starts and polls the optional BAL request once, if it is ready to make progress.
535    fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
536        self.start_access_lists_request_if_possible();
537
538        let poll = match &mut self.access_lists {
539            OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
540            OptionalBlockAccessListsState::WaitingForBlocks { .. } |
541            OptionalBlockAccessListsState::Ready(_) => return,
542        };
543
544        match poll {
545            Poll::Pending => {}
546            Poll::Ready(Ok(access_lists)) => {
547                self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
548            }
549            Poll::Ready(Err(err)) => {
550                debug!(
551                    target: "downloaders",
552                    %err,
553                    start_hash = ?self.blocks.start_hash(),
554                    "Access list range download failed",
555                );
556
557                // Optional BAL lookup is best-effort: missing eth/71 support or request failures
558                // should not block returning the downloaded block range.
559                self.access_lists = OptionalBlockAccessListsState::Ready(None);
560            }
561        }
562    }
563
564    /// Returns the block range once blocks and the optional BAL lookup are both complete.
565    fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
566        let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
567            return None
568        };
569
570        let blocks = self.block_result.take()?;
571
572        Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
573    }
574}
575
576impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
577where
578    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
579        + BlockAccessListsClient
580        + 'static,
581{
582    type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
583
584    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
585        let this = self.get_mut();
586
587        // Complete the normal block range first, then issue a separate BAL hash-list request.
588        if this.block_result.is_none() &&
589            let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
590        {
591            this.block_result = Some(blocks);
592        }
593
594        this.poll_access_lists(cx);
595
596        if let Some(response) = this.take_response() {
597            return Poll::Ready(response)
598        }
599
600        Poll::Pending
601    }
602}
603
604/// Tracks an optional BAL range request and its completed result.
605enum OptionalBlockAccessListsState<Req> {
606    /// The block hashes needed for `GetBlockAccessLists` are not known yet.
607    WaitingForBlocks {
608        /// The BAL availability policy for the eventual request.
609        requirement: BalRequirement,
610    },
611    /// A `GetBlockAccessLists` request is in flight.
612    Pending(Req),
613    /// `None` means the block range is available but optional BAL data is not.
614    Ready(Option<WithPeerId<BlockAccessLists>>),
615}
616
617impl<Client> Debug for FetchFullBlockFuture<Client>
618where
619    Client: BlockClient<Header: Debug, Body: Debug>,
620{
621    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
622        f.debug_struct("FetchFullBlockFuture")
623            .field("hash", &self.hash)
624            .field("header", &self.header)
625            .field("body", &self.body)
626            .finish()
627    }
628}
629
630struct FullBlockRequest<Client>
631where
632    Client: BlockClient,
633{
634    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
635    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
636}
637
638impl<Client> FullBlockRequest<Client>
639where
640    Client: BlockClient,
641{
642    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
643        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
644            let Poll::Ready(res) = fut.poll(cx)
645        {
646            self.header = None;
647            return Poll::Ready(ResponseResult::Header(res))
648        }
649
650        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
651            let Poll::Ready(res) = fut.poll(cx)
652        {
653            self.body = None;
654            return Poll::Ready(ResponseResult::Body(res))
655        }
656
657        Poll::Pending
658    }
659}
660
661/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
662/// future.
663enum ResponseResult<H, B> {
664    Header(PeerRequestResult<Option<H>>),
665    Body(PeerRequestResult<Option<B>>),
666}
667
668/// The response of a body request.
669#[derive(Debug)]
670enum BodyResponse<B> {
671    /// Already validated against transaction root of header
672    Validated(B),
673    /// Still needs to be validated against header
674    PendingValidation(WithPeerId<B>),
675}
676/// A future that downloads a range of full blocks from the network.
677///
678/// This first fetches the headers for the given range using the inner `Client`. Once the request
679/// is complete, it will fetch the bodies for the headers it received.
680///
681/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
682/// yield the full block range.
683///
684/// The full block range will be returned with falling block numbers, i.e. in descending order.
685///
686/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
687/// hash array used to request them.
688#[must_use = "futures do nothing unless polled"]
689#[expect(missing_debug_implementations)]
690pub struct FetchFullBlockRangeFuture<Client>
691where
692    Client: BlockClient,
693{
694    /// The client used to fetch headers and bodies.
695    client: Client,
696    /// The consensus instance used to validate the blocks.
697    consensus: Arc<dyn Consensus<Client::Block>>,
698    /// The block hash to start fetching from (inclusive).
699    start_hash: B256,
700    /// How many blocks to fetch: `len([start_hash, ..]) == count`
701    count: u64,
702    /// Requests for headers and bodies that are in progress.
703    request: FullBlockRangeRequest<Client>,
704    /// Fetched headers.
705    headers: Option<Vec<SealedHeader<Client::Header>>>,
706    /// The next headers to request bodies for. This is drained as responses are received.
707    pending_headers: VecDeque<SealedHeader<Client::Header>>,
708    /// The bodies that have been received so far.
709    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
710}
711
712impl<Client> FetchFullBlockRangeFuture<Client>
713where
714    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
715{
716    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
717    fn is_bodies_complete(&self) -> bool {
718        self.bodies.len() == self.count as usize
719    }
720
721    /// Inserts a block body, matching it with the `next_header`.
722    ///
723    /// Note: this assumes the response matches the next header in the queue.
724    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
725        if let Some(header) = self.pending_headers.pop_front() {
726            self.bodies.insert(header, body_response);
727        }
728    }
729
730    /// Inserts multiple block bodies.
731    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
732        for body in bodies {
733            self.insert_body(body);
734        }
735    }
736
737    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
738    /// in the `root_map`.
739    fn remaining_bodies_hashes(&self) -> Vec<B256> {
740        self.pending_headers.iter().map(|h| h.hash()).collect()
741    }
742
743    /// Returns the [`SealedBlock`]s if the request is complete and valid.
744    ///
745    /// The request is complete if the number of blocks requested is equal to the number of blocks
746    /// received. The request is valid if the returned bodies match the roots in the headers.
747    ///
748    /// These are returned in falling order starting with the requested `hash`, i.e. with
749    /// descending block numbers.
750    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
751        if !self.is_bodies_complete() {
752            // not done with bodies yet
753            return None
754        }
755
756        let headers = self.headers.take()?;
757        let mut needs_retry = false;
758        let mut valid_responses = Vec::new();
759
760        for header in &headers {
761            if let Some(body_resp) = self.bodies.remove(header) {
762                // validate body w.r.t. the hashes in the header, only inserting into the response
763                let body = match body_resp {
764                    BodyResponse::Validated(body) => body,
765                    BodyResponse::PendingValidation(resp) => {
766                        // ensure the block is valid, else retry
767                        if let Err(err) =
768                            self.consensus.validate_body_against_header(resp.data(), header)
769                        {
770                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
771                            self.client.report_bad_message(resp.peer_id());
772
773                            // get body that doesn't match, put back into vecdeque, and retry it
774                            self.pending_headers.push_back(header.clone());
775                            needs_retry = true;
776                            continue
777                        }
778
779                        resp.into_data()
780                    }
781                };
782
783                valid_responses
784                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
785            }
786        }
787
788        if needs_retry {
789            // put response hashes back into bodies map since we aren't returning them as a
790            // response
791            for block in valid_responses {
792                let (header, body) = block.split_sealed_header_body();
793                self.bodies.insert(header, BodyResponse::Validated(body));
794            }
795
796            // put headers back since they were `take`n before
797            self.headers = Some(headers);
798
799            // create response for failing bodies
800            let hashes = self.remaining_bodies_hashes();
801            self.request.bodies = Some(self.client.get_block_bodies(hashes));
802            return None
803        }
804
805        Some(valid_responses)
806    }
807
808    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
809        let (peer, mut headers_falling) =
810            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
811
812        // fill in the response if it's the correct length
813        if headers_falling.len() == self.count as usize {
814            // sort headers from highest to lowest block number
815            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
816
817            // check the starting hash
818            if headers_falling[0].hash() == self.start_hash {
819                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
820                // check if the downloaded headers are valid
821                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
822                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
823                    self.client.report_bad_message(peer);
824                }
825
826                // get the bodies request so it can be polled later
827                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
828
829                // populate the pending headers
830                self.pending_headers = headers_falling.clone().into();
831
832                // set the actual request if it hasn't been started yet
833                if !self.has_bodies_request_started() {
834                    // request the bodies for the downloaded headers
835                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
836                }
837
838                // set the headers response
839                self.headers = Some(headers_falling);
840            } else {
841                // received a different header than requested
842                self.client.report_bad_message(peer);
843            }
844        }
845    }
846
847    /// Returns whether or not a bodies request has been started, returning false if there is no
848    /// pending request.
849    const fn has_bodies_request_started(&self) -> bool {
850        self.request.bodies.is_some()
851    }
852
853    /// Returns the start hash for the request
854    pub const fn start_hash(&self) -> B256 {
855        self.start_hash
856    }
857
858    /// Returns the block count for the request
859    pub const fn count(&self) -> u64 {
860        self.count
861    }
862}
863
864impl<Client> Future for FetchFullBlockRangeFuture<Client>
865where
866    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
867{
868    type Output = Vec<SealedBlock<Client::Block>>;
869
870    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
871        let this = self.get_mut();
872
873        loop {
874            match ready!(this.request.poll(cx)) {
875                // This branch handles headers responses from peers - it first ensures that the
876                // starting hash and number of headers matches what we requested.
877                //
878                // If these don't match, we penalize the peer and retry the request.
879                // If they do match, we sort the headers by block number and start the request for
880                // the corresponding block bodies.
881                //
882                // The next result that should be yielded by `poll` is the bodies response.
883                RangeResponseResult::Header(res) => {
884                    match res {
885                        Ok(headers) => {
886                            this.on_headers_response(headers);
887                        }
888                        Err(err) => {
889                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
890                        }
891                    }
892
893                    if this.headers.is_none() {
894                        // did not receive a correct response yet, retry
895                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
896                            start: this.start_hash.into(),
897                            limit: this.count,
898                            direction: HeadersDirection::Falling,
899                        }));
900                    }
901                }
902                // This branch handles block body responses from peers - it first inserts the
903                // bodies into the `bodies` map, and then checks if the request is complete.
904                //
905                // If the request is not complete, and we need to request more bodies, we send
906                // a bodies request for the headers we don't yet have bodies for.
907                RangeResponseResult::Body(res) => {
908                    match res {
909                        Ok(bodies_resp) => {
910                            let (peer, new_bodies) = bodies_resp.split();
911
912                            // first insert the received bodies
913                            this.insert_bodies(
914                                new_bodies
915                                    .into_iter()
916                                    .map(|resp| WithPeerId::new(peer, resp))
917                                    .map(BodyResponse::PendingValidation),
918                            );
919
920                            if !this.is_bodies_complete() {
921                                // get remaining hashes so we can send the next request
922                                let req_hashes = this.remaining_bodies_hashes();
923
924                                // set a new request
925                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
926                            }
927                        }
928                        Err(err) => {
929                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
930                        }
931                    }
932                    if this.request.bodies.is_none() && !this.is_bodies_complete() {
933                        // no pending bodies request (e.g., request error), retry remaining bodies
934                        // TODO: convert this into two futures, one which is a headers range
935                        // future, and one which is a bodies range future.
936                        //
937                        // The headers range future should yield the bodies range future.
938                        // The bodies range future should not have an Option<Vec<B256>>, it should
939                        // have a populated Vec<B256> from the successful headers range future.
940                        //
941                        // This is optimal because we can not send a bodies request without
942                        // first completing the headers request. This way we can get rid of the
943                        // following `if let Some`. A bodies request should never be sent before
944                        // the headers request completes, so this should always be `Some` anyways.
945                        let hashes = this.remaining_bodies_hashes();
946                        if !hashes.is_empty() {
947                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
948                        }
949                    }
950                }
951            }
952
953            if let Some(res) = this.take_blocks() {
954                return Poll::Ready(res)
955            }
956        }
957    }
958}
959
960/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
961/// futures until they return responses. It will return either the header or body result, depending
962/// on which future successfully returned.
963struct FullBlockRangeRequest<Client>
964where
965    Client: BlockClient,
966{
967    headers: Option<<Client as HeadersClient>::Output>,
968    bodies: Option<<Client as BodiesClient>::Output>,
969}
970
971impl<Client> FullBlockRangeRequest<Client>
972where
973    Client: BlockClient,
974{
975    fn poll(
976        &mut self,
977        cx: &mut Context<'_>,
978    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
979        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
980            let Poll::Ready(res) = fut.poll(cx)
981        {
982            self.headers = None;
983            return Poll::Ready(RangeResponseResult::Header(res))
984        }
985
986        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
987            let Poll::Ready(res) = fut.poll(cx)
988        {
989            self.bodies = None;
990            return Poll::Ready(RangeResponseResult::Body(res))
991        }
992
993        Poll::Pending
994    }
995}
996
997// The result of a request for headers or block bodies. This is yielded by the
998// `FullBlockRangeRequest` future.
999enum RangeResponseResult<H, B> {
1000    Header(PeerRequestResult<Vec<H>>),
1001    Body(PeerRequestResult<Vec<B>>),
1002}
1003
1004/// A headers+bodies client implementation that does nothing.
1005#[derive(Debug, Clone)]
1006#[non_exhaustive]
1007pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1008
1009/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
1010impl<Net> DownloadClient for NoopFullBlockClient<Net>
1011where
1012    Net: Debug + Send + Sync,
1013{
1014    /// Reports a bad message received from a peer.
1015    ///
1016    /// # Arguments
1017    ///
1018    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
1019    ///   implementation).
1020    fn report_bad_message(&self, _peer_id: PeerId) {}
1021
1022    /// Retrieves the number of connected peers.
1023    ///
1024    /// # Returns
1025    ///
1026    /// The number of connected peers, which is always zero in this implementation.
1027    fn num_connected_peers(&self) -> usize {
1028        0
1029    }
1030}
1031
1032/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
1033impl<Net> BodiesClient for NoopFullBlockClient<Net>
1034where
1035    Net: NetworkPrimitives,
1036{
1037    type Body = Net::BlockBody;
1038    /// Defines the output type of the function.
1039    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1040
1041    /// Retrieves block bodies based on provided hashes and priority.
1042    ///
1043    /// # Arguments
1044    ///
1045    /// * `_hashes` - A vector of block hashes (unused in this implementation).
1046    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
1047    ///
1048    /// # Returns
1049    ///
1050    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
1051    fn get_block_bodies_with_priority_and_range_hint(
1052        &self,
1053        _hashes: Vec<B256>,
1054        _priority: Priority,
1055        _range_hint: Option<RangeInclusive<u64>>,
1056    ) -> Self::Output {
1057        // Create a future that immediately returns an empty vector of block bodies and a random
1058        // PeerId.
1059        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1060    }
1061}
1062
1063impl<Net> HeadersClient for NoopFullBlockClient<Net>
1064where
1065    Net: NetworkPrimitives,
1066{
1067    type Header = Net::BlockHeader;
1068    /// The output type representing a future containing a peer request result with a vector of
1069    /// headers.
1070    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1071
1072    /// Retrieves headers with a specified priority level.
1073    ///
1074    /// This implementation does nothing and returns an empty vector of headers.
1075    ///
1076    /// # Arguments
1077    ///
1078    /// * `_request` - A request for headers (unused in this implementation).
1079    /// * `_priority` - The priority level for the headers request (unused in this implementation).
1080    ///
1081    /// # Returns
1082    ///
1083    /// Always returns a ready future with an empty vector of headers wrapped in a
1084    /// `PeerRequestResult`.
1085    fn get_headers_with_priority(
1086        &self,
1087        _request: HeadersRequest,
1088        _priority: Priority,
1089    ) -> Self::Output {
1090        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1091    }
1092}
1093
1094impl<Net> BlockClient for NoopFullBlockClient<Net>
1095where
1096    Net: NetworkPrimitives,
1097{
1098    type Block = Net::Block;
1099}
1100
1101impl<Net> Default for NoopFullBlockClient<Net> {
1102    fn default() -> Self {
1103        Self(PhantomData::<Net>)
1104    }
1105}
1106
1107/// Validates one raw block access-list entry against the block's access-list hash.
1108///
1109/// Returns `Ok(Some(_))` for a matching entry, `Ok(None)` when the block has no access-list hash
1110/// or the peer returned an unavailable entry, and `Err(peer)` for a hash mismatch that should be
1111/// reported as a bad message.
1112fn seal_block_access_list_for_block<B: Block>(
1113    block: &SealedBlock<B>,
1114    bal: WithPeerId<Option<Bytes>>,
1115) -> Result<Option<RawBal>, PeerId> {
1116    let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1117
1118    let (peer, bal) = bal.split();
1119    let Some(bal) = bal else { return Ok(None) };
1120    let raw_bal = RawBal::new(bal);
1121    let computed = raw_bal.hash();
1122    if computed == expected {
1123        return Ok(Some(raw_bal))
1124    }
1125
1126    debug!(
1127        target: "downloaders",
1128        block_hash = ?block.hash(),
1129        ?computed,
1130        ?expected,
1131        "Received block access list with wrong hash",
1132    );
1133    Err(peer)
1134}
1135
1136/// Wraps a block range with validated block access-list entries.
1137///
1138/// Short responses are treated as a valid prefix and the remaining blocks receive `None`.
1139/// Responses longer than the requested block range return the full block range without access-list
1140/// data. Non-empty hash mismatches stop accepting access-list data after reporting the peer, so any
1141/// already validated prefix is preserved.
1142fn seal_blocks_with_access_lists<Client>(
1143    client: &Client,
1144    blocks: Vec<SealedBlock<Client::Block>>,
1145    access_lists: Option<WithPeerId<BlockAccessLists>>,
1146) -> Vec<SealedBlockWithAccessList<Client::Block>>
1147where
1148    Client: BlockClient,
1149{
1150    let Some(access_lists) = access_lists else {
1151        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1152    };
1153
1154    let (peer, access_lists) = access_lists.split();
1155    let expected = blocks.len();
1156    let received = access_lists.0.len();
1157
1158    if received > expected {
1159        trace!(
1160            target: "downloaders",
1161            expected,
1162            received,
1163            "Ignoring overlong access list range response",
1164        );
1165        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1166    }
1167
1168    let mut access_lists = access_lists.0.into_iter();
1169    let mut blocks = blocks.into_iter();
1170    let mut response = Vec::with_capacity(expected);
1171
1172    for block in blocks.by_ref() {
1173        let Some(bal) = access_lists.next() else {
1174            // Short BAL responses are valid; the current block and all remaining blocks are
1175            // returned without access-list data below.
1176            response.push(SealedBlockWith::from_block(block));
1177            break
1178        };
1179
1180        match seal_block_access_list_for_block(&block, WithPeerId::new(peer, bal)) {
1181            Ok(raw_bal) => response.push(SealedBlockWith::new(block, raw_bal)),
1182            Err(peer) => {
1183                // A hash mismatch means this BAL entry is not for the current block. Stop matching
1184                // later positional entries and return the rest of the range without BAL data.
1185                client.report_bad_message(peer);
1186                response.push(SealedBlockWith::from_block(block));
1187                break
1188            }
1189        }
1190    }
1191
1192    response.extend(blocks.map(SealedBlockWith::from_block));
1193    response
1194}
1195
1196#[cfg(test)]
1197mod tests {
1198    use reth_ethereum_primitives::BlockBody;
1199
1200    use super::*;
1201    use crate::{error::RequestError, test_utils::TestFullBlockClient};
1202    use alloy_consensus::Header;
1203    use alloy_primitives::{keccak256, map::B256Map, Bytes};
1204    use parking_lot::Mutex;
1205    use std::{
1206        ops::Range,
1207        sync::{
1208            atomic::{AtomicBool, AtomicUsize, Ordering},
1209            Arc,
1210        },
1211    };
1212
1213    const EMPTY_LIST_CODE: u8 = 0xc0;
1214    use tokio::time::{timeout, Duration};
1215
1216    fn sealed_header_with_access_list_hash(bal: &Bytes) -> SealedHeader {
1217        let header =
1218            Header { block_access_list_hash: Some(keccak256(bal.as_ref())), ..Default::default() };
1219        SealedHeader::seal_slow(header)
1220    }
1221
1222    fn range_access_lists<B: Block>(
1223        blocks: &[SealedBlockWithAccessList<B>],
1224    ) -> Vec<Option<RawBal>> {
1225        blocks.iter().map(|block| block.data().clone()).collect()
1226    }
1227
1228    #[tokio::test]
1229    async fn download_single_full_block() {
1230        let client = TestFullBlockClient::default();
1231        let header: SealedHeader = SealedHeader::default();
1232        let body = BlockBody::default();
1233        client.insert(header.clone(), body.clone());
1234        let client = FullBlockClient::test_client(client);
1235
1236        let received = client.get_full_block(header.hash()).await;
1237        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1238    }
1239
1240    #[tokio::test]
1241    async fn download_single_full_block_range() {
1242        let client = TestFullBlockClient::default();
1243        let header: SealedHeader = SealedHeader::default();
1244        let body = BlockBody::default();
1245        client.insert(header.clone(), body.clone());
1246        let client = FullBlockClient::test_client(client);
1247
1248        let received = client.get_full_block_range(header.hash(), 1).await;
1249        let received = received.first().expect("response should include a block");
1250        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1251    }
1252
1253    #[tokio::test]
1254    async fn download_single_full_block_with_access_lists() {
1255        let client = FullBlockWithAccessListsClient::default();
1256        let body = BlockBody::default();
1257        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1258        let header = sealed_header_with_access_list_hash(&bal);
1259        client.insert(header.clone(), body.clone(), bal.clone());
1260
1261        let request_count = Arc::clone(&client.access_list_requests);
1262        let client = FullBlockClient::test_client(client);
1263
1264        let received = client.get_full_block_with_access_lists(header.hash()).await;
1265        let expected_raw_bal = RawBal::from(bal);
1266
1267        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1268        assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1269        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1270    }
1271
1272    #[tokio::test]
1273    async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1274        let client = FullBlockWithAccessListsClient::default();
1275        let body = BlockBody::default();
1276        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1277        let header = sealed_header_with_access_list_hash(&bal);
1278        client.insert(header.clone(), body.clone(), bal.clone());
1279
1280        let requirement = Arc::clone(&client.last_access_list_requirement);
1281        let client = FullBlockClient::test_client(client);
1282
1283        let received = client
1284            .get_full_block_with_access_lists_with_requirement(
1285                header.hash(),
1286                BalRequirement::Mandatory,
1287            )
1288            .await;
1289
1290        let expected_raw_bal = RawBal::from(bal);
1291        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1292        assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1293        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1294    }
1295
1296    #[tokio::test]
1297    async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1298        let client = FullBlockWithAccessListsClient::default();
1299        client.set_access_list_pending_polls(1);
1300
1301        let body = BlockBody::default();
1302        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1303        let header = sealed_header_with_access_list_hash(&bal);
1304        client.insert(header.clone(), body.clone(), bal.clone());
1305
1306        let request_count = Arc::clone(&client.access_list_requests);
1307        let client = FullBlockClient::test_client(client);
1308
1309        let received =
1310            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1311                .await
1312                .expect("access list request should complete");
1313
1314        let expected_raw_bal = RawBal::from(bal);
1315        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1316        assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1317        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1318    }
1319
1320    #[tokio::test]
1321    async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1322        let client = FullBlockWithAccessListsClient::default();
1323        let body = BlockBody::default();
1324        let expected_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1325        let wrong_bal = Bytes::from_static(&[0xc1, 0x01]);
1326        let header = sealed_header_with_access_list_hash(&expected_bal);
1327        client.insert(header.clone(), body.clone(), wrong_bal);
1328
1329        let bad_messages = Arc::clone(&client.bad_messages);
1330        let client = FullBlockClient::test_client(client);
1331
1332        let received =
1333            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1334                .await
1335                .expect("block request should complete without access lists");
1336
1337        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1338        assert!(received.data().is_none());
1339        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1340    }
1341
1342    #[tokio::test]
1343    async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1344        let client = FullBlockWithAccessListsClient::default();
1345        let body = BlockBody::default();
1346        let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1347        let header = sealed_header_with_access_list_hash(&expected_bal);
1348        client.inner.insert(header.clone(), body.clone());
1349
1350        let bad_messages = Arc::clone(&client.bad_messages);
1351        let client = FullBlockClient::test_client(client);
1352
1353        let received =
1354            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1355                .await
1356                .expect("block request should complete without access lists");
1357
1358        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1359        assert!(received.data().is_none());
1360        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1361    }
1362
1363    #[tokio::test]
1364    async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1365        let client = FullBlockWithAccessListsClient::default();
1366        let body = BlockBody::default();
1367        let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1368        let wrong_empty_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1369        let header = sealed_header_with_access_list_hash(&expected_bal);
1370        client.insert(header.clone(), body.clone(), wrong_empty_bal);
1371
1372        let bad_messages = Arc::clone(&client.bad_messages);
1373        let client = FullBlockClient::test_client(client);
1374
1375        let received =
1376            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1377                .await
1378                .expect("block request should complete without access lists");
1379
1380        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1381        assert!(received.data().is_none());
1382        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1383    }
1384
1385    #[tokio::test]
1386    async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1387        let client = FullBlockWithAccessListsClient::default();
1388        client.empty_first_response.store(true, Ordering::SeqCst);
1389
1390        let body = BlockBody::default();
1391        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1392        let header = sealed_header_with_access_list_hash(&bal);
1393        client.insert(header.clone(), body.clone(), bal.clone());
1394
1395        let request_count = Arc::clone(&client.access_list_requests);
1396        let bad_messages = Arc::clone(&client.bad_messages);
1397        let client = FullBlockClient::test_client(client);
1398
1399        let received =
1400            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1401                .await
1402                .expect("block request should complete without access lists");
1403
1404        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1405        assert!(received.data().is_none());
1406        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1407        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1408    }
1409
1410    #[tokio::test]
1411    async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1412        let client = FullBlockWithAccessListsClient::default();
1413        client.set_access_lists_unsupported(true);
1414
1415        let body = BlockBody::default();
1416        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1417        let header = sealed_header_with_access_list_hash(&bal);
1418        client.insert(header.clone(), body.clone(), bal);
1419
1420        let request_count = Arc::clone(&client.access_list_requests);
1421        let requirement = Arc::clone(&client.last_access_list_requirement);
1422        let client = FullBlockClient::test_client(client);
1423
1424        let received =
1425            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1426                .await
1427                .expect("block request should complete without access lists");
1428
1429        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1430        assert!(received.data().is_none());
1431        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1432        assert_eq!(
1433            *requirement.lock(),
1434            Some(BalRequirement::Optional),
1435            "single block BAL lookup should be best-effort"
1436        );
1437    }
1438
1439    /// Inserts headers and returns the last header and block body.
1440    fn insert_headers_into_client(
1441        client: &TestFullBlockClient,
1442        range: Range<usize>,
1443    ) -> (SealedHeader, BlockBody) {
1444        let mut sealed_header: SealedHeader = SealedHeader::default();
1445        let body = BlockBody::default();
1446        for _ in range {
1447            let (mut header, hash) = sealed_header.split();
1448            // update to the next header
1449            header.parent_hash = hash;
1450            header.number += 1;
1451
1452            sealed_header = SealedHeader::seal_slow(header);
1453
1454            client.insert(sealed_header.clone(), body.clone());
1455        }
1456
1457        (sealed_header, body)
1458    }
1459
1460    #[derive(Clone, Debug)]
1461    struct FullBlockWithAccessListsClient {
1462        inner: TestFullBlockClient,
1463        access_lists: Arc<Mutex<B256Map<Bytes>>>,
1464        access_list_requests: Arc<AtomicUsize>,
1465        access_list_soft_limit: Arc<AtomicUsize>,
1466        access_list_pending_polls: Arc<AtomicUsize>,
1467        extra_access_list_entries: Arc<AtomicUsize>,
1468        unsupported_access_lists: Arc<AtomicBool>,
1469        last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1470        bad_messages: Arc<AtomicUsize>,
1471        empty_first_response: Arc<AtomicBool>,
1472    }
1473
1474    impl Default for FullBlockWithAccessListsClient {
1475        fn default() -> Self {
1476            Self {
1477                inner: TestFullBlockClient::default(),
1478                access_lists: Arc::new(Mutex::new(B256Map::default())),
1479                access_list_requests: Arc::new(AtomicUsize::new(0)),
1480                access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1481                access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1482                extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1483                unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1484                last_access_list_requirement: Arc::new(Mutex::new(None)),
1485                bad_messages: Arc::new(AtomicUsize::new(0)),
1486                empty_first_response: Arc::new(AtomicBool::new(false)),
1487            }
1488        }
1489    }
1490
1491    impl FullBlockWithAccessListsClient {
1492        fn insert(&self, header: SealedHeader, body: BlockBody, bal: Bytes) {
1493            self.inner.insert(header.clone(), body);
1494            self.access_lists.lock().insert(header.hash(), bal);
1495        }
1496
1497        fn set_access_list_soft_limit(&self, limit: usize) {
1498            self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1499        }
1500
1501        fn set_access_list_pending_polls(&self, polls: usize) {
1502            self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1503        }
1504
1505        fn set_extra_access_list_entries(&self, count: usize) {
1506            self.extra_access_list_entries.store(count, Ordering::SeqCst);
1507        }
1508
1509        fn set_access_lists_unsupported(&self, unsupported: bool) {
1510            self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1511        }
1512    }
1513
1514    /// Inserts headers with block access lists and returns the last header and block body.
1515    fn insert_headers_with_access_lists_into_client(
1516        client: &FullBlockWithAccessListsClient,
1517        range: Range<usize>,
1518    ) -> (SealedHeader, BlockBody) {
1519        let mut sealed_header: SealedHeader = SealedHeader::default();
1520        let body = BlockBody::default();
1521        for block_idx in range {
1522            let (mut header, hash) = sealed_header.split();
1523            header.parent_hash = hash;
1524            header.number += 1;
1525            let bal = Bytes::from(vec![0xc1, block_idx as u8]);
1526            header.block_access_list_hash = Some(keccak256(bal.as_ref()));
1527
1528            sealed_header = SealedHeader::seal_slow(header);
1529
1530            client.insert(sealed_header.clone(), body.clone(), bal);
1531        }
1532
1533        (sealed_header, body)
1534    }
1535
1536    impl DownloadClient for FullBlockWithAccessListsClient {
1537        fn report_bad_message(&self, peer_id: PeerId) {
1538            self.bad_messages.fetch_add(1, Ordering::SeqCst);
1539            self.inner.report_bad_message(peer_id);
1540        }
1541
1542        fn num_connected_peers(&self) -> usize {
1543            self.inner.num_connected_peers()
1544        }
1545    }
1546
1547    impl HeadersClient for FullBlockWithAccessListsClient {
1548        type Header = <TestFullBlockClient as HeadersClient>::Header;
1549        type Output = <TestFullBlockClient as HeadersClient>::Output;
1550
1551        fn get_headers_with_priority(
1552            &self,
1553            request: HeadersRequest,
1554            priority: Priority,
1555        ) -> Self::Output {
1556            self.inner.get_headers_with_priority(request, priority)
1557        }
1558    }
1559
1560    impl BodiesClient for FullBlockWithAccessListsClient {
1561        type Body = <TestFullBlockClient as BodiesClient>::Body;
1562        type Output = <TestFullBlockClient as BodiesClient>::Output;
1563
1564        fn get_block_bodies_with_priority_and_range_hint(
1565            &self,
1566            hashes: Vec<B256>,
1567            priority: Priority,
1568            range_hint: Option<RangeInclusive<u64>>,
1569        ) -> Self::Output {
1570            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1571        }
1572    }
1573
1574    struct MaybePendingAccessLists {
1575        response: Option<PeerRequestResult<BlockAccessLists>>,
1576        pending_polls: usize,
1577    }
1578
1579    impl MaybePendingAccessLists {
1580        const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1581            Self { response: Some(response), pending_polls }
1582        }
1583    }
1584
1585    impl std::future::Future for MaybePendingAccessLists {
1586        type Output = PeerRequestResult<BlockAccessLists>;
1587
1588        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1589            if self.pending_polls > 0 {
1590                self.pending_polls -= 1;
1591                cx.waker().wake_by_ref();
1592                return Poll::Pending
1593            }
1594
1595            Poll::Ready(self.response.take().expect("future polled after completion"))
1596        }
1597    }
1598
1599    impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1600        type Output = MaybePendingAccessLists;
1601
1602        fn get_block_access_lists_with_priority_and_requirement(
1603            &self,
1604            hashes: Vec<B256>,
1605            _priority: Priority,
1606            requirement: BalRequirement,
1607        ) -> Self::Output {
1608            self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1609            *self.last_access_list_requirement.lock() = Some(requirement);
1610            let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1611
1612            if self.unsupported_access_lists.load(Ordering::SeqCst) {
1613                return MaybePendingAccessLists::new(
1614                    Err(RequestError::UnsupportedCapability),
1615                    pending_polls,
1616                )
1617            }
1618
1619            if self.empty_first_response.swap(false, Ordering::SeqCst) {
1620                return MaybePendingAccessLists::new(
1621                    Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1622                    pending_polls,
1623                )
1624            }
1625
1626            let mut access_lists: Vec<_> = hashes
1627                .into_iter()
1628                .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1629                .map(|hash| self.access_lists.lock().get(&hash).cloned())
1630                .collect();
1631            for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1632                access_lists.push(None);
1633            }
1634
1635            MaybePendingAccessLists::new(
1636                Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1637                pending_polls,
1638            )
1639        }
1640    }
1641
1642    impl BlockClient for FullBlockWithAccessListsClient {
1643        type Block = reth_ethereum_primitives::Block;
1644    }
1645
1646    #[derive(Clone, Debug)]
1647    struct FailingBodiesClient {
1648        inner: TestFullBlockClient,
1649        fail_on: usize,
1650        body_requests: Arc<AtomicUsize>,
1651    }
1652
1653    impl FailingBodiesClient {
1654        fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1655            Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1656        }
1657    }
1658
1659    impl DownloadClient for FailingBodiesClient {
1660        fn report_bad_message(&self, peer_id: PeerId) {
1661            self.inner.report_bad_message(peer_id);
1662        }
1663
1664        fn num_connected_peers(&self) -> usize {
1665            self.inner.num_connected_peers()
1666        }
1667    }
1668
1669    impl HeadersClient for FailingBodiesClient {
1670        type Header = <TestFullBlockClient as HeadersClient>::Header;
1671        type Output = <TestFullBlockClient as HeadersClient>::Output;
1672
1673        fn get_headers_with_priority(
1674            &self,
1675            request: HeadersRequest,
1676            priority: Priority,
1677        ) -> Self::Output {
1678            self.inner.get_headers_with_priority(request, priority)
1679        }
1680    }
1681
1682    impl BodiesClient for FailingBodiesClient {
1683        type Body = <TestFullBlockClient as BodiesClient>::Body;
1684        type Output = <TestFullBlockClient as BodiesClient>::Output;
1685
1686        fn get_block_bodies_with_priority_and_range_hint(
1687            &self,
1688            hashes: Vec<B256>,
1689            priority: Priority,
1690            range_hint: Option<RangeInclusive<u64>>,
1691        ) -> Self::Output {
1692            let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1693            if attempt == self.fail_on {
1694                return futures::future::ready(Err(RequestError::Timeout))
1695            }
1696
1697            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1698        }
1699    }
1700
1701    impl BlockClient for FailingBodiesClient {
1702        type Block = reth_ethereum_primitives::Block;
1703    }
1704
1705    #[tokio::test]
1706    async fn download_full_block_range() {
1707        let client = TestFullBlockClient::default();
1708        let (header, body) = insert_headers_into_client(&client, 0..50);
1709        let client = FullBlockClient::test_client(client);
1710
1711        let received = client.get_full_block_range(header.hash(), 1).await;
1712        let received = received.first().expect("response should include a block");
1713        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1714
1715        let received = client.get_full_block_range(header.hash(), 10).await;
1716        assert_eq!(received.len(), 10);
1717        for (i, block) in received.iter().enumerate() {
1718            let expected_number = header.number - i as u64;
1719            assert_eq!(block.number, expected_number);
1720        }
1721    }
1722
1723    #[tokio::test]
1724    async fn download_full_block_range_over_soft_limit() {
1725        // default soft limit is 20, so we will request 50 blocks
1726        let client = TestFullBlockClient::default();
1727        let (header, body) = insert_headers_into_client(&client, 0..50);
1728        let client = FullBlockClient::test_client(client);
1729
1730        let received = client.get_full_block_range(header.hash(), 1).await;
1731        let received = received.first().expect("response should include a block");
1732        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1733
1734        let received = client.get_full_block_range(header.hash(), 50).await;
1735        assert_eq!(received.len(), 50);
1736        for (i, block) in received.iter().enumerate() {
1737            let expected_number = header.number - i as u64;
1738            assert_eq!(block.number, expected_number);
1739        }
1740    }
1741
1742    #[tokio::test]
1743    async fn download_full_block_range_retries_after_body_error() {
1744        let mut client = TestFullBlockClient::default();
1745        client.set_soft_limit(2);
1746        let (header, _) = insert_headers_into_client(&client, 0..3);
1747
1748        let client = FailingBodiesClient::new(client, 1);
1749        let body_requests = Arc::clone(&client.body_requests);
1750        let client = FullBlockClient::test_client(client);
1751
1752        let received =
1753            timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1754                .await
1755                .expect("body request retry should complete");
1756
1757        assert_eq!(received.len(), 3);
1758        assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1759    }
1760
1761    #[tokio::test]
1762    async fn download_full_block_range_with_access_lists() {
1763        let client = FullBlockWithAccessListsClient::default();
1764        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1765
1766        let access_lists = Arc::clone(&client.access_lists);
1767        let request_count = Arc::clone(&client.access_list_requests);
1768        let requirement = Arc::clone(&client.last_access_list_requirement);
1769        let client = FullBlockClient::test_client(client);
1770
1771        let response = timeout(
1772            Duration::from_secs(1),
1773            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1774        )
1775        .await
1776        .expect("range request should complete");
1777
1778        let blocks = response;
1779        assert_eq!(blocks.len(), 3);
1780        let expected = {
1781            let bals = access_lists.lock();
1782            blocks
1783                .iter()
1784                .map(|block| {
1785                    let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1786                    Some(RawBal::from(bal))
1787                })
1788                .collect::<Vec<_>>()
1789        };
1790        assert_eq!(range_access_lists(&blocks), expected);
1791        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1792        assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1793    }
1794
1795    #[tokio::test]
1796    async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1797        let client = FullBlockWithAccessListsClient::default();
1798        client.empty_first_response.store(true, Ordering::SeqCst);
1799        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1800
1801        let request_count = Arc::clone(&client.access_list_requests);
1802        let client = FullBlockClient::test_client(client);
1803
1804        let response = timeout(
1805            Duration::from_secs(1),
1806            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1807        )
1808        .await
1809        .expect("range request should complete without access lists");
1810
1811        let blocks = response;
1812        assert_eq!(blocks.len(), 3);
1813        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1814        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1815    }
1816
1817    #[tokio::test]
1818    async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1819        let client = FullBlockWithAccessListsClient::default();
1820        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1821
1822        let requirement = Arc::clone(&client.last_access_list_requirement);
1823        let client = FullBlockClient::test_client(client);
1824
1825        let blocks = timeout(
1826            Duration::from_secs(1),
1827            client.get_full_block_range_with_optional_access_lists_with_requirement(
1828                header.hash(),
1829                3,
1830                BalRequirement::Mandatory,
1831            ),
1832        )
1833        .await
1834        .expect("range request should complete");
1835
1836        assert_eq!(blocks.len(), 3);
1837        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1838    }
1839
1840    #[tokio::test]
1841    async fn download_full_block_range_with_access_lists_preserves_short_response() {
1842        let client = FullBlockWithAccessListsClient::default();
1843        client.set_access_list_soft_limit(2);
1844        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1845
1846        let access_lists = Arc::clone(&client.access_lists);
1847        let request_count = Arc::clone(&client.access_list_requests);
1848        let client = FullBlockClient::test_client(client);
1849
1850        let blocks = timeout(
1851            Duration::from_secs(1),
1852            client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1853        )
1854        .await
1855        .expect("range request should complete without access lists");
1856
1857        assert_eq!(blocks.len(), 5);
1858        let expected = {
1859            let bals = access_lists.lock();
1860            blocks
1861                .iter()
1862                .enumerate()
1863                .map(|(idx, block)| {
1864                    if idx >= 2 {
1865                        return None
1866                    }
1867
1868                    let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1869                    Some(RawBal::from(bal))
1870                })
1871                .collect::<Vec<_>>()
1872        };
1873        assert_eq!(range_access_lists(&blocks), expected);
1874        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1875    }
1876
1877    #[tokio::test]
1878    async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1879        let client = FullBlockWithAccessListsClient::default();
1880        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1881        client.access_lists.lock().remove(&header.hash());
1882
1883        let access_lists = Arc::clone(&client.access_lists);
1884        let bad_messages = Arc::clone(&client.bad_messages);
1885        let client = FullBlockClient::test_client(client);
1886
1887        let blocks = timeout(
1888            Duration::from_secs(1),
1889            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1890        )
1891        .await
1892        .expect("range request should complete");
1893
1894        assert_eq!(blocks.len(), 3);
1895        let expected = {
1896            let bals = access_lists.lock();
1897            blocks
1898                .iter()
1899                .map(|block| {
1900                    if block.block().hash() == header.hash() {
1901                        return None
1902                    }
1903
1904                    let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1905                    Some(RawBal::from(bal))
1906                })
1907                .collect::<Vec<_>>()
1908        };
1909        assert_eq!(range_access_lists(&blocks), expected);
1910        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1911    }
1912
1913    #[tokio::test]
1914    async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1915        let client = FullBlockWithAccessListsClient::default();
1916        client.set_access_lists_unsupported(true);
1917        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1918
1919        let request_count = Arc::clone(&client.access_list_requests);
1920        let client = FullBlockClient::test_client(client);
1921
1922        let blocks = timeout(
1923            Duration::from_secs(1),
1924            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1925        )
1926        .await
1927        .expect("range request should complete without access lists");
1928
1929        assert_eq!(blocks.len(), 3);
1930        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1931        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1932    }
1933
1934    #[tokio::test]
1935    async fn download_full_block_range_with_access_lists_ignores_long_response() {
1936        let client = FullBlockWithAccessListsClient::default();
1937        client.set_extra_access_list_entries(1);
1938        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1939
1940        let request_count = Arc::clone(&client.access_list_requests);
1941        let bad_messages = Arc::clone(&client.bad_messages);
1942        let client = FullBlockClient::test_client(client);
1943
1944        let blocks = timeout(
1945            Duration::from_secs(1),
1946            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1947        )
1948        .await
1949        .expect("range request should complete without access lists");
1950
1951        assert_eq!(blocks.len(), 3);
1952        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1953        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1954        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1955    }
1956
1957    #[tokio::test]
1958    async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
1959        let client = FullBlockWithAccessListsClient::default();
1960        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1961        client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
1962
1963        let bad_messages = Arc::clone(&client.bad_messages);
1964        let client = FullBlockClient::test_client(client);
1965
1966        let blocks = timeout(
1967            Duration::from_secs(1),
1968            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1969        )
1970        .await
1971        .expect("range request should complete without access lists");
1972
1973        assert_eq!(blocks.len(), 3);
1974        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1975        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1976    }
1977
1978    #[tokio::test]
1979    async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
1980        let client = FullBlockWithAccessListsClient::default();
1981        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1982        let first_bal =
1983            client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
1984        let second_hash = header.parent_hash;
1985        client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
1986
1987        let bad_messages = Arc::clone(&client.bad_messages);
1988        let client = FullBlockClient::test_client(client);
1989
1990        let blocks = timeout(
1991            Duration::from_secs(1),
1992            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1993        )
1994        .await
1995        .expect("range request should complete without unvalidated access lists");
1996
1997        assert_eq!(blocks.len(), 3);
1998        assert_eq!(blocks[1].block().hash(), second_hash);
1999        assert_eq!(range_access_lists(&blocks), vec![Some(RawBal::from(first_bal)), None, None]);
2000        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2001    }
2002
2003    #[tokio::test]
2004    async fn download_full_block_range_with_invalid_header() {
2005        let client = TestFullBlockClient::default();
2006        let range_length: usize = 3;
2007        let (header, _) = insert_headers_into_client(&client, 0..range_length);
2008
2009        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2010        test_consensus.set_fail_validation(true);
2011        test_consensus.set_fail_body_against_header(false);
2012        let client = FullBlockClient::new(client, Arc::new(test_consensus));
2013
2014        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2015
2016        assert_eq!(received.len(), range_length);
2017        for (i, block) in received.iter().enumerate() {
2018            let expected_number = header.number - i as u64;
2019            assert_eq!(block.number, expected_number);
2020        }
2021    }
2022}