Skip to main content

reth_network_p2p/
full_block.rs

1use super::headers::client::HeadersRequest;
2use crate::{
3    block_access_lists::client::{BalRequirement, BlockAccessListsClient},
4    bodies::client::{BodiesClient, SingleBodyRequest},
5    download::DownloadClient,
6    error::PeerRequestResult,
7    headers::client::{HeadersClient, SingleHeaderRequest},
8    priority::Priority,
9    BlockClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_primitives::{keccak256, Bytes, Sealable, Sealed, B256};
13use core::marker::PhantomData;
14use futures::FutureExt;
15use reth_consensus::Consensus;
16use reth_eth_wire_types::{
17    BlockAccessLists, EthNetworkPrimitives, HeadersDirection, NetworkPrimitives,
18};
19use reth_network_peers::{PeerId, WithPeerId};
20use reth_primitives_traits::{Block, SealedBlock, SealedHeader};
21use std::{
22    cmp::Reverse,
23    collections::{HashMap, VecDeque},
24    fmt::Debug,
25    hash::Hash,
26    ops::RangeInclusive,
27    pin::Pin,
28    sync::Arc,
29    task::{ready, Context, Poll},
30};
31use tracing::{debug, trace};
32
33/// A sealed block with associated data.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct SealedBlockWith<B: Block, T = Option<Sealed<Bytes>>> {
36    block: SealedBlock<B>,
37    data: T,
38}
39
40/// Raw block access-list RLP bytes sealed by `header.block_access_list_hash`.
41pub type SealedBlockAccessList = Sealed<Bytes>;
42
43/// A sealed block with optional validated block access-list data.
44pub type SealedBlockWithAccessList<B> = SealedBlockWith<B, Option<SealedBlockAccessList>>;
45
46impl<B: Block, T> SealedBlockWith<B, T> {
47    /// Creates a sealed block with associated data.
48    pub const fn new(block: SealedBlock<B>, data: T) -> Self {
49        Self { block, data }
50    }
51
52    /// Returns the sealed block.
53    pub const fn block(&self) -> &SealedBlock<B> {
54        &self.block
55    }
56
57    /// Returns the associated data.
58    pub const fn data(&self) -> &T {
59        &self.data
60    }
61
62    /// Consumes the wrapper and returns its parts.
63    pub fn into_parts(self) -> (SealedBlock<B>, T) {
64        (self.block, self.data)
65    }
66}
67
68impl<B: Block> SealedBlockWithAccessList<B> {
69    /// Creates a full block response without block access-list data.
70    pub const fn from_block(block: SealedBlock<B>) -> Self {
71        Self::new(block, None)
72    }
73
74    /// Returns the optional raw block access-list data, sealed by its hash.
75    pub const fn access_list(&self) -> Option<&SealedBlockAccessList> {
76        self.data.as_ref()
77    }
78
79    /// Returns the optional raw block access-list data, sealed by its hash.
80    pub const fn access_lists(&self) -> Option<&SealedBlockAccessList> {
81        self.data.as_ref()
82    }
83}
84
85impl<B: Block> From<SealedBlock<B>> for SealedBlockWith<B> {
86    fn from(block: SealedBlock<B>) -> Self {
87        Self::new(block, None)
88    }
89}
90
91/// A Client that can fetch full blocks from the network.
92#[derive(Debug, Clone)]
93pub struct FullBlockClient<Client>
94where
95    Client: BlockClient,
96{
97    client: Client,
98    consensus: Arc<dyn Consensus<Client::Block>>,
99}
100
101impl<Client> FullBlockClient<Client>
102where
103    Client: BlockClient,
104{
105    /// Creates a new instance of `FullBlockClient`.
106    pub fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>) -> Self {
107        Self { client, consensus }
108    }
109
110    /// Returns a client with Test consensus
111    #[cfg(any(test, feature = "test-utils"))]
112    pub fn test_client(client: Client) -> Self {
113        Self::new(client, Arc::new(reth_consensus::test_utils::TestConsensus::default()))
114    }
115}
116
117impl<Client> FullBlockClient<Client>
118where
119    Client: BlockClient,
120{
121    /// Returns a future that fetches the [`SealedBlock`] for the given hash.
122    ///
123    /// Note: this future is cancel safe
124    ///
125    /// Caution: This does no validation of body (transactions) response but guarantees that the
126    /// [`SealedHeader`] matches the requested hash.
127    pub fn get_full_block(&self, hash: B256) -> FetchFullBlockFuture<Client> {
128        FetchFullBlockFuture::new(self.client.clone(), self.consensus.clone(), hash)
129    }
130
131    /// Returns a future that fetches [`SealedBlock`]s for the given hash and count.
132    ///
133    /// Note: this future is cancel safe.
134    ///
135    /// Caution: This does no validation of body (transactions) responses but guarantees that
136    /// the starting [`SealedHeader`] matches the requested hash, and that the number of headers and
137    /// bodies received matches the requested limit.
138    ///
139    /// The returned future yields bodies in falling order, i.e. with descending block numbers.
140    pub fn get_full_block_range(
141        &self,
142        hash: B256,
143        count: u64,
144    ) -> FetchFullBlockRangeFuture<Client> {
145        let client = self.client.clone();
146        FetchFullBlockRangeFuture {
147            start_hash: hash,
148            count,
149            request: FullBlockRangeRequest {
150                headers: Some(client.get_headers(HeadersRequest::falling(hash.into(), count))),
151                bodies: None,
152            },
153            client,
154            headers: None,
155            pending_headers: VecDeque::new(),
156            bodies: HashMap::default(),
157            consensus: Arc::clone(&self.consensus),
158        }
159    }
160}
161
162impl<Client> FullBlockClient<Client>
163where
164    Client: BlockClient + BlockAccessListsClient,
165{
166    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
167    /// for the given hash.
168    ///
169    /// Note: this future is cancel safe
170    ///
171    /// Caution: This does no validation of body (transactions) response but guarantees that the
172    /// [`SealedHeader`] matches the requested hash.
173    pub fn get_full_block_with_access_lists(
174        &self,
175        hash: B256,
176    ) -> FetchFullBlockWithBalFuture<Client> {
177        self.get_full_block_with_access_lists_with_requirement(hash, BalRequirement::default())
178    }
179
180    /// Returns a future that fetches the [`SealedBlock`] and optionally its block access list
181    /// for the given hash using the requested BAL availability policy.
182    ///
183    /// Note: this future is cancel safe
184    ///
185    /// Caution: This does no validation of body (transactions) response but guarantees that the
186    /// [`SealedHeader`] matches the requested hash.
187    pub fn get_full_block_with_access_lists_with_requirement(
188        &self,
189        hash: B256,
190        requirement: BalRequirement,
191    ) -> FetchFullBlockWithBalFuture<Client> {
192        let client = self.client.clone();
193        FetchFullBlockWithBalFuture {
194            block: FetchFullBlockFuture::new(client.clone(), self.consensus.clone(), hash),
195            block_result: None,
196            bal_request_state: BalRequestState::Pending(
197                client.get_block_access_lists_with_requirement(vec![hash], requirement),
198            ),
199        }
200    }
201
202    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
203    /// the given hash and count.
204    ///
205    /// The block range is always the primary result. Access lists are requested after the block
206    /// range is downloaded. Each block contains access-list data when the optional BAL response
207    /// included the corresponding entry.
208    pub fn get_full_block_range_with_optional_access_lists(
209        &self,
210        hash: B256,
211        count: u64,
212    ) -> FetchFullBlockRangeWithBalFuture<Client> {
213        self.get_full_block_range_with_optional_access_lists_with_requirement(
214            hash,
215            count,
216            BalRequirement::default(),
217        )
218    }
219
220    /// Returns a future that fetches [`SealedBlock`]s and optionally their block access lists for
221    /// the given hash and count using the requested BAL availability policy.
222    ///
223    /// The block range is always the primary result. Access lists are requested after the block
224    /// range is downloaded. Each block contains access-list data when the optional BAL response
225    /// included the corresponding entry.
226    pub fn get_full_block_range_with_optional_access_lists_with_requirement(
227        &self,
228        hash: B256,
229        count: u64,
230        requirement: BalRequirement,
231    ) -> FetchFullBlockRangeWithBalFuture<Client> {
232        let client = self.client.clone();
233        FetchFullBlockRangeWithBalFuture {
234            blocks: self.get_full_block_range(hash, count),
235            client,
236            block_result: None,
237            access_lists: OptionalBlockAccessListsState::WaitingForBlocks { requirement },
238        }
239    }
240}
241
242/// A future that downloads a full block from the network.
243///
244/// This will attempt to fetch both the header and body for the given block hash at the same time.
245/// When both requests succeed, the future will yield the full block.
246#[must_use = "futures do nothing unless polled"]
247pub struct FetchFullBlockFuture<Client>
248where
249    Client: BlockClient,
250{
251    client: Client,
252    consensus: Arc<dyn Consensus<Client::Block>>,
253    hash: B256,
254    request: FullBlockRequest<Client>,
255    header: Option<SealedHeader<Client::Header>>,
256    body: Option<BodyResponse<Client::Body>>,
257}
258
259impl<Client> FetchFullBlockFuture<Client>
260where
261    Client: BlockClient,
262{
263    fn new(client: Client, consensus: Arc<dyn Consensus<Client::Block>>, hash: B256) -> Self {
264        Self {
265            hash,
266            consensus,
267            request: FullBlockRequest {
268                header: Some(client.get_header(hash.into())),
269                body: Some(client.get_block_body(hash)),
270            },
271            client,
272            header: None,
273            body: None,
274        }
275    }
276
277    /// Returns the hash of the block being requested.
278    pub const fn hash(&self) -> &B256 {
279        &self.hash
280    }
281
282    /// If the header request is already complete, this returns the block number
283    pub fn block_number(&self) -> Option<u64> {
284        self.header.as_ref().map(|h| h.number())
285    }
286
287    /// Returns the [`SealedBlock`] if the request is complete and valid.
288    fn take_block(&mut self) -> Option<SealedBlock<Client::Block>> {
289        if self.header.is_none() || self.body.is_none() {
290            return None
291        }
292
293        let header = self.header.take().unwrap();
294        let resp = self.body.take().unwrap();
295        match resp {
296            BodyResponse::Validated(body) => Some(SealedBlock::from_sealed_parts(header, body)),
297            BodyResponse::PendingValidation(resp) => {
298                // ensure the block is valid, else retry
299                if let Err(err) = self.consensus.validate_body_against_header(resp.data(), &header)
300                {
301                    debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
302                    self.client.report_bad_message(resp.peer_id());
303                    self.header = Some(header);
304                    self.request.body = Some(self.client.get_block_body(self.hash));
305                    return None
306                }
307                Some(SealedBlock::from_sealed_parts(header, resp.into_data()))
308            }
309        }
310    }
311
312    fn on_block_response(&mut self, resp: WithPeerId<Client::Body>) {
313        if let Some(ref header) = self.header {
314            if let Err(err) = self.consensus.validate_body_against_header(resp.data(), header) {
315                debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body");
316                self.client.report_bad_message(resp.peer_id());
317                return
318            }
319            self.body = Some(BodyResponse::Validated(resp.into_data()));
320            return
321        }
322        self.body = Some(BodyResponse::PendingValidation(resp));
323    }
324}
325
326impl<Client> Future for FetchFullBlockFuture<Client>
327where
328    Client: BlockClient<Header: BlockHeader + Sealable> + 'static,
329{
330    type Output = SealedBlock<Client::Block>;
331
332    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
333        let this = self.get_mut();
334
335        // preemptive yield point
336        let mut budget = 4;
337
338        loop {
339            match ready!(this.request.poll(cx)) {
340                ResponseResult::Header(res) => {
341                    match res {
342                        Ok(maybe_header) => {
343                            let (peer, maybe_header) =
344                                maybe_header.map(|h| h.map(SealedHeader::seal_slow)).split();
345                            if let Some(header) = maybe_header {
346                                if header.hash() == this.hash {
347                                    this.header = Some(header);
348                                } else {
349                                    debug!(target: "downloaders", expected=?this.hash, received=?header.hash(), "Received wrong header");
350                                    // received a different header than requested
351                                    this.client.report_bad_message(peer)
352                                }
353                            }
354                        }
355                        Err(err) => {
356                            debug!(target: "downloaders", %err, ?this.hash, "Header download failed");
357                        }
358                    }
359
360                    if this.header.is_none() {
361                        // received bad response
362                        this.request.header = Some(this.client.get_header(this.hash.into()));
363                    }
364                }
365                ResponseResult::Body(res) => {
366                    match res {
367                        Ok(maybe_body) => {
368                            if let Some(body) = maybe_body.transpose() {
369                                this.on_block_response(body);
370                            }
371                        }
372                        Err(err) => {
373                            debug!(target: "downloaders", %err, ?this.hash, "Body download failed");
374                        }
375                    }
376                    if this.body.is_none() {
377                        // received bad response
378                        this.request.body = Some(this.client.get_block_body(this.hash));
379                    }
380                }
381            }
382
383            if let Some(res) = this.take_block() {
384                return Poll::Ready(res)
385            }
386
387            // ensure we still have enough budget for another iteration
388            budget -= 1;
389            if budget == 0 {
390                // make sure we're woken up again
391                cx.waker().wake_by_ref();
392                return Poll::Pending
393            }
394        }
395    }
396}
397
398/// A future that downloads a full block and optionally its block access lists from the network.
399///
400/// This composes the existing full block downloader with a block access list request so the
401/// header/body logic stays centralized. Missing access lists do not block returning the full block.
402#[must_use = "futures do nothing unless polled"]
403pub struct FetchFullBlockWithBalFuture<Client>
404where
405    Client: BlockClient + BlockAccessListsClient,
406{
407    block: FetchFullBlockFuture<Client>,
408    block_result: Option<SealedBlock<Client::Block>>,
409    bal_request_state: BalRequestState<<Client as BlockAccessListsClient>::Output>,
410}
411
412impl<Client> FetchFullBlockWithBalFuture<Client>
413where
414    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
415{
416    /// Returns the hash of the block being requested.
417    pub const fn hash(&self) -> &B256 {
418        self.block.hash()
419    }
420}
421
422impl<Client> FetchFullBlockWithBalFuture<Client>
423where
424    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
425{
426    /// If the header request is already complete, this returns the block number.
427    pub fn block_number(&self) -> Option<u64> {
428        self.block_result.as_ref().map(|block| block.number()).or_else(|| self.block.block_number())
429    }
430
431    /// Polls the optional BAL request once and records its final best-effort result.
432    ///
433    /// Exactly one returned BAL entry is preserved. Empty responses, malformed responses, and
434    /// request errors resolve to `None` instead of being retried, so a BAL failure cannot block
435    /// returning the downloaded block.
436    fn poll_bal_request(&mut self, cx: &mut Context<'_>) -> Poll<()> {
437        let res = match &mut self.bal_request_state {
438            BalRequestState::Pending(fut) => ready!(fut.poll_unpin(cx)),
439            BalRequestState::Ready(_) => return Poll::Ready(()),
440        };
441
442        match res {
443            Ok(bal) => {
444                let (peer, access_lists) = bal.split();
445                match access_lists.0.len() {
446                    0 => self.bal_request_state = BalRequestState::Ready(None),
447                    1 => {
448                        let access_list = access_lists.0.into_iter().next().expect("len checked");
449                        self.bal_request_state =
450                            BalRequestState::Ready(Some(WithPeerId::new(peer, access_list)));
451                    }
452                    received => {
453                        debug!(
454                            target: "downloaders",
455                            hash = ?self.block.hash(),
456                            expected = 1,
457                            received,
458                            "Received wrong access list response",
459                        );
460                        self.block.client.report_bad_message(peer);
461                        self.bal_request_state = BalRequestState::Ready(None);
462                    }
463                }
464            }
465            Err(err) => {
466                debug!(
467                    target: "downloaders",
468                    %err,
469                    hash = ?self.block.hash(),
470                    "Access list download failed",
471                );
472                self.bal_request_state = BalRequestState::Ready(None);
473            }
474        }
475
476        Poll::Ready(())
477    }
478
479    /// Returns the block once the block download and optional BAL lookup have both completed.
480    ///
481    /// The BAL lookup must be ready even when it resolved to `None`, which prevents a fast block
482    /// response from racing a still-pending BAL response and incorrectly dropping available BAL
483    /// data.
484    fn take_block_and_access_lists(&mut self) -> Option<SealedBlockWithAccessList<Client::Block>> {
485        let BalRequestState::Ready(access_list) = &mut self.bal_request_state else { return None };
486        let block = self.block_result.take()?;
487        let access_list = access_list.take().and_then(|access_list| {
488            match seal_block_access_list_for_block(&block, access_list) {
489                Ok(access_list) => access_list,
490                Err(peer) => {
491                    self.block.client.report_bad_message(peer);
492                    None
493                }
494            }
495        });
496        Some(SealedBlockWith::new(block, access_list))
497    }
498}
499
500impl<Client> Future for FetchFullBlockWithBalFuture<Client>
501where
502    Client: BlockClient<Header: BlockHeader + Sealable> + BlockAccessListsClient + 'static,
503{
504    type Output = SealedBlockWithAccessList<Client::Block>;
505
506    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
507        let this = self.get_mut();
508
509        if this.block_result.is_none() &&
510            let Poll::Ready(block) = this.block.poll_unpin(cx)
511        {
512            this.block_result = Some(block);
513        }
514
515        ready!(this.poll_bal_request(cx));
516
517        if let Some(res) = this.take_block_and_access_lists() {
518            return Poll::Ready(res)
519        }
520
521        Poll::Pending
522    }
523}
524
525impl<Client> Debug for FetchFullBlockWithBalFuture<Client>
526where
527    Client: BlockClient<Header: BlockHeader> + BlockAccessListsClient,
528{
529    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
530        f.debug_struct("FetchFullBlockWithBalFuture")
531            .field("hash", &self.block.hash())
532            .field("block_ready", &self.block_result.is_some())
533            .field("bal_request_ready", &self.bal_request_state.is_ready())
534            .finish()
535    }
536}
537
538/// Tracks the BAL request and its completed result.
539enum BalRequestState<Req> {
540    Pending(Req),
541    Ready(Option<WithPeerId<Option<Bytes>>>),
542}
543
544impl<Req> BalRequestState<Req> {
545    const fn is_ready(&self) -> bool {
546        matches!(self, Self::Ready(_))
547    }
548}
549
550/// A future that downloads a range of full blocks and optionally their block access lists from the
551/// network.
552///
553/// This composes the existing full block range downloader with a follow-up optional block
554/// access-list request, so callers that only need blocks can keep using
555/// [`FetchFullBlockRangeFuture`].
556#[must_use = "futures do nothing unless polled"]
557#[expect(missing_debug_implementations)]
558pub struct FetchFullBlockRangeWithBalFuture<Client>
559where
560    Client: BlockClient + BlockAccessListsClient,
561{
562    blocks: FetchFullBlockRangeFuture<Client>,
563    client: Client,
564    block_result: Option<Vec<SealedBlock<Client::Block>>>,
565    access_lists: OptionalBlockAccessListsState<<Client as BlockAccessListsClient>::Output>,
566}
567
568impl<Client> FetchFullBlockRangeWithBalFuture<Client>
569where
570    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
571        + BlockAccessListsClient,
572{
573    fn start_access_lists_request_if_possible(&mut self) {
574        let requirement = match &self.access_lists {
575            OptionalBlockAccessListsState::WaitingForBlocks { requirement } => *requirement,
576            OptionalBlockAccessListsState::Pending(_) | OptionalBlockAccessListsState::Ready(_) => {
577                return
578            }
579        };
580
581        // BALs are requested by block hash, so wait until the block range is fully assembled.
582        let Some(blocks) = self.block_result.as_ref() else { return };
583        let hashes = blocks.iter().map(|block| block.hash()).collect::<Vec<_>>();
584        self.access_lists = OptionalBlockAccessListsState::Pending(
585            self.client.get_block_access_lists_with_requirement(hashes, requirement),
586        );
587    }
588
589    /// Starts and polls the optional BAL request once, if it is ready to make progress.
590    fn poll_access_lists(&mut self, cx: &mut Context<'_>) {
591        self.start_access_lists_request_if_possible();
592
593        let poll = match &mut self.access_lists {
594            OptionalBlockAccessListsState::Pending(fut) => fut.poll_unpin(cx),
595            OptionalBlockAccessListsState::WaitingForBlocks { .. } |
596            OptionalBlockAccessListsState::Ready(_) => return,
597        };
598
599        match poll {
600            Poll::Pending => {}
601            Poll::Ready(Ok(access_lists)) => {
602                self.access_lists = OptionalBlockAccessListsState::Ready(Some(access_lists));
603            }
604            Poll::Ready(Err(err)) => {
605                debug!(
606                    target: "downloaders",
607                    %err,
608                    start_hash = ?self.blocks.start_hash(),
609                    "Access list range download failed",
610                );
611
612                // Optional BAL lookup is best-effort: missing eth/71 support or request failures
613                // should not block returning the downloaded block range.
614                self.access_lists = OptionalBlockAccessListsState::Ready(None);
615            }
616        }
617    }
618
619    /// Returns the block range once blocks and the optional BAL lookup are both complete.
620    fn take_response(&mut self) -> Option<Vec<SealedBlockWithAccessList<Client::Block>>> {
621        let OptionalBlockAccessListsState::Ready(access_lists) = &mut self.access_lists else {
622            return None
623        };
624
625        let blocks = self.block_result.take()?;
626
627        Some(seal_blocks_with_access_lists(&self.client, blocks, access_lists.take()))
628    }
629}
630
631impl<Client> Future for FetchFullBlockRangeWithBalFuture<Client>
632where
633    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>
634        + BlockAccessListsClient
635        + 'static,
636{
637    type Output = Vec<SealedBlockWithAccessList<Client::Block>>;
638
639    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
640        let this = self.get_mut();
641
642        // Complete the normal block range first, then issue a separate BAL hash-list request.
643        if this.block_result.is_none() &&
644            let Poll::Ready(blocks) = this.blocks.poll_unpin(cx)
645        {
646            this.block_result = Some(blocks);
647        }
648
649        this.poll_access_lists(cx);
650
651        if let Some(response) = this.take_response() {
652            return Poll::Ready(response)
653        }
654
655        Poll::Pending
656    }
657}
658
659/// Tracks an optional BAL range request and its completed result.
660enum OptionalBlockAccessListsState<Req> {
661    /// The block hashes needed for `GetBlockAccessLists` are not known yet.
662    WaitingForBlocks {
663        /// The BAL availability policy for the eventual request.
664        requirement: BalRequirement,
665    },
666    /// A `GetBlockAccessLists` request is in flight.
667    Pending(Req),
668    /// `None` means the block range is available but optional BAL data is not.
669    Ready(Option<WithPeerId<BlockAccessLists>>),
670}
671
672impl<Client> Debug for FetchFullBlockFuture<Client>
673where
674    Client: BlockClient<Header: Debug, Body: Debug>,
675{
676    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
677        f.debug_struct("FetchFullBlockFuture")
678            .field("hash", &self.hash)
679            .field("header", &self.header)
680            .field("body", &self.body)
681            .finish()
682    }
683}
684
685struct FullBlockRequest<Client>
686where
687    Client: BlockClient,
688{
689    header: Option<SingleHeaderRequest<<Client as HeadersClient>::Output>>,
690    body: Option<SingleBodyRequest<<Client as BodiesClient>::Output>>,
691}
692
693impl<Client> FullBlockRequest<Client>
694where
695    Client: BlockClient,
696{
697    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<ResponseResult<Client::Header, Client::Body>> {
698        if let Some(fut) = Pin::new(&mut self.header).as_pin_mut() &&
699            let Poll::Ready(res) = fut.poll(cx)
700        {
701            self.header = None;
702            return Poll::Ready(ResponseResult::Header(res))
703        }
704
705        if let Some(fut) = Pin::new(&mut self.body).as_pin_mut() &&
706            let Poll::Ready(res) = fut.poll(cx)
707        {
708            self.body = None;
709            return Poll::Ready(ResponseResult::Body(res))
710        }
711
712        Poll::Pending
713    }
714}
715
716/// The result of a request for a single header or body. This is yielded by the `FullBlockRequest`
717/// future.
718enum ResponseResult<H, B> {
719    Header(PeerRequestResult<Option<H>>),
720    Body(PeerRequestResult<Option<B>>),
721}
722
723/// The response of a body request.
724#[derive(Debug)]
725enum BodyResponse<B> {
726    /// Already validated against transaction root of header
727    Validated(B),
728    /// Still needs to be validated against header
729    PendingValidation(WithPeerId<B>),
730}
731/// A future that downloads a range of full blocks from the network.
732///
733/// This first fetches the headers for the given range using the inner `Client`. Once the request
734/// is complete, it will fetch the bodies for the headers it received.
735///
736/// Once the bodies request completes, the [`SealedBlock`]s will be assembled and the future will
737/// yield the full block range.
738///
739/// The full block range will be returned with falling block numbers, i.e. in descending order.
740///
741/// NOTE: this assumes that bodies responses are returned by the client in the same order as the
742/// hash array used to request them.
743#[must_use = "futures do nothing unless polled"]
744#[expect(missing_debug_implementations)]
745pub struct FetchFullBlockRangeFuture<Client>
746where
747    Client: BlockClient,
748{
749    /// The client used to fetch headers and bodies.
750    client: Client,
751    /// The consensus instance used to validate the blocks.
752    consensus: Arc<dyn Consensus<Client::Block>>,
753    /// The block hash to start fetching from (inclusive).
754    start_hash: B256,
755    /// How many blocks to fetch: `len([start_hash, ..]) == count`
756    count: u64,
757    /// Requests for headers and bodies that are in progress.
758    request: FullBlockRangeRequest<Client>,
759    /// Fetched headers.
760    headers: Option<Vec<SealedHeader<Client::Header>>>,
761    /// The next headers to request bodies for. This is drained as responses are received.
762    pending_headers: VecDeque<SealedHeader<Client::Header>>,
763    /// The bodies that have been received so far.
764    bodies: HashMap<SealedHeader<Client::Header>, BodyResponse<Client::Body>>,
765}
766
767impl<Client> FetchFullBlockRangeFuture<Client>
768where
769    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq>,
770{
771    /// Returns whether or not the bodies map is fully populated with requested headers and bodies.
772    fn is_bodies_complete(&self) -> bool {
773        self.bodies.len() == self.count as usize
774    }
775
776    /// Inserts a block body, matching it with the `next_header`.
777    ///
778    /// Note: this assumes the response matches the next header in the queue.
779    fn insert_body(&mut self, body_response: BodyResponse<Client::Body>) {
780        if let Some(header) = self.pending_headers.pop_front() {
781            self.bodies.insert(header, body_response);
782        }
783    }
784
785    /// Inserts multiple block bodies.
786    fn insert_bodies(&mut self, bodies: impl IntoIterator<Item = BodyResponse<Client::Body>>) {
787        for body in bodies {
788            self.insert_body(body);
789        }
790    }
791
792    /// Returns the remaining hashes for the bodies request, based on the headers that still exist
793    /// in the `root_map`.
794    fn remaining_bodies_hashes(&self) -> Vec<B256> {
795        self.pending_headers.iter().map(|h| h.hash()).collect()
796    }
797
798    /// Returns the [`SealedBlock`]s if the request is complete and valid.
799    ///
800    /// The request is complete if the number of blocks requested is equal to the number of blocks
801    /// received. The request is valid if the returned bodies match the roots in the headers.
802    ///
803    /// These are returned in falling order starting with the requested `hash`, i.e. with
804    /// descending block numbers.
805    fn take_blocks(&mut self) -> Option<Vec<SealedBlock<Client::Block>>> {
806        if !self.is_bodies_complete() {
807            // not done with bodies yet
808            return None
809        }
810
811        let headers = self.headers.take()?;
812        let mut needs_retry = false;
813        let mut valid_responses = Vec::new();
814
815        for header in &headers {
816            if let Some(body_resp) = self.bodies.remove(header) {
817                // validate body w.r.t. the hashes in the header, only inserting into the response
818                let body = match body_resp {
819                    BodyResponse::Validated(body) => body,
820                    BodyResponse::PendingValidation(resp) => {
821                        // ensure the block is valid, else retry
822                        if let Err(err) =
823                            self.consensus.validate_body_against_header(resp.data(), header)
824                        {
825                            debug!(target: "downloaders", %err, hash=?header.hash(), "Received wrong body in range response");
826                            self.client.report_bad_message(resp.peer_id());
827
828                            // get body that doesn't match, put back into vecdeque, and retry it
829                            self.pending_headers.push_back(header.clone());
830                            needs_retry = true;
831                            continue
832                        }
833
834                        resp.into_data()
835                    }
836                };
837
838                valid_responses
839                    .push(SealedBlock::<Client::Block>::from_sealed_parts(header.clone(), body));
840            }
841        }
842
843        if needs_retry {
844            // put response hashes back into bodies map since we aren't returning them as a
845            // response
846            for block in valid_responses {
847                let (header, body) = block.split_sealed_header_body();
848                self.bodies.insert(header, BodyResponse::Validated(body));
849            }
850
851            // put headers back since they were `take`n before
852            self.headers = Some(headers);
853
854            // create response for failing bodies
855            let hashes = self.remaining_bodies_hashes();
856            self.request.bodies = Some(self.client.get_block_bodies(hashes));
857            return None
858        }
859
860        Some(valid_responses)
861    }
862
863    fn on_headers_response(&mut self, headers: WithPeerId<Vec<Client::Header>>) {
864        let (peer, mut headers_falling) =
865            headers.map(|h| h.into_iter().map(SealedHeader::seal_slow).collect::<Vec<_>>()).split();
866
867        // fill in the response if it's the correct length
868        if headers_falling.len() == self.count as usize {
869            // sort headers from highest to lowest block number
870            headers_falling.sort_unstable_by_key(|h| Reverse(h.number()));
871
872            // check the starting hash
873            if headers_falling[0].hash() == self.start_hash {
874                let headers_rising = headers_falling.iter().rev().cloned().collect::<Vec<_>>();
875                // check if the downloaded headers are valid
876                if let Err(err) = self.consensus.validate_header_range(&headers_rising) {
877                    debug!(target: "downloaders", %err, ?self.start_hash, "Received bad header response");
878                    self.client.report_bad_message(peer);
879                }
880
881                // get the bodies request so it can be polled later
882                let hashes = headers_falling.iter().map(|h| h.hash()).collect::<Vec<_>>();
883
884                // populate the pending headers
885                self.pending_headers = headers_falling.clone().into();
886
887                // set the actual request if it hasn't been started yet
888                if !self.has_bodies_request_started() {
889                    // request the bodies for the downloaded headers
890                    self.request.bodies = Some(self.client.get_block_bodies(hashes));
891                }
892
893                // set the headers response
894                self.headers = Some(headers_falling);
895            } else {
896                // received a different header than requested
897                self.client.report_bad_message(peer);
898            }
899        }
900    }
901
902    /// Returns whether or not a bodies request has been started, returning false if there is no
903    /// pending request.
904    const fn has_bodies_request_started(&self) -> bool {
905        self.request.bodies.is_some()
906    }
907
908    /// Returns the start hash for the request
909    pub const fn start_hash(&self) -> B256 {
910        self.start_hash
911    }
912
913    /// Returns the block count for the request
914    pub const fn count(&self) -> u64 {
915        self.count
916    }
917}
918
919impl<Client> Future for FetchFullBlockRangeFuture<Client>
920where
921    Client: BlockClient<Header: Debug + BlockHeader + Sealable + Clone + Hash + Eq> + 'static,
922{
923    type Output = Vec<SealedBlock<Client::Block>>;
924
925    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
926        let this = self.get_mut();
927
928        loop {
929            match ready!(this.request.poll(cx)) {
930                // This branch handles headers responses from peers - it first ensures that the
931                // starting hash and number of headers matches what we requested.
932                //
933                // If these don't match, we penalize the peer and retry the request.
934                // If they do match, we sort the headers by block number and start the request for
935                // the corresponding block bodies.
936                //
937                // The next result that should be yielded by `poll` is the bodies response.
938                RangeResponseResult::Header(res) => {
939                    match res {
940                        Ok(headers) => {
941                            this.on_headers_response(headers);
942                        }
943                        Err(err) => {
944                            debug!(target: "downloaders", %err, ?this.start_hash, "Header range download failed");
945                        }
946                    }
947
948                    if this.headers.is_none() {
949                        // did not receive a correct response yet, retry
950                        this.request.headers = Some(this.client.get_headers(HeadersRequest {
951                            start: this.start_hash.into(),
952                            limit: this.count,
953                            direction: HeadersDirection::Falling,
954                        }));
955                    }
956                }
957                // This branch handles block body responses from peers - it first inserts the
958                // bodies into the `bodies` map, and then checks if the request is complete.
959                //
960                // If the request is not complete, and we need to request more bodies, we send
961                // a bodies request for the headers we don't yet have bodies for.
962                RangeResponseResult::Body(res) => {
963                    match res {
964                        Ok(bodies_resp) => {
965                            let (peer, new_bodies) = bodies_resp.split();
966
967                            // first insert the received bodies
968                            this.insert_bodies(
969                                new_bodies
970                                    .into_iter()
971                                    .map(|resp| WithPeerId::new(peer, resp))
972                                    .map(BodyResponse::PendingValidation),
973                            );
974
975                            if !this.is_bodies_complete() {
976                                // get remaining hashes so we can send the next request
977                                let req_hashes = this.remaining_bodies_hashes();
978
979                                // set a new request
980                                this.request.bodies = Some(this.client.get_block_bodies(req_hashes))
981                            }
982                        }
983                        Err(err) => {
984                            debug!(target: "downloaders", %err, ?this.start_hash, "Body range download failed");
985                        }
986                    }
987                    if this.request.bodies.is_none() && !this.is_bodies_complete() {
988                        // no pending bodies request (e.g., request error), retry remaining bodies
989                        // TODO: convert this into two futures, one which is a headers range
990                        // future, and one which is a bodies range future.
991                        //
992                        // The headers range future should yield the bodies range future.
993                        // The bodies range future should not have an Option<Vec<B256>>, it should
994                        // have a populated Vec<B256> from the successful headers range future.
995                        //
996                        // This is optimal because we can not send a bodies request without
997                        // first completing the headers request. This way we can get rid of the
998                        // following `if let Some`. A bodies request should never be sent before
999                        // the headers request completes, so this should always be `Some` anyways.
1000                        let hashes = this.remaining_bodies_hashes();
1001                        if !hashes.is_empty() {
1002                            this.request.bodies = Some(this.client.get_block_bodies(hashes));
1003                        }
1004                    }
1005                }
1006            }
1007
1008            if let Some(res) = this.take_blocks() {
1009                return Poll::Ready(res)
1010            }
1011        }
1012    }
1013}
1014
1015/// A request for a range of full blocks. Polling this will poll the inner headers and bodies
1016/// futures until they return responses. It will return either the header or body result, depending
1017/// on which future successfully returned.
1018struct FullBlockRangeRequest<Client>
1019where
1020    Client: BlockClient,
1021{
1022    headers: Option<<Client as HeadersClient>::Output>,
1023    bodies: Option<<Client as BodiesClient>::Output>,
1024}
1025
1026impl<Client> FullBlockRangeRequest<Client>
1027where
1028    Client: BlockClient,
1029{
1030    fn poll(
1031        &mut self,
1032        cx: &mut Context<'_>,
1033    ) -> Poll<RangeResponseResult<Client::Header, Client::Body>> {
1034        if let Some(fut) = Pin::new(&mut self.headers).as_pin_mut() &&
1035            let Poll::Ready(res) = fut.poll(cx)
1036        {
1037            self.headers = None;
1038            return Poll::Ready(RangeResponseResult::Header(res))
1039        }
1040
1041        if let Some(fut) = Pin::new(&mut self.bodies).as_pin_mut() &&
1042            let Poll::Ready(res) = fut.poll(cx)
1043        {
1044            self.bodies = None;
1045            return Poll::Ready(RangeResponseResult::Body(res))
1046        }
1047
1048        Poll::Pending
1049    }
1050}
1051
1052// The result of a request for headers or block bodies. This is yielded by the
1053// `FullBlockRangeRequest` future.
1054enum RangeResponseResult<H, B> {
1055    Header(PeerRequestResult<Vec<H>>),
1056    Body(PeerRequestResult<Vec<B>>),
1057}
1058
1059/// A headers+bodies client implementation that does nothing.
1060#[derive(Debug, Clone)]
1061#[non_exhaustive]
1062pub struct NoopFullBlockClient<Net = EthNetworkPrimitives>(PhantomData<Net>);
1063
1064/// Implements the `DownloadClient` trait for the `NoopFullBlockClient` struct.
1065impl<Net> DownloadClient for NoopFullBlockClient<Net>
1066where
1067    Net: Debug + Send + Sync,
1068{
1069    /// Reports a bad message received from a peer.
1070    ///
1071    /// # Arguments
1072    ///
1073    /// * `_peer_id` - Identifier for the peer sending the bad message (unused in this
1074    ///   implementation).
1075    fn report_bad_message(&self, _peer_id: PeerId) {}
1076
1077    /// Retrieves the number of connected peers.
1078    ///
1079    /// # Returns
1080    ///
1081    /// The number of connected peers, which is always zero in this implementation.
1082    fn num_connected_peers(&self) -> usize {
1083        0
1084    }
1085}
1086
1087/// Implements the `BodiesClient` trait for the `NoopFullBlockClient` struct.
1088impl<Net> BodiesClient for NoopFullBlockClient<Net>
1089where
1090    Net: NetworkPrimitives,
1091{
1092    type Body = Net::BlockBody;
1093    /// Defines the output type of the function.
1094    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Body>>>;
1095
1096    /// Retrieves block bodies based on provided hashes and priority.
1097    ///
1098    /// # Arguments
1099    ///
1100    /// * `_hashes` - A vector of block hashes (unused in this implementation).
1101    /// * `_priority` - Priority level for block body retrieval (unused in this implementation).
1102    ///
1103    /// # Returns
1104    ///
1105    /// A future containing an empty vector of block bodies and a randomly generated `PeerId`.
1106    fn get_block_bodies_with_priority_and_range_hint(
1107        &self,
1108        _hashes: Vec<B256>,
1109        _priority: Priority,
1110        _range_hint: Option<RangeInclusive<u64>>,
1111    ) -> Self::Output {
1112        // Create a future that immediately returns an empty vector of block bodies and a random
1113        // PeerId.
1114        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1115    }
1116}
1117
1118impl<Net> HeadersClient for NoopFullBlockClient<Net>
1119where
1120    Net: NetworkPrimitives,
1121{
1122    type Header = Net::BlockHeader;
1123    /// The output type representing a future containing a peer request result with a vector of
1124    /// headers.
1125    type Output = futures::future::Ready<PeerRequestResult<Vec<Self::Header>>>;
1126
1127    /// Retrieves headers with a specified priority level.
1128    ///
1129    /// This implementation does nothing and returns an empty vector of headers.
1130    ///
1131    /// # Arguments
1132    ///
1133    /// * `_request` - A request for headers (unused in this implementation).
1134    /// * `_priority` - The priority level for the headers request (unused in this implementation).
1135    ///
1136    /// # Returns
1137    ///
1138    /// Always returns a ready future with an empty vector of headers wrapped in a
1139    /// `PeerRequestResult`.
1140    fn get_headers_with_priority(
1141        &self,
1142        _request: HeadersRequest,
1143        _priority: Priority,
1144    ) -> Self::Output {
1145        futures::future::ready(Ok(WithPeerId::new(PeerId::random(), vec![])))
1146    }
1147}
1148
1149impl<Net> BlockClient for NoopFullBlockClient<Net>
1150where
1151    Net: NetworkPrimitives,
1152{
1153    type Block = Net::Block;
1154}
1155
1156impl<Net> Default for NoopFullBlockClient<Net> {
1157    fn default() -> Self {
1158        Self(PhantomData::<Net>)
1159    }
1160}
1161
1162/// Validates one raw block access-list entry against the block's access-list hash.
1163///
1164/// Returns `Ok(Some(_))` for a matching entry, `Ok(None)` when the block has no access-list hash
1165/// or the peer returned an unavailable entry, and `Err(peer)` for a hash mismatch that should be
1166/// reported as a bad message.
1167fn seal_block_access_list_for_block<B: Block>(
1168    block: &SealedBlock<B>,
1169    access_list: WithPeerId<Option<Bytes>>,
1170) -> Result<Option<Sealed<Bytes>>, PeerId> {
1171    let Some(expected) = block.header().block_access_list_hash() else { return Ok(None) };
1172
1173    let (peer, access_list) = access_list.split();
1174    let Some(access_list) = access_list else { return Ok(None) };
1175    let computed = keccak256(access_list.as_ref());
1176    if computed == expected {
1177        return Ok(Some(Sealed::new_unchecked(access_list, expected)))
1178    }
1179
1180    debug!(
1181        target: "downloaders",
1182        block_hash = ?block.hash(),
1183        ?computed,
1184        ?expected,
1185        "Received block access list with wrong hash",
1186    );
1187    Err(peer)
1188}
1189
1190/// Wraps a block range with validated block access-list entries.
1191///
1192/// Short responses are treated as a valid prefix and the remaining blocks receive `None`.
1193/// Responses longer than the requested block range return the full block range without access-list
1194/// data. Non-empty hash mismatches stop accepting access-list data after reporting the peer, so any
1195/// already validated prefix is preserved.
1196fn seal_blocks_with_access_lists<Client>(
1197    client: &Client,
1198    blocks: Vec<SealedBlock<Client::Block>>,
1199    access_lists: Option<WithPeerId<BlockAccessLists>>,
1200) -> Vec<SealedBlockWithAccessList<Client::Block>>
1201where
1202    Client: BlockClient,
1203{
1204    let Some(access_lists) = access_lists else {
1205        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1206    };
1207
1208    let (peer, access_lists) = access_lists.split();
1209    let expected = blocks.len();
1210    let received = access_lists.0.len();
1211
1212    if received > expected {
1213        trace!(
1214            target: "downloaders",
1215            expected,
1216            received,
1217            "Ignoring overlong access list range response",
1218        );
1219        return blocks.into_iter().map(SealedBlockWith::from_block).collect()
1220    }
1221
1222    let mut access_lists = access_lists.0.into_iter();
1223    let mut blocks = blocks.into_iter();
1224    let mut response = Vec::with_capacity(expected);
1225
1226    for block in blocks.by_ref() {
1227        let Some(access_list) = access_lists.next() else {
1228            // Short BAL responses are valid; the current block and all remaining blocks are
1229            // returned without access-list data below.
1230            response.push(SealedBlockWith::from_block(block));
1231            break
1232        };
1233
1234        match seal_block_access_list_for_block(&block, WithPeerId::new(peer, access_list)) {
1235            Ok(access_list) => response.push(SealedBlockWith::new(block, access_list)),
1236            Err(peer) => {
1237                // A hash mismatch means this BAL entry is not for the current block. Stop matching
1238                // later positional entries and return the rest of the range without BAL data.
1239                client.report_bad_message(peer);
1240                response.push(SealedBlockWith::from_block(block));
1241                break
1242            }
1243        }
1244    }
1245
1246    response.extend(blocks.map(SealedBlockWith::from_block));
1247    response
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252    use reth_ethereum_primitives::BlockBody;
1253
1254    use super::*;
1255    use crate::{error::RequestError, test_utils::TestFullBlockClient};
1256    use alloy_consensus::Header;
1257    use alloy_primitives::{keccak256, Bytes};
1258    use parking_lot::Mutex;
1259    use std::{
1260        collections::HashMap,
1261        ops::Range,
1262        sync::{
1263            atomic::{AtomicBool, AtomicUsize, Ordering},
1264            Arc,
1265        },
1266    };
1267
1268    const EMPTY_LIST_CODE: u8 = 0xc0;
1269    use tokio::time::{timeout, Duration};
1270
1271    fn sealed_access_list(access_list: Bytes) -> Sealed<Bytes> {
1272        let hash = keccak256(access_list.as_ref());
1273        Sealed::new_unchecked(access_list, hash)
1274    }
1275
1276    fn sealed_header_with_access_list_hash(access_list: &Bytes) -> SealedHeader {
1277        let header = Header {
1278            block_access_list_hash: Some(keccak256(access_list.as_ref())),
1279            ..Default::default()
1280        };
1281        SealedHeader::seal_slow(header)
1282    }
1283
1284    fn range_access_lists<B: Block>(
1285        blocks: &[SealedBlockWithAccessList<B>],
1286    ) -> Vec<Option<Sealed<Bytes>>> {
1287        blocks.iter().map(|block| block.access_list().cloned()).collect()
1288    }
1289
1290    #[tokio::test]
1291    async fn download_single_full_block() {
1292        let client = TestFullBlockClient::default();
1293        let header: SealedHeader = SealedHeader::default();
1294        let body = BlockBody::default();
1295        client.insert(header.clone(), body.clone());
1296        let client = FullBlockClient::test_client(client);
1297
1298        let received = client.get_full_block(header.hash()).await;
1299        assert_eq!(received, SealedBlock::from_sealed_parts(header, body));
1300    }
1301
1302    #[tokio::test]
1303    async fn download_single_full_block_range() {
1304        let client = TestFullBlockClient::default();
1305        let header: SealedHeader = SealedHeader::default();
1306        let body = BlockBody::default();
1307        client.insert(header.clone(), body.clone());
1308        let client = FullBlockClient::test_client(client);
1309
1310        let received = client.get_full_block_range(header.hash(), 1).await;
1311        let received = received.first().expect("response should include a block");
1312        assert_eq!(*received, SealedBlock::from_sealed_parts(header, body));
1313    }
1314
1315    #[tokio::test]
1316    async fn download_single_full_block_with_access_lists() {
1317        let client = FullBlockWithAccessListsClient::default();
1318        let body = BlockBody::default();
1319        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1320        let header = sealed_header_with_access_list_hash(&access_list);
1321        client.insert(header.clone(), body.clone(), access_list.clone());
1322
1323        let request_count = Arc::clone(&client.access_list_requests);
1324        let client = FullBlockClient::test_client(client);
1325
1326        let received = client.get_full_block_with_access_lists(header.hash()).await;
1327        let expected_access_list = sealed_access_list(access_list);
1328
1329        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1330        assert_eq!(received.access_list(), Some(&expected_access_list));
1331        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1332    }
1333
1334    #[tokio::test]
1335    async fn download_single_full_block_with_access_lists_uses_requested_requirement() {
1336        let client = FullBlockWithAccessListsClient::default();
1337        let body = BlockBody::default();
1338        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1339        let header = sealed_header_with_access_list_hash(&access_list);
1340        client.insert(header.clone(), body.clone(), access_list.clone());
1341
1342        let requirement = Arc::clone(&client.last_access_list_requirement);
1343        let client = FullBlockClient::test_client(client);
1344
1345        let received = client
1346            .get_full_block_with_access_lists_with_requirement(
1347                header.hash(),
1348                BalRequirement::Mandatory,
1349            )
1350            .await;
1351
1352        let expected_access_list = sealed_access_list(access_list);
1353        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1354        assert_eq!(received.access_list(), Some(&expected_access_list));
1355        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1356    }
1357
1358    #[tokio::test]
1359    async fn download_single_full_block_with_access_lists_waits_for_pending_access_lists() {
1360        let client = FullBlockWithAccessListsClient::default();
1361        client.set_access_list_pending_polls(1);
1362
1363        let body = BlockBody::default();
1364        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1365        let header = sealed_header_with_access_list_hash(&access_list);
1366        client.insert(header.clone(), body.clone(), access_list.clone());
1367
1368        let request_count = Arc::clone(&client.access_list_requests);
1369        let client = FullBlockClient::test_client(client);
1370
1371        let received =
1372            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1373                .await
1374                .expect("access list request should complete");
1375
1376        let expected_access_list = sealed_access_list(access_list);
1377        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1378        assert_eq!(received.access_list(), Some(&expected_access_list));
1379        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1380    }
1381
1382    #[tokio::test]
1383    async fn download_single_full_block_with_access_lists_rejects_wrong_hash() {
1384        let client = FullBlockWithAccessListsClient::default();
1385        let body = BlockBody::default();
1386        let expected_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1387        let wrong_access_list = Bytes::from_static(&[0xc1, 0x01]);
1388        let header = sealed_header_with_access_list_hash(&expected_access_list);
1389        client.insert(header.clone(), body.clone(), wrong_access_list);
1390
1391        let bad_messages = Arc::clone(&client.bad_messages);
1392        let client = FullBlockClient::test_client(client);
1393
1394        let received =
1395            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1396                .await
1397                .expect("block request should complete without access lists");
1398
1399        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1400        assert!(received.access_list().is_none());
1401        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1402    }
1403
1404    #[tokio::test]
1405    async fn download_single_full_block_with_access_lists_treats_none_as_unavailable() {
1406        let client = FullBlockWithAccessListsClient::default();
1407        let body = BlockBody::default();
1408        let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1409        let header = sealed_header_with_access_list_hash(&expected_access_list);
1410        client.inner.insert(header.clone(), body.clone());
1411
1412        let bad_messages = Arc::clone(&client.bad_messages);
1413        let client = FullBlockClient::test_client(client);
1414
1415        let received =
1416            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1417                .await
1418                .expect("block request should complete without access lists");
1419
1420        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1421        assert!(received.access_list().is_none());
1422        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1423    }
1424
1425    #[tokio::test]
1426    async fn download_single_full_block_with_access_lists_rejects_wrong_empty_list() {
1427        let client = FullBlockWithAccessListsClient::default();
1428        let body = BlockBody::default();
1429        let expected_access_list = Bytes::from_static(&[0xc1, 0x01]);
1430        let wrong_empty_access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1431        let header = sealed_header_with_access_list_hash(&expected_access_list);
1432        client.insert(header.clone(), body.clone(), wrong_empty_access_list);
1433
1434        let bad_messages = Arc::clone(&client.bad_messages);
1435        let client = FullBlockClient::test_client(client);
1436
1437        let received =
1438            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1439                .await
1440                .expect("block request should complete without access lists");
1441
1442        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1443        assert!(received.access_list().is_none());
1444        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
1445    }
1446
1447    #[tokio::test]
1448    async fn download_single_full_block_with_access_lists_returns_none_after_empty_response() {
1449        let client = FullBlockWithAccessListsClient::default();
1450        client.empty_first_response.store(true, Ordering::SeqCst);
1451
1452        let body = BlockBody::default();
1453        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1454        let header = sealed_header_with_access_list_hash(&access_list);
1455        client.insert(header.clone(), body.clone(), access_list.clone());
1456
1457        let request_count = Arc::clone(&client.access_list_requests);
1458        let bad_messages = Arc::clone(&client.bad_messages);
1459        let client = FullBlockClient::test_client(client);
1460
1461        let received =
1462            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1463                .await
1464                .expect("block request should complete without access lists");
1465
1466        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1467        assert!(received.access_list().is_none());
1468        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1469        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1470    }
1471
1472    #[tokio::test]
1473    async fn download_single_full_block_with_access_lists_returns_block_when_unavailable() {
1474        let client = FullBlockWithAccessListsClient::default();
1475        client.set_access_lists_unsupported(true);
1476
1477        let body = BlockBody::default();
1478        let access_list = Bytes::from_static(&[EMPTY_LIST_CODE]);
1479        let header = sealed_header_with_access_list_hash(&access_list);
1480        client.insert(header.clone(), body.clone(), access_list);
1481
1482        let request_count = Arc::clone(&client.access_list_requests);
1483        let requirement = Arc::clone(&client.last_access_list_requirement);
1484        let client = FullBlockClient::test_client(client);
1485
1486        let received =
1487            timeout(Duration::from_secs(1), client.get_full_block_with_access_lists(header.hash()))
1488                .await
1489                .expect("block request should complete without access lists");
1490
1491        assert_eq!(received.block(), &SealedBlock::from_sealed_parts(header, body));
1492        assert!(received.access_list().is_none());
1493        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1494        assert_eq!(
1495            *requirement.lock(),
1496            Some(BalRequirement::Optional),
1497            "single block BAL lookup should be best-effort"
1498        );
1499    }
1500
1501    /// Inserts headers and returns the last header and block body.
1502    fn insert_headers_into_client(
1503        client: &TestFullBlockClient,
1504        range: Range<usize>,
1505    ) -> (SealedHeader, BlockBody) {
1506        let mut sealed_header: SealedHeader = SealedHeader::default();
1507        let body = BlockBody::default();
1508        for _ in range {
1509            let (mut header, hash) = sealed_header.split();
1510            // update to the next header
1511            header.parent_hash = hash;
1512            header.number += 1;
1513
1514            sealed_header = SealedHeader::seal_slow(header);
1515
1516            client.insert(sealed_header.clone(), body.clone());
1517        }
1518
1519        (sealed_header, body)
1520    }
1521
1522    #[derive(Clone, Debug)]
1523    struct FullBlockWithAccessListsClient {
1524        inner: TestFullBlockClient,
1525        access_lists: Arc<Mutex<HashMap<B256, Bytes>>>,
1526        access_list_requests: Arc<AtomicUsize>,
1527        access_list_soft_limit: Arc<AtomicUsize>,
1528        access_list_pending_polls: Arc<AtomicUsize>,
1529        extra_access_list_entries: Arc<AtomicUsize>,
1530        unsupported_access_lists: Arc<AtomicBool>,
1531        last_access_list_requirement: Arc<Mutex<Option<BalRequirement>>>,
1532        bad_messages: Arc<AtomicUsize>,
1533        empty_first_response: Arc<AtomicBool>,
1534    }
1535
1536    impl Default for FullBlockWithAccessListsClient {
1537        fn default() -> Self {
1538            Self {
1539                inner: TestFullBlockClient::default(),
1540                access_lists: Arc::new(Mutex::new(HashMap::default())),
1541                access_list_requests: Arc::new(AtomicUsize::new(0)),
1542                access_list_soft_limit: Arc::new(AtomicUsize::new(usize::MAX)),
1543                access_list_pending_polls: Arc::new(AtomicUsize::new(0)),
1544                extra_access_list_entries: Arc::new(AtomicUsize::new(0)),
1545                unsupported_access_lists: Arc::new(AtomicBool::new(false)),
1546                last_access_list_requirement: Arc::new(Mutex::new(None)),
1547                bad_messages: Arc::new(AtomicUsize::new(0)),
1548                empty_first_response: Arc::new(AtomicBool::new(false)),
1549            }
1550        }
1551    }
1552
1553    impl FullBlockWithAccessListsClient {
1554        fn insert(&self, header: SealedHeader, body: BlockBody, access_list: Bytes) {
1555            self.inner.insert(header.clone(), body);
1556            self.access_lists.lock().insert(header.hash(), access_list);
1557        }
1558
1559        fn set_access_list_soft_limit(&self, limit: usize) {
1560            self.access_list_soft_limit.store(limit, Ordering::SeqCst);
1561        }
1562
1563        fn set_access_list_pending_polls(&self, polls: usize) {
1564            self.access_list_pending_polls.store(polls, Ordering::SeqCst);
1565        }
1566
1567        fn set_extra_access_list_entries(&self, count: usize) {
1568            self.extra_access_list_entries.store(count, Ordering::SeqCst);
1569        }
1570
1571        fn set_access_lists_unsupported(&self, unsupported: bool) {
1572            self.unsupported_access_lists.store(unsupported, Ordering::SeqCst);
1573        }
1574    }
1575
1576    /// Inserts headers with block access lists and returns the last header and block body.
1577    fn insert_headers_with_access_lists_into_client(
1578        client: &FullBlockWithAccessListsClient,
1579        range: Range<usize>,
1580    ) -> (SealedHeader, BlockBody) {
1581        let mut sealed_header: SealedHeader = SealedHeader::default();
1582        let body = BlockBody::default();
1583        for block_idx in range {
1584            let (mut header, hash) = sealed_header.split();
1585            header.parent_hash = hash;
1586            header.number += 1;
1587            let access_list = Bytes::from(vec![0xc1, block_idx as u8]);
1588            header.block_access_list_hash = Some(keccak256(access_list.as_ref()));
1589
1590            sealed_header = SealedHeader::seal_slow(header);
1591
1592            client.insert(sealed_header.clone(), body.clone(), access_list);
1593        }
1594
1595        (sealed_header, body)
1596    }
1597
1598    impl DownloadClient for FullBlockWithAccessListsClient {
1599        fn report_bad_message(&self, peer_id: PeerId) {
1600            self.bad_messages.fetch_add(1, Ordering::SeqCst);
1601            self.inner.report_bad_message(peer_id);
1602        }
1603
1604        fn num_connected_peers(&self) -> usize {
1605            self.inner.num_connected_peers()
1606        }
1607    }
1608
1609    impl HeadersClient for FullBlockWithAccessListsClient {
1610        type Header = <TestFullBlockClient as HeadersClient>::Header;
1611        type Output = <TestFullBlockClient as HeadersClient>::Output;
1612
1613        fn get_headers_with_priority(
1614            &self,
1615            request: HeadersRequest,
1616            priority: Priority,
1617        ) -> Self::Output {
1618            self.inner.get_headers_with_priority(request, priority)
1619        }
1620    }
1621
1622    impl BodiesClient for FullBlockWithAccessListsClient {
1623        type Body = <TestFullBlockClient as BodiesClient>::Body;
1624        type Output = <TestFullBlockClient as BodiesClient>::Output;
1625
1626        fn get_block_bodies_with_priority_and_range_hint(
1627            &self,
1628            hashes: Vec<B256>,
1629            priority: Priority,
1630            range_hint: Option<RangeInclusive<u64>>,
1631        ) -> Self::Output {
1632            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1633        }
1634    }
1635
1636    struct MaybePendingAccessLists {
1637        response: Option<PeerRequestResult<BlockAccessLists>>,
1638        pending_polls: usize,
1639    }
1640
1641    impl MaybePendingAccessLists {
1642        const fn new(response: PeerRequestResult<BlockAccessLists>, pending_polls: usize) -> Self {
1643            Self { response: Some(response), pending_polls }
1644        }
1645    }
1646
1647    impl std::future::Future for MaybePendingAccessLists {
1648        type Output = PeerRequestResult<BlockAccessLists>;
1649
1650        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1651            if self.pending_polls > 0 {
1652                self.pending_polls -= 1;
1653                cx.waker().wake_by_ref();
1654                return Poll::Pending
1655            }
1656
1657            Poll::Ready(self.response.take().expect("future polled after completion"))
1658        }
1659    }
1660
1661    impl BlockAccessListsClient for FullBlockWithAccessListsClient {
1662        type Output = MaybePendingAccessLists;
1663
1664        fn get_block_access_lists_with_priority_and_requirement(
1665            &self,
1666            hashes: Vec<B256>,
1667            _priority: Priority,
1668            requirement: BalRequirement,
1669        ) -> Self::Output {
1670            self.access_list_requests.fetch_add(1, Ordering::SeqCst);
1671            *self.last_access_list_requirement.lock() = Some(requirement);
1672            let pending_polls = self.access_list_pending_polls.swap(0, Ordering::SeqCst);
1673
1674            if self.unsupported_access_lists.load(Ordering::SeqCst) {
1675                return MaybePendingAccessLists::new(
1676                    Err(RequestError::UnsupportedCapability),
1677                    pending_polls,
1678                )
1679            }
1680
1681            if self.empty_first_response.swap(false, Ordering::SeqCst) {
1682                return MaybePendingAccessLists::new(
1683                    Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(Vec::new()))),
1684                    pending_polls,
1685                )
1686            }
1687
1688            let mut access_lists: Vec<_> = hashes
1689                .into_iter()
1690                .take(self.access_list_soft_limit.load(Ordering::SeqCst))
1691                .map(|hash| self.access_lists.lock().get(&hash).cloned())
1692                .collect();
1693            for _ in 0..self.extra_access_list_entries.load(Ordering::SeqCst) {
1694                access_lists.push(None);
1695            }
1696
1697            MaybePendingAccessLists::new(
1698                Ok(WithPeerId::new(PeerId::random(), BlockAccessLists(access_lists))),
1699                pending_polls,
1700            )
1701        }
1702    }
1703
1704    impl BlockClient for FullBlockWithAccessListsClient {
1705        type Block = reth_ethereum_primitives::Block;
1706    }
1707
1708    #[derive(Clone, Debug)]
1709    struct FailingBodiesClient {
1710        inner: TestFullBlockClient,
1711        fail_on: usize,
1712        body_requests: Arc<AtomicUsize>,
1713    }
1714
1715    impl FailingBodiesClient {
1716        fn new(inner: TestFullBlockClient, fail_on: usize) -> Self {
1717            Self { inner, fail_on, body_requests: Arc::new(AtomicUsize::new(0)) }
1718        }
1719    }
1720
1721    impl DownloadClient for FailingBodiesClient {
1722        fn report_bad_message(&self, peer_id: PeerId) {
1723            self.inner.report_bad_message(peer_id);
1724        }
1725
1726        fn num_connected_peers(&self) -> usize {
1727            self.inner.num_connected_peers()
1728        }
1729    }
1730
1731    impl HeadersClient for FailingBodiesClient {
1732        type Header = <TestFullBlockClient as HeadersClient>::Header;
1733        type Output = <TestFullBlockClient as HeadersClient>::Output;
1734
1735        fn get_headers_with_priority(
1736            &self,
1737            request: HeadersRequest,
1738            priority: Priority,
1739        ) -> Self::Output {
1740            self.inner.get_headers_with_priority(request, priority)
1741        }
1742    }
1743
1744    impl BodiesClient for FailingBodiesClient {
1745        type Body = <TestFullBlockClient as BodiesClient>::Body;
1746        type Output = <TestFullBlockClient as BodiesClient>::Output;
1747
1748        fn get_block_bodies_with_priority_and_range_hint(
1749            &self,
1750            hashes: Vec<B256>,
1751            priority: Priority,
1752            range_hint: Option<RangeInclusive<u64>>,
1753        ) -> Self::Output {
1754            let attempt = self.body_requests.fetch_add(1, Ordering::SeqCst);
1755            if attempt == self.fail_on {
1756                return futures::future::ready(Err(RequestError::Timeout))
1757            }
1758
1759            self.inner.get_block_bodies_with_priority_and_range_hint(hashes, priority, range_hint)
1760        }
1761    }
1762
1763    impl BlockClient for FailingBodiesClient {
1764        type Block = reth_ethereum_primitives::Block;
1765    }
1766
1767    #[tokio::test]
1768    async fn download_full_block_range() {
1769        let client = TestFullBlockClient::default();
1770        let (header, body) = insert_headers_into_client(&client, 0..50);
1771        let client = FullBlockClient::test_client(client);
1772
1773        let received = client.get_full_block_range(header.hash(), 1).await;
1774        let received = received.first().expect("response should include a block");
1775        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1776
1777        let received = client.get_full_block_range(header.hash(), 10).await;
1778        assert_eq!(received.len(), 10);
1779        for (i, block) in received.iter().enumerate() {
1780            let expected_number = header.number - i as u64;
1781            assert_eq!(block.number, expected_number);
1782        }
1783    }
1784
1785    #[tokio::test]
1786    async fn download_full_block_range_over_soft_limit() {
1787        // default soft limit is 20, so we will request 50 blocks
1788        let client = TestFullBlockClient::default();
1789        let (header, body) = insert_headers_into_client(&client, 0..50);
1790        let client = FullBlockClient::test_client(client);
1791
1792        let received = client.get_full_block_range(header.hash(), 1).await;
1793        let received = received.first().expect("response should include a block");
1794        assert_eq!(*received, SealedBlock::from_sealed_parts(header.clone(), body));
1795
1796        let received = client.get_full_block_range(header.hash(), 50).await;
1797        assert_eq!(received.len(), 50);
1798        for (i, block) in received.iter().enumerate() {
1799            let expected_number = header.number - i as u64;
1800            assert_eq!(block.number, expected_number);
1801        }
1802    }
1803
1804    #[tokio::test]
1805    async fn download_full_block_range_retries_after_body_error() {
1806        let mut client = TestFullBlockClient::default();
1807        client.set_soft_limit(2);
1808        let (header, _) = insert_headers_into_client(&client, 0..3);
1809
1810        let client = FailingBodiesClient::new(client, 1);
1811        let body_requests = Arc::clone(&client.body_requests);
1812        let client = FullBlockClient::test_client(client);
1813
1814        let received =
1815            timeout(Duration::from_secs(1), client.get_full_block_range(header.hash(), 3))
1816                .await
1817                .expect("body request retry should complete");
1818
1819        assert_eq!(received.len(), 3);
1820        assert_eq!(body_requests.load(Ordering::SeqCst), 3);
1821    }
1822
1823    #[tokio::test]
1824    async fn download_full_block_range_with_access_lists() {
1825        let client = FullBlockWithAccessListsClient::default();
1826        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1827
1828        let access_lists = Arc::clone(&client.access_lists);
1829        let request_count = Arc::clone(&client.access_list_requests);
1830        let requirement = Arc::clone(&client.last_access_list_requirement);
1831        let client = FullBlockClient::test_client(client);
1832
1833        let response = timeout(
1834            Duration::from_secs(1),
1835            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1836        )
1837        .await
1838        .expect("range request should complete");
1839
1840        let blocks = response;
1841        assert_eq!(blocks.len(), 3);
1842        let expected = {
1843            let access_lists = access_lists.lock();
1844            blocks
1845                .iter()
1846                .map(|block| {
1847                    let access_list = access_lists
1848                        .get(&block.block().hash())
1849                        .cloned()
1850                        .expect("access list exists");
1851                    Some(sealed_access_list(access_list))
1852                })
1853                .collect::<Vec<_>>()
1854        };
1855        assert_eq!(range_access_lists(&blocks), expected);
1856        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1857        assert_eq!(*requirement.lock(), Some(BalRequirement::Optional));
1858    }
1859
1860    #[tokio::test]
1861    async fn download_full_block_range_with_access_lists_returns_none_for_empty_response() {
1862        let client = FullBlockWithAccessListsClient::default();
1863        client.empty_first_response.store(true, Ordering::SeqCst);
1864        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1865
1866        let request_count = Arc::clone(&client.access_list_requests);
1867        let client = FullBlockClient::test_client(client);
1868
1869        let response = timeout(
1870            Duration::from_secs(1),
1871            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1872        )
1873        .await
1874        .expect("range request should complete without access lists");
1875
1876        let blocks = response;
1877        assert_eq!(blocks.len(), 3);
1878        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
1879        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1880    }
1881
1882    #[tokio::test]
1883    async fn download_full_block_range_with_access_lists_uses_requested_requirement() {
1884        let client = FullBlockWithAccessListsClient::default();
1885        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1886
1887        let requirement = Arc::clone(&client.last_access_list_requirement);
1888        let client = FullBlockClient::test_client(client);
1889
1890        let blocks = timeout(
1891            Duration::from_secs(1),
1892            client.get_full_block_range_with_optional_access_lists_with_requirement(
1893                header.hash(),
1894                3,
1895                BalRequirement::Mandatory,
1896            ),
1897        )
1898        .await
1899        .expect("range request should complete");
1900
1901        assert_eq!(blocks.len(), 3);
1902        assert_eq!(*requirement.lock(), Some(BalRequirement::Mandatory));
1903    }
1904
1905    #[tokio::test]
1906    async fn download_full_block_range_with_access_lists_preserves_short_response() {
1907        let client = FullBlockWithAccessListsClient::default();
1908        client.set_access_list_soft_limit(2);
1909        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..5);
1910
1911        let access_lists = Arc::clone(&client.access_lists);
1912        let request_count = Arc::clone(&client.access_list_requests);
1913        let client = FullBlockClient::test_client(client);
1914
1915        let blocks = timeout(
1916            Duration::from_secs(1),
1917            client.get_full_block_range_with_optional_access_lists(header.hash(), 5),
1918        )
1919        .await
1920        .expect("range request should complete without access lists");
1921
1922        assert_eq!(blocks.len(), 5);
1923        let expected = {
1924            let access_lists = access_lists.lock();
1925            blocks
1926                .iter()
1927                .enumerate()
1928                .map(|(idx, block)| {
1929                    if idx >= 2 {
1930                        return None
1931                    }
1932
1933                    let access_list = access_lists
1934                        .get(&block.block().hash())
1935                        .cloned()
1936                        .expect("access list exists");
1937                    Some(sealed_access_list(access_list))
1938                })
1939                .collect::<Vec<_>>()
1940        };
1941        assert_eq!(range_access_lists(&blocks), expected);
1942        assert_eq!(request_count.load(Ordering::SeqCst), 1);
1943    }
1944
1945    #[tokio::test]
1946    async fn download_full_block_range_with_access_lists_preserves_unavailable_entries() {
1947        let client = FullBlockWithAccessListsClient::default();
1948        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1949        client.access_lists.lock().remove(&header.hash());
1950
1951        let access_lists = Arc::clone(&client.access_lists);
1952        let bad_messages = Arc::clone(&client.bad_messages);
1953        let client = FullBlockClient::test_client(client);
1954
1955        let blocks = timeout(
1956            Duration::from_secs(1),
1957            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1958        )
1959        .await
1960        .expect("range request should complete");
1961
1962        assert_eq!(blocks.len(), 3);
1963        let expected = {
1964            let access_lists = access_lists.lock();
1965            blocks
1966                .iter()
1967                .map(|block| {
1968                    if block.block().hash() == header.hash() {
1969                        return None
1970                    }
1971
1972                    let access_list = access_lists
1973                        .get(&block.block().hash())
1974                        .cloned()
1975                        .expect("access list exists");
1976                    Some(sealed_access_list(access_list))
1977                })
1978                .collect::<Vec<_>>()
1979        };
1980        assert_eq!(range_access_lists(&blocks), expected);
1981        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
1982    }
1983
1984    #[tokio::test]
1985    async fn download_full_block_range_with_access_lists_returns_none_when_unavailable() {
1986        let client = FullBlockWithAccessListsClient::default();
1987        client.set_access_lists_unsupported(true);
1988        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
1989
1990        let request_count = Arc::clone(&client.access_list_requests);
1991        let client = FullBlockClient::test_client(client);
1992
1993        let blocks = timeout(
1994            Duration::from_secs(1),
1995            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
1996        )
1997        .await
1998        .expect("range request should complete without access lists");
1999
2000        assert_eq!(blocks.len(), 3);
2001        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2002        assert_eq!(request_count.load(Ordering::SeqCst), 1);
2003    }
2004
2005    #[tokio::test]
2006    async fn download_full_block_range_with_access_lists_ignores_long_response() {
2007        let client = FullBlockWithAccessListsClient::default();
2008        client.set_extra_access_list_entries(1);
2009        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2010
2011        let request_count = Arc::clone(&client.access_list_requests);
2012        let bad_messages = Arc::clone(&client.bad_messages);
2013        let client = FullBlockClient::test_client(client);
2014
2015        let blocks = timeout(
2016            Duration::from_secs(1),
2017            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2018        )
2019        .await
2020        .expect("range request should complete without access lists");
2021
2022        assert_eq!(blocks.len(), 3);
2023        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2024        assert_eq!(request_count.load(Ordering::SeqCst), 1);
2025        assert_eq!(bad_messages.load(Ordering::SeqCst), 0);
2026    }
2027
2028    #[tokio::test]
2029    async fn download_full_block_range_with_access_lists_rejects_wrong_hash() {
2030        let client = FullBlockWithAccessListsClient::default();
2031        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2032        client.access_lists.lock().insert(header.hash(), Bytes::from_static(&[0xc1, 0x7f]));
2033
2034        let bad_messages = Arc::clone(&client.bad_messages);
2035        let client = FullBlockClient::test_client(client);
2036
2037        let blocks = timeout(
2038            Duration::from_secs(1),
2039            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2040        )
2041        .await
2042        .expect("range request should complete without access lists");
2043
2044        assert_eq!(blocks.len(), 3);
2045        assert_eq!(range_access_lists(&blocks), vec![None; blocks.len()]);
2046        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2047    }
2048
2049    #[tokio::test]
2050    async fn download_full_block_range_with_access_lists_preserves_valid_prefix_until_wrong_hash() {
2051        let client = FullBlockWithAccessListsClient::default();
2052        let (header, _) = insert_headers_with_access_lists_into_client(&client, 0..3);
2053        let first_access_list =
2054            client.access_lists.lock().get(&header.hash()).cloned().expect("access list exists");
2055        let second_hash = header.parent_hash;
2056        client.access_lists.lock().insert(second_hash, Bytes::from_static(&[0xc1, 0x7f]));
2057
2058        let bad_messages = Arc::clone(&client.bad_messages);
2059        let client = FullBlockClient::test_client(client);
2060
2061        let blocks = timeout(
2062            Duration::from_secs(1),
2063            client.get_full_block_range_with_optional_access_lists(header.hash(), 3),
2064        )
2065        .await
2066        .expect("range request should complete without unvalidated access lists");
2067
2068        assert_eq!(blocks.len(), 3);
2069        assert_eq!(blocks[1].block().hash(), second_hash);
2070        assert_eq!(
2071            range_access_lists(&blocks),
2072            vec![Some(sealed_access_list(first_access_list)), None, None]
2073        );
2074        assert_eq!(bad_messages.load(Ordering::SeqCst), 1);
2075    }
2076
2077    #[tokio::test]
2078    async fn download_full_block_range_with_invalid_header() {
2079        let client = TestFullBlockClient::default();
2080        let range_length: usize = 3;
2081        let (header, _) = insert_headers_into_client(&client, 0..range_length);
2082
2083        let test_consensus = reth_consensus::test_utils::TestConsensus::default();
2084        test_consensus.set_fail_validation(true);
2085        test_consensus.set_fail_body_against_header(false);
2086        let client = FullBlockClient::new(client, Arc::new(test_consensus));
2087
2088        let received = client.get_full_block_range(header.hash(), range_length as u64).await;
2089
2090        assert_eq!(received.len(), range_length);
2091        for (i, block) in received.iter().enumerate() {
2092            let expected_number = header.number - i as u64;
2093            assert_eq!(block.number, expected_number);
2094        }
2095    }
2096}