Skip to main content

reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4    bodies::client::{BodiesClient, SingleBodyRequest},
5    download::DownloadClient,
6    error::PeerRequestResult,
7    headers::client::{HeadersClient, SingleHeaderRequest},
8    priority::Priority,
9    BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_primitives::{keccak256, Bytes, Sealable, Sealed, B256};
13use core::marker::PhantomData;
14use futures::FutureExt;
15use reth_consensus::Consensus;
16use reth_eth_wire_types::{
17    BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
18};
19use reth_network_peers::{PeerId, WithPeerId};
20use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
21use std::{
22    cmp::Reverse,
23    collections::{HashMap, VecDeque},
24    fmt::Debug,
25    hash::Hash,
26    ops::RangeInclusive,
27    pin::Pin,
28    sync::Arc,
29    task::{ready, Context, Poll},
30};
31use tracing::{debug, trace};
32
33/// A sealed block with associated data.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct SealedBlockWith<B: Block, T = Option<Sealed<Bytes>>> {
36    block: SealedBlock<B>,
37    data: T,
38}
39
40/// Raw block access-list RLP bytes sealed by `header.block_access_list_hash`.
41pub type SealedBlockAccessList = Sealed<Bytes>;
42
43/// A sealed block with optional validated block access-list data.
44pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<SealedBlockAccessList>>;
45
46impl<B: Block, T> SealedBlockWith<B, T> {
47    /// Creates a sealed block with associated data.
48    pub const fn new(block: SealedBlock<B>, data: T) -> Self {
49        Self { block, data }
50    }
51
52    /// Returns the sealed block.
53    pub const fn block(&self) -> &SealedBlock<B> {
54        &self.block
55    }
56
57    /// Returns the associated data.
58    pub const fn data(&self) -> &T {
59        &self.data
60    }
61
62    /// Consumes the wrapper and returns its parts.
63    pub fn into_parts(self) -> (SealedBlock<B>, T) {
64        (self.block, self.data)
65    }
66}
67
68impl<B: Block> SealedBlockWithAccessList<B> {
69    /// Creates a full block response without block access-list data.
70    pub const fn from_block(block: SealedBlock<B>) -> Self {
71        Self::new(block, None)
72    }
73
74    /// Returns the optional raw block access-list data, sealed by its hash.
75    pub const fn access_list(&self) -> Option<&SealedBlockAccessList> {
76        self.data.as_ref()
77    }
78
79    /// Returns the optional raw block access-list data, sealed by its hash.
80    pub const fn access_lists(&self) -> Option<&SealedBlockAccessList> {
81        self.data.as_ref()
82    }
83}
84
85impl<B: Block> From<SealedBlock<B>> for SealedBlockWith<B> {
86    fn from(block: SealedBlock<B>) -> Self {
87        Self::new(block, None)
88    }
89}
90
91/// A Client that can fetch full blocks from the network.
92#[derive(Debug, Clone)]
93pub struct FullBlockClient<Client>
94where
95    Client: BlockClient,
96{
97    client: Client,
98    consensus: Arc<dyn Consensus<Client::Block>>,
99}
100
101impl<Client> FullBlockClient<Client>
102where
103    Client: BlockClient,
104{
105    /// Creates a new instance of `FullBlockClient`.
106    pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
107        Self { client, consensus }
108    }
109
110    /// Returns a client with Test consensus
111    #[cfg(any(test, feature = "test-utils"))]
112    pub fn test_client(client: Client) -> Self {
113        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
114    }
115}
116
117impl<Client> FullBlockClient<Client>
118where
119    Client: BlockClient,
120{
121    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
122    ///
123    /// Note: this future is cancel safe
124    ///
125    /// Caution: This does no validation of body (transactions) response but guarantees that the
126    /// [`SealedHeader`] matches the requested hash.
127    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
128        FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
129    }
130
131    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
132    ///
133    /// Note: this future is cancel safe.
134    ///
135    /// Caution: This does no validation of body (transactions) responses but guarantees that
136    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
137    /// bodies received matches the requested limit.
138    ///
139    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
140    pub fn get_full_block_range(
141        &self,
142        hash: B256,
143        count: u64,
144    ) -> FetchFullBlockRangeFuture<Client> {
145        let client = self.client.clone();
146        FetchFullBlockRangeFuture {
147            start_hash: hash,
148            count,
149            request: FullBlockRangeRequest {
150                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
151                bodies: None,
152            },
153            client,
154            headers: None,
155            pending_headers: VecDeque::new(),
156            bodies: HashMap::default(),
157            consensus: Arc::clone(&self.consensus),
158        }
159    }
160}
161
162impl<Client> FullBlockClient<Client>
163where
164    Client: BlockClient + BlockAccessListsClient,
165{
166    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
167    /// for the given hash.
168    ///
169    /// Note: this future is cancel safe
170    ///
171    /// Caution: This does no validation of body (transactions) response but guarantees that the
172    /// [`SealedHeader`] matches the requested hash.
173    pub fn get_full_block_with_access_lists(
174        &self,
175        hash: B256,
176    ) -> FetchFullBlockWithBalFuture<Client> {
177        self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
178    }
179
180    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
181    /// for the given hash using the requested BAL availability policy.
182    ///
183    /// Note: this future is cancel safe
184    ///
185    /// Caution: This does no validation of body (transactions) response but guarantees that the
186    /// [`SealedHeader`] matches the requested hash.
187    pub fn get_full_block_with_access_lists_with_requirement(
188        &self,
189        hash: B256,
190        requirement: BalRequirement,
191    ) -> FetchFullBlockWithBalFuture<Client> {
192        let client = self.client.clone();
193        FetchFullBlockWithBalFuture {
194            block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
195            block_result: None,
196            bal_request_state: BalRequestState::Pending(
197                client.get_block_access_lists_with_requirement(vec![hash], requirement),
198            ),
199        }
200    }
201
202    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
203    /// the given hash and count.
204    ///
205    /// The block range is always the primary result. Access lists are requested after the block
206    /// range is downloaded. Each block contains access-list data when the optional BAL response
207    /// included the corresponding entry.
208    pub fn get_full_block_range_with_optional_access_lists(
209        &self,
210        hash: B256,
211        count: u64,
212    ) -> FetchFullBlockRangeWithBalFuture<Client> {
213        self.get_full_block_range_with_optional_access_lists_with_requirement(
214            hash,
215            count,
216            BalRequirement::default(),
217        )
218    }
219
220    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
221    /// the given hash and count using the requested BAL availability policy.
222    ///
223    /// The block range is always the primary result. Access lists are requested after the block
224    /// range is downloaded. Each block contains access-list data when the optional BAL response
225    /// included the corresponding entry.
226    pub fn get_full_block_range_with_optional_access_lists_with_requirement(
227        &self,
228        hash: B256,
229        count: u64,
230        requirement: BalRequirement,
231    ) -> FetchFullBlockRangeWithBalFuture<Client> {
232        let client = self.client.clone();
233        FetchFullBlockRangeWithBalFuture {
234            blocks: self.get_full_block_range(hash, count),
235            client,
236            block_result: None,
237            access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
238        }
239    }
240}
241
242/// A future that downloads a full block from the network.
243///
244/// This will attempt to fetch both the header and body for the given block hash at the same time.
245/// When both requests succeed, the future will yield the full block.
246#[must_use = "futures do nothing unless polled"]
247pub struct FetchFullBlockFuture<Client>
248where
249    Client: BlockClient,
250{
251    client: Client,
252    consensus: Arc<dyn Consensus<Client::Block>>,
253    hash: B256,
254    request: FullBlockRequest<Client>,
255    header: Option<SealedHeader<Client::Header>>,
256    body: Option<BodyResponse<Client::Body>>,
257}
258
259impl<Client> FetchFullBlockFuture<Client>
260where
261    Client: BlockClient,
262{
263    fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
264        Self {
265            hash,
266            consensus,
267            request: FullBlockRequest {
268                header: Some(client.get_header(hash.into())),
269                body: Some(client.get_block_body(hash)),
270            },
271            client,
272            header: None,
273            body: None,
274        }
275    }
276
277    /// Returns the hash of the block being requested.
278    pub const fn hash(&self) -> &B256 {
279        &self.hash
280    }
281
282    /// If the header request is already complete, this returns the block number
283    pub fn block_number(&self) -> Option<u64> {
284        self.header.as_ref().map(|h| h.number())
285    }
286
287    /// Returns the [`SealedBlock`] if the request is complete and valid.
288    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
289        if self.header.is_none() || self.body.is_none() {
290            return None
291        }
292
293        let header = self.header.take().unwrap();
294        let resp = self.body.take().unwrap();
295        match resp {
296            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
297            BodyResponse::PendingValidation(resp) => {
298                // ensure the block is valid, else retry
299                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
300                {
301                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
302                    self.client.report_bad_message(resp.peer_id());
303                    self.header = Some(header);
304                    self.request.body = Some(self.client.get_block_body(self.hash));
305                    return None
306                }
307                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
308            }
309        }
310    }
311
312    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
313        if let Some(ref header) = self.header {
314            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
315                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
316                self.client.report_bad_message(resp.peer_id());
317                return
318            }
319            self.body = Some(BodyResponse::Validated(resp.into_data()));
320            return
321        }
322        self.body = Some(BodyResponse::PendingValidation(resp));
323    }
324}
325
326impl<Client> Future for FetchFullBlockFuture<Client>
327where
328    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
329{
330    type Output = SealedBlock<Client::Block>;
331
332    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
333        let this = self.get_mut();
334
335        // preemptive yield point
336        let mut budget = 4;
337
338        loop {
339            match ready!(this.request.poll(cx)) {
340                ResponseResult::Header(res) => {
341                    match res {
342                        Ok(maybe_header) => {
343                            let (peer, maybe_header) =
344                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
345                            if let Some(header) = maybe_header {
346                                if header.hash() == this.hash {
347                                    this.header = Some(header);
348                                } else {
349                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
350                                    // received a different header than requested
351                                    this.client.report_bad_message(peer)
352                                }
353                            }
354                        }
355                        Err(err) => {
356                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
357                        }
358                    }
359
360                    if this.header.is_none() {
361                        // received bad response
362                        this.request.header = Some(this.client.get_header(this.hash.into()));
363                    }
364                }
365                ResponseResult::Body(res) => {
366                    match res {
367                        Ok(maybe_body) => {
368                            if let Some(body) = maybe_body.transpose() {
369                                this.on_block_response(body);
370                            }
371                        }
372                        Err(err) => {
373                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
374                        }
375                    }
376                    if this.body.is_none() {
377                        // received bad response
378                        this.request.body = Some(this.client.get_block_body(this.hash));
379                    }
380                }
381            }
382
383            if let Some(res) = this.take_block() {
384                return Poll::Ready(res)
385            }
386
387            // ensure we still have enough budget for another iteration
388            budget -= 1;
389            if budget == 0 {
390                // make sure we're woken up again
391                cx.waker().wake_by_ref();
392                return Poll::Pending
393            }
394        }
395    }
396}
397
398/// A future that downloads a full block and optionally its block access lists from the network.
399///
400/// This composes the existing full block downloader with a block access list request so the
401/// header/body logic stays centralized. Missing access lists do not block returning the full block.
402#[must_use = "futures do nothing unless polled"]
403pub struct FetchFullBlockWithBalFuture<Client>
404where
405    Client: BlockClient + BlockAccessListsClient,
406{
407    block: FetchFullBlockFuture<Client>,
408    block_result: Option<SealedBlock<Client::Block>>,
409    bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
410}
411
412impl<Client> FetchFullBlockWithBalFuture<Client>
413where
414    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
415{
416    /// Returns the hash of the block being requested.
417    pub const fn hash(&self) -> &B256 {
418        self.block.hash()
419    }
420}
421
422impl<Client> FetchFullBlockWithBalFuture<Client>
423where
424    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
425{
426    /// If the header request is already complete, this returns the block number.
427    pub fn block_number(&self) -> Option<u64> {
428        self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
429    }
430
431    /// Polls the optional BAL request once and records its final best-effort result.
432    ///
433    /// Exactly one returned BAL entry is preserved. Empty responses, malformed responses, and
434    /// request errors resolve to `None` instead of being retried, so a BAL failure cannot block
435    /// returning the downloaded block.
436    fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
437        let res = match &mut self.bal_request_state {
438            BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
439            BalRequestState::Ready(_) => return Poll::Ready(()),
440        };
441
442        match res {
443            Ok(bal) => {
444                let (peer, access_lists) = bal.split();
445                match access_lists.0.len() {
446                    0 => self.bal_request_state = BalRequestState::Ready(None),
447                    1 => {
448                        let access_list = access_lists.0.into_iter().next().expect("len checked");
449                        self.bal_request_state =
450                            BalRequestState::Ready(Some(WithPeerId::new(peer, access_list)));
451                    }
452                    received => {
453                        debug!(
454                            target: "downloaders",
455                            hash = ?self.block.hash(),
456                            expected = 1,
457                            received,
458                            "Received wrong access list response",
459                        );
460                        self.block.client.report_bad_message(peer);
461                        self.bal_request_state = BalRequestState::Ready(None);
462                    }
463                }
464            }
465            Err(err) => {
466                debug!(
467                    target: "downloaders",
468                    %err,
469                    hash = ?self.block.hash(),
470                    "Access list download failed",
471                );
472                self.bal_request_state = BalRequestState::Ready(None);
473            }
474        }
475
476        Poll::Ready(())
477    }
478
479    /// Returns the block once the block download and optional BAL lookup have both completed.
480    ///
481    /// The BAL lookup must be ready even when it resolved to `None`, which prevents a fast block
482    /// response from racing a still-pending BAL response and incorrectly dropping available BAL
483    /// data.
484    fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
485        let BalRequestState::Ready(access_list) = &mut self.bal_request_state else { return None };
486        let block = self.block_result.take()?;
487        let access_list = access_list.take().and_then(|access_list| {
488            match seal_block_access_list_for_block(&block, access_list) {
489                Ok(access_list) => access_list,
490                Err(peer) => {
491                    self.block.client.report_bad_message(peer);
492                    None
493                }
494            }
495        });
496        Some(SealedBlockWith::new(block, access_list))
497    }
498}
499
500impl<Client> Future for FetchFullBlockWithBalFuture<Client>
501where
502    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
503{
504    type Output = SealedBlockWithAccessList<Client::Block>;
505
506    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
507        let this = self.get_mut();
508
509        if this.block_result.is_none() &&
510            let Poll::Ready(block) = this.block.poll_unpin(cx)
511        {
512            this.block_result = Some(block);
513        }
514
515        ready!(this.poll_bal_request(cx));
516
517        if let Some(res) = this.take_block_and_access_lists() {
518            return Poll::Ready(res)
519        }
520
521        Poll::Pending
522    }
523}
524
525impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
526where
527    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
528{
529    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
530        f.debug_struct("FetchFullBlockWithBalFuture")
531            .field("hash", &self.block.hash())
532            .field("block_ready", &self.block_result.is_some())
533            .field("bal_request_ready", &self.bal_request_state.is_ready())
534            .finish()
535    }
536}
537
538/// Tracks the BAL request and its completed result.
539enum BalRequestState<Req> {
540    Pending(Req),
541    Ready(Option<WithPeerId<Option<Bytes>>>),
542}
543
544impl<Req> BalRequestState<Req> {
545    const fn is_ready(&self) -> bool {
546        matches!(self, Self::Ready(_))
547    }
548}
549
550/// A future that downloads a range of full blocks and optionally their block access lists from the
551/// network.
552///
553/// This composes the existing full block range downloader with a follow-up optional block
554/// access-list request, so callers that only need blocks can keep using
555/// [`FetchFullBlockRangeFuture`].
556#[must_use = "futures do nothing unless polled"]
557#[expect(missing_debug_implementations)]
558pub struct FetchFullBlockRangeWithBalFuture<Client>
559where
560    Client: BlockClient + BlockAccessListsClient,
561{
562    blocks: FetchFullBlockRangeFuture<Client>,
563    client: Client,
564    block_result: Option<Vec<SealedBlock<Client::Block>>>,
565    access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
566}
567
568impl<Client> FetchFullBlockRangeWithBalFuture<Client>
569where
570    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
571        + BlockAccessListsClient,
572{
573    fn start_access_lists_request_if_possible(&mut self) {
574        let requirement = match &self.access_lists {
575            OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
576            OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
577                return
578            }
579        };
580
581        // BALs are requested by block hash, so wait until the block range is fully assembled.
582        let Some(blocks) = self.block_result.as_ref() else { return };
583        let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
584        self.access_lists = OptionalBlockAccessListsState::Pending(
585            self.client.get_block_access_lists_with_requirement(hashes, requirement),
586        );
587    }
588
589    /// Starts and polls the optional BAL request once, if it is ready to make progress.
590    fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
591        self.start_access_lists_request_if_possible();
592
593        let poll = match &mut self.access_lists {
594            OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
595            OptionalBlockAccessListsState::WaitingForBlocks { .. } |
596            OptionalBlockAccessListsState::Ready(_) => return,
597        };
598
599        match poll {
600            Poll::Pending => {}
601            Poll::Ready(Ok(access_lists)) => {
602                self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
603            }
604            Poll::Ready(Err(err)) => {
605                debug!(
606                    target: "downloaders",
607                    %err,
608                    start_hash = ?self.blocks.start_hash(),
609                    "Access list range download failed",
610                );
611
612                // Optional BAL lookup is best-effort: missing eth/71 support or request failures
613                // should not block returning the downloaded block range.
614                self.access_lists = OptionalBlockAccessListsState::Ready(None);
615            }
616        }
617    }
618
619    /// Returns the block range once blocks and the optional BAL lookup are both complete.
620    fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
621        let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
622            return None
623        };
624
625        let blocks = self.block_result.take()?;
626
627        Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
628    }
629}
630
631impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
632where
633    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
634        + BlockAccessListsClient
635        + 'static,
636{
637    type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
638
639    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
640        let this = self.get_mut();
641
642        // Complete the normal block range first, then issue a separate BAL hash-list request.
643        if this.block_result.is_none() &&
644            let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
645        {
646            this.block_result = Some(blocks);
647        }
648
649        this.poll_access_lists(cx);
650
651        if let Some(response) = this.take_response() {
652            return Poll::Ready(response)
653        }
654
655        Poll::Pending
656    }
657}
658
659/// Tracks an optional BAL range request and its completed result.
660enum OptionalBlockAccessListsState<Req> {
661    /// The block hashes needed for `GetBlockAccessLists` are not known yet.
662    WaitingForBlocks {
663        /// The BAL availability policy for the eventual request.
664        requirement: BalRequirement,
665    },
666    /// A `GetBlockAccessLists` request is in flight.
667    Pending(Req),
668    /// `None` means the block range is available but optional BAL data is not.
669    Ready(Option<WithPeerId<BlockAccessLists>>),
670}
671
672impl<Client> Debug for FetchFullBlockFuture<Client>
673where
674    Client: BlockClient<Header: Debug, Body: Debug>,
675{
676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677        f.debug_struct("FetchFullBlockFuture")
678            .field("hash", &self.hash)
679            .field("header", &self.header)
680            .field("body", &self.body)
681            .finish()
682    }
683}
684
685struct FullBlockRequest<Client>
686where
687    Client: BlockClient,
688{
689    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
690    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
691}
692
693impl<Client> FullBlockRequest<Client>
694where
695    Client: BlockClient,
696{
697    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
698        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
699            let Poll::Ready(res) = fut.poll(cx)
700        {
701            self.header = None;
702            return Poll::Ready(ResponseResult::Header(res))
703        }
704
705        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
706            let Poll::Ready(res) = fut.poll(cx)
707        {
708            self.body = None;
709            return Poll::Ready(ResponseResult::Body(res))
710        }
711
712        Poll::Pending
713    }
714}
715
716/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
717/// future.
718enum ResponseResult<H, B> {
719    Header(PeerRequestResult<Option<H>>),
720    Body(PeerRequestResult<Option<B>>),
721}
722
723/// The response of a body request.
724#[derive(Debug)]
725enum BodyResponse<B> {
726    /// Already validated against transaction root of header
727    Validated(B),
728    /// Still needs to be validated against header
729    PendingValidation(WithPeerId<B>),
730}
731/// A future that downloads a range of full blocks from the network.
732///
733/// This first fetches the headers for the given range using the inner `Client`. Once the request
734/// is complete, it will fetch the bodies for the headers it received.
735///
736/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
737/// yield the full block range.
738///
739/// The full block range will be returned with falling block numbers, i.e. in descending order.
740///
741/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
742/// hash array used to request them.
743#[must_use = "futures do nothing unless polled"]
744#[expect(missing_debug_implementations)]
745pub struct FetchFullBlockRangeFuture<Client>
746where
747    Client: BlockClient,
748{
749    /// The client used to fetch headers and bodies.
750    client: Client,
751    /// The consensus instance used to validate the blocks.
752    consensus: Arc<dyn Consensus<Client::Block>>,
753    /// The block hash to start fetching from (inclusive).
754    start_hash: B256,
755    /// How many blocks to fetch: `len([start_hash, ..]) == count`
756    count: u64,
757    /// Requests for headers and bodies that are in progress.
758    request: FullBlockRangeRequest<Client>,
759    /// Fetched headers.
760    headers: Option<Vec<SealedHeader<Client::Header>>>,
761    /// The next headers to request bodies for. This is drained as responses are received.
762    pending_headers: VecDeque<SealedHeader<Client::Header>>,
763    /// The bodies that have been received so far.
764    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
765}
766
767impl<Client> FetchFullBlockRangeFuture<Client>
768where
769    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
770{
771    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
772    fn is_bodies_complete(&self) -> bool {
773        self.bodies.len() == self.count as usize
774    }
775
776    /// Inserts a block body, matching it with the `next_header`.
777    ///
778    /// Note: this assumes the response matches the next header in the queue.
779    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
780        if let Some(header) = self.pending_headers.pop_front() {
781            self.bodies.insert(header, body_response);
782        }
783    }
784
785    /// Inserts multiple block bodies.
786    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
787        for body in bodies {
788            self.insert_body(body);
789        }
790    }
791
792    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
793    /// in the `root_map`.
794    fn remaining_bodies_hashes(&self) -> Vec<B256> {
795        self.pending_headers.iter().map(|h| h.hash()).collect()
796    }
797
798    /// Returns the [`SealedBlock`]s if the request is complete and valid.
799    ///
800    /// The request is complete if the number of blocks requested is equal to the number of blocks
801    /// received. The request is valid if the returned bodies match the roots in the headers.
802    ///
803    /// These are returned in falling order starting with the requested `hash`, i.e. with
804    /// descending block numbers.
805    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
806        if !self.is_bodies_complete() {
807            // not done with bodies yet
808            return None
809        }
810
811        let headers = self.headers.take()?;
812        let mut needs_retry = false;
813        let mut valid_responses = Vec::new();
814
815        for header in &headers {
816            if let Some(body_resp) = self.bodies.remove(header) {
817                // validate body w.r.t. the hashes in the header, only inserting into the response
818                let body = match body_resp {
819                    BodyResponse::Validated(body) => body,
820                    BodyResponse::PendingValidation(resp) => {
821                        // ensure the block is valid, else retry
822                        if let Err(err) =
823                            self.consensus.validate_body_against_header(resp.data(), header)
824                        {
825                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
826                            self.client.report_bad_message(resp.peer_id());
827
828                            // get body that doesn't match, put back into vecdeque, and retry it
829                            self.pending_headers.push_back(header.clone());
830                            needs_retry = true;
831                            continue
832                        }
833
834                        resp.into_data()
835                    }
836                };
837
838                valid_responses
839                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
840            }
841        }
842
843        if needs_retry {
844            // put response hashes back into bodies map since we aren't returning them as a
845            // response
846            for block in valid_responses {
847                let (header, body) = block.split_sealed_header_body();
848                self.bodies.insert(header, BodyResponse::Validated(body));
849            }
850
851            // put headers back since they were `take`n before
852            self.headers = Some(headers);
853
854            // create response for failing bodies
855            let hashes = self.remaining_bodies_hashes();
856            self.request.bodies = Some(self.client.get_block_bodies(hashes));
857            return None
858        }
859
860        Some(valid_responses)
861    }
862
863    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
864        let (peer, mut headers_falling) =
865            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
866
867        // fill in the response if it's the correct length
868        if headers_falling.len() == self.count as usize {
869            // sort headers from highest to lowest block number
870            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
871
872            // check the starting hash
873            if headers_falling[0].hash() == self.start_hash {
874                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
875                // check if the downloaded headers are valid
876                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
877                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
878                    self.client.report_bad_message(peer);
879                }
880
881                // get the bodies request so it can be polled later
882                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
883
884                // populate the pending headers
885                self.pending_headers = headers_falling.clone().into();
886
887                // set the actual request if it hasn't been started yet
888                if !self.has_bodies_request_started() {
889                    // request the bodies for the downloaded headers
890                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
891                }
892
893                // set the headers response
894                self.headers = Some(headers_falling);
895            } else {
896                // received a different header than requested
897                self.client.report_bad_message(peer);
898            }
899        }
900    }
901
902    /// Returns whether or not a bodies request has been started, returning false if there is no
903    /// pending request.
904    const fn has_bodies_request_started(&self) -> bool {
905        self.request.bodies.is_some()
906    }
907
908    /// Returns the start hash for the request
909    pub const fn start_hash(&self) -> B256 {
910        self.start_hash
911    }
912
913    /// Returns the block count for the request
914    pub const fn count(&self) -> u64 {
915        self.count
916    }
917}
918
919impl<Client> Future for FetchFullBlockRangeFuture<Client>
920where
921    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
922{
923    type Output = Vec<SealedBlock<Client::Block>>;
924
925    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
926        let this = self.get_mut();
927
928        loop {
929            match ready!(this.request.poll(cx)) {
930                // This branch handles headers responses from peers - it first ensures that the
931                // starting hash and number of headers matches what we requested.
932                //
933                // If these don't match, we penalize the peer and retry the request.
934                // If they do match, we sort the headers by block number and start the request for
935                // the corresponding block bodies.
936                //
937                // The next result that should be yielded by `poll` is the bodies response.
938                RangeResponseResult::Header(res) => {
939                    match res {
940                        Ok(headers) => {
941                            this.on_headers_response(headers);
942                        }
943                        Err(err) => {
944                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
945                        }
946                    }
947
948                    if this.headers.is_none() {
949                        // did not receive a correct response yet, retry
950                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
951                            start: this.start_hash.into(),
952                            limit: this.count,
953                            direction: HeadersDirection::Falling,
954                        }));
955                    }
956                }
957                // This branch handles block body responses from peers - it first inserts the
958                // bodies into the `bodies` map, and then checks if the request is complete.
959                //
960                // If the request is not complete, and we need to request more bodies, we send
961                // a bodies request for the headers we don't yet have bodies for.
962                RangeResponseResult::Body(res) => {
963                    match res {
964                        Ok(bodies_resp) => {
965                            let (peer, new_bodies) = bodies_resp.split();
966
967                            // first insert the received bodies
968                            this.insert_bodies(
969                                new_bodies
970                                    .into_iter()
971                                    .map(|resp| WithPeerId::new(peer, resp))
972                                    .map(BodyResponse::PendingValidation),
973                            );
974
975                            if !this.is_bodies_complete() {
976                                // get remaining hashes so we can send the next request
977                                let req_hashes = this.remaining_bodies_hashes();
978
979                                // set a new request
980                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
981                            }
982                        }
983                        Err(err) => {
984                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
985                        }
986                    }
987                    if this.request.bodies.is_none() && !this.is_bodies_complete() {
988                        // no pending bodies request (e.g., request error), retry remaining bodies
989                        // TODO: convert this into two futures, one which is a headers range
990                        // future, and one which is a bodies range future.
991                        //
992                        // The headers range future should yield the bodies range future.
993                        // The bodies range future should not have an Option<Vec<B256>>, it should
994                        // have a populated Vec<B256> from the successful headers range future.
995                        //
996                        // This is optimal because we can not send a bodies request without
997                        // first completing the headers request. This way we can get rid of the
998                        // following `if let Some`. A bodies request should never be sent before
999                        // the headers request completes, so this should always be `Some` anyways.
1000                        let hashes = this.remaining_bodies_hashes();
1001                        if !hashes.is_empty() {
1002                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
1003                        }
1004                    }
1005                }
1006            }
1007
1008            if let Some(res) = this.take_blocks() {
1009                return Poll::Ready(res)
1010            }
1011        }
1012    }
1013}
1014
1015/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
1016/// futures until they return responses. It will return either the header or body result, depending
1017/// on which future successfully returned.
1018struct FullBlockRangeRequest<Client>
1019where
1020    Client: BlockClient,
1021{
1022    headers: Option<<Client as HeadersClient>::Output>,
1023    bodies: Option<<Client as BodiesClient>::Output>,
1024}
1025
1026impl<Client> FullBlockRangeRequest<Client>
1027where
1028    Client: BlockClient,
1029{
1030    fn poll(
1031        &mut self,
1032        cx: &mut Context<'_>,
1033    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
1034        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
1035            let Poll::Ready(res) = fut.poll(cx)
1036        {
1037            self.headers = None;
1038            return Poll::Ready(RangeResponseResult::Header(res))
1039        }
1040
1041        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
1042            let Poll::Ready(res) = fut.poll(cx)
1043        {
1044            self.bodies = None;
1045            return Poll::Ready(RangeResponseResult::Body(res))
1046        }
1047
1048        Poll::Pending
1049    }
1050}
1051
1052// The result of a request for headers or block bodies. This is yielded by the
1053// `FullBlockRangeRequest` future.
1054enum RangeResponseResult<H, B> {
1055    Header(PeerRequestResult<Vec<H>>),
1056    Body(PeerRequestResult<Vec<B>>),
1057}
1058
1059/// A headers+bodies client implementation that does nothing.
1060#[derive(Debug, Clone)]
1061#[non_exhaustive]
1062pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1063
1064/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
1065impl<Net> DownloadClient for NoopFullBlockClient<Net>
1066where
1067    Net: Debug + Send + Sync,
1068{
1069    /// Reports a bad message received from a peer.
1070    ///
1071    /// # Arguments
1072    ///
1073    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
1074    ///   implementation).
1075    fn report_bad_message(&self, _peer_id: PeerId) {}
1076
1077    /// Retrieves the number of connected peers.
1078    ///
1079    /// # Returns
1080    ///
1081    /// The number of connected peers, which is always zero in this implementation.
1082    fn num_connected_peers(&self) -> usize {
1083        0
1084    }
1085}
1086
1087/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
1088impl<Net> BodiesClient for NoopFullBlockClient<Net>
1089where
1090    Net: NetworkPrimitives,
1091{
1092    type Body = Net::BlockBody;
1093    /// Defines the output type of the function.
1094    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1095
1096    /// Retrieves block bodies based on provided hashes and priority.
1097    ///
1098    /// # Arguments
1099    ///
1100    /// * `_hashes` - A vector of block hashes (unused in this implementation).
1101    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
1102    ///
1103    /// # Returns
1104    ///
1105    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
1106    fn get_block_bodies_with_priority_and_range_hint(
1107        &self,
1108        _hashes: Vec<B256>,
1109        _priority: Priority,
1110        _range_hint: Option<RangeInclusive<u64>>,
1111    ) -> Self::Output {
1112        // Create a future that immediately returns an empty vector of block bodies and a random
1113        // PeerId.
1114        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1115    }
1116}
1117
1118impl<Net> HeadersClient for NoopFullBlockClient<Net>
1119where
1120    Net: NetworkPrimitives,
1121{
1122    type Header = Net::BlockHeader;
1123    /// The output type representing a future containing a peer request result with a vector of
1124    /// headers.
1125    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1126
1127    /// Retrieves headers with a specified priority level.
1128    ///
1129    /// This implementation does nothing and returns an empty vector of headers.
1130    ///
1131    /// # Arguments
1132    ///
1133    /// * `_request` - A request for headers (unused in this implementation).
1134    /// * `_priority` - The priority level for the headers request (unused in this implementation).
1135    ///
1136    /// # Returns
1137    ///
1138    /// Always returns a ready future with an empty vector of headers wrapped in a
1139    /// `PeerRequestResult`.
1140    fn get_headers_with_priority(
1141        &self,
1142        _request: HeadersRequest,
1143        _priority: Priority,
1144    ) -> Self::Output {
1145        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1146    }
1147}
1148
1149impl<Net> BlockClient for NoopFullBlockClient<Net>
1150where
1151    Net: NetworkPrimitives,
1152{
1153    type Block = Net::Block;
1154}
1155
1156impl<Net> Default for NoopFullBlockClient<Net> {
1157    fn default() -> Self {
1158        Self(PhantomData::<Net>)
1159    }
1160}
1161
1162/// Validates one raw block access-list entry against the block's access-list hash.
1163///
1164/// Returns `Ok(Some(_))` for a matching entry, `Ok(None)` when the block has no access-list hash
1165/// or the peer returned an unavailable entry, and `Err(peer)` for a hash mismatch that should be
1166/// reported as a bad message.
1167fn seal_block_access_list_for_block<B: Block>(
1168    block: &SealedBlock<B>,
1169    access_list: WithPeerId<Option<Bytes>>,
1170) -> Result<Option<Sealed<Bytes>>, PeerId> {
1171    let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1172
1173    let (peer, access_list) = access_list.split();
1174    let Some(access_list) = access_list else { return Ok(None) };
1175    let computed = keccak256(access_list.as_ref());
1176    if computed == expected {
1177        return Ok(Some(Sealed::new_unchecked(access_list, expected)))
1178    }
1179
1180    debug!(
1181        target: "downloaders",
1182        block_hash = ?block.hash(),
1183        ?computed,
1184        ?expected,
1185        "Received block access list with wrong hash",
1186    );
1187    Err(peer)
1188}
1189
1190/// Wraps a block range with validated block access-list entries.
1191///
1192/// Short responses are treated as a valid prefix and the remaining blocks receive `None`.
1193/// Responses longer than the requested block range return the full block range without access-list
1194/// data. Non-empty hash mismatches stop accepting access-list data after reporting the peer, so any
1195/// already validated prefix is preserved.
1196fn seal_blocks_with_access_lists<Client>(
1197    client: &Client,
1198    blocks: Vec<SealedBlock<Client::Block>>,
1199    access_lists: Option<WithPeerId<BlockAccessLists>>,
1200) -> Vec<SealedBlockWithAccessList<Client::Block>>
1201where
1202    Client: BlockClient,
1203{
1204    let Some(access_lists) = access_lists else {
1205        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1206    };
1207
1208    let (peer, access_lists) = access_lists.split();
1209    let expected = blocks.len();
1210    let received = access_lists.0.len();
1211
1212    if received > expected {
1213        trace!(
1214            target: "downloaders",
1215            expected,
1216            received,
1217            "Ignoring overlong access list range response",
1218        );
1219        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1220    }
1221
1222    let mut access_lists = access_lists.0.into_iter();
1223    let mut blocks = blocks.into_iter();
1224    let mut response = Vec::with_capacity(expected);
1225
1226    for block in blocks.by_ref() {
1227        let Some(access_list) = access_lists.next() else {
1228            // Short BAL responses are valid; the current block and all remaining blocks are
1229            // returned without access-list data below.
1230            response.push(SealedBlockWith::from_block(block));
1231            break
1232        };
1233
1234        match seal_block_access_list_for_block(&block, WithPeerId::new(peer, access_list)) {
1235            Ok(access_list) => response.push(SealedBlockWith::new(block, access_list)),
1236            Err(peer) => {
1237                // A hash mismatch means this BAL entry is not for the current block. Stop matching
1238                // later positional entries and return the rest of the range without BAL data.
1239                client.report_bad_message(peer);
1240                response.push(SealedBlockWith::from_block(block));
1241                break
1242            }
1243        }
1244    }
1245
1246    response.extend(blocks.map(SealedBlockWith::from_block));
1247    response
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252    use reth_ethereum_primitives::BlockBody;
1253
1254    use super::*;
1255    use crate::{error::RequestError, test_utils::TestFullBlockClient};
1256    use alloy_consensus::Header;
1257    use alloy_primitives::{keccak256, map::B256Map, Bytes};
1258    use parking_lot::Mutex;
1259    use std::{
1260        ops::Range,
1261        sync::{
1262            atomic::{AtomicBool, AtomicUsize, Ordering},
1263            Arc,
1264        },
1265    };
1266
1267    const EMPTY_LIST_CODE: u8 = 0xc0;
1268    use tokio::time::{timeout, Duration};
1269
1270    fn sealed_access_list(access_list: Bytes) -> Sealed<Bytes> {
1271        let hash = keccak256(access_list.as_ref());
1272        Sealed::new_unchecked(access_list, hash)
1273    }
1274
1275    fn sealed_header_with_access_list_hash(access_list: &Bytes) -> SealedHeader {
1276        let header = Header {
1277            block_access_list_hash: Some(keccak256(access_list.as_ref())),
1278            ..Default::default()
1279        };
1280        SealedHeader::seal_slow(header)
1281    }
1282
1283    fn range_access_lists<B: Block>(
1284        blocks: &[SealedBlockWithAccessList<B>],
1285    ) -> Vec<Option<Sealed<Bytes>>> {
1286        blocks.iter().map(|block| block.access_list().cloned()).collect()
1287    }
1288
1289    #[tokio::test]
1290    async fn download_single_full_block() {
1291        let client = TestFullBlockClient::default();
1292        let header: SealedHeader = SealedHeader::default();
1293        let body = BlockBody::default();
1294        client.insert(header.clone(), body.clone());
1295        let client = FullBlockClient::test_client(client);
1296
1297        let received = client.get_full_block(header.hash()).await;
1298        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1299    }
1300
1301    #[tokio::test]
1302    async fn download_single_full_block_range() {
1303        let client = TestFullBlockClient::default();
1304        let header: SealedHeader = SealedHeader::default();
1305        let body = BlockBody::default();
1306        client.insert(header.clone(), body.clone());
1307        let client = FullBlockClient::test_client(client);
1308
1309        let received = client.get_full_block_range(header.hash(), 1).await;
1310        let received = received.first().expect("response should include a block");
1311        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1312    }
1313
1314    #[tokio::test]
1315    async fn download_single_full_block_with_access_lists() {
1316        let client = FullBlockWithAccessListsClient::default();
1317        let body = BlockBody::default();
1318        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1319        let header = sealed_header_with_access_list_hash(&access_list);
1320        client.insert(header.clone(), body.clone(), access_list.clone());
1321
1322        let request_count = Arc::clone(&client.access_list_requests);
1323        let client = FullBlockClient::test_client(client);
1324
1325        let received = client.get_full_block_with_access_lists(header.hash()).await;
1326        let expected_access_list = sealed_access_list(access_list);
1327
1328        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1329        assert_eq!(received.access_list(), Some(&expected_access_list));
1330        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1331    }
1332
1333    #[tokio::test]
1334    async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1335        let client = FullBlockWithAccessListsClient::default();
1336        let body = BlockBody::default();
1337        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1338        let header = sealed_header_with_access_list_hash(&access_list);
1339        client.insert(header.clone(), body.clone(), access_list.clone());
1340
1341        let requirement = Arc::clone(&client.last_access_list_requirement);
1342        let client = FullBlockClient::test_client(client);
1343
1344        let received = client
1345            .get_full_block_with_access_lists_with_requirement(
1346                header.hash(),
1347                BalRequirement::Mandatory,
1348            )
1349            .await;
1350
1351        let expected_access_list = sealed_access_list(access_list);
1352        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1353        assert_eq!(received.access_list(), Some(&expected_access_list));
1354        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1355    }
1356
1357    #[tokio::test]
1358    async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1359        let client = FullBlockWithAccessListsClient::default();
1360        client.set_access_list_pending_polls(1);
1361
1362        let body = BlockBody::default();
1363        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1364        let header = sealed_header_with_access_list_hash(&access_list);
1365        client.insert(header.clone(), body.clone(), access_list.clone());
1366
1367        let request_count = Arc::clone(&client.access_list_requests);
1368        let client = FullBlockClient::test_client(client);
1369
1370        let received =
1371            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1372                .await
1373                .expect("access list request should complete");
1374
1375        let expected_access_list = sealed_access_list(access_list);
1376        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1377        assert_eq!(received.access_list(), Some(&expected_access_list));
1378        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1379    }
1380
1381    #[tokio::test]
1382    async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1383        let client = FullBlockWithAccessListsClient::default();
1384        let body = BlockBody::default();
1385        let expected_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1386        let wrong_access_list = Bytes::from_static(&[0xc1, 0x01]);
1387        let header = sealed_header_with_access_list_hash(&expected_access_list);
1388        client.insert(header.clone(), body.clone(), wrong_access_list);
1389
1390        let bad_messages = Arc::clone(&client.bad_messages);
1391        let client = FullBlockClient::test_client(client);
1392
1393        let received =
1394            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1395                .await
1396                .expect("block request should complete without access lists");
1397
1398        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1399        assert!(received.access_list().is_none());
1400        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1401    }
1402
1403    #[tokio::test]
1404    async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1405        let client = FullBlockWithAccessListsClient::default();
1406        let body = BlockBody::default();
1407        let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1408        let header = sealed_header_with_access_list_hash(&expected_access_list);
1409        client.inner.insert(header.clone(), body.clone());
1410
1411        let bad_messages = Arc::clone(&client.bad_messages);
1412        let client = FullBlockClient::test_client(client);
1413
1414        let received =
1415            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1416                .await
1417                .expect("block request should complete without access lists");
1418
1419        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1420        assert!(received.access_list().is_none());
1421        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1422    }
1423
1424    #[tokio::test]
1425    async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1426        let client = FullBlockWithAccessListsClient::default();
1427        let body = BlockBody::default();
1428        let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1429        let wrong_empty_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1430        let header = sealed_header_with_access_list_hash(&expected_access_list);
1431        client.insert(header.clone(), body.clone(), wrong_empty_access_list);
1432
1433        let bad_messages = Arc::clone(&client.bad_messages);
1434        let client = FullBlockClient::test_client(client);
1435
1436        let received =
1437            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1438                .await
1439                .expect("block request should complete without access lists");
1440
1441        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1442        assert!(received.access_list().is_none());
1443        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1444    }
1445
1446    #[tokio::test]
1447    async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1448        let client = FullBlockWithAccessListsClient::default();
1449        client.empty_first_response.store(true, Ordering::SeqCst);
1450
1451        let body = BlockBody::default();
1452        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1453        let header = sealed_header_with_access_list_hash(&access_list);
1454        client.insert(header.clone(), body.clone(), access_list.clone());
1455
1456        let request_count = Arc::clone(&client.access_list_requests);
1457        let bad_messages = Arc::clone(&client.bad_messages);
1458        let client = FullBlockClient::test_client(client);
1459
1460        let received =
1461            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1462                .await
1463                .expect("block request should complete without access lists");
1464
1465        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1466        assert!(received.access_list().is_none());
1467        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1468        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1469    }
1470
1471    #[tokio::test]
1472    async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1473        let client = FullBlockWithAccessListsClient::default();
1474        client.set_access_lists_unsupported(true);
1475
1476        let body = BlockBody::default();
1477        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1478        let header = sealed_header_with_access_list_hash(&access_list);
1479        client.insert(header.clone(), body.clone(), access_list);
1480
1481        let request_count = Arc::clone(&client.access_list_requests);
1482        let requirement = Arc::clone(&client.last_access_list_requirement);
1483        let client = FullBlockClient::test_client(client);
1484
1485        let received =
1486            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1487                .await
1488                .expect("block request should complete without access lists");
1489
1490        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1491        assert!(received.access_list().is_none());
1492        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1493        assert_eq!(
1494            *requirement.lock(),
1495            Some(BalRequirement::Optional),
1496            "single block BAL lookup should be best-effort"
1497        );
1498    }
1499
1500    /// Inserts headers and returns the last header and block body.
1501    fn insert_headers_into_client(
1502        client: &TestFullBlockClient,
1503        range: Range<usize>,
1504    ) -> (SealedHeader, BlockBody) {
1505        let mut sealed_header: SealedHeader = SealedHeader::default();
1506        let body = BlockBody::default();
1507        for _ in range {
1508            let (mut header, hash) = sealed_header.split();
1509            // update to the next header
1510            header.parent_hash = hash;
1511            header.number += 1;
1512
1513            sealed_header = SealedHeader::seal_slow(header);
1514
1515            client.insert(sealed_header.clone(), body.clone());
1516        }
1517
1518        (sealed_header, body)
1519    }
1520
1521    #[derive(Clone, Debug)]
1522    struct FullBlockWithAccessListsClient {
1523        inner: TestFullBlockClient,
1524        access_lists: Arc<Mutex<B256Map<Bytes>>>,
1525        access_list_requests: Arc<AtomicUsize>,
1526        access_list_soft_limit: Arc<AtomicUsize>,
1527        access_list_pending_polls: Arc<AtomicUsize>,
1528        extra_access_list_entries: Arc<AtomicUsize>,
1529        unsupported_access_lists: Arc<AtomicBool>,
1530        last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1531        bad_messages: Arc<AtomicUsize>,
1532        empty_first_response: Arc<AtomicBool>,
1533    }
1534
1535    impl Default for FullBlockWithAccessListsClient {
1536        fn default() -> Self {
1537            Self {
1538                inner: TestFullBlockClient::default(),
1539                access_lists: Arc::new(Mutex::new(B256Map::default())),
1540                access_list_requests: Arc::new(AtomicUsize::new(0)),
1541                access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1542                access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1543                extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1544                unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1545                last_access_list_requirement: Arc::new(Mutex::new(None)),
1546                bad_messages: Arc::new(AtomicUsize::new(0)),
1547                empty_first_response: Arc::new(AtomicBool::new(false)),
1548            }
1549        }
1550    }
1551
1552    impl FullBlockWithAccessListsClient {
1553        fn insert(&self, header: SealedHeader, body: BlockBody, access_list: Bytes) {
1554            self.inner.insert(header.clone(), body);
1555            self.access_lists.lock().insert(header.hash(), access_list);
1556        }
1557
1558        fn set_access_list_soft_limit(&self, limit: usize) {
1559            self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1560        }
1561
1562        fn set_access_list_pending_polls(&self, polls: usize) {
1563            self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1564        }
1565
1566        fn set_extra_access_list_entries(&self, count: usize) {
1567            self.extra_access_list_entries.store(count, Ordering::SeqCst);
1568        }
1569
1570        fn set_access_lists_unsupported(&self, unsupported: bool) {
1571            self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1572        }
1573    }
1574
1575    /// Inserts headers with block access lists and returns the last header and block body.
1576    fn insert_headers_with_access_lists_into_client(
1577        client: &FullBlockWithAccessListsClient,
1578        range: Range<usize>,
1579    ) -> (SealedHeader, BlockBody) {
1580        let mut sealed_header: SealedHeader = SealedHeader::default();
1581        let body = BlockBody::default();
1582        for block_idx in range {
1583            let (mut header, hash) = sealed_header.split();
1584            header.parent_hash = hash;
1585            header.number += 1;
1586            let access_list = Bytes::from(vec![0xc1, block_idx as u8]);
1587            header.block_access_list_hash = Some(keccak256(access_list.as_ref()));
1588
1589            sealed_header = SealedHeader::seal_slow(header);
1590
1591            client.insert(sealed_header.clone(), body.clone(), access_list);
1592        }
1593
1594        (sealed_header, body)
1595    }
1596
1597    impl DownloadClient for FullBlockWithAccessListsClient {
1598        fn report_bad_message(&self, peer_id: PeerId) {
1599            self.bad_messages.fetch_add(1, Ordering::SeqCst);
1600            self.inner.report_bad_message(peer_id);
1601        }
1602
1603        fn num_connected_peers(&self) -> usize {
1604            self.inner.num_connected_peers()
1605        }
1606    }
1607
1608    impl HeadersClient for FullBlockWithAccessListsClient {
1609        type Header = <TestFullBlockClient as HeadersClient>::Header;
1610        type Output = <TestFullBlockClient as HeadersClient>::Output;
1611
1612        fn get_headers_with_priority(
1613            &self,
1614            request: HeadersRequest,
1615            priority: Priority,
1616        ) -> Self::Output {
1617            self.inner.get_headers_with_priority(request, priority)
1618        }
1619    }
1620
1621    impl BodiesClient for FullBlockWithAccessListsClient {
1622        type Body = <TestFullBlockClient as BodiesClient>::Body;
1623        type Output = <TestFullBlockClient as BodiesClient>::Output;
1624
1625        fn get_block_bodies_with_priority_and_range_hint(
1626            &self,
1627            hashes: Vec<B256>,
1628            priority: Priority,
1629            range_hint: Option<RangeInclusive<u64>>,
1630        ) -> Self::Output {
1631            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1632        }
1633    }
1634
1635    struct MaybePendingAccessLists {
1636        response: Option<PeerRequestResult<BlockAccessLists>>,
1637        pending_polls: usize,
1638    }
1639
1640    impl MaybePendingAccessLists {
1641        const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1642            Self { response: Some(response), pending_polls }
1643        }
1644    }
1645
1646    impl std::future::Future for MaybePendingAccessLists {
1647        type Output = PeerRequestResult<BlockAccessLists>;
1648
1649        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1650            if self.pending_polls > 0 {
1651                self.pending_polls -= 1;
1652                cx.waker().wake_by_ref();
1653                return Poll::Pending
1654            }
1655
1656            Poll::Ready(self.response.take().expect("future polled after completion"))
1657        }
1658    }
1659
1660    impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1661        type Output = MaybePendingAccessLists;
1662
1663        fn get_block_access_lists_with_priority_and_requirement(
1664            &self,
1665            hashes: Vec<B256>,
1666            _priority: Priority,
1667            requirement: BalRequirement,
1668        ) -> Self::Output {
1669            self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1670            *self.last_access_list_requirement.lock() = Some(requirement);
1671            let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1672
1673            if self.unsupported_access_lists.load(Ordering::SeqCst) {
1674                return MaybePendingAccessLists::new(
1675                    Err(RequestError::UnsupportedCapability),
1676                    pending_polls,
1677                )
1678            }
1679
1680            if self.empty_first_response.swap(false, Ordering::SeqCst) {
1681                return MaybePendingAccessLists::new(
1682                    Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1683                    pending_polls,
1684                )
1685            }
1686
1687            let mut access_lists: Vec<_> = hashes
1688                .into_iter()
1689                .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1690                .map(|hash| self.access_lists.lock().get(&hash).cloned())
1691                .collect();
1692            for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1693                access_lists.push(None);
1694            }
1695
1696            MaybePendingAccessLists::new(
1697                Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1698                pending_polls,
1699            )
1700        }
1701    }
1702
1703    impl BlockClient for FullBlockWithAccessListsClient {
1704        type Block = reth_ethereum_primitives::Block;
1705    }
1706
1707    #[derive(Clone, Debug)]
1708    struct FailingBodiesClient {
1709        inner: TestFullBlockClient,
1710        fail_on: usize,
1711        body_requests: Arc<AtomicUsize>,
1712    }
1713
1714    impl FailingBodiesClient {
1715        fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1716            Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1717        }
1718    }
1719
1720    impl DownloadClient for FailingBodiesClient {
1721        fn report_bad_message(&self, peer_id: PeerId) {
1722            self.inner.report_bad_message(peer_id);
1723        }
1724
1725        fn num_connected_peers(&self) -> usize {
1726            self.inner.num_connected_peers()
1727        }
1728    }
1729
1730    impl HeadersClient for FailingBodiesClient {
1731        type Header = <TestFullBlockClient as HeadersClient>::Header;
1732        type Output = <TestFullBlockClient as HeadersClient>::Output;
1733
1734        fn get_headers_with_priority(
1735            &self,
1736            request: HeadersRequest,
1737            priority: Priority,
1738        ) -> Self::Output {
1739            self.inner.get_headers_with_priority(request, priority)
1740        }
1741    }
1742
1743    impl BodiesClient for FailingBodiesClient {
1744        type Body = <TestFullBlockClient as BodiesClient>::Body;
1745        type Output = <TestFullBlockClient as BodiesClient>::Output;
1746
1747        fn get_block_bodies_with_priority_and_range_hint(
1748            &self,
1749            hashes: Vec<B256>,
1750            priority: Priority,
1751            range_hint: Option<RangeInclusive<u64>>,
1752        ) -> Self::Output {
1753            let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1754            if attempt == self.fail_on {
1755                return futures::future::ready(Err(RequestError::Timeout))
1756            }
1757
1758            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1759        }
1760    }
1761
1762    impl BlockClient for FailingBodiesClient {
1763        type Block = reth_ethereum_primitives::Block;
1764    }
1765
1766    #[tokio::test]
1767    async fn download_full_block_range() {
1768        let client = TestFullBlockClient::default();
1769        let (header, body) = insert_headers_into_client(&client, 0..50);
1770        let client = FullBlockClient::test_client(client);
1771
1772        let received = client.get_full_block_range(header.hash(), 1).await;
1773        let received = received.first().expect("response should include a block");
1774        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1775
1776        let received = client.get_full_block_range(header.hash(), 10).await;
1777        assert_eq!(received.len(), 10);
1778        for (i, block) in received.iter().enumerate() {
1779            let expected_number = header.number - i as u64;
1780            assert_eq!(block.number, expected_number);
1781        }
1782    }
1783
1784    #[tokio::test]
1785    async fn download_full_block_range_over_soft_limit() {
1786        // default soft limit is 20, so we will request 50 blocks
1787        let client = TestFullBlockClient::default();
1788        let (header, body) = insert_headers_into_client(&client, 0..50);
1789        let client = FullBlockClient::test_client(client);
1790
1791        let received = client.get_full_block_range(header.hash(), 1).await;
1792        let received = received.first().expect("response should include a block");
1793        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1794
1795        let received = client.get_full_block_range(header.hash(), 50).await;
1796        assert_eq!(received.len(), 50);
1797        for (i, block) in received.iter().enumerate() {
1798            let expected_number = header.number - i as u64;
1799            assert_eq!(block.number, expected_number);
1800        }
1801    }
1802
1803    #[tokio::test]
1804    async fn download_full_block_range_retries_after_body_error() {
1805        let mut client = TestFullBlockClient::default();
1806        client.set_soft_limit(2);
1807        let (header, _) = insert_headers_into_client(&client, 0..3);
1808
1809        let client = FailingBodiesClient::new(client, 1);
1810        let body_requests = Arc::clone(&client.body_requests);
1811        let client = FullBlockClient::test_client(client);
1812
1813        let received =
1814            timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1815                .await
1816                .expect("body request retry should complete");
1817
1818        assert_eq!(received.len(), 3);
1819        assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1820    }
1821
1822    #[tokio::test]
1823    async fn download_full_block_range_with_access_lists() {
1824        let client = FullBlockWithAccessListsClient::default();
1825        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1826
1827        let access_lists = Arc::clone(&client.access_lists);
1828        let request_count = Arc::clone(&client.access_list_requests);
1829        let requirement = Arc::clone(&client.last_access_list_requirement);
1830        let client = FullBlockClient::test_client(client);
1831
1832        let response = timeout(
1833            Duration::from_secs(1),
1834            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1835        )
1836        .await
1837        .expect("range request should complete");
1838
1839        let blocks = response;
1840        assert_eq!(blocks.len(), 3);
1841        let expected = {
1842            let access_lists = access_lists.lock();
1843            blocks
1844                .iter()
1845                .map(|block| {
1846                    let access_list = access_lists
1847                        .get(&block.block().hash())
1848                        .cloned()
1849                        .expect("access list exists");
1850                    Some(sealed_access_list(access_list))
1851                })
1852                .collect::<Vec<_>>()
1853        };
1854        assert_eq!(range_access_lists(&blocks), expected);
1855        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1856        assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1857    }
1858
1859    #[tokio::test]
1860    async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1861        let client = FullBlockWithAccessListsClient::default();
1862        client.empty_first_response.store(true, Ordering::SeqCst);
1863        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1864
1865        let request_count = Arc::clone(&client.access_list_requests);
1866        let client = FullBlockClient::test_client(client);
1867
1868        let response = timeout(
1869            Duration::from_secs(1),
1870            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1871        )
1872        .await
1873        .expect("range request should complete without access lists");
1874
1875        let blocks = response;
1876        assert_eq!(blocks.len(), 3);
1877        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1878        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1879    }
1880
1881    #[tokio::test]
1882    async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1883        let client = FullBlockWithAccessListsClient::default();
1884        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1885
1886        let requirement = Arc::clone(&client.last_access_list_requirement);
1887        let client = FullBlockClient::test_client(client);
1888
1889        let blocks = timeout(
1890            Duration::from_secs(1),
1891            client.get_full_block_range_with_optional_access_lists_with_requirement(
1892                header.hash(),
1893                3,
1894                BalRequirement::Mandatory,
1895            ),
1896        )
1897        .await
1898        .expect("range request should complete");
1899
1900        assert_eq!(blocks.len(), 3);
1901        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1902    }
1903
1904    #[tokio::test]
1905    async fn download_full_block_range_with_access_lists_preserves_short_response() {
1906        let client = FullBlockWithAccessListsClient::default();
1907        client.set_access_list_soft_limit(2);
1908        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1909
1910        let access_lists = Arc::clone(&client.access_lists);
1911        let request_count = Arc::clone(&client.access_list_requests);
1912        let client = FullBlockClient::test_client(client);
1913
1914        let blocks = timeout(
1915            Duration::from_secs(1),
1916            client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1917        )
1918        .await
1919        .expect("range request should complete without access lists");
1920
1921        assert_eq!(blocks.len(), 5);
1922        let expected = {
1923            let access_lists = access_lists.lock();
1924            blocks
1925                .iter()
1926                .enumerate()
1927                .map(|(idx, block)| {
1928                    if idx >= 2 {
1929                        return None
1930                    }
1931
1932                    let access_list = access_lists
1933                        .get(&block.block().hash())
1934                        .cloned()
1935                        .expect("access list exists");
1936                    Some(sealed_access_list(access_list))
1937                })
1938                .collect::<Vec<_>>()
1939        };
1940        assert_eq!(range_access_lists(&blocks), expected);
1941        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1942    }
1943
1944    #[tokio::test]
1945    async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1946        let client = FullBlockWithAccessListsClient::default();
1947        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1948        client.access_lists.lock().remove(&header.hash());
1949
1950        let access_lists = Arc::clone(&client.access_lists);
1951        let bad_messages = Arc::clone(&client.bad_messages);
1952        let client = FullBlockClient::test_client(client);
1953
1954        let blocks = timeout(
1955            Duration::from_secs(1),
1956            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1957        )
1958        .await
1959        .expect("range request should complete");
1960
1961        assert_eq!(blocks.len(), 3);
1962        let expected = {
1963            let access_lists = access_lists.lock();
1964            blocks
1965                .iter()
1966                .map(|block| {
1967                    if block.block().hash() == header.hash() {
1968                        return None
1969                    }
1970
1971                    let access_list = access_lists
1972                        .get(&block.block().hash())
1973                        .cloned()
1974                        .expect("access list exists");
1975                    Some(sealed_access_list(access_list))
1976                })
1977                .collect::<Vec<_>>()
1978        };
1979        assert_eq!(range_access_lists(&blocks), expected);
1980        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1981    }
1982
1983    #[tokio::test]
1984    async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1985        let client = FullBlockWithAccessListsClient::default();
1986        client.set_access_lists_unsupported(true);
1987        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1988
1989        let request_count = Arc::clone(&client.access_list_requests);
1990        let client = FullBlockClient::test_client(client);
1991
1992        let blocks = timeout(
1993            Duration::from_secs(1),
1994            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1995        )
1996        .await
1997        .expect("range request should complete without access lists");
1998
1999        assert_eq!(blocks.len(), 3);
2000        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2001        assert_eq!(request_count.load(Ordering::SeqCst), 1);
2002    }
2003
2004    #[tokio::test]
2005    async fn download_full_block_range_with_access_lists_ignores_long_response() {
2006        let client = FullBlockWithAccessListsClient::default();
2007        client.set_extra_access_list_entries(1);
2008        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2009
2010        let request_count = Arc::clone(&client.access_list_requests);
2011        let bad_messages = Arc::clone(&client.bad_messages);
2012        let client = FullBlockClient::test_client(client);
2013
2014        let blocks = timeout(
2015            Duration::from_secs(1),
2016            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2017        )
2018        .await
2019        .expect("range request should complete without access lists");
2020
2021        assert_eq!(blocks.len(), 3);
2022        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2023        assert_eq!(request_count.load(Ordering::SeqCst), 1);
2024        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
2025    }
2026
2027    #[tokio::test]
2028    async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
2029        let client = FullBlockWithAccessListsClient::default();
2030        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2031        client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
2032
2033        let bad_messages = Arc::clone(&client.bad_messages);
2034        let client = FullBlockClient::test_client(client);
2035
2036        let blocks = timeout(
2037            Duration::from_secs(1),
2038            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2039        )
2040        .await
2041        .expect("range request should complete without access lists");
2042
2043        assert_eq!(blocks.len(), 3);
2044        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2045        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2046    }
2047
2048    #[tokio::test]
2049    async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
2050        let client = FullBlockWithAccessListsClient::default();
2051        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2052        let first_access_list =
2053            client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
2054        let second_hash = header.parent_hash;
2055        client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
2056
2057        let bad_messages = Arc::clone(&client.bad_messages);
2058        let client = FullBlockClient::test_client(client);
2059
2060        let blocks = timeout(
2061            Duration::from_secs(1),
2062            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2063        )
2064        .await
2065        .expect("range request should complete without unvalidated access lists");
2066
2067        assert_eq!(blocks.len(), 3);
2068        assert_eq!(blocks[1].block().hash(), second_hash);
2069        assert_eq!(
2070            range_access_lists(&blocks),
2071            vec![Some(sealed_access_list(first_access_list)), None, None]
2072        );
2073        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2074    }
2075
2076    #[tokio::test]
2077    async fn download_full_block_range_with_invalid_header() {
2078        let client = TestFullBlockClient::default();
2079        let range_length: usize = 3;
2080        let (header, _) = insert_headers_into_client(&client, 0..range_length);
2081
2082        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2083        test_consensus.set_fail_validation(true);
2084        test_consensus.set_fail_body_against_header(false);
2085        let client = FullBlockClient::new(client, Arc::new(test_consensus));
2086
2087        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2088
2089        assert_eq!(received.len(), range_length);
2090        for (i, block) in received.iter().enumerate() {
2091            let expected_number = header.number - i as u64;
2092            assert_eq!(block.number, expected_number);
2093        }
2094    }
2095}