Skip to main content

reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11    BlockAccessLists, Capabilities, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
12    GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
13};
14use reth_network_api::test_utils::PeersHandle;
15use reth_network_p2p::{
16    block_access_lists::client::BalRequirement,
17    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
18    headers::client::HeadersRequest,
19    priority::Priority,
20    receipts::client::ReceiptsResponse,
21};
22use reth_network_peers::PeerId;
23use reth_network_types::ReputationChangeKind;
24use std::{
25    collections::{HashMap, VecDeque},
26    ops::RangeInclusive,
27    sync::{
28        atomic::{AtomicU64, AtomicUsize, Ordering},
29        Arc,
30    },
31    task::{Context, Poll},
32};
33use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
34use tokio_stream::wrappers::UnboundedReceiverStream;
35
36type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
37type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
38type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
39type InflightBlockAccessListsRequest = Request<(), PeerRequestResult<BlockAccessLists>>;
40
41/// Manages data fetching operations.
42///
43/// This type is hooked into the staged sync pipeline and delegates download request to available
44/// peers and sends the response once ready.
45///
46/// This type maintains a list of connected peers that are available for requests.
47#[derive(Debug)]
48pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
49    /// Currently active [`GetBlockHeaders`] requests
50    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
51    /// Currently active [`GetBlockBodies`] requests
52    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
53    /// Currently active [`GetBlockAccessLists`] requests
54    inflight_bals_requests: HashMap<PeerId, InflightBlockAccessListsRequest>,
55    /// Currently active `GetReceipts` requests
56    inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
57    /// The list of _available_ peers for requests.
58    peers: HashMap<PeerId, Peer>,
59    /// The handle to the peers manager
60    peers_handle: PeersHandle,
61    /// Number of active peer sessions the node's currently handling.
62    num_active_peers: Arc<AtomicUsize>,
63    /// Requests queued for processing
64    queued_requests: VecDeque<DownloadRequest<N>>,
65    /// Receiver for new incoming download requests
66    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
67    /// Sender for download requests, used to detach a [`FetchClient`]
68    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
69}
70
71// === impl StateSyncer ===
72
73impl<N: NetworkPrimitives> StateFetcher<N> {
74    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
75        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
76        Self {
77            inflight_headers_requests: Default::default(),
78            inflight_bodies_requests: Default::default(),
79            inflight_bals_requests: Default::default(),
80            inflight_receipts_requests: Default::default(),
81            peers: Default::default(),
82            peers_handle,
83            num_active_peers,
84            queued_requests: Default::default(),
85            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
86            download_requests_tx,
87        }
88    }
89
90    /// Invoked when connected to a new peer.
91    pub(crate) fn new_active_peer(
92        &mut self,
93        peer_id: PeerId,
94        best_hash: B256,
95        best_number: u64,
96        capabilities: Arc<Capabilities>,
97        timeout: Arc<AtomicU64>,
98        range_info: Option<BlockRangeInfo>,
99    ) {
100        self.peers.insert(
101            peer_id,
102            Peer {
103                state: PeerState::Idle,
104                best_hash,
105                best_number,
106                capabilities,
107                timeout,
108                last_response_likely_bad: false,
109                range_info,
110            },
111        );
112    }
113
114    /// Removes the peer from the peer list, after which it is no longer available for future
115    /// requests.
116    ///
117    /// Invoked when an active session was closed.
118    ///
119    /// This cancels also inflight request and sends an error to the receiver.
120    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
121        self.peers.remove(peer);
122        if let Some(req) = self.inflight_headers_requests.remove(peer) {
123            let _ = req.response.send(Err(RequestError::ConnectionDropped));
124        }
125        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
126            let _ = req.response.send(Err(RequestError::ConnectionDropped));
127        }
128        if let Some(req) = self.inflight_bals_requests.remove(peer) {
129            let _ = req.response.send(Err(RequestError::ConnectionDropped));
130        }
131        if let Some(req) = self.inflight_receipts_requests.remove(peer) {
132            let _ = req.response.send(Err(RequestError::ConnectionDropped));
133        }
134    }
135
136    /// Updates the block information for the peer.
137    ///
138    /// Returns `true` if this a newer block
139    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
140        if let Some(peer) = self.peers.get_mut(peer_id) &&
141            number > peer.best_number
142        {
143            peer.best_hash = hash;
144            peer.best_number = number;
145            return true
146        }
147        false
148    }
149
150    /// Invoked when an active session is about to be disconnected.
151    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
152        if let Some(peer) = self.peers.get_mut(peer_id) {
153            peer.state = PeerState::Closing;
154        }
155    }
156
157    /// Returns the _next_ idle peer that's ready to accept a request,
158    /// prioritizing those with the lowest timeout/latency and those that recently responded with
159    /// adequate data. Additionally, if full blocks are required this prioritizes peers that have
160    /// full history available
161    fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
162        // filter out peers that aren't idle or don't meet the requirement
163        let mut idle = self
164            .peers
165            .iter()
166            .filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement));
167
168        let mut best_peer = idle.next()?;
169
170        for maybe_better in idle {
171            // replace best peer if our current best peer sent us a bad response last time
172            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
173                best_peer = maybe_better;
174                continue
175            }
176
177            // replace best peer if this peer meets the requirements better
178            if maybe_better.1.is_better(best_peer.1, &requirement) {
179                best_peer = maybe_better;
180                continue
181            }
182
183            // replace best peer if this peer has better rtt and both have same range quality
184            if maybe_better.1.timeout() < best_peer.1.timeout() &&
185                !maybe_better.1.last_response_likely_bad
186            {
187                best_peer = maybe_better;
188            }
189        }
190
191        Some(*best_peer.0)
192    }
193
194    /// Returns whether any connected peer can serve BAL requests.
195    fn has_eth71_peer(&self) -> bool {
196        self.peers.values().any(|peer| {
197            !matches!(peer.state, PeerState::Closing) &&
198                peer.capabilities.supports_eth_at_least(&EthVersion::Eth71)
199        })
200    }
201
202    /// Returns the next action to return
203    fn poll_action(&mut self) -> PollAction {
204        // we only check and not pop here since we don't know yet whether a peer is available.
205        if self.queued_requests.is_empty() {
206            return PollAction::NoRequests
207        }
208
209        if self.peers.is_empty() {
210            return PollAction::NoPeersAvailable
211        }
212
213        let request = self.queued_requests.pop_front().expect("not empty");
214        let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
215            // Optional BAL requests can lose their eth/71 peer while queued; complete them
216            // instead of waiting for future peer churn.
217            if request.is_optional_bal() && !self.has_eth71_peer() {
218                request.send_err_response(RequestError::UnsupportedCapability);
219            } else {
220                // no peer matches this request's requirements; requeue at the back so other
221                // queued requests get a chance on the next poll instead of head-of-line blocking.
222                self.queued_requests.push_back(request);
223            }
224            return PollAction::NoPeersAvailable
225        };
226
227        let request = self.prepare_block_request(peer_id, request);
228
229        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
230    }
231
232    /// Advance the state the syncer
233    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
234        // drain buffered actions first
235        loop {
236            let no_peers_available = match self.poll_action() {
237                PollAction::Ready(action) => return Poll::Ready(action),
238                PollAction::NoRequests => false,
239                PollAction::NoPeersAvailable => true,
240            };
241
242            loop {
243                // poll incoming requests
244                match self.download_requests_rx.poll_next_unpin(cx) {
245                    Poll::Ready(Some(request)) => {
246                        // Optional BAL requests should not wait for future peer churn if no
247                        // connected peer can serve them right now.
248                        if request.is_optional_bal() && !self.has_eth71_peer() {
249                            request.send_err_response(RequestError::UnsupportedCapability);
250                            continue
251                        }
252
253                        match request.get_priority() {
254                            Priority::High => {
255                                // find first normal request and queue before it; add this request
256                                // to the back of the high-priority queue
257                                let pos = self
258                                    .queued_requests
259                                    .iter()
260                                    .position(|req| req.is_normal_priority())
261                                    .unwrap_or(0);
262                                self.queued_requests.insert(pos, request);
263                            }
264                            Priority::Normal => {
265                                self.queued_requests.push_back(request);
266                            }
267                        }
268                    }
269                    Poll::Ready(None) => {
270                        unreachable!("channel can't close")
271                    }
272                    Poll::Pending => break,
273                }
274            }
275
276            if self.queued_requests.is_empty() || no_peers_available {
277                return Poll::Pending
278            }
279        }
280    }
281
282    /// Handles a new request to a peer.
283    ///
284    /// Caution: this assumes the peer exists and is idle
285    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
286        // update the peer's state
287        if let Some(peer) = self.peers.get_mut(&peer_id) {
288            peer.state = req.peer_state();
289        }
290
291        self.prepare_inflight_block_request(peer_id, req)
292    }
293
294    /// Tracks an inflight request and converts it into a peer request.
295    fn prepare_inflight_block_request(
296        &mut self,
297        peer_id: PeerId,
298        req: DownloadRequest<N>,
299    ) -> BlockRequest {
300        match req {
301            DownloadRequest::GetBlockHeaders { request, response, .. } => {
302                let inflight = Request { request: request.clone(), response };
303                self.inflight_headers_requests.insert(peer_id, inflight);
304                let HeadersRequest { start, limit, direction } = request;
305                BlockRequest::GetBlockHeaders(GetBlockHeaders {
306                    start_block: start,
307                    limit,
308                    skip: 0,
309                    direction,
310                })
311            }
312            DownloadRequest::GetBlockBodies { request, response, .. } => {
313                let inflight = Request { request: (), response };
314                self.inflight_bodies_requests.insert(peer_id, inflight);
315                BlockRequest::GetBlockBodies(GetBlockBodies(request))
316            }
317            DownloadRequest::GetBlockAccessLists { request, response, .. } => {
318                let inflight = Request { request: (), response };
319                self.inflight_bals_requests.insert(peer_id, inflight);
320                BlockRequest::GetBlockAccessLists(GetBlockAccessLists(request))
321            }
322            DownloadRequest::GetReceipts { request, response, .. } => {
323                let inflight = Request { request: (), response };
324                self.inflight_receipts_requests.insert(peer_id, inflight);
325                BlockRequest::GetReceipts(GetReceipts(request))
326            }
327        }
328    }
329
330    /// Returns a queued followup request the peer can serve.
331    ///
332    /// This is an immediate scheduling shortcut after a successful response. It skips queued
333    /// requests whose hard requirements do not match this peer, leaving them for the regular peer
334    /// selection path.
335    ///
336    /// Caution: this expects that the peer is _not_ closed.
337    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
338        let peer = self.peers.get_mut(&peer_id)?;
339        let req_idx = self.queued_requests.iter().position(|req| {
340            // Find the first queued request this peer can serve.
341            peer.satisfies(&req.best_peer_requirements())
342        })?;
343        let req = self.queued_requests.remove(req_idx).expect("valid request index");
344
345        peer.state = req.peer_state();
346        let req = self.prepare_inflight_block_request(peer_id, req);
347        Some(BlockResponseOutcome::Request(peer_id, req))
348    }
349
350    /// Called on a `GetBlockHeaders` response from a peer.
351    ///
352    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
353    /// direct followup request or get the peer reported if the response was a
354    /// [`EthResponseValidator::reputation_change_err`]
355    pub(crate) fn on_block_headers_response(
356        &mut self,
357        peer_id: PeerId,
358        res: RequestResult<Vec<N::BlockHeader>>,
359    ) -> Option<BlockResponseOutcome> {
360        let is_error = res.is_err();
361        let maybe_reputation_change = res.reputation_change_err();
362
363        let resp = self.inflight_headers_requests.remove(&peer_id);
364
365        let is_likely_bad_response =
366            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
367
368        if let Some(resp) = resp {
369            // delegate the response
370            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
371        }
372
373        if let Some(peer) = self.peers.get_mut(&peer_id) {
374            // update the peer's response state
375            peer.last_response_likely_bad = is_likely_bad_response;
376
377            // If the peer is still ready to accept new requests, we try to send a followup
378            // request immediately.
379            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
380                return self.followup_request(peer_id)
381            }
382        }
383
384        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
385        // outcome
386        maybe_reputation_change
387            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
388    }
389
390    /// Called on a `GetBlockBodies` response from a peer
391    pub(crate) fn on_block_bodies_response(
392        &mut self,
393        peer_id: PeerId,
394        res: RequestResult<Vec<N::BlockBody>>,
395    ) -> Option<BlockResponseOutcome> {
396        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
397
398        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
399            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
400        }
401        if let Some(peer) = self.peers.get_mut(&peer_id) {
402            // update the peer's response state
403            peer.last_response_likely_bad = is_likely_bad_response;
404
405            if peer.state.on_request_finished() && !is_likely_bad_response {
406                return self.followup_request(peer_id)
407            }
408        }
409        None
410    }
411
412    /// Called on a `GetBlockAccessLists` response from a peer
413    pub(crate) fn on_block_access_lists_response(
414        &mut self,
415        peer_id: PeerId,
416        res: RequestResult<BlockAccessLists>,
417    ) -> Option<BlockResponseOutcome> {
418        let is_likely_bad_response = res.is_err();
419
420        if let Some(resp) = self.inflight_bals_requests.remove(&peer_id) {
421            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
422        }
423        if let Some(peer) = self.peers.get_mut(&peer_id) {
424            peer.last_response_likely_bad = is_likely_bad_response;
425
426            if peer.state.on_request_finished() && !is_likely_bad_response {
427                return self.followup_request(peer_id)
428            }
429        }
430        None
431    }
432
433    /// Called on a `GetReceipts` response from a peer.
434    ///
435    /// All receipt variants (legacy with bloom, eth/69, eth/70) are expected to be normalized
436    /// to [`ReceiptsResponse`] by the caller before invoking this method.
437    pub(crate) fn on_receipts_response(
438        &mut self,
439        peer_id: PeerId,
440        res: RequestResult<ReceiptsResponse<N::Receipt>>,
441    ) -> Option<BlockResponseOutcome> {
442        let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
443
444        if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
445            let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
446        }
447        if let Some(peer) = self.peers.get_mut(&peer_id) {
448            peer.last_response_likely_bad = is_likely_bad_response;
449
450            if peer.state.on_request_finished() && !is_likely_bad_response {
451                return self.followup_request(peer_id)
452            }
453        }
454        None
455    }
456
457    /// Returns a new [`FetchClient`] that can send requests to this type.
458    pub(crate) fn client(&self) -> FetchClient<N> {
459        FetchClient {
460            request_tx: self.download_requests_tx.clone(),
461            peers_handle: self.peers_handle.clone(),
462            num_active_peers: Arc::clone(&self.num_active_peers),
463        }
464    }
465}
466
467/// The outcome of [`StateFetcher::poll_action`]
468enum PollAction {
469    Ready(FetchAction),
470    NoRequests,
471    NoPeersAvailable,
472}
473
474/// Represents a connected peer
475#[derive(Debug)]
476struct Peer {
477    /// The state this peer currently resides in.
478    state: PeerState,
479    /// Best known hash that the peer has
480    best_hash: B256,
481    /// Tracks the best number of the peer.
482    best_number: u64,
483    /// Capabilities announced by the peer.
484    #[allow(dead_code)]
485    capabilities: Arc<Capabilities>,
486    /// Tracks the current timeout value we use for the peer.
487    timeout: Arc<AtomicU64>,
488    /// Tracks whether the peer has recently responded with a likely bad response.
489    ///
490    /// This is used to de-rank the peer if there are other peers available.
491    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
492    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
493    /// lowest timeout.
494    last_response_likely_bad: bool,
495    /// Tracks the range info for the peer.
496    range_info: Option<BlockRangeInfo>,
497}
498
499impl Peer {
500    fn timeout(&self) -> u64 {
501        self.timeout.load(Ordering::Relaxed)
502    }
503
504    /// Returns the earliest block number available from the peer.
505    fn earliest(&self) -> u64 {
506        self.range_info.as_ref().map_or(0, |info| info.earliest())
507    }
508
509    /// Returns true if the peer has the full history available.
510    fn has_full_history(&self) -> bool {
511        self.earliest() == 0
512    }
513
514    fn range(&self) -> Option<RangeInclusive<u64>> {
515        self.range_info.as_ref().map(|info| info.range())
516    }
517
518    /// Returns whether this peer can serve requests with the given hard requirements.
519    fn satisfies(&self, requirement: &BestPeerRequirements) -> bool {
520        match requirement {
521            BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver),
522            BestPeerRequirements::None |
523            BestPeerRequirements::FullBlock |
524            BestPeerRequirements::FullBlockRange(_) => true,
525        }
526    }
527
528    /// Returns true if this peer has a better range than the other peer for serving the requested
529    /// range.
530    ///
531    /// A peer has a "better range" if:
532    /// 1. It can fully cover the requested range while the other cannot
533    /// 2. None can fully cover the range, but this peer has lower start value
534    /// 3. If a peer doesn't announce a range we assume it has full history, but check the other's
535    ///    range and treat that as better if it can cover the range
536    fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
537        let self_range = self.range();
538        let other_range = other.range();
539
540        match (self_range, other_range) {
541            (Some(self_r), Some(other_r)) => {
542                // Check if each peer can fully cover the requested range
543                let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
544                let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
545
546                #[expect(clippy::match_same_arms)]
547                match (self_covers, other_covers) {
548                    (true, false) => true,  // Only self covers the range
549                    (false, true) => false, // Only other covers the range
550                    (true, true) => false,  // Both cover
551                    (false, false) => {
552                        // neither covers - prefer if peer has lower (better) start range
553                        self_r.start() < other_r.start()
554                    }
555                }
556            }
557            (Some(self_r), None) => {
558                // Self has range info, other doesn't (treated as full history with unknown latest)
559                // Self is better only if it covers the range
560                self_r.contains(range.start()) && self_r.contains(range.end())
561            }
562            (None, Some(other_r)) => {
563                // Self has no range info (full history), other has range info
564                // Self is better only if other doesn't cover the range
565                !(other_r.contains(range.start()) && other_r.contains(range.end()))
566            }
567            (None, None) => false, // Neither has range info - no one is better
568        }
569    }
570
571    /// Returns true if this peer is better than the other peer based on the given requirements.
572    fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
573        match requirement {
574            BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
575            BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
576            // Version-based filtering happens in `next_best_peer`, so by the time we get here
577            // both peers already satisfy the version requirement.
578            BestPeerRequirements::None | BestPeerRequirements::EthVersion(_) => false,
579        }
580    }
581}
582
583/// Tracks the state of an individual peer
584#[derive(Debug)]
585enum PeerState {
586    /// Peer is currently not handling requests and is available.
587    Idle,
588    /// Peer is handling a `GetBlockHeaders` request.
589    GetBlockHeaders,
590    /// Peer is handling a `GetBlockBodies` request.
591    GetBlockBodies,
592    /// Peer is handling a `GetBlockAccessLists` request.
593    GetBlockAccessLists,
594    /// Peer is handling a `GetReceipts` request.
595    GetReceipts,
596    /// Peer session is about to close
597    Closing,
598}
599
600// === impl PeerState ===
601
602impl PeerState {
603    /// Returns true if the peer is currently idle.
604    const fn is_idle(&self) -> bool {
605        matches!(self, Self::Idle)
606    }
607
608    /// Resets the state on a received response.
609    ///
610    /// If the state was already marked as `Closing` do nothing.
611    ///
612    /// Returns `true` if the peer is ready for another request.
613    const fn on_request_finished(&mut self) -> bool {
614        if !matches!(self, Self::Closing) {
615            *self = Self::Idle;
616            return true
617        }
618        false
619    }
620}
621
622/// A request that waits for a response from the network, so it can send it back through the
623/// response channel.
624#[derive(Debug)]
625struct Request<Req, Resp> {
626    /// The issued request object
627    // TODO: this can be attached to the response in error case
628    request: Req,
629    response: oneshot::Sender<Resp>,
630}
631
632/// Requests that can be sent to the Syncer from a [`FetchClient`]
633#[derive(Debug)]
634#[expect(clippy::enum_variant_names)]
635pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
636    /// Download the requested headers and send response through channel
637    GetBlockHeaders {
638        request: HeadersRequest,
639        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
640        priority: Priority,
641    },
642    /// Download the requested bodies and send response through channel
643    GetBlockBodies {
644        request: Vec<B256>,
645        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
646        priority: Priority,
647        range_hint: Option<RangeInclusive<u64>>,
648    },
649    /// Download the requested access lists and send response through channel
650    GetBlockAccessLists {
651        request: Vec<B256>,
652        response: oneshot::Sender<PeerRequestResult<BlockAccessLists>>,
653        priority: Priority,
654        requirement: BalRequirement,
655    },
656    /// Download receipts for the given block hashes and send response through channel
657    GetReceipts {
658        request: Vec<B256>,
659        response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
660        priority: Priority,
661    },
662}
663
664// === impl DownloadRequest ===
665
666impl<N: NetworkPrimitives> DownloadRequest<N> {
667    /// Returns the corresponding state for a peer that handles the request.
668    const fn peer_state(&self) -> PeerState {
669        match self {
670            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
671            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
672            Self::GetBlockAccessLists { .. } => PeerState::GetBlockAccessLists,
673            Self::GetReceipts { .. } => PeerState::GetReceipts,
674        }
675    }
676
677    /// Returns the requested priority of this request
678    const fn get_priority(&self) -> &Priority {
679        match self {
680            Self::GetBlockHeaders { priority, .. } |
681            Self::GetBlockBodies { priority, .. } |
682            Self::GetBlockAccessLists { priority, .. } |
683            Self::GetReceipts { priority, .. } => priority,
684        }
685    }
686
687    /// Returns `true` if this request is normal priority.
688    const fn is_normal_priority(&self) -> bool {
689        self.get_priority().is_normal()
690    }
691
692    /// Returns `true` if this is an optional BAL request.
693    const fn is_optional_bal(&self) -> bool {
694        matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. })
695    }
696
697    /// Sends an error response to the waiting caller.
698    fn send_err_response(self, err: RequestError) {
699        let _ = match self {
700            Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
701            Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
702            Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
703            Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
704        };
705    }
706
707    /// Returns the best peer requirements for this request.
708    fn best_peer_requirements(&self) -> BestPeerRequirements {
709        match self {
710            Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
711            Self::GetBlockAccessLists { .. } => BestPeerRequirements::EthVersion(EthVersion::Eth71),
712            Self::GetBlockBodies { range_hint, .. } => {
713                if let Some(range) = range_hint {
714                    BestPeerRequirements::FullBlockRange(range.clone())
715                } else {
716                    BestPeerRequirements::FullBlock
717                }
718            }
719            Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
720        }
721    }
722}
723
724/// An action the syncer can emit.
725pub(crate) enum FetchAction {
726    /// Dispatch an eth request to the given peer.
727    BlockRequest {
728        /// The targeted recipient for the request
729        peer_id: PeerId,
730        /// The request to send
731        request: BlockRequest,
732    },
733}
734
735/// Outcome of a processed response.
736///
737/// Returned after processing a response.
738#[derive(Debug, PartialEq, Eq)]
739pub(crate) enum BlockResponseOutcome {
740    /// Continue with another request to the peer.
741    Request(PeerId, BlockRequest),
742    /// How to handle a bad response and the reputation change to apply, if any.
743    BadResponse(PeerId, ReputationChangeKind),
744}
745
746/// Additional requirements for how to rank peers during selection.
747enum BestPeerRequirements {
748    /// No additional requirements
749    None,
750    /// Peer must have this block range available.
751    FullBlockRange(RangeInclusive<u64>),
752    /// Peer must have full range.
753    FullBlock,
754    /// Peer must support at least this eth protocol version.
755    EthVersion(EthVersion),
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use crate::{peers::PeersManager, PeersConfig};
762    use alloy_consensus::Header;
763    use alloy_primitives::B512;
764    use reth_eth_wire::Capability;
765    use std::future::poll_fn;
766
767    #[tokio::test(flavor = "multi_thread")]
768    async fn test_poll_fetcher() {
769        let manager = PeersManager::new(PeersConfig::default());
770        let mut fetcher =
771            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
772
773        poll_fn(move |cx| {
774            assert!(fetcher.poll(cx).is_pending());
775            let (tx, _rx) = oneshot::channel();
776            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
777                request: vec![],
778                response: tx,
779                priority: Priority::default(),
780                range_hint: None,
781            });
782            assert!(fetcher.poll(cx).is_pending());
783
784            Poll::Ready(())
785        })
786        .await;
787    }
788
789    #[tokio::test]
790    async fn test_peer_rotation() {
791        let manager = PeersManager::new(PeersConfig::default());
792        let mut fetcher =
793            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
794        // Add a few random peers
795        let peer1 = B512::random();
796        let peer2 = B512::random();
797        let capabilities = Arc::new(Capabilities::from(vec![]));
798        fetcher.new_active_peer(
799            peer1,
800            B256::random(),
801            1,
802            Arc::clone(&capabilities),
803            Arc::new(AtomicU64::new(1)),
804            None,
805        );
806        fetcher.new_active_peer(
807            peer2,
808            B256::random(),
809            2,
810            Arc::clone(&capabilities),
811            Arc::new(AtomicU64::new(1)),
812            None,
813        );
814
815        let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
816        assert!(first_peer == peer1 || first_peer == peer2);
817        // Pending disconnect for first_peer
818        fetcher.on_pending_disconnect(&first_peer);
819        // first_peer now isn't idle, so we should get other peer
820        let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
821        assert!(first_peer == peer1 || first_peer == peer2);
822        assert_ne!(first_peer, second_peer);
823        // without idle peers, returns None
824        fetcher.on_pending_disconnect(&second_peer);
825        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
826    }
827
828    #[tokio::test]
829    async fn test_peer_prioritization() {
830        let manager = PeersManager::new(PeersConfig::default());
831        let mut fetcher =
832            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
833        // Add a few random peers
834        let peer1 = B512::random();
835        let peer2 = B512::random();
836        let peer3 = B512::random();
837
838        let peer2_timeout = Arc::new(AtomicU64::new(300));
839
840        let capabilities = Arc::new(Capabilities::from(vec![]));
841        fetcher.new_active_peer(
842            peer1,
843            B256::random(),
844            1,
845            Arc::clone(&capabilities),
846            Arc::new(AtomicU64::new(30)),
847            None,
848        );
849        fetcher.new_active_peer(
850            peer2,
851            B256::random(),
852            2,
853            Arc::clone(&capabilities),
854            Arc::clone(&peer2_timeout),
855            None,
856        );
857        fetcher.new_active_peer(
858            peer3,
859            B256::random(),
860            3,
861            Arc::clone(&capabilities),
862            Arc::new(AtomicU64::new(50)),
863            None,
864        );
865
866        // Must always get peer1 (lowest timeout)
867        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
868        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
869        // peer2's timeout changes below peer1's
870        peer2_timeout.store(10, Ordering::Relaxed);
871        // Then we get peer 2 always (now lowest)
872        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
873        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
874    }
875
876    #[tokio::test]
877    async fn test_on_block_headers_response() {
878        let manager = PeersManager::new(PeersConfig::default());
879        let mut fetcher =
880            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
881        let peer_id = B512::random();
882
883        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
884
885        assert_eq!(
886            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
887            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
888        );
889        assert_eq!(
890            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
891            None
892        );
893        assert_eq!(
894            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
895            None
896        );
897        assert_eq!(
898            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
899            None
900        );
901        assert_eq!(
902            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
903            None
904        );
905    }
906
907    #[tokio::test]
908    async fn test_header_response_outcome() {
909        let manager = PeersManager::new(PeersConfig::default());
910        let mut fetcher =
911            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
912        let peer_id = B512::random();
913
914        let request_pair = || {
915            let (tx, _rx) = oneshot::channel();
916            let req = Request {
917                request: HeadersRequest {
918                    start: 0u64.into(),
919                    limit: 1,
920                    direction: Default::default(),
921                },
922                response: tx,
923            };
924            let header = Header { number: 0, ..Default::default() };
925            (req, header)
926        };
927
928        fetcher.new_active_peer(
929            peer_id,
930            Default::default(),
931            Default::default(),
932            Arc::new(Capabilities::from(vec![])),
933            Default::default(),
934            None,
935        );
936
937        let (req, header) = request_pair();
938        fetcher.inflight_headers_requests.insert(peer_id, req);
939
940        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
941        assert!(outcome.is_none());
942        assert!(fetcher.peers[&peer_id].state.is_idle());
943
944        let outcome =
945            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
946
947        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
948            RequestError::Timeout
949        ))
950        .is_some());
951
952        match outcome {
953            BlockResponseOutcome::BadResponse(peer, _) => {
954                assert_eq!(peer, peer_id)
955            }
956            BlockResponseOutcome::Request(_, _) => {
957                unreachable!()
958            }
959        };
960
961        assert!(fetcher.peers[&peer_id].state.is_idle());
962    }
963
964    #[test]
965    fn test_peer_is_better_none_requirement() {
966        let peer1 = Peer {
967            state: PeerState::Idle,
968            best_hash: B256::random(),
969            best_number: 100,
970            capabilities: Arc::new(Capabilities::new(vec![])),
971            timeout: Arc::new(AtomicU64::new(10)),
972            last_response_likely_bad: false,
973            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
974        };
975
976        let peer2 = Peer {
977            state: PeerState::Idle,
978            best_hash: B256::random(),
979            best_number: 50,
980            capabilities: Arc::new(Capabilities::new(vec![])),
981            timeout: Arc::new(AtomicU64::new(20)),
982            last_response_likely_bad: false,
983            range_info: None,
984        };
985
986        // With None requirement, is_better should always return false
987        assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
988        assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
989    }
990
991    #[test]
992    fn test_peer_is_better_full_block_requirement() {
993        // Peer with full history (earliest = 0)
994        let peer_full = Peer {
995            state: PeerState::Idle,
996            best_hash: B256::random(),
997            best_number: 100,
998            capabilities: Arc::new(Capabilities::new(vec![])),
999            timeout: Arc::new(AtomicU64::new(10)),
1000            last_response_likely_bad: false,
1001            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
1002        };
1003
1004        // Peer without full history (earliest = 50)
1005        let peer_partial = Peer {
1006            state: PeerState::Idle,
1007            best_hash: B256::random(),
1008            best_number: 100,
1009            capabilities: Arc::new(Capabilities::new(vec![])),
1010            timeout: Arc::new(AtomicU64::new(10)),
1011            last_response_likely_bad: false,
1012            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1013        };
1014
1015        // Peer without range info (treated as full history)
1016        let peer_no_range = Peer {
1017            state: PeerState::Idle,
1018            best_hash: B256::random(),
1019            best_number: 100,
1020            capabilities: Arc::new(Capabilities::new(vec![])),
1021            timeout: Arc::new(AtomicU64::new(10)),
1022            last_response_likely_bad: false,
1023            range_info: None,
1024        };
1025
1026        // Peer with full history is better than peer without
1027        assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
1028        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
1029
1030        // Peer without range info (full history) is better than partial
1031        assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
1032        assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
1033
1034        // Both have full history - no improvement
1035        assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
1036        assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
1037    }
1038
1039    #[test]
1040    fn test_peer_is_better_full_block_range_requirement() {
1041        let range = RangeInclusive::new(40, 60);
1042
1043        // Peer that covers the requested range
1044        let peer_covers = Peer {
1045            state: PeerState::Idle,
1046            best_hash: B256::random(),
1047            best_number: 100,
1048            capabilities: Arc::new(Capabilities::new(vec![])),
1049            timeout: Arc::new(AtomicU64::new(10)),
1050            last_response_likely_bad: false,
1051            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
1052        };
1053
1054        // Peer that doesn't cover the range (earliest too high)
1055        let peer_no_cover = Peer {
1056            state: PeerState::Idle,
1057            best_hash: B256::random(),
1058            best_number: 100,
1059            capabilities: Arc::new(Capabilities::new(vec![])),
1060            timeout: Arc::new(AtomicU64::new(10)),
1061            last_response_likely_bad: false,
1062            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1063        };
1064
1065        // Peer that covers the requested range is better than one that doesn't
1066        assert!(peer_covers
1067            .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
1068        assert!(
1069            !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
1070        );
1071    }
1072
1073    #[test]
1074    fn test_peer_is_better_both_cover_range() {
1075        let range = RangeInclusive::new(30, 50);
1076
1077        // Peer with full history that covers the range
1078        let peer_full = Peer {
1079            state: PeerState::Idle,
1080            best_hash: B256::random(),
1081            best_number: 100,
1082            capabilities: Arc::new(Capabilities::new(vec![])),
1083            timeout: Arc::new(AtomicU64::new(10)),
1084            last_response_likely_bad: false,
1085            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1086        };
1087
1088        // Peer without full history that also covers the range
1089        let peer_partial = Peer {
1090            state: PeerState::Idle,
1091            best_hash: B256::random(),
1092            best_number: 100,
1093            capabilities: Arc::new(Capabilities::new(vec![])),
1094            timeout: Arc::new(AtomicU64::new(10)),
1095            last_response_likely_bad: false,
1096            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1097        };
1098
1099        // When both cover the range, prefer none
1100        assert!(!peer_full
1101            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1102        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1103    }
1104
1105    #[test]
1106    fn test_peer_is_better_lower_start() {
1107        let range = RangeInclusive::new(30, 60);
1108
1109        // Peer with full history that covers the range
1110        let peer_full = Peer {
1111            state: PeerState::Idle,
1112            best_hash: B256::random(),
1113            best_number: 100,
1114            capabilities: Arc::new(Capabilities::new(vec![])),
1115            timeout: Arc::new(AtomicU64::new(10)),
1116            last_response_likely_bad: false,
1117            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1118        };
1119
1120        // Peer without full history that also covers the range
1121        let peer_partial = Peer {
1122            state: PeerState::Idle,
1123            best_hash: B256::random(),
1124            best_number: 100,
1125            capabilities: Arc::new(Capabilities::new(vec![])),
1126            timeout: Arc::new(AtomicU64::new(10)),
1127            last_response_likely_bad: false,
1128            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1129        };
1130
1131        // When both cover the range, prefer lower start value
1132        assert!(peer_full
1133            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1134        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1135    }
1136
1137    #[test]
1138    fn test_peer_is_better_neither_covers_range() {
1139        let range = RangeInclusive::new(40, 60);
1140
1141        // Peer with full history that doesn't cover the range (latest too low)
1142        let peer_full = Peer {
1143            state: PeerState::Idle,
1144            best_hash: B256::random(),
1145            best_number: 30,
1146            capabilities: Arc::new(Capabilities::new(vec![])),
1147            timeout: Arc::new(AtomicU64::new(10)),
1148            last_response_likely_bad: false,
1149            range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
1150        };
1151
1152        // Peer without full history that also doesn't cover the range
1153        let peer_partial = Peer {
1154            state: PeerState::Idle,
1155            best_hash: B256::random(),
1156            best_number: 30,
1157            capabilities: Arc::new(Capabilities::new(vec![])),
1158            timeout: Arc::new(AtomicU64::new(10)),
1159            last_response_likely_bad: false,
1160            range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
1161        };
1162
1163        // When neither covers the range, prefer full history
1164        assert!(peer_full
1165            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1166        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1167    }
1168
1169    #[test]
1170    fn test_peer_is_better_no_range_info() {
1171        let range = RangeInclusive::new(40, 60);
1172
1173        // Peer with range info
1174        let peer_with_range = Peer {
1175            state: PeerState::Idle,
1176            best_hash: B256::random(),
1177            best_number: 100,
1178            capabilities: Arc::new(Capabilities::new(vec![])),
1179            timeout: Arc::new(AtomicU64::new(10)),
1180            last_response_likely_bad: false,
1181            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1182        };
1183
1184        // Peer without range info
1185        let peer_no_range = Peer {
1186            state: PeerState::Idle,
1187            best_hash: B256::random(),
1188            best_number: 100,
1189            capabilities: Arc::new(Capabilities::new(vec![])),
1190            timeout: Arc::new(AtomicU64::new(10)),
1191            last_response_likely_bad: false,
1192            range_info: None,
1193        };
1194
1195        // Peer without range info is not better (we prefer peers with known ranges)
1196        assert!(!peer_no_range
1197            .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1198
1199        // Peer with range info is better than peer without
1200        assert!(
1201            peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1202        );
1203    }
1204
1205    #[test]
1206    fn test_peer_is_better_one_peer_no_range_covers() {
1207        let range = RangeInclusive::new(40, 60);
1208
1209        // Peer with range info that covers the requested range
1210        let peer_with_range_covers = Peer {
1211            state: PeerState::Idle,
1212            best_hash: B256::random(),
1213            best_number: 100,
1214            capabilities: Arc::new(Capabilities::new(vec![])),
1215            timeout: Arc::new(AtomicU64::new(10)),
1216            last_response_likely_bad: false,
1217            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1218        };
1219
1220        // Peer without range info (treated as full history with unknown latest)
1221        let peer_no_range = Peer {
1222            state: PeerState::Idle,
1223            best_hash: B256::random(),
1224            best_number: 100,
1225            capabilities: Arc::new(Capabilities::new(vec![])),
1226            timeout: Arc::new(AtomicU64::new(10)),
1227            last_response_likely_bad: false,
1228            range_info: None,
1229        };
1230
1231        // Peer with range that covers is better than peer without range info
1232        assert!(peer_with_range_covers
1233            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1234
1235        // Peer without range info is not better when other covers
1236        assert!(!peer_no_range
1237            .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1238    }
1239
1240    #[test]
1241    fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1242        let range = RangeInclusive::new(40, 60);
1243
1244        // Peer with range info that does NOT cover the requested range (too high)
1245        let peer_with_range_no_cover = Peer {
1246            state: PeerState::Idle,
1247            best_hash: B256::random(),
1248            best_number: 100,
1249            capabilities: Arc::new(Capabilities::new(vec![])),
1250            timeout: Arc::new(AtomicU64::new(10)),
1251            last_response_likely_bad: false,
1252            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1253        };
1254
1255        // Peer without range info (treated as full history)
1256        let peer_no_range = Peer {
1257            state: PeerState::Idle,
1258            best_hash: B256::random(),
1259            best_number: 100,
1260            capabilities: Arc::new(Capabilities::new(vec![])),
1261            timeout: Arc::new(AtomicU64::new(10)),
1262            last_response_likely_bad: false,
1263            range_info: None,
1264        };
1265
1266        // Peer with range that doesn't cover is not better
1267        assert!(!peer_with_range_no_cover
1268            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1269
1270        // Peer without range info (full history) is better when other doesn't cover
1271        assert!(peer_no_range
1272            .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1273    }
1274
1275    #[test]
1276    fn test_peer_is_better_edge_cases() {
1277        // Test exact range boundaries
1278        let range = RangeInclusive::new(50, 100);
1279
1280        // Peer that exactly covers the range
1281        let peer_exact = Peer {
1282            state: PeerState::Idle,
1283            best_hash: B256::random(),
1284            best_number: 100,
1285            capabilities: Arc::new(Capabilities::new(vec![])),
1286            timeout: Arc::new(AtomicU64::new(10)),
1287            last_response_likely_bad: false,
1288            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1289        };
1290
1291        // Peer that's one block short at the start
1292        let peer_short_start = Peer {
1293            state: PeerState::Idle,
1294            best_hash: B256::random(),
1295            best_number: 100,
1296            capabilities: Arc::new(Capabilities::new(vec![])),
1297            timeout: Arc::new(AtomicU64::new(10)),
1298            last_response_likely_bad: false,
1299            range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1300        };
1301
1302        // Peer that's one block short at the end
1303        let peer_short_end = Peer {
1304            state: PeerState::Idle,
1305            best_hash: B256::random(),
1306            best_number: 100,
1307            capabilities: Arc::new(Capabilities::new(vec![])),
1308            timeout: Arc::new(AtomicU64::new(10)),
1309            last_response_likely_bad: false,
1310            range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1311        };
1312
1313        // Exact coverage is better than short coverage
1314        assert!(peer_exact
1315            .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1316        assert!(peer_exact
1317            .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1318
1319        // Short coverage is not better than exact coverage
1320        assert!(!peer_short_start
1321            .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1322        assert!(
1323            !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1324        );
1325    }
1326
1327    /// Creates a `StateFetcher` with a single idle peer and returns both.
1328    fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
1329        let manager = PeersManager::new(PeersConfig::default());
1330        let mut fetcher =
1331            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1332        let peer_id = B512::random();
1333
1334        fetcher.new_active_peer(
1335            peer_id,
1336            Default::default(),
1337            Default::default(),
1338            Arc::new(Capabilities::from(vec![])),
1339            Default::default(),
1340            None,
1341        );
1342        (fetcher, peer_id)
1343    }
1344
1345    /// Inserts an inflight receipts request into the fetcher and returns the
1346    /// `oneshot::Receiver` that the final response will be sent through.
1347    fn insert_inflight_receipts(
1348        fetcher: &mut StateFetcher<EthNetworkPrimitives>,
1349        peer_id: PeerId,
1350    ) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
1351    {
1352        let (tx, rx) = oneshot::channel();
1353        fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
1354        fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
1355        rx
1356    }
1357
1358    // ---- Receipts: basic dispatch ----
1359
1360    #[tokio::test]
1361    async fn test_poll_dispatches_receipts_to_peer() {
1362        let (mut fetcher, peer_id) = fetcher_with_peer();
1363
1364        poll_fn(move |cx| {
1365            let (tx, _rx) = oneshot::channel();
1366            fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
1367                request: vec![B256::ZERO],
1368                response: tx,
1369                priority: Priority::default(),
1370            });
1371
1372            let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
1373                fetcher.poll(cx)
1374            else {
1375                panic!("expected Ready(BlockRequest)");
1376            };
1377            assert_eq!(dispatched_peer, peer_id);
1378            assert!(matches!(request, BlockRequest::GetReceipts(_)));
1379
1380            // Peer should now be in GetReceipts state
1381            assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1382            // Inflight request should be tracked
1383            assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1384
1385            Poll::Ready(())
1386        })
1387        .await;
1388    }
1389
1390    // ---- Receipts: response handling ----
1391
1392    #[tokio::test]
1393    async fn test_receipts_complete_response_resolves_and_idles_peer() {
1394        let (mut fetcher, peer_id) = fetcher_with_peer();
1395
1396        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1397
1398        let resp = ReceiptsResponse::new(vec![vec![]]);
1399        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1400
1401        // No queued requests, so no followup
1402        assert!(outcome.is_none());
1403        // Peer back to idle
1404        assert!(fetcher.peers[&peer_id].state.is_idle());
1405        // Inflight cleaned up
1406        assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1407
1408        // Caller receives the response
1409        let result = rx.await.unwrap().unwrap();
1410        assert_eq!(result.1.receipts.len(), 1);
1411    }
1412
1413    #[tokio::test]
1414    async fn test_receipts_empty_response_marks_peer_bad() {
1415        let (mut fetcher, peer_id) = fetcher_with_peer();
1416        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1417
1418        let resp = ReceiptsResponse::new(vec![]);
1419        let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
1420
1421        assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1422    }
1423
1424    #[tokio::test]
1425    async fn test_receipts_error_forwards_and_marks_peer_bad() {
1426        let (mut fetcher, peer_id) = fetcher_with_peer();
1427        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1428
1429        let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
1430
1431        assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1432        // Error is forwarded to the caller
1433        let result = rx.await.unwrap();
1434        assert_eq!(result.unwrap_err(), RequestError::Timeout);
1435    }
1436
1437    #[tokio::test]
1438    async fn test_session_closed_cancels_inflight_receipts() {
1439        let (mut fetcher, peer_id) = fetcher_with_peer();
1440        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1441
1442        fetcher.on_session_closed(&peer_id);
1443
1444        assert!(!fetcher.peers.contains_key(&peer_id));
1445        assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1446
1447        let result = rx.await.unwrap();
1448        assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
1449    }
1450
1451    #[tokio::test]
1452    async fn test_receipts_response_triggers_followup() {
1453        let (mut fetcher, peer_id) = fetcher_with_peer();
1454
1455        // Queue a bodies request as a followup candidate
1456        let (followup_tx, _followup_rx) = oneshot::channel();
1457        fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1458            request: vec![B256::random()],
1459            response: followup_tx,
1460            priority: Priority::default(),
1461            range_hint: None,
1462        });
1463
1464        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1465
1466        let resp = ReceiptsResponse::new(vec![vec![]]);
1467        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1468
1469        assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
1470    }
1471
1472    #[tokio::test]
1473    async fn test_followup_skips_request_peer_cannot_serve() {
1474        let (mut fetcher, peer_id) = fetcher_with_peer();
1475
1476        let peer_71 = B512::random();
1477        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1478        fetcher.new_active_peer(
1479            peer_71,
1480            B256::random(),
1481            100,
1482            caps_71,
1483            Arc::new(AtomicU64::new(10)),
1484            None,
1485        );
1486        fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1487
1488        let (followup_tx, _followup_rx) = oneshot::channel();
1489        fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1490            request: vec![B256::random()],
1491            response: followup_tx,
1492            priority: Priority::Normal,
1493            requirement: BalRequirement::Optional,
1494        });
1495
1496        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1497
1498        let resp = ReceiptsResponse::new(vec![vec![]]);
1499        assert!(fetcher.on_receipts_response(peer_id, Ok(resp)).is_none());
1500        assert!(fetcher.peers[&peer_id].state.is_idle());
1501        assert!(!fetcher.inflight_bals_requests.contains_key(&peer_id));
1502        assert!(matches!(
1503            fetcher.queued_requests.front(),
1504            Some(DownloadRequest::GetBlockAccessLists {
1505                requirement: BalRequirement::Optional,
1506                ..
1507            })
1508        ));
1509    }
1510
1511    #[tokio::test]
1512    async fn test_followup_uses_first_satisfiable_request() {
1513        let (mut fetcher, peer_id) = fetcher_with_peer();
1514
1515        let peer_71 = B512::random();
1516        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1517        fetcher.new_active_peer(
1518            peer_71,
1519            B256::random(),
1520            100,
1521            caps_71,
1522            Arc::new(AtomicU64::new(10)),
1523            None,
1524        );
1525        fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1526
1527        let (bal_tx, _bal_rx) = oneshot::channel();
1528        fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1529            request: vec![B256::random()],
1530            response: bal_tx,
1531            priority: Priority::Normal,
1532            requirement: BalRequirement::Optional,
1533        });
1534
1535        let (bodies_tx, _bodies_rx) = oneshot::channel();
1536        fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1537            request: vec![B256::random()],
1538            response: bodies_tx,
1539            priority: Priority::Normal,
1540            range_hint: None,
1541        });
1542
1543        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1544
1545        let resp = ReceiptsResponse::new(vec![vec![]]);
1546        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1547
1548        assert!(matches!(
1549            outcome,
1550            Some(BlockResponseOutcome::Request(pid, BlockRequest::GetBlockBodies(_))) if pid == peer_id
1551        ));
1552        assert!(fetcher.inflight_bodies_requests.contains_key(&peer_id));
1553        assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetBlockBodies));
1554        assert_eq!(fetcher.queued_requests.len(), 1);
1555        assert!(matches!(
1556            fetcher.queued_requests.front(),
1557            Some(DownloadRequest::GetBlockAccessLists {
1558                requirement: BalRequirement::Optional,
1559                ..
1560            })
1561        ));
1562    }
1563
1564    #[tokio::test]
1565    async fn test_prepare_block_request_creates_inflight_receipts() {
1566        let (mut fetcher, peer_id) = fetcher_with_peer();
1567        let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
1568
1569        let (tx, _rx) = oneshot::channel();
1570        let req = DownloadRequest::GetReceipts {
1571            request: hashes.clone(),
1572            response: tx,
1573            priority: Priority::default(),
1574        };
1575
1576        let block_request = fetcher.prepare_block_request(peer_id, req);
1577
1578        // Returns a GetReceipts block request with the same hashes
1579        match block_request {
1580            BlockRequest::GetReceipts(ref get) => {
1581                assert_eq!(get.0, hashes);
1582            }
1583            other => panic!("expected GetReceipts, got {other:?}"),
1584        }
1585
1586        // Peer state transitions to GetReceipts
1587        assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1588
1589        // Inflight request is tracked
1590        assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1591    }
1592    #[tokio::test]
1593    async fn test_next_best_peer_eth71_no_support() {
1594        let manager = PeersManager::new(PeersConfig::default());
1595        let mut fetcher =
1596            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1597
1598        let peer = B512::random();
1599
1600        // Capabilities WITHOUT eth71
1601        let capabilities = Arc::new(Capabilities::new(vec![]));
1602
1603        fetcher.new_active_peer(
1604            peer,
1605            B256::random(),
1606            100,
1607            capabilities,
1608            Arc::new(AtomicU64::new(10)),
1609            None,
1610        );
1611
1612        // Should return None because peer doesn't support eth71
1613        assert_eq!(
1614            fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1615            None
1616        );
1617    }
1618
1619    #[tokio::test]
1620    async fn test_next_best_peer_eth71_supported() {
1621        let manager = PeersManager::new(PeersConfig::default());
1622        let mut fetcher =
1623            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1624
1625        let peer = B512::random();
1626
1627        // Build capability list that includes Eth71
1628        let capabilities = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1629
1630        fetcher.new_active_peer(
1631            peer,
1632            B256::random(),
1633            100,
1634            capabilities,
1635            Arc::new(AtomicU64::new(10)),
1636            None,
1637        );
1638
1639        assert_eq!(
1640            fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1641            Some(peer)
1642        );
1643    }
1644
1645    #[tokio::test]
1646    async fn test_next_best_peer_eth71_filters_correctly() {
1647        let manager = PeersManager::new(PeersConfig::default());
1648        let mut fetcher =
1649            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1650
1651        let peer_no_71 = B512::random();
1652        let peer_with_71 = B512::random();
1653
1654        // Peer without eth71
1655        let caps_old = Arc::new(Capabilities::new(vec![]));
1656
1657        // Peer with eth71
1658        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1659
1660        fetcher.new_active_peer(
1661            peer_no_71,
1662            B256::random(),
1663            100,
1664            caps_old,
1665            Arc::new(AtomicU64::new(5)),
1666            None,
1667        );
1668
1669        fetcher.new_active_peer(
1670            peer_with_71,
1671            B256::random(),
1672            100,
1673            caps_71,
1674            Arc::new(AtomicU64::new(50)),
1675            None,
1676        );
1677
1678        // Even though peer_no_71 has lower timeout,
1679        // it must NOT be selected.
1680        assert_eq!(
1681            fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1682            Some(peer_with_71)
1683        );
1684    }
1685
1686    #[tokio::test]
1687    async fn test_wakes_when_eth71_peer_connects() {
1688        use futures::task::noop_waker;
1689        use std::task::{Context, Poll};
1690
1691        let manager = PeersManager::new(PeersConfig::default());
1692        let mut fetcher =
1693            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1694
1695        // Queue Eth71-required request
1696        let (tx, _rx) = oneshot::channel();
1697        fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1698            request: vec![],
1699            response: tx,
1700            priority: Priority::Normal,
1701            requirement: BalRequirement::Mandatory,
1702        });
1703
1704        let waker = noop_waker();
1705        let mut cx = Context::from_waker(&waker);
1706
1707        // No peers -> must be Pending
1708        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1709
1710        // Add peer WITHOUT Eth71 support
1711        let peer_old = B512::random();
1712        let caps_old = Arc::new(Capabilities::new(vec![]));
1713
1714        fetcher.new_active_peer(
1715            peer_old,
1716            B256::random(),
1717            100,
1718            caps_old,
1719            Arc::new(AtomicU64::new(10)),
1720            None,
1721        );
1722
1723        // Still Pending
1724        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1725
1726        // Add peer WITH Eth71 support
1727        let peer_71 = B512::random();
1728        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1729
1730        fetcher.new_active_peer(
1731            peer_71,
1732            B256::random(),
1733            100,
1734            caps_71,
1735            Arc::new(AtomicU64::new(10)),
1736            None,
1737        );
1738
1739        // Now we must get Ready(BlockRequest)
1740        if let Poll::Ready(FetchAction::BlockRequest { peer_id, .. }) = fetcher.poll(&mut cx) {
1741            assert_eq!(peer_id, peer_71);
1742        }
1743    }
1744
1745    #[tokio::test]
1746    async fn test_optional_bal_request_rejected_without_eth71_peer() {
1747        use futures::task::noop_waker;
1748        use std::task::{Context, Poll};
1749
1750        let manager = PeersManager::new(PeersConfig::default());
1751        let mut fetcher =
1752            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1753
1754        let peer_old = B512::random();
1755        let caps_old = Arc::new(Capabilities::new(vec![]));
1756        fetcher.new_active_peer(
1757            peer_old,
1758            B256::random(),
1759            100,
1760            caps_old,
1761            Arc::new(AtomicU64::new(10)),
1762            None,
1763        );
1764
1765        let (tx, rx) = oneshot::channel();
1766        fetcher
1767            .download_requests_tx
1768            .send(DownloadRequest::GetBlockAccessLists {
1769                request: vec![],
1770                response: tx,
1771                priority: Priority::Normal,
1772                requirement: BalRequirement::Optional,
1773            })
1774            .unwrap();
1775
1776        let waker = noop_waker();
1777        let mut cx = Context::from_waker(&waker);
1778
1779        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1780        assert!(fetcher.queued_requests.is_empty());
1781        assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability);
1782    }
1783
1784    #[tokio::test]
1785    async fn test_optional_bal_request_waits_for_busy_eth71_peer() {
1786        use futures::task::noop_waker;
1787        use std::task::{Context, Poll};
1788
1789        let manager = PeersManager::new(PeersConfig::default());
1790        let mut fetcher =
1791            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1792
1793        let peer_71 = B512::random();
1794        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1795        fetcher.new_active_peer(
1796            peer_71,
1797            B256::random(),
1798            100,
1799            caps_71,
1800            Arc::new(AtomicU64::new(10)),
1801            None,
1802        );
1803        fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1804
1805        let (tx, _rx) = oneshot::channel();
1806        fetcher
1807            .download_requests_tx
1808            .send(DownloadRequest::GetBlockAccessLists {
1809                request: vec![],
1810                response: tx,
1811                priority: Priority::Normal,
1812                requirement: BalRequirement::Optional,
1813            })
1814            .unwrap();
1815
1816        let waker = noop_waker();
1817        let mut cx = Context::from_waker(&waker);
1818
1819        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1820        assert_eq!(fetcher.queued_requests.len(), 1);
1821    }
1822
1823    #[tokio::test]
1824    async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() {
1825        use futures::task::noop_waker;
1826        use std::task::{Context, Poll};
1827
1828        let manager = PeersManager::new(PeersConfig::default());
1829        let mut fetcher =
1830            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1831
1832        let peer_old = B512::random();
1833        let caps_old = Arc::new(Capabilities::new(vec![]));
1834        fetcher.new_active_peer(
1835            peer_old,
1836            B256::random(),
1837            100,
1838            caps_old,
1839            Arc::new(AtomicU64::new(10)),
1840            None,
1841        );
1842
1843        let peer_71 = B512::random();
1844        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1845        fetcher.new_active_peer(
1846            peer_71,
1847            B256::random(),
1848            100,
1849            caps_71,
1850            Arc::new(AtomicU64::new(10)),
1851            None,
1852        );
1853        fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1854
1855        let (tx, rx) = oneshot::channel();
1856        fetcher
1857            .download_requests_tx
1858            .send(DownloadRequest::GetBlockAccessLists {
1859                request: vec![],
1860                response: tx,
1861                priority: Priority::Normal,
1862                requirement: BalRequirement::Optional,
1863            })
1864            .unwrap();
1865
1866        let waker = noop_waker();
1867        let mut cx = Context::from_waker(&waker);
1868
1869        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1870        assert_eq!(fetcher.queued_requests.len(), 1);
1871
1872        fetcher.on_session_closed(&peer_71);
1873
1874        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1875        assert!(fetcher.queued_requests.is_empty());
1876        assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability);
1877    }
1878}