Skip to main content

reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4    bodies::client::{BodiesClient, SingleBodyRequest},
5    download::DownloadClient,
6    error::PeerRequestResult,
7    headers::client::{HeadersClient, SingleHeaderRequest},
8    priority::Priority,
9    BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_eip7928::bal::RawBal;
13use alloy_primitives::{Bytes, Sealable, B256};
14use core::marker::PhantomData;
15use futures::FutureExt;
16use reth_consensus::Consensus;
17use reth_eth_wire_types::{
18    BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
19};
20use reth_network_peers::{PeerId, WithPeerId};
21use reth_primitives_traits::{Block, SealedBlock, SealedBlockWith, SealedHeader};
22use std::{
23    cmp::Reverse,
24    collections::{HashMap, VecDeque},
25    fmt::Debug,
26    hash::Hash,
27    ops::RangeInclusive,
28    pin::Pin,
29    sync::Arc,
30    task::{ready, Context, Poll},
31};
32use tracing::{debug, trace};
33
34/// A sealed block with optional validated raw block access-list data.
35pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<RawBal>>;
36
37/// A Client that can fetch full blocks from the network.
38#[derive(Debug, Clone)]
39pub struct FullBlockClient<Client>
40where
41    Client: BlockClient,
42{
43    client: Client,
44    consensus: Arc<dyn Consensus<Client::Block>>,
45}
46
47impl<Client> FullBlockClient<Client>
48where
49    Client: BlockClient,
50{
51    /// Creates a new instance of `FullBlockClient`.
52    pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
53        Self { client, consensus }
54    }
55
56    /// Returns a client with Test consensus
57    #[cfg(any(test, feature = "test-utils"))]
58    pub fn test_client(client: Client) -> Self {
59        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
60    }
61}
62
63impl<Client> FullBlockClient<Client>
64where
65    Client: BlockClient,
66{
67    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
68    ///
69    /// Note: this future is cancel safe
70    ///
71    /// Caution: This does no validation of body (transactions) response but guarantees that the
72    /// [`SealedHeader`] matches the requested hash.
73    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
74        FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
75    }
76
77    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
78    ///
79    /// Note: this future is cancel safe.
80    ///
81    /// Caution: This does no validation of body (transactions) responses but guarantees that
82    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
83    /// bodies received matches the requested limit.
84    ///
85    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
86    pub fn get_full_block_range(
87        &self,
88        hash: B256,
89        count: u64,
90    ) -> FetchFullBlockRangeFuture<Client> {
91        let client = self.client.clone();
92        FetchFullBlockRangeFuture {
93            start_hash: hash,
94            count,
95            request: FullBlockRangeRequest {
96                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
97                bodies: None,
98            },
99            client,
100            headers: None,
101            pending_headers: VecDeque::new(),
102            bodies: HashMap::default(),
103            consensus: Arc::clone(&self.consensus),
104        }
105    }
106}
107
108impl<Client> FullBlockClient<Client>
109where
110    Client: BlockClient + BlockAccessListsClient,
111{
112    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
113    /// for the given hash.
114    ///
115    /// Note: this future is cancel safe
116    ///
117    /// Caution: This does no validation of body (transactions) response but guarantees that the
118    /// [`SealedHeader`] matches the requested hash.
119    pub fn get_full_block_with_access_lists(
120        &self,
121        hash: B256,
122    ) -> FetchFullBlockWithBalFuture<Client> {
123        self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
124    }
125
126    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
127    /// for the given hash using the requested BAL availability policy.
128    ///
129    /// Note: this future is cancel safe
130    ///
131    /// Caution: This does no validation of body (transactions) response but guarantees that the
132    /// [`SealedHeader`] matches the requested hash.
133    pub fn get_full_block_with_access_lists_with_requirement(
134        &self,
135        hash: B256,
136        requirement: BalRequirement,
137    ) -> FetchFullBlockWithBalFuture<Client> {
138        let client = self.client.clone();
139        FetchFullBlockWithBalFuture {
140            block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
141            block_result: None,
142            bal_request_state: BalRequestState::Pending(
143                client.get_block_access_lists_with_requirement(vec![hash], requirement),
144            ),
145        }
146    }
147
148    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
149    /// the given hash and count.
150    ///
151    /// The block range is always the primary result. Access lists are requested after the block
152    /// range is downloaded. Each block contains access-list data when the optional BAL response
153    /// included the corresponding entry.
154    pub fn get_full_block_range_with_optional_access_lists(
155        &self,
156        hash: B256,
157        count: u64,
158    ) -> FetchFullBlockRangeWithBalFuture<Client> {
159        self.get_full_block_range_with_optional_access_lists_with_requirement(
160            hash,
161            count,
162            BalRequirement::default(),
163        )
164    }
165
166    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
167    /// the given hash and count using the requested BAL availability policy.
168    ///
169    /// The block range is always the primary result. Access lists are requested after the block
170    /// range is downloaded. Each block contains access-list data when the optional BAL response
171    /// included the corresponding entry.
172    pub fn get_full_block_range_with_optional_access_lists_with_requirement(
173        &self,
174        hash: B256,
175        count: u64,
176        requirement: BalRequirement,
177    ) -> FetchFullBlockRangeWithBalFuture<Client> {
178        let client = self.client.clone();
179        FetchFullBlockRangeWithBalFuture {
180            blocks: self.get_full_block_range(hash, count),
181            client,
182            block_result: None,
183            access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
184        }
185    }
186}
187
188/// A future that downloads a full block from the network.
189///
190/// This will attempt to fetch both the header and body for the given block hash at the same time.
191/// When both requests succeed, the future will yield the full block.
192#[must_use = "futures do nothing unless polled"]
193pub struct FetchFullBlockFuture<Client>
194where
195    Client: BlockClient,
196{
197    client: Client,
198    consensus: Arc<dyn Consensus<Client::Block>>,
199    hash: B256,
200    request: FullBlockRequest<Client>,
201    header: Option<SealedHeader<Client::Header>>,
202    body: Option<BodyResponse<Client::Body>>,
203}
204
205impl<Client> FetchFullBlockFuture<Client>
206where
207    Client: BlockClient,
208{
209    fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
210        Self {
211            hash,
212            consensus,
213            request: FullBlockRequest {
214                header: Some(client.get_header(hash.into())),
215                body: Some(client.get_block_body(hash)),
216            },
217            client,
218            header: None,
219            body: None,
220        }
221    }
222
223    /// Returns the hash of the block being requested.
224    pub const fn hash(&self) -> &B256 {
225        &self.hash
226    }
227
228    /// If the header request is already complete, this returns the block number
229    pub fn block_number(&self) -> Option<u64> {
230        self.header.as_ref().map(|h| h.number())
231    }
232
233    /// Returns the [`SealedBlock`] if the request is complete and valid.
234    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
235        if self.header.is_none() || self.body.is_none() {
236            return None
237        }
238
239        let header = self.header.take().unwrap();
240        let resp = self.body.take().unwrap();
241        match resp {
242            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
243            BodyResponse::PendingValidation(resp) => {
244                // ensure the block is valid, else retry
245                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
246                {
247                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
248                    self.client.report_bad_message(resp.peer_id());
249                    self.header = Some(header);
250                    self.request.body = Some(self.client.get_block_body(self.hash));
251                    return None
252                }
253                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
254            }
255        }
256    }
257
258    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
259        if let Some(ref header) = self.header {
260            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
261                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
262                self.client.report_bad_message(resp.peer_id());
263                return
264            }
265            self.body = Some(BodyResponse::Validated(resp.into_data()));
266            return
267        }
268        self.body = Some(BodyResponse::PendingValidation(resp));
269    }
270}
271
272impl<Client> Future for FetchFullBlockFuture<Client>
273where
274    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
275{
276    type Output = SealedBlock<Client::Block>;
277
278    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
279        let this = self.get_mut();
280
281        // preemptive yield point
282        let mut budget = 4;
283
284        loop {
285            match ready!(this.request.poll(cx)) {
286                ResponseResult::Header(res) => {
287                    match res {
288                        Ok(maybe_header) => {
289                            let (peer, maybe_header) =
290                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
291                            if let Some(header) = maybe_header {
292                                if header.hash() == this.hash {
293                                    this.header = Some(header);
294                                } else {
295                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
296                                    // received a different header than requested
297                                    this.client.report_bad_message(peer)
298                                }
299                            }
300                        }
301                        Err(err) => {
302                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
303                        }
304                    }
305
306                    if this.header.is_none() {
307                        // received bad response
308                        this.request.header = Some(this.client.get_header(this.hash.into()));
309                    }
310                }
311                ResponseResult::Body(res) => {
312                    match res {
313                        Ok(maybe_body) => {
314                            if let Some(body) = maybe_body.transpose() {
315                                this.on_block_response(body);
316                            }
317                        }
318                        Err(err) => {
319                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
320                        }
321                    }
322                    if this.body.is_none() {
323                        // received bad response
324                        this.request.body = Some(this.client.get_block_body(this.hash));
325                    }
326                }
327            }
328
329            if let Some(res) = this.take_block() {
330                return Poll::Ready(res)
331            }
332
333            // ensure we still have enough budget for another iteration
334            budget -= 1;
335            if budget == 0 {
336                // make sure we're woken up again
337                cx.waker().wake_by_ref();
338                return Poll::Pending
339            }
340        }
341    }
342}
343
344/// A future that downloads a full block and optionally its block access lists from the network.
345///
346/// This composes the existing full block downloader with a block access list request so the
347/// header/body logic stays centralized. Missing access lists do not block returning the full block.
348#[must_use = "futures do nothing unless polled"]
349pub struct FetchFullBlockWithBalFuture<Client>
350where
351    Client: BlockClient + BlockAccessListsClient,
352{
353    block: FetchFullBlockFuture<Client>,
354    block_result: Option<SealedBlock<Client::Block>>,
355    bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
356}
357
358impl<Client> FetchFullBlockWithBalFuture<Client>
359where
360    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
361{
362    /// Returns the hash of the block being requested.
363    pub const fn hash(&self) -> &B256 {
364        self.block.hash()
365    }
366}
367
368impl<Client> FetchFullBlockWithBalFuture<Client>
369where
370    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
371{
372    /// If the header request is already complete, this returns the block number.
373    pub fn block_number(&self) -> Option<u64> {
374        self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
375    }
376
377    /// Polls the optional BAL request once and records its final best-effort result.
378    ///
379    /// Exactly one returned BAL entry is preserved. Empty responses, malformed responses, and
380    /// request errors resolve to `None` instead of being retried, so a BAL failure cannot block
381    /// returning the downloaded block.
382    fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
383        let res = match &mut self.bal_request_state {
384            BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
385            BalRequestState::Ready(_) => return Poll::Ready(()),
386        };
387
388        match res {
389            Ok(bal) => {
390                let (peer, access_lists) = bal.split();
391                match access_lists.0.len() {
392                    0 => self.bal_request_state = BalRequestState::Ready(None),
393                    1 => {
394                        let bal = access_lists.0.into_iter().next().expect("len checked");
395                        self.bal_request_state =
396                            BalRequestState::Ready(Some(WithPeerId::new(peer, bal)));
397                    }
398                    received => {
399                        debug!(
400                            target: "downloaders",
401                            hash = ?self.block.hash(),
402                            expected = 1,
403                            received,
404                            "Received wrong access list response",
405                        );
406                        self.block.client.report_bad_message(peer);
407                        self.bal_request_state = BalRequestState::Ready(None);
408                    }
409                }
410            }
411            Err(err) => {
412                debug!(
413                    target: "downloaders",
414                    %err,
415                    hash = ?self.block.hash(),
416                    "Access list download failed",
417                );
418                self.bal_request_state = BalRequestState::Ready(None);
419            }
420        }
421
422        Poll::Ready(())
423    }
424
425    /// Returns the block once the block download and optional BAL lookup have both completed.
426    ///
427    /// The BAL lookup must be ready even when it resolved to `None`, which prevents a fast block
428    /// response from racing a still-pending BAL response and incorrectly dropping available BAL
429    /// data.
430    fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
431        let BalRequestState::Ready(bal) = &mut self.bal_request_state else { return None };
432        let block = self.block_result.take()?;
433        let raw_bal =
434            bal.take().and_then(|bal| match seal_block_access_list_for_block(&block, bal) {
435                Ok(raw_bal) => raw_bal,
436                Err(peer) => {
437                    self.block.client.report_bad_message(peer);
438                    None
439                }
440            });
441        Some(SealedBlockWith::new(block, raw_bal))
442    }
443}
444
445impl<Client> Future for FetchFullBlockWithBalFuture<Client>
446where
447    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
448{
449    type Output = SealedBlockWithAccessList<Client::Block>;
450
451    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
452        let this = self.get_mut();
453
454        if this.block_result.is_none() &&
455            let Poll::Ready(block) = this.block.poll_unpin(cx)
456        {
457            this.block_result = Some(block);
458        }
459
460        ready!(this.poll_bal_request(cx));
461
462        if let Some(res) = this.take_block_and_access_lists() {
463            return Poll::Ready(res)
464        }
465
466        Poll::Pending
467    }
468}
469
470impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
471where
472    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
473{
474    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
475        f.debug_struct("FetchFullBlockWithBalFuture")
476            .field("hash", &self.block.hash())
477            .field("block_ready", &self.block_result.is_some())
478            .field("bal_request_ready", &self.bal_request_state.is_ready())
479            .finish()
480    }
481}
482
483/// Tracks the BAL request and its completed result.
484enum BalRequestState<Req> {
485    Pending(Req),
486    Ready(Option<WithPeerId<Option<Bytes>>>),
487}
488
489impl<Req> BalRequestState<Req> {
490    const fn is_ready(&self) -> bool {
491        matches!(self, Self::Ready(_))
492    }
493}
494
495/// A future that downloads a range of full blocks and optionally their block access lists from the
496/// network.
497///
498/// This composes the existing full block range downloader with a follow-up optional block
499/// access-list request, so callers that only need blocks can keep using
500/// [`FetchFullBlockRangeFuture`].
501#[must_use = "futures do nothing unless polled"]
502#[expect(missing_debug_implementations)]
503pub struct FetchFullBlockRangeWithBalFuture<Client>
504where
505    Client: BlockClient + BlockAccessListsClient,
506{
507    blocks: FetchFullBlockRangeFuture<Client>,
508    client: Client,
509    block_result: Option<Vec<SealedBlock<Client::Block>>>,
510    access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
511}
512
513impl<Client> FetchFullBlockRangeWithBalFuture<Client>
514where
515    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
516        + BlockAccessListsClient,
517{
518    /// Returns the block hash the requested range starts at (inclusive).
519    pub const fn start_hash(&self) -> B256 {
520        self.blocks.start_hash()
521    }
522
523    /// Returns the number of requested blocks.
524    pub const fn count(&self) -> u64 {
525        self.blocks.count()
526    }
527
528    fn start_access_lists_request_if_possible(&mut self) {
529        let requirement = match &self.access_lists {
530            OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
531            OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
532                return
533            }
534        };
535
536        // BALs are requested by block hash, so wait until the block range is fully assembled.
537        let Some(blocks) = self.block_result.as_ref() else { return };
538        let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
539        self.access_lists = OptionalBlockAccessListsState::Pending(
540            self.client.get_block_access_lists_with_requirement(hashes, requirement),
541        );
542    }
543
544    /// Starts and polls the optional BAL request once, if it is ready to make progress.
545    fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
546        self.start_access_lists_request_if_possible();
547
548        let poll = match &mut self.access_lists {
549            OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
550            OptionalBlockAccessListsState::WaitingForBlocks { .. } |
551            OptionalBlockAccessListsState::Ready(_) => return,
552        };
553
554        match poll {
555            Poll::Pending => {}
556            Poll::Ready(Ok(access_lists)) => {
557                self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
558            }
559            Poll::Ready(Err(err)) => {
560                debug!(
561                    target: "downloaders",
562                    %err,
563                    start_hash = ?self.blocks.start_hash(),
564                    "Access list range download failed",
565                );
566
567                // Optional BAL lookup is best-effort: missing eth/71 support or request failures
568                // should not block returning the downloaded block range.
569                self.access_lists = OptionalBlockAccessListsState::Ready(None);
570            }
571        }
572    }
573
574    /// Returns the block range once blocks and the optional BAL lookup are both complete.
575    fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
576        let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
577            return None
578        };
579
580        let blocks = self.block_result.take()?;
581
582        Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
583    }
584}
585
586impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
587where
588    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
589        + BlockAccessListsClient
590        + 'static,
591{
592    type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
593
594    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
595        let this = self.get_mut();
596
597        // Complete the normal block range first, then issue a separate BAL hash-list request.
598        if this.block_result.is_none() &&
599            let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
600        {
601            this.block_result = Some(blocks);
602        }
603
604        this.poll_access_lists(cx);
605
606        if let Some(response) = this.take_response() {
607            return Poll::Ready(response)
608        }
609
610        Poll::Pending
611    }
612}
613
614/// Tracks an optional BAL range request and its completed result.
615enum OptionalBlockAccessListsState<Req> {
616    /// The block hashes needed for `GetBlockAccessLists` are not known yet.
617    WaitingForBlocks {
618        /// The BAL availability policy for the eventual request.
619        requirement: BalRequirement,
620    },
621    /// A `GetBlockAccessLists` request is in flight.
622    Pending(Req),
623    /// `None` means the block range is available but optional BAL data is not.
624    Ready(Option<WithPeerId<BlockAccessLists>>),
625}
626
627impl<Client> Debug for FetchFullBlockFuture<Client>
628where
629    Client: BlockClient<Header: Debug, Body: Debug>,
630{
631    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
632        f.debug_struct("FetchFullBlockFuture")
633            .field("hash", &self.hash)
634            .field("header", &self.header)
635            .field("body", &self.body)
636            .finish()
637    }
638}
639
640struct FullBlockRequest<Client>
641where
642    Client: BlockClient,
643{
644    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
645    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
646}
647
648impl<Client> FullBlockRequest<Client>
649where
650    Client: BlockClient,
651{
652    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
653        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
654            let Poll::Ready(res) = fut.poll(cx)
655        {
656            self.header = None;
657            return Poll::Ready(ResponseResult::Header(res))
658        }
659
660        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
661            let Poll::Ready(res) = fut.poll(cx)
662        {
663            self.body = None;
664            return Poll::Ready(ResponseResult::Body(res))
665        }
666
667        Poll::Pending
668    }
669}
670
671/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
672/// future.
673enum ResponseResult<H, B> {
674    Header(PeerRequestResult<Option<H>>),
675    Body(PeerRequestResult<Option<B>>),
676}
677
678/// The response of a body request.
679#[derive(Debug)]
680enum BodyResponse<B> {
681    /// Already validated against transaction root of header
682    Validated(B),
683    /// Still needs to be validated against header
684    PendingValidation(WithPeerId<B>),
685}
686/// A future that downloads a range of full blocks from the network.
687///
688/// This first fetches the headers for the given range using the inner `Client`. Once the request
689/// is complete, it will fetch the bodies for the headers it received.
690///
691/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
692/// yield the full block range.
693///
694/// The full block range will be returned with falling block numbers, i.e. in descending order.
695///
696/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
697/// hash array used to request them.
698#[must_use = "futures do nothing unless polled"]
699#[expect(missing_debug_implementations)]
700pub struct FetchFullBlockRangeFuture<Client>
701where
702    Client: BlockClient,
703{
704    /// The client used to fetch headers and bodies.
705    client: Client,
706    /// The consensus instance used to validate the blocks.
707    consensus: Arc<dyn Consensus<Client::Block>>,
708    /// The block hash to start fetching from (inclusive).
709    start_hash: B256,
710    /// How many blocks to fetch: `len([start_hash, ..]) == count`
711    count: u64,
712    /// Requests for headers and bodies that are in progress.
713    request: FullBlockRangeRequest<Client>,
714    /// Fetched headers.
715    headers: Option<Vec<SealedHeader<Client::Header>>>,
716    /// The next headers to request bodies for. This is drained as responses are received.
717    pending_headers: VecDeque<SealedHeader<Client::Header>>,
718    /// The bodies that have been received so far.
719    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
720}
721
722impl<Client> FetchFullBlockRangeFuture<Client>
723where
724    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
725{
726    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
727    fn is_bodies_complete(&self) -> bool {
728        self.bodies.len() == self.count as usize
729    }
730
731    /// Inserts a block body, matching it with the `next_header`.
732    ///
733    /// Note: this assumes the response matches the next header in the queue.
734    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
735        if let Some(header) = self.pending_headers.pop_front() {
736            self.bodies.insert(header, body_response);
737        }
738    }
739
740    /// Inserts multiple block bodies.
741    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
742        for body in bodies {
743            self.insert_body(body);
744        }
745    }
746
747    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
748    /// in the `root_map`.
749    fn remaining_bodies_hashes(&self) -> Vec<B256> {
750        self.pending_headers.iter().map(|h| h.hash()).collect()
751    }
752
753    /// Returns the [`SealedBlock`]s if the request is complete and valid.
754    ///
755    /// The request is complete if the number of blocks requested is equal to the number of blocks
756    /// received. The request is valid if the returned bodies match the roots in the headers.
757    ///
758    /// These are returned in falling order starting with the requested `hash`, i.e. with
759    /// descending block numbers.
760    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
761        if !self.is_bodies_complete() {
762            // not done with bodies yet
763            return None
764        }
765
766        let headers = self.headers.take()?;
767        let mut needs_retry = false;
768        let mut valid_responses = Vec::new();
769
770        for header in &headers {
771            if let Some(body_resp) = self.bodies.remove(header) {
772                // validate body w.r.t. the hashes in the header, only inserting into the response
773                let body = match body_resp {
774                    BodyResponse::Validated(body) => body,
775                    BodyResponse::PendingValidation(resp) => {
776                        // ensure the block is valid, else retry
777                        if let Err(err) =
778                            self.consensus.validate_body_against_header(resp.data(), header)
779                        {
780                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
781                            self.client.report_bad_message(resp.peer_id());
782
783                            // get body that doesn't match, put back into vecdeque, and retry it
784                            self.pending_headers.push_back(header.clone());
785                            needs_retry = true;
786                            continue
787                        }
788
789                        resp.into_data()
790                    }
791                };
792
793                valid_responses
794                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
795            }
796        }
797
798        if needs_retry {
799            // put response hashes back into bodies map since we aren't returning them as a
800            // response
801            for block in valid_responses {
802                let (header, body) = block.split_sealed_header_body();
803                self.bodies.insert(header, BodyResponse::Validated(body));
804            }
805
806            // put headers back since they were `take`n before
807            self.headers = Some(headers);
808
809            // create response for failing bodies
810            let hashes = self.remaining_bodies_hashes();
811            self.request.bodies = Some(self.client.get_block_bodies(hashes));
812            return None
813        }
814
815        Some(valid_responses)
816    }
817
818    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
819        let (peer, mut headers_falling) =
820            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
821
822        // fill in the response if it's the correct length
823        if headers_falling.len() == self.count as usize {
824            // sort headers from highest to lowest block number
825            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
826
827            // check the starting hash
828            if headers_falling[0].hash() == self.start_hash {
829                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
830                // check if the downloaded headers are valid
831                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
832                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
833                    self.client.report_bad_message(peer);
834                }
835
836                // get the bodies request so it can be polled later
837                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
838
839                // populate the pending headers
840                self.pending_headers = headers_falling.clone().into();
841
842                // set the actual request if it hasn't been started yet
843                if !self.has_bodies_request_started() {
844                    // request the bodies for the downloaded headers
845                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
846                }
847
848                // set the headers response
849                self.headers = Some(headers_falling);
850            } else {
851                // received a different header than requested
852                self.client.report_bad_message(peer);
853            }
854        }
855    }
856
857    /// Returns whether or not a bodies request has been started, returning false if there is no
858    /// pending request.
859    const fn has_bodies_request_started(&self) -> bool {
860        self.request.bodies.is_some()
861    }
862
863    /// Returns the start hash for the request
864    pub const fn start_hash(&self) -> B256 {
865        self.start_hash
866    }
867
868    /// Returns the block count for the request
869    pub const fn count(&self) -> u64 {
870        self.count
871    }
872}
873
874impl<Client> Future for FetchFullBlockRangeFuture<Client>
875where
876    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
877{
878    type Output = Vec<SealedBlock<Client::Block>>;
879
880    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
881        let this = self.get_mut();
882
883        loop {
884            match ready!(this.request.poll(cx)) {
885                // This branch handles headers responses from peers - it first ensures that the
886                // starting hash and number of headers matches what we requested.
887                //
888                // If these don't match, we penalize the peer and retry the request.
889                // If they do match, we sort the headers by block number and start the request for
890                // the corresponding block bodies.
891                //
892                // The next result that should be yielded by `poll` is the bodies response.
893                RangeResponseResult::Header(res) => {
894                    match res {
895                        Ok(headers) => {
896                            this.on_headers_response(headers);
897                        }
898                        Err(err) => {
899                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
900                        }
901                    }
902
903                    if this.headers.is_none() {
904                        // did not receive a correct response yet, retry
905                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
906                            start: this.start_hash.into(),
907                            limit: this.count,
908                            direction: HeadersDirection::Falling,
909                        }));
910                    }
911                }
912                // This branch handles block body responses from peers - it first inserts the
913                // bodies into the `bodies` map, and then checks if the request is complete.
914                //
915                // If the request is not complete, and we need to request more bodies, we send
916                // a bodies request for the headers we don't yet have bodies for.
917                RangeResponseResult::Body(res) => {
918                    match res {
919                        Ok(bodies_resp) => {
920                            let (peer, new_bodies) = bodies_resp.split();
921
922                            // first insert the received bodies
923                            this.insert_bodies(
924                                new_bodies
925                                    .into_iter()
926                                    .map(|resp| WithPeerId::new(peer, resp))
927                                    .map(BodyResponse::PendingValidation),
928                            );
929
930                            if !this.is_bodies_complete() {
931                                // get remaining hashes so we can send the next request
932                                let req_hashes = this.remaining_bodies_hashes();
933
934                                // set a new request
935                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
936                            }
937                        }
938                        Err(err) => {
939                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
940                        }
941                    }
942                    if this.request.bodies.is_none() && !this.is_bodies_complete() {
943                        // no pending bodies request (e.g., request error), retry remaining bodies
944                        // TODO: convert this into two futures, one which is a headers range
945                        // future, and one which is a bodies range future.
946                        //
947                        // The headers range future should yield the bodies range future.
948                        // The bodies range future should not have an Option<Vec<B256>>, it should
949                        // have a populated Vec<B256> from the successful headers range future.
950                        //
951                        // This is optimal because we can not send a bodies request without
952                        // first completing the headers request. This way we can get rid of the
953                        // following `if let Some`. A bodies request should never be sent before
954                        // the headers request completes, so this should always be `Some` anyways.
955                        let hashes = this.remaining_bodies_hashes();
956                        if !hashes.is_empty() {
957                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
958                        }
959                    }
960                }
961            }
962
963            if let Some(res) = this.take_blocks() {
964                return Poll::Ready(res)
965            }
966        }
967    }
968}
969
970/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
971/// futures until they return responses. It will return either the header or body result, depending
972/// on which future successfully returned.
973struct FullBlockRangeRequest<Client>
974where
975    Client: BlockClient,
976{
977    headers: Option<<Client as HeadersClient>::Output>,
978    bodies: Option<<Client as BodiesClient>::Output>,
979}
980
981impl<Client> FullBlockRangeRequest<Client>
982where
983    Client: BlockClient,
984{
985    fn poll(
986        &mut self,
987        cx: &mut Context<'_>,
988    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
989        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
990            let Poll::Ready(res) = fut.poll(cx)
991        {
992            self.headers = None;
993            return Poll::Ready(RangeResponseResult::Header(res))
994        }
995
996        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
997            let Poll::Ready(res) = fut.poll(cx)
998        {
999            self.bodies = None;
1000            return Poll::Ready(RangeResponseResult::Body(res))
1001        }
1002
1003        Poll::Pending
1004    }
1005}
1006
1007// The result of a request for headers or block bodies. This is yielded by the
1008// `FullBlockRangeRequest` future.
1009enum RangeResponseResult<H, B> {
1010    Header(PeerRequestResult<Vec<H>>),
1011    Body(PeerRequestResult<Vec<B>>),
1012}
1013
1014/// A headers+bodies client implementation that does nothing.
1015#[derive(Debug, Clone)]
1016#[non_exhaustive]
1017pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1018
1019/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
1020impl<Net> DownloadClient for NoopFullBlockClient<Net>
1021where
1022    Net: Debug + Send + Sync,
1023{
1024    /// Reports a bad message received from a peer.
1025    ///
1026    /// # Arguments
1027    ///
1028    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
1029    ///   implementation).
1030    fn report_bad_message(&self, _peer_id: PeerId) {}
1031
1032    /// Retrieves the number of connected peers.
1033    ///
1034    /// # Returns
1035    ///
1036    /// The number of connected peers, which is always zero in this implementation.
1037    fn num_connected_peers(&self) -> usize {
1038        0
1039    }
1040}
1041
1042/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
1043impl<Net> BodiesClient for NoopFullBlockClient<Net>
1044where
1045    Net: NetworkPrimitives,
1046{
1047    type Body = Net::BlockBody;
1048    /// Defines the output type of the function.
1049    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1050
1051    /// Retrieves block bodies based on provided hashes and priority.
1052    ///
1053    /// # Arguments
1054    ///
1055    /// * `_hashes` - A vector of block hashes (unused in this implementation).
1056    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
1057    ///
1058    /// # Returns
1059    ///
1060    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
1061    fn get_block_bodies_with_priority_and_range_hint(
1062        &self,
1063        _hashes: Vec<B256>,
1064        _priority: Priority,
1065        _range_hint: Option<RangeInclusive<u64>>,
1066    ) -> Self::Output {
1067        // Create a future that immediately returns an empty vector of block bodies and a random
1068        // PeerId.
1069        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1070    }
1071}
1072
1073impl<Net> HeadersClient for NoopFullBlockClient<Net>
1074where
1075    Net: NetworkPrimitives,
1076{
1077    type Header = Net::BlockHeader;
1078    /// The output type representing a future containing a peer request result with a vector of
1079    /// headers.
1080    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1081
1082    /// Retrieves headers with a specified priority level.
1083    ///
1084    /// This implementation does nothing and returns an empty vector of headers.
1085    ///
1086    /// # Arguments
1087    ///
1088    /// * `_request` - A request for headers (unused in this implementation).
1089    /// * `_priority` - The priority level for the headers request (unused in this implementation).
1090    ///
1091    /// # Returns
1092    ///
1093    /// Always returns a ready future with an empty vector of headers wrapped in a
1094    /// `PeerRequestResult`.
1095    fn get_headers_with_priority(
1096        &self,
1097        _request: HeadersRequest,
1098        _priority: Priority,
1099    ) -> Self::Output {
1100        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1101    }
1102}
1103
1104impl<Net> BlockClient for NoopFullBlockClient<Net>
1105where
1106    Net: NetworkPrimitives,
1107{
1108    type Block = Net::Block;
1109}
1110
1111impl<Net> BlockAccessListsClient for NoopFullBlockClient<Net>
1112where
1113    Net: NetworkPrimitives,
1114{
1115    type Output = futures::future::Ready<PeerRequestResult<BlockAccessLists>>;
1116
1117    fn get_block_access_lists_with_priority_and_requirement(
1118        &self,
1119        _hashes: Vec<B256>,
1120        _priority: Priority,
1121        _requirement: BalRequirement,
1122    ) -> Self::Output {
1123        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), BlockAccessLists::default())))
1124    }
1125}
1126
1127impl<Net> Default for NoopFullBlockClient<Net> {
1128    fn default() -> Self {
1129        Self(PhantomData::<Net>)
1130    }
1131}
1132
1133/// Validates one raw block access-list entry against the block's access-list hash.
1134///
1135/// Returns `Ok(Some(_))` for a matching entry, `Ok(None)` when the block has no access-list hash
1136/// or the peer returned an unavailable entry, and `Err(peer)` for a hash mismatch that should be
1137/// reported as a bad message.
1138fn seal_block_access_list_for_block<B: Block>(
1139    block: &SealedBlock<B>,
1140    bal: WithPeerId<Option<Bytes>>,
1141) -> Result<Option<RawBal>, PeerId> {
1142    let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1143
1144    let (peer, bal) = bal.split();
1145    let Some(bal) = bal else { return Ok(None) };
1146    let raw_bal = RawBal::new(bal);
1147    let computed = raw_bal.hash();
1148    if computed == expected {
1149        return Ok(Some(raw_bal))
1150    }
1151
1152    debug!(
1153        target: "downloaders",
1154        block_hash = ?block.hash(),
1155        ?computed,
1156        ?expected,
1157        "Received block access list with wrong hash",
1158    );
1159    Err(peer)
1160}
1161
1162/// Wraps a block range with validated block access-list entries.
1163///
1164/// Short responses are treated as a valid prefix and the remaining blocks receive `None`.
1165/// Responses longer than the requested block range return the full block range without access-list
1166/// data. Non-empty hash mismatches stop accepting access-list data after reporting the peer, so any
1167/// already validated prefix is preserved.
1168fn seal_blocks_with_access_lists<Client>(
1169    client: &Client,
1170    blocks: Vec<SealedBlock<Client::Block>>,
1171    access_lists: Option<WithPeerId<BlockAccessLists>>,
1172) -> Vec<SealedBlockWithAccessList<Client::Block>>
1173where
1174    Client: BlockClient,
1175{
1176    let Some(access_lists) = access_lists else {
1177        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1178    };
1179
1180    let (peer, access_lists) = access_lists.split();
1181    let expected = blocks.len();
1182    let received = access_lists.0.len();
1183
1184    if received > expected {
1185        trace!(
1186            target: "downloaders",
1187            expected,
1188            received,
1189            "Ignoring overlong access list range response",
1190        );
1191        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1192    }
1193
1194    let mut access_lists = access_lists.0.into_iter();
1195    let mut blocks = blocks.into_iter();
1196    let mut response = Vec::with_capacity(expected);
1197
1198    for block in blocks.by_ref() {
1199        let Some(bal) = access_lists.next() else {
1200            // Short BAL responses are valid; the current block and all remaining blocks are
1201            // returned without access-list data below.
1202            response.push(SealedBlockWith::from_block(block));
1203            break
1204        };
1205
1206        match seal_block_access_list_for_block(&block, WithPeerId::new(peer, bal)) {
1207            Ok(raw_bal) => response.push(SealedBlockWith::new(block, raw_bal)),
1208            Err(peer) => {
1209                // A hash mismatch means this BAL entry is not for the current block. Stop matching
1210                // later positional entries and return the rest of the range without BAL data.
1211                client.report_bad_message(peer);
1212                response.push(SealedBlockWith::from_block(block));
1213                break
1214            }
1215        }
1216    }
1217
1218    response.extend(blocks.map(SealedBlockWith::from_block));
1219    response
1220}
1221
1222#[cfg(test)]
1223mod tests {
1224    use reth_ethereum_primitives::BlockBody;
1225
1226    use super::*;
1227    use crate::{error::RequestError, test_utils::TestFullBlockClient};
1228    use alloy_consensus::Header;
1229    use alloy_primitives::{keccak256, map::B256Map, Bytes};
1230    use parking_lot::Mutex;
1231    use std::{
1232        ops::Range,
1233        sync::{
1234            atomic::{AtomicBool, AtomicUsize, Ordering},
1235            Arc,
1236        },
1237    };
1238
1239    const EMPTY_LIST_CODE: u8 = 0xc0;
1240    use tokio::time::{timeout, Duration};
1241
1242    fn sealed_header_with_access_list_hash(bal: &Bytes) -> SealedHeader {
1243        let header =
1244            Header { block_access_list_hash: Some(keccak256(bal.as_ref())), ..Default::default() };
1245        SealedHeader::seal_slow(header)
1246    }
1247
1248    fn range_access_lists<B: Block>(
1249        blocks: &[SealedBlockWithAccessList<B>],
1250    ) -> Vec<Option<RawBal>> {
1251        blocks.iter().map(|block| block.data().clone()).collect()
1252    }
1253
1254    #[tokio::test]
1255    async fn download_single_full_block() {
1256        let client = TestFullBlockClient::default();
1257        let header: SealedHeader = SealedHeader::default();
1258        let body = BlockBody::default();
1259        client.insert(header.clone(), body.clone());
1260        let client = FullBlockClient::test_client(client);
1261
1262        let received = client.get_full_block(header.hash()).await;
1263        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1264    }
1265
1266    #[tokio::test]
1267    async fn download_single_full_block_range() {
1268        let client = TestFullBlockClient::default();
1269        let header: SealedHeader = SealedHeader::default();
1270        let body = BlockBody::default();
1271        client.insert(header.clone(), body.clone());
1272        let client = FullBlockClient::test_client(client);
1273
1274        let received = client.get_full_block_range(header.hash(), 1).await;
1275        let received = received.first().expect("response should include a block");
1276        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1277    }
1278
1279    #[tokio::test]
1280    async fn download_single_full_block_with_access_lists() {
1281        let client = FullBlockWithAccessListsClient::default();
1282        let body = BlockBody::default();
1283        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1284        let header = sealed_header_with_access_list_hash(&bal);
1285        client.insert(header.clone(), body.clone(), bal.clone());
1286
1287        let request_count = Arc::clone(&client.access_list_requests);
1288        let client = FullBlockClient::test_client(client);
1289
1290        let received = client.get_full_block_with_access_lists(header.hash()).await;
1291        let expected_raw_bal = RawBal::from(bal);
1292
1293        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1294        assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1295        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1296    }
1297
1298    #[tokio::test]
1299    async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1300        let client = FullBlockWithAccessListsClient::default();
1301        let body = BlockBody::default();
1302        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1303        let header = sealed_header_with_access_list_hash(&bal);
1304        client.insert(header.clone(), body.clone(), bal.clone());
1305
1306        let requirement = Arc::clone(&client.last_access_list_requirement);
1307        let client = FullBlockClient::test_client(client);
1308
1309        let received = client
1310            .get_full_block_with_access_lists_with_requirement(
1311                header.hash(),
1312                BalRequirement::Mandatory,
1313            )
1314            .await;
1315
1316        let expected_raw_bal = RawBal::from(bal);
1317        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1318        assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1319        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1320    }
1321
1322    #[tokio::test]
1323    async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1324        let client = FullBlockWithAccessListsClient::default();
1325        client.set_access_list_pending_polls(1);
1326
1327        let body = BlockBody::default();
1328        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1329        let header = sealed_header_with_access_list_hash(&bal);
1330        client.insert(header.clone(), body.clone(), bal.clone());
1331
1332        let request_count = Arc::clone(&client.access_list_requests);
1333        let client = FullBlockClient::test_client(client);
1334
1335        let received =
1336            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1337                .await
1338                .expect("access list request should complete");
1339
1340        let expected_raw_bal = RawBal::from(bal);
1341        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1342        assert_eq!(received.data().as_ref(), Some(&expected_raw_bal));
1343        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1344    }
1345
1346    #[tokio::test]
1347    async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1348        let client = FullBlockWithAccessListsClient::default();
1349        let body = BlockBody::default();
1350        let expected_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1351        let wrong_bal = Bytes::from_static(&[0xc1, 0x01]);
1352        let header = sealed_header_with_access_list_hash(&expected_bal);
1353        client.insert(header.clone(), body.clone(), wrong_bal);
1354
1355        let bad_messages = Arc::clone(&client.bad_messages);
1356        let client = FullBlockClient::test_client(client);
1357
1358        let received =
1359            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1360                .await
1361                .expect("block request should complete without access lists");
1362
1363        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1364        assert!(received.data().is_none());
1365        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1366    }
1367
1368    #[tokio::test]
1369    async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1370        let client = FullBlockWithAccessListsClient::default();
1371        let body = BlockBody::default();
1372        let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1373        let header = sealed_header_with_access_list_hash(&expected_bal);
1374        client.inner.insert(header.clone(), body.clone());
1375
1376        let bad_messages = Arc::clone(&client.bad_messages);
1377        let client = FullBlockClient::test_client(client);
1378
1379        let received =
1380            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1381                .await
1382                .expect("block request should complete without access lists");
1383
1384        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1385        assert!(received.data().is_none());
1386        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1387    }
1388
1389    #[tokio::test]
1390    async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1391        let client = FullBlockWithAccessListsClient::default();
1392        let body = BlockBody::default();
1393        let expected_bal = Bytes::from_static(&[0xc1, 0x01]);
1394        let wrong_empty_bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1395        let header = sealed_header_with_access_list_hash(&expected_bal);
1396        client.insert(header.clone(), body.clone(), wrong_empty_bal);
1397
1398        let bad_messages = Arc::clone(&client.bad_messages);
1399        let client = FullBlockClient::test_client(client);
1400
1401        let received =
1402            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1403                .await
1404                .expect("block request should complete without access lists");
1405
1406        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1407        assert!(received.data().is_none());
1408        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1409    }
1410
1411    #[tokio::test]
1412    async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1413        let client = FullBlockWithAccessListsClient::default();
1414        client.empty_first_response.store(true, Ordering::SeqCst);
1415
1416        let body = BlockBody::default();
1417        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1418        let header = sealed_header_with_access_list_hash(&bal);
1419        client.insert(header.clone(), body.clone(), bal.clone());
1420
1421        let request_count = Arc::clone(&client.access_list_requests);
1422        let bad_messages = Arc::clone(&client.bad_messages);
1423        let client = FullBlockClient::test_client(client);
1424
1425        let received =
1426            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1427                .await
1428                .expect("block request should complete without access lists");
1429
1430        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1431        assert!(received.data().is_none());
1432        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1433        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1434    }
1435
1436    #[tokio::test]
1437    async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1438        let client = FullBlockWithAccessListsClient::default();
1439        client.set_access_lists_unsupported(true);
1440
1441        let body = BlockBody::default();
1442        let bal = Bytes::from_static(&[EMPTY_LIST_CODE]);
1443        let header = sealed_header_with_access_list_hash(&bal);
1444        client.insert(header.clone(), body.clone(), bal);
1445
1446        let request_count = Arc::clone(&client.access_list_requests);
1447        let requirement = Arc::clone(&client.last_access_list_requirement);
1448        let client = FullBlockClient::test_client(client);
1449
1450        let received =
1451            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1452                .await
1453                .expect("block request should complete without access lists");
1454
1455        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1456        assert!(received.data().is_none());
1457        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1458        assert_eq!(
1459            *requirement.lock(),
1460            Some(BalRequirement::Optional),
1461            "single block BAL lookup should be best-effort"
1462        );
1463    }
1464
1465    /// Inserts headers and returns the last header and block body.
1466    fn insert_headers_into_client(
1467        client: &TestFullBlockClient,
1468        range: Range<usize>,
1469    ) -> (SealedHeader, BlockBody) {
1470        let mut sealed_header: SealedHeader = SealedHeader::default();
1471        let body = BlockBody::default();
1472        for _ in range {
1473            let (mut header, hash) = sealed_header.split();
1474            // update to the next header
1475            header.parent_hash = hash;
1476            header.number += 1;
1477
1478            sealed_header = SealedHeader::seal_slow(header);
1479
1480            client.insert(sealed_header.clone(), body.clone());
1481        }
1482
1483        (sealed_header, body)
1484    }
1485
1486    #[derive(Clone, Debug)]
1487    struct FullBlockWithAccessListsClient {
1488        inner: TestFullBlockClient,
1489        access_lists: Arc<Mutex<B256Map<Bytes>>>,
1490        access_list_requests: Arc<AtomicUsize>,
1491        access_list_soft_limit: Arc<AtomicUsize>,
1492        access_list_pending_polls: Arc<AtomicUsize>,
1493        extra_access_list_entries: Arc<AtomicUsize>,
1494        unsupported_access_lists: Arc<AtomicBool>,
1495        last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1496        bad_messages: Arc<AtomicUsize>,
1497        empty_first_response: Arc<AtomicBool>,
1498    }
1499
1500    impl Default for FullBlockWithAccessListsClient {
1501        fn default() -> Self {
1502            Self {
1503                inner: TestFullBlockClient::default(),
1504                access_lists: Arc::new(Mutex::new(B256Map::default())),
1505                access_list_requests: Arc::new(AtomicUsize::new(0)),
1506                access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1507                access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1508                extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1509                unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1510                last_access_list_requirement: Arc::new(Mutex::new(None)),
1511                bad_messages: Arc::new(AtomicUsize::new(0)),
1512                empty_first_response: Arc::new(AtomicBool::new(false)),
1513            }
1514        }
1515    }
1516
1517    impl FullBlockWithAccessListsClient {
1518        fn insert(&self, header: SealedHeader, body: BlockBody, bal: Bytes) {
1519            self.inner.insert(header.clone(), body);
1520            self.access_lists.lock().insert(header.hash(), bal);
1521        }
1522
1523        fn set_access_list_soft_limit(&self, limit: usize) {
1524            self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1525        }
1526
1527        fn set_access_list_pending_polls(&self, polls: usize) {
1528            self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1529        }
1530
1531        fn set_extra_access_list_entries(&self, count: usize) {
1532            self.extra_access_list_entries.store(count, Ordering::SeqCst);
1533        }
1534
1535        fn set_access_lists_unsupported(&self, unsupported: bool) {
1536            self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1537        }
1538    }
1539
1540    /// Inserts headers with block access lists and returns the last header and block body.
1541    fn insert_headers_with_access_lists_into_client(
1542        client: &FullBlockWithAccessListsClient,
1543        range: Range<usize>,
1544    ) -> (SealedHeader, BlockBody) {
1545        let mut sealed_header: SealedHeader = SealedHeader::default();
1546        let body = BlockBody::default();
1547        for block_idx in range {
1548            let (mut header, hash) = sealed_header.split();
1549            header.parent_hash = hash;
1550            header.number += 1;
1551            let bal = Bytes::from(vec![0xc1, block_idx as u8]);
1552            header.block_access_list_hash = Some(keccak256(bal.as_ref()));
1553
1554            sealed_header = SealedHeader::seal_slow(header);
1555
1556            client.insert(sealed_header.clone(), body.clone(), bal);
1557        }
1558
1559        (sealed_header, body)
1560    }
1561
1562    impl DownloadClient for FullBlockWithAccessListsClient {
1563        fn report_bad_message(&self, peer_id: PeerId) {
1564            self.bad_messages.fetch_add(1, Ordering::SeqCst);
1565            self.inner.report_bad_message(peer_id);
1566        }
1567
1568        fn num_connected_peers(&self) -> usize {
1569            self.inner.num_connected_peers()
1570        }
1571    }
1572
1573    impl HeadersClient for FullBlockWithAccessListsClient {
1574        type Header = <TestFullBlockClient as HeadersClient>::Header;
1575        type Output = <TestFullBlockClient as HeadersClient>::Output;
1576
1577        fn get_headers_with_priority(
1578            &self,
1579            request: HeadersRequest,
1580            priority: Priority,
1581        ) -> Self::Output {
1582            self.inner.get_headers_with_priority(request, priority)
1583        }
1584    }
1585
1586    impl BodiesClient for FullBlockWithAccessListsClient {
1587        type Body = <TestFullBlockClient as BodiesClient>::Body;
1588        type Output = <TestFullBlockClient as BodiesClient>::Output;
1589
1590        fn get_block_bodies_with_priority_and_range_hint(
1591            &self,
1592            hashes: Vec<B256>,
1593            priority: Priority,
1594            range_hint: Option<RangeInclusive<u64>>,
1595        ) -> Self::Output {
1596            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1597        }
1598    }
1599
1600    struct MaybePendingAccessLists {
1601        response: Option<PeerRequestResult<BlockAccessLists>>,
1602        pending_polls: usize,
1603    }
1604
1605    impl MaybePendingAccessLists {
1606        const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1607            Self { response: Some(response), pending_polls }
1608        }
1609    }
1610
1611    impl std::future::Future for MaybePendingAccessLists {
1612        type Output = PeerRequestResult<BlockAccessLists>;
1613
1614        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1615            if self.pending_polls > 0 {
1616                self.pending_polls -= 1;
1617                cx.waker().wake_by_ref();
1618                return Poll::Pending
1619            }
1620
1621            Poll::Ready(self.response.take().expect("future polled after completion"))
1622        }
1623    }
1624
1625    impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1626        type Output = MaybePendingAccessLists;
1627
1628        fn get_block_access_lists_with_priority_and_requirement(
1629            &self,
1630            hashes: Vec<B256>,
1631            _priority: Priority,
1632            requirement: BalRequirement,
1633        ) -> Self::Output {
1634            self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1635            *self.last_access_list_requirement.lock() = Some(requirement);
1636            let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1637
1638            if self.unsupported_access_lists.load(Ordering::SeqCst) {
1639                return MaybePendingAccessLists::new(
1640                    Err(RequestError::UnsupportedCapability),
1641                    pending_polls,
1642                )
1643            }
1644
1645            if self.empty_first_response.swap(false, Ordering::SeqCst) {
1646                return MaybePendingAccessLists::new(
1647                    Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1648                    pending_polls,
1649                )
1650            }
1651
1652            let mut access_lists: Vec<_> = hashes
1653                .into_iter()
1654                .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1655                .map(|hash| self.access_lists.lock().get(&hash).cloned())
1656                .collect();
1657            for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1658                access_lists.push(None);
1659            }
1660
1661            MaybePendingAccessLists::new(
1662                Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1663                pending_polls,
1664            )
1665        }
1666    }
1667
1668    impl BlockClient for FullBlockWithAccessListsClient {
1669        type Block = reth_ethereum_primitives::Block;
1670    }
1671
1672    #[derive(Clone, Debug)]
1673    struct FailingBodiesClient {
1674        inner: TestFullBlockClient,
1675        fail_on: usize,
1676        body_requests: Arc<AtomicUsize>,
1677    }
1678
1679    impl FailingBodiesClient {
1680        fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1681            Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1682        }
1683    }
1684
1685    impl DownloadClient for FailingBodiesClient {
1686        fn report_bad_message(&self, peer_id: PeerId) {
1687            self.inner.report_bad_message(peer_id);
1688        }
1689
1690        fn num_connected_peers(&self) -> usize {
1691            self.inner.num_connected_peers()
1692        }
1693    }
1694
1695    impl HeadersClient for FailingBodiesClient {
1696        type Header = <TestFullBlockClient as HeadersClient>::Header;
1697        type Output = <TestFullBlockClient as HeadersClient>::Output;
1698
1699        fn get_headers_with_priority(
1700            &self,
1701            request: HeadersRequest,
1702            priority: Priority,
1703        ) -> Self::Output {
1704            self.inner.get_headers_with_priority(request, priority)
1705        }
1706    }
1707
1708    impl BodiesClient for FailingBodiesClient {
1709        type Body = <TestFullBlockClient as BodiesClient>::Body;
1710        type Output = <TestFullBlockClient as BodiesClient>::Output;
1711
1712        fn get_block_bodies_with_priority_and_range_hint(
1713            &self,
1714            hashes: Vec<B256>,
1715            priority: Priority,
1716            range_hint: Option<RangeInclusive<u64>>,
1717        ) -> Self::Output {
1718            let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1719            if attempt == self.fail_on {
1720                return futures::future::ready(Err(RequestError::Timeout))
1721            }
1722
1723            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1724        }
1725    }
1726
1727    impl BlockClient for FailingBodiesClient {
1728        type Block = reth_ethereum_primitives::Block;
1729    }
1730
1731    #[tokio::test]
1732    async fn download_full_block_range() {
1733        let client = TestFullBlockClient::default();
1734        let (header, body) = insert_headers_into_client(&client, 0..50);
1735        let client = FullBlockClient::test_client(client);
1736
1737        let received = client.get_full_block_range(header.hash(), 1).await;
1738        let received = received.first().expect("response should include a block");
1739        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1740
1741        let received = client.get_full_block_range(header.hash(), 10).await;
1742        assert_eq!(received.len(), 10);
1743        for (i, block) in received.iter().enumerate() {
1744            let expected_number = header.number - i as u64;
1745            assert_eq!(block.number, expected_number);
1746        }
1747    }
1748
1749    #[tokio::test]
1750    async fn download_full_block_range_over_soft_limit() {
1751        // default soft limit is 20, so we will request 50 blocks
1752        let client = TestFullBlockClient::default();
1753        let (header, body) = insert_headers_into_client(&client, 0..50);
1754        let client = FullBlockClient::test_client(client);
1755
1756        let received = client.get_full_block_range(header.hash(), 1).await;
1757        let received = received.first().expect("response should include a block");
1758        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1759
1760        let received = client.get_full_block_range(header.hash(), 50).await;
1761        assert_eq!(received.len(), 50);
1762        for (i, block) in received.iter().enumerate() {
1763            let expected_number = header.number - i as u64;
1764            assert_eq!(block.number, expected_number);
1765        }
1766    }
1767
1768    #[tokio::test]
1769    async fn download_full_block_range_retries_after_body_error() {
1770        let mut client = TestFullBlockClient::default();
1771        client.set_soft_limit(2);
1772        let (header, _) = insert_headers_into_client(&client, 0..3);
1773
1774        let client = FailingBodiesClient::new(client, 1);
1775        let body_requests = Arc::clone(&client.body_requests);
1776        let client = FullBlockClient::test_client(client);
1777
1778        let received =
1779            timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1780                .await
1781                .expect("body request retry should complete");
1782
1783        assert_eq!(received.len(), 3);
1784        assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1785    }
1786
1787    #[tokio::test]
1788    async fn download_full_block_range_with_access_lists() {
1789        let client = FullBlockWithAccessListsClient::default();
1790        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1791
1792        let access_lists = Arc::clone(&client.access_lists);
1793        let request_count = Arc::clone(&client.access_list_requests);
1794        let requirement = Arc::clone(&client.last_access_list_requirement);
1795        let client = FullBlockClient::test_client(client);
1796
1797        let response = timeout(
1798            Duration::from_secs(1),
1799            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1800        )
1801        .await
1802        .expect("range request should complete");
1803
1804        let blocks = response;
1805        assert_eq!(blocks.len(), 3);
1806        let expected = {
1807            let bals = access_lists.lock();
1808            blocks
1809                .iter()
1810                .map(|block| {
1811                    let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1812                    Some(RawBal::from(bal))
1813                })
1814                .collect::<Vec<_>>()
1815        };
1816        assert_eq!(range_access_lists(&blocks), expected);
1817        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1818        assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1819    }
1820
1821    #[tokio::test]
1822    async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1823        let client = FullBlockWithAccessListsClient::default();
1824        client.empty_first_response.store(true, Ordering::SeqCst);
1825        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1826
1827        let request_count = Arc::clone(&client.access_list_requests);
1828        let client = FullBlockClient::test_client(client);
1829
1830        let response = timeout(
1831            Duration::from_secs(1),
1832            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1833        )
1834        .await
1835        .expect("range request should complete without access lists");
1836
1837        let blocks = response;
1838        assert_eq!(blocks.len(), 3);
1839        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1840        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1841    }
1842
1843    #[tokio::test]
1844    async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1845        let client = FullBlockWithAccessListsClient::default();
1846        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1847
1848        let requirement = Arc::clone(&client.last_access_list_requirement);
1849        let client = FullBlockClient::test_client(client);
1850
1851        let blocks = timeout(
1852            Duration::from_secs(1),
1853            client.get_full_block_range_with_optional_access_lists_with_requirement(
1854                header.hash(),
1855                3,
1856                BalRequirement::Mandatory,
1857            ),
1858        )
1859        .await
1860        .expect("range request should complete");
1861
1862        assert_eq!(blocks.len(), 3);
1863        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1864    }
1865
1866    #[tokio::test]
1867    async fn download_full_block_range_with_access_lists_preserves_short_response() {
1868        let client = FullBlockWithAccessListsClient::default();
1869        client.set_access_list_soft_limit(2);
1870        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1871
1872        let access_lists = Arc::clone(&client.access_lists);
1873        let request_count = Arc::clone(&client.access_list_requests);
1874        let client = FullBlockClient::test_client(client);
1875
1876        let blocks = timeout(
1877            Duration::from_secs(1),
1878            client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1879        )
1880        .await
1881        .expect("range request should complete without access lists");
1882
1883        assert_eq!(blocks.len(), 5);
1884        let expected = {
1885            let bals = access_lists.lock();
1886            blocks
1887                .iter()
1888                .enumerate()
1889                .map(|(idx, block)| {
1890                    if idx >= 2 {
1891                        return None
1892                    }
1893
1894                    let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1895                    Some(RawBal::from(bal))
1896                })
1897                .collect::<Vec<_>>()
1898        };
1899        assert_eq!(range_access_lists(&blocks), expected);
1900        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1901    }
1902
1903    #[tokio::test]
1904    async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1905        let client = FullBlockWithAccessListsClient::default();
1906        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1907        client.access_lists.lock().remove(&header.hash());
1908
1909        let access_lists = Arc::clone(&client.access_lists);
1910        let bad_messages = Arc::clone(&client.bad_messages);
1911        let client = FullBlockClient::test_client(client);
1912
1913        let blocks = timeout(
1914            Duration::from_secs(1),
1915            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1916        )
1917        .await
1918        .expect("range request should complete");
1919
1920        assert_eq!(blocks.len(), 3);
1921        let expected = {
1922            let bals = access_lists.lock();
1923            blocks
1924                .iter()
1925                .map(|block| {
1926                    if block.block().hash() == header.hash() {
1927                        return None
1928                    }
1929
1930                    let bal = bals.get(&block.block().hash()).cloned().expect("access list exists");
1931                    Some(RawBal::from(bal))
1932                })
1933                .collect::<Vec<_>>()
1934        };
1935        assert_eq!(range_access_lists(&blocks), expected);
1936        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1937    }
1938
1939    #[tokio::test]
1940    async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1941        let client = FullBlockWithAccessListsClient::default();
1942        client.set_access_lists_unsupported(true);
1943        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1944
1945        let request_count = Arc::clone(&client.access_list_requests);
1946        let client = FullBlockClient::test_client(client);
1947
1948        let blocks = timeout(
1949            Duration::from_secs(1),
1950            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1951        )
1952        .await
1953        .expect("range request should complete without access lists");
1954
1955        assert_eq!(blocks.len(), 3);
1956        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1957        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1958    }
1959
1960    #[tokio::test]
1961    async fn download_full_block_range_with_access_lists_ignores_long_response() {
1962        let client = FullBlockWithAccessListsClient::default();
1963        client.set_extra_access_list_entries(1);
1964        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1965
1966        let request_count = Arc::clone(&client.access_list_requests);
1967        let bad_messages = Arc::clone(&client.bad_messages);
1968        let client = FullBlockClient::test_client(client);
1969
1970        let blocks = timeout(
1971            Duration::from_secs(1),
1972            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1973        )
1974        .await
1975        .expect("range request should complete without access lists");
1976
1977        assert_eq!(blocks.len(), 3);
1978        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1979        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1980        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1981    }
1982
1983    #[tokio::test]
1984    async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
1985        let client = FullBlockWithAccessListsClient::default();
1986        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1987        client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
1988
1989        let bad_messages = Arc::clone(&client.bad_messages);
1990        let client = FullBlockClient::test_client(client);
1991
1992        let blocks = timeout(
1993            Duration::from_secs(1),
1994            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1995        )
1996        .await
1997        .expect("range request should complete without access lists");
1998
1999        assert_eq!(blocks.len(), 3);
2000        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2001        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2002    }
2003
2004    #[tokio::test]
2005    async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
2006        let client = FullBlockWithAccessListsClient::default();
2007        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2008        let first_bal =
2009            client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
2010        let second_hash = header.parent_hash;
2011        client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
2012
2013        let bad_messages = Arc::clone(&client.bad_messages);
2014        let client = FullBlockClient::test_client(client);
2015
2016        let blocks = timeout(
2017            Duration::from_secs(1),
2018            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2019        )
2020        .await
2021        .expect("range request should complete without unvalidated access lists");
2022
2023        assert_eq!(blocks.len(), 3);
2024        assert_eq!(blocks[1].block().hash(), second_hash);
2025        assert_eq!(range_access_lists(&blocks), vec![Some(RawBal::from(first_bal)), None, None]);
2026        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2027    }
2028
2029    #[tokio::test]
2030    async fn download_full_block_range_with_invalid_header() {
2031        let client = TestFullBlockClient::default();
2032        let range_length: usize = 3;
2033        let (header, _) = insert_headers_into_client(&client, 0..range_length);
2034
2035        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2036        test_consensus.set_fail_validation(true);
2037        test_consensus.set_fail_body_against_header(false);
2038        let client = FullBlockClient::new(client, Arc::new(test_consensus));
2039
2040        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2041
2042        assert_eq!(received.len(), range_length);
2043        for (i, block) in received.iter().enumerate() {
2044            let expected_number = header.number - i as u64;
2045            assert_eq!(block.number, expected_number);
2046        }
2047    }
2048}