Skip to main content

reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11    BlockAccessLists, Capabilities, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
12    GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
13};
14use reth_network_api::test_utils::PeersHandle;
15use reth_network_p2p::{
16    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
17    headers::client::HeadersRequest,
18    priority::Priority,
19    receipts::client::ReceiptsResponse,
20};
21use reth_network_peers::PeerId;
22use reth_network_types::ReputationChangeKind;
23use std::{
24    collections::{HashMap, VecDeque},
25    ops::RangeInclusive,
26    sync::{
27        atomic::{AtomicU64, AtomicUsize, Ordering},
28        Arc,
29    },
30    task::{Context, Poll},
31};
32use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
33use tokio_stream::wrappers::UnboundedReceiverStream;
34
35type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
36type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
37type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
38type InflightBlockAccessListsRequest = Request<(), PeerRequestResult<BlockAccessLists>>;
39
40/// Manages data fetching operations.
41///
42/// This type is hooked into the staged sync pipeline and delegates download request to available
43/// peers and sends the response once ready.
44///
45/// This type maintains a list of connected peers that are available for requests.
46#[derive(Debug)]
47pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
48    /// Currently active [`GetBlockHeaders`] requests
49    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
50    /// Currently active [`GetBlockBodies`] requests
51    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
52    /// Currently active [`GetBlockAccessLists`] requests
53    inflight_bals_requests: HashMap<PeerId, InflightBlockAccessListsRequest>,
54    /// Currently active `GetReceipts` requests
55    inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
56    /// The list of _available_ peers for requests.
57    peers: HashMap<PeerId, Peer>,
58    /// The handle to the peers manager
59    peers_handle: PeersHandle,
60    /// Number of active peer sessions the node's currently handling.
61    num_active_peers: Arc<AtomicUsize>,
62    /// Requests queued for processing
63    queued_requests: VecDeque<DownloadRequest<N>>,
64    /// Receiver for new incoming download requests
65    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
66    /// Sender for download requests, used to detach a [`FetchClient`]
67    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
68}
69
70// === impl StateSyncer ===
71
72impl<N: NetworkPrimitives> StateFetcher<N> {
73    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
74        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
75        Self {
76            inflight_headers_requests: Default::default(),
77            inflight_bodies_requests: Default::default(),
78            inflight_bals_requests: Default::default(),
79            inflight_receipts_requests: Default::default(),
80            peers: Default::default(),
81            peers_handle,
82            num_active_peers,
83            queued_requests: Default::default(),
84            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
85            download_requests_tx,
86        }
87    }
88
89    /// Invoked when connected to a new peer.
90    pub(crate) fn new_active_peer(
91        &mut self,
92        peer_id: PeerId,
93        best_hash: B256,
94        best_number: u64,
95        capabilities: Arc<Capabilities>,
96        timeout: Arc<AtomicU64>,
97        range_info: Option<BlockRangeInfo>,
98    ) {
99        self.peers.insert(
100            peer_id,
101            Peer {
102                state: PeerState::Idle,
103                best_hash,
104                best_number,
105                capabilities,
106                timeout,
107                last_response_likely_bad: false,
108                range_info,
109            },
110        );
111    }
112
113    /// Removes the peer from the peer list, after which it is no longer available for future
114    /// requests.
115    ///
116    /// Invoked when an active session was closed.
117    ///
118    /// This cancels also inflight request and sends an error to the receiver.
119    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
120        self.peers.remove(peer);
121        if let Some(req) = self.inflight_headers_requests.remove(peer) {
122            let _ = req.response.send(Err(RequestError::ConnectionDropped));
123        }
124        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
125            let _ = req.response.send(Err(RequestError::ConnectionDropped));
126        }
127        if let Some(req) = self.inflight_bals_requests.remove(peer) {
128            let _ = req.response.send(Err(RequestError::ConnectionDropped));
129        }
130        if let Some(req) = self.inflight_receipts_requests.remove(peer) {
131            let _ = req.response.send(Err(RequestError::ConnectionDropped));
132        }
133    }
134
135    /// Updates the block information for the peer.
136    ///
137    /// Returns `true` if this a newer block
138    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
139        if let Some(peer) = self.peers.get_mut(peer_id) &&
140            number > peer.best_number
141        {
142            peer.best_hash = hash;
143            peer.best_number = number;
144            return true
145        }
146        false
147    }
148
149    /// Invoked when an active session is about to be disconnected.
150    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
151        if let Some(peer) = self.peers.get_mut(peer_id) {
152            peer.state = PeerState::Closing;
153        }
154    }
155
156    /// Returns the _next_ idle peer that's ready to accept a request,
157    /// prioritizing those with the lowest timeout/latency and those that recently responded with
158    /// adequate data. Additionally, if full blocks are required this prioritizes peers that have
159    /// full history available
160    fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
161        // filter out peers that aren't idle or don't meet the requirement
162        let mut idle = self.peers.iter().filter(|(_, peer)| {
163            peer.state.is_idle() &&
164                match &requirement {
165                    BestPeerRequirements::EthVersion(ver) => {
166                        peer.capabilities.supports_eth_at_least(ver)
167                    }
168                    _ => true,
169                }
170        });
171
172        let mut best_peer = idle.next()?;
173
174        for maybe_better in idle {
175            // replace best peer if our current best peer sent us a bad response last time
176            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
177                best_peer = maybe_better;
178                continue
179            }
180
181            // replace best peer if this peer meets the requirements better
182            if maybe_better.1.is_better(best_peer.1, &requirement) {
183                best_peer = maybe_better;
184                continue
185            }
186
187            // replace best peer if this peer has better rtt and both have same range quality
188            if maybe_better.1.timeout() < best_peer.1.timeout() &&
189                !maybe_better.1.last_response_likely_bad
190            {
191                best_peer = maybe_better;
192            }
193        }
194
195        Some(*best_peer.0)
196    }
197
198    /// Returns the next action to return
199    fn poll_action(&mut self) -> PollAction {
200        // we only check and not pop here since we don't know yet whether a peer is available.
201        if self.queued_requests.is_empty() {
202            return PollAction::NoRequests
203        }
204
205        if self.peers.is_empty() {
206            return PollAction::NoPeersAvailable
207        }
208
209        let request = self.queued_requests.pop_front().expect("not empty");
210        let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
211            // no peer matches this request's requirements; requeue at the back so other
212            // queued requests get a chance on the next poll instead of head-of-line blocking.
213            self.queued_requests.push_back(request);
214            return PollAction::NoPeersAvailable
215        };
216
217        let request = self.prepare_block_request(peer_id, request);
218
219        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
220    }
221
222    /// Advance the state the syncer
223    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
224        // drain buffered actions first
225        loop {
226            let no_peers_available = match self.poll_action() {
227                PollAction::Ready(action) => return Poll::Ready(action),
228                PollAction::NoRequests => false,
229                PollAction::NoPeersAvailable => true,
230            };
231
232            loop {
233                // poll incoming requests
234                match self.download_requests_rx.poll_next_unpin(cx) {
235                    Poll::Ready(Some(request)) => match request.get_priority() {
236                        Priority::High => {
237                            // find the first normal request and queue before, add this request to
238                            // the back of the high-priority queue
239                            let pos = self
240                                .queued_requests
241                                .iter()
242                                .position(|req| req.is_normal_priority())
243                                .unwrap_or(0);
244                            self.queued_requests.insert(pos, request);
245                        }
246                        Priority::Normal => {
247                            self.queued_requests.push_back(request);
248                        }
249                    },
250                    Poll::Ready(None) => {
251                        unreachable!("channel can't close")
252                    }
253                    Poll::Pending => break,
254                }
255            }
256
257            if self.queued_requests.is_empty() || no_peers_available {
258                return Poll::Pending
259            }
260        }
261    }
262
263    /// Handles a new request to a peer.
264    ///
265    /// Caution: this assumes the peer exists and is idle
266    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
267        // update the peer's state
268        if let Some(peer) = self.peers.get_mut(&peer_id) {
269            peer.state = req.peer_state();
270        }
271
272        match req {
273            DownloadRequest::GetBlockHeaders { request, response, .. } => {
274                let inflight = Request { request: request.clone(), response };
275                self.inflight_headers_requests.insert(peer_id, inflight);
276                let HeadersRequest { start, limit, direction } = request;
277                BlockRequest::GetBlockHeaders(GetBlockHeaders {
278                    start_block: start,
279                    limit,
280                    skip: 0,
281                    direction,
282                })
283            }
284            DownloadRequest::GetBlockBodies { request, response, .. } => {
285                let inflight = Request { request: (), response };
286                self.inflight_bodies_requests.insert(peer_id, inflight);
287                BlockRequest::GetBlockBodies(GetBlockBodies(request))
288            }
289            DownloadRequest::GetBlockAccessLists { request, response, .. } => {
290                let inflight = Request { request: (), response };
291                self.inflight_bals_requests.insert(peer_id, inflight);
292                BlockRequest::GetBlockAccessLists(GetBlockAccessLists(request))
293            }
294            DownloadRequest::GetReceipts { request, response, .. } => {
295                let inflight = Request { request: (), response };
296                self.inflight_receipts_requests.insert(peer_id, inflight);
297                BlockRequest::GetReceipts(GetReceipts(request))
298            }
299        }
300    }
301
302    /// Returns a new followup request for the peer.
303    ///
304    /// Caution: this expects that the peer is _not_ closed.
305    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
306        let req = self.queued_requests.pop_front()?;
307        let req = self.prepare_block_request(peer_id, req);
308        Some(BlockResponseOutcome::Request(peer_id, req))
309    }
310
311    /// Called on a `GetBlockHeaders` response from a peer.
312    ///
313    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
314    /// direct followup request or get the peer reported if the response was a
315    /// [`EthResponseValidator::reputation_change_err`]
316    pub(crate) fn on_block_headers_response(
317        &mut self,
318        peer_id: PeerId,
319        res: RequestResult<Vec<N::BlockHeader>>,
320    ) -> Option<BlockResponseOutcome> {
321        let is_error = res.is_err();
322        let maybe_reputation_change = res.reputation_change_err();
323
324        let resp = self.inflight_headers_requests.remove(&peer_id);
325
326        let is_likely_bad_response =
327            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
328
329        if let Some(resp) = resp {
330            // delegate the response
331            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
332        }
333
334        if let Some(peer) = self.peers.get_mut(&peer_id) {
335            // update the peer's response state
336            peer.last_response_likely_bad = is_likely_bad_response;
337
338            // If the peer is still ready to accept new requests, we try to send a followup
339            // request immediately.
340            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
341                return self.followup_request(peer_id)
342            }
343        }
344
345        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
346        // outcome
347        maybe_reputation_change
348            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
349    }
350
351    /// Called on a `GetBlockBodies` response from a peer
352    pub(crate) fn on_block_bodies_response(
353        &mut self,
354        peer_id: PeerId,
355        res: RequestResult<Vec<N::BlockBody>>,
356    ) -> Option<BlockResponseOutcome> {
357        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
358
359        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
360            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
361        }
362        if let Some(peer) = self.peers.get_mut(&peer_id) {
363            // update the peer's response state
364            peer.last_response_likely_bad = is_likely_bad_response;
365
366            if peer.state.on_request_finished() && !is_likely_bad_response {
367                return self.followup_request(peer_id)
368            }
369        }
370        None
371    }
372
373    /// Called on a `GetBlockAccessLists` response from a peer
374    pub(crate) fn on_block_access_lists_response(
375        &mut self,
376        peer_id: PeerId,
377        res: RequestResult<BlockAccessLists>,
378    ) -> Option<BlockResponseOutcome> {
379        let is_likely_bad_response = res.is_err();
380
381        if let Some(resp) = self.inflight_bals_requests.remove(&peer_id) {
382            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
383        }
384        if let Some(peer) = self.peers.get_mut(&peer_id) {
385            peer.last_response_likely_bad = is_likely_bad_response;
386
387            if peer.state.on_request_finished() && !is_likely_bad_response {
388                return self.followup_request(peer_id)
389            }
390        }
391        None
392    }
393
394    /// Called on a `GetReceipts` response from a peer.
395    ///
396    /// All receipt variants (legacy with bloom, eth/69, eth/70) are expected to be normalized
397    /// to [`ReceiptsResponse`] by the caller before invoking this method.
398    pub(crate) fn on_receipts_response(
399        &mut self,
400        peer_id: PeerId,
401        res: RequestResult<ReceiptsResponse<N::Receipt>>,
402    ) -> Option<BlockResponseOutcome> {
403        let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
404
405        if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
406            let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
407        }
408        if let Some(peer) = self.peers.get_mut(&peer_id) {
409            peer.last_response_likely_bad = is_likely_bad_response;
410
411            if peer.state.on_request_finished() && !is_likely_bad_response {
412                return self.followup_request(peer_id)
413            }
414        }
415        None
416    }
417
418    /// Returns a new [`FetchClient`] that can send requests to this type.
419    pub(crate) fn client(&self) -> FetchClient<N> {
420        FetchClient {
421            request_tx: self.download_requests_tx.clone(),
422            peers_handle: self.peers_handle.clone(),
423            num_active_peers: Arc::clone(&self.num_active_peers),
424        }
425    }
426}
427
428/// The outcome of [`StateFetcher::poll_action`]
429enum PollAction {
430    Ready(FetchAction),
431    NoRequests,
432    NoPeersAvailable,
433}
434
435/// Represents a connected peer
436#[derive(Debug)]
437struct Peer {
438    /// The state this peer currently resides in.
439    state: PeerState,
440    /// Best known hash that the peer has
441    best_hash: B256,
442    /// Tracks the best number of the peer.
443    best_number: u64,
444    /// Capabilities announced by the peer.
445    #[allow(dead_code)]
446    capabilities: Arc<Capabilities>,
447    /// Tracks the current timeout value we use for the peer.
448    timeout: Arc<AtomicU64>,
449    /// Tracks whether the peer has recently responded with a likely bad response.
450    ///
451    /// This is used to de-rank the peer if there are other peers available.
452    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
453    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
454    /// lowest timeout.
455    last_response_likely_bad: bool,
456    /// Tracks the range info for the peer.
457    range_info: Option<BlockRangeInfo>,
458}
459
460impl Peer {
461    fn timeout(&self) -> u64 {
462        self.timeout.load(Ordering::Relaxed)
463    }
464
465    /// Returns the earliest block number available from the peer.
466    fn earliest(&self) -> u64 {
467        self.range_info.as_ref().map_or(0, |info| info.earliest())
468    }
469
470    /// Returns true if the peer has the full history available.
471    fn has_full_history(&self) -> bool {
472        self.earliest() == 0
473    }
474
475    fn range(&self) -> Option<RangeInclusive<u64>> {
476        self.range_info.as_ref().map(|info| info.range())
477    }
478
479    /// Returns true if this peer has a better range than the other peer for serving the requested
480    /// range.
481    ///
482    /// A peer has a "better range" if:
483    /// 1. It can fully cover the requested range while the other cannot
484    /// 2. None can fully cover the range, but this peer has lower start value
485    /// 3. If a peer doesn't announce a range we assume it has full history, but check the other's
486    ///    range and treat that as better if it can cover the range
487    fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
488        let self_range = self.range();
489        let other_range = other.range();
490
491        match (self_range, other_range) {
492            (Some(self_r), Some(other_r)) => {
493                // Check if each peer can fully cover the requested range
494                let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
495                let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
496
497                #[expect(clippy::match_same_arms)]
498                match (self_covers, other_covers) {
499                    (true, false) => true,  // Only self covers the range
500                    (false, true) => false, // Only other covers the range
501                    (true, true) => false,  // Both cover
502                    (false, false) => {
503                        // neither covers - prefer if peer has lower (better) start range
504                        self_r.start() < other_r.start()
505                    }
506                }
507            }
508            (Some(self_r), None) => {
509                // Self has range info, other doesn't (treated as full history with unknown latest)
510                // Self is better only if it covers the range
511                self_r.contains(range.start()) && self_r.contains(range.end())
512            }
513            (None, Some(other_r)) => {
514                // Self has no range info (full history), other has range info
515                // Self is better only if other doesn't cover the range
516                !(other_r.contains(range.start()) && other_r.contains(range.end()))
517            }
518            (None, None) => false, // Neither has range info - no one is better
519        }
520    }
521
522    /// Returns true if this peer is better than the other peer based on the given requirements.
523    fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
524        match requirement {
525            BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
526            BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
527            // Version-based filtering happens in `next_best_peer`, so by the time we get here
528            // both peers already satisfy the version requirement.
529            BestPeerRequirements::None | BestPeerRequirements::EthVersion(_) => false,
530        }
531    }
532}
533
534/// Tracks the state of an individual peer
535#[derive(Debug)]
536enum PeerState {
537    /// Peer is currently not handling requests and is available.
538    Idle,
539    /// Peer is handling a `GetBlockHeaders` request.
540    GetBlockHeaders,
541    /// Peer is handling a `GetBlockBodies` request.
542    GetBlockBodies,
543    /// Peer is handling a `GetBlockAccessLists` request.
544    GetBlockAccessLists,
545    /// Peer is handling a `GetReceipts` request.
546    GetReceipts,
547    /// Peer session is about to close
548    Closing,
549}
550
551// === impl PeerState ===
552
553impl PeerState {
554    /// Returns true if the peer is currently idle.
555    const fn is_idle(&self) -> bool {
556        matches!(self, Self::Idle)
557    }
558
559    /// Resets the state on a received response.
560    ///
561    /// If the state was already marked as `Closing` do nothing.
562    ///
563    /// Returns `true` if the peer is ready for another request.
564    const fn on_request_finished(&mut self) -> bool {
565        if !matches!(self, Self::Closing) {
566            *self = Self::Idle;
567            return true
568        }
569        false
570    }
571}
572
573/// A request that waits for a response from the network, so it can send it back through the
574/// response channel.
575#[derive(Debug)]
576struct Request<Req, Resp> {
577    /// The issued request object
578    // TODO: this can be attached to the response in error case
579    request: Req,
580    response: oneshot::Sender<Resp>,
581}
582
583/// Requests that can be sent to the Syncer from a [`FetchClient`]
584#[derive(Debug)]
585#[expect(clippy::enum_variant_names)]
586pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
587    /// Download the requested headers and send response through channel
588    GetBlockHeaders {
589        request: HeadersRequest,
590        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
591        priority: Priority,
592    },
593    /// Download the requested bodies and send response through channel
594    GetBlockBodies {
595        request: Vec<B256>,
596        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
597        priority: Priority,
598        range_hint: Option<RangeInclusive<u64>>,
599    },
600    /// Download the requested access lists and send response through channel
601    GetBlockAccessLists {
602        request: Vec<B256>,
603        response: oneshot::Sender<PeerRequestResult<BlockAccessLists>>,
604        priority: Priority,
605    },
606    /// Download receipts for the given block hashes and send response through channel
607    GetReceipts {
608        request: Vec<B256>,
609        response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
610        priority: Priority,
611    },
612}
613
614// === impl DownloadRequest ===
615
616impl<N: NetworkPrimitives> DownloadRequest<N> {
617    /// Returns the corresponding state for a peer that handles the request.
618    const fn peer_state(&self) -> PeerState {
619        match self {
620            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
621            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
622            Self::GetBlockAccessLists { .. } => PeerState::GetBlockAccessLists,
623            Self::GetReceipts { .. } => PeerState::GetReceipts,
624        }
625    }
626
627    /// Returns the requested priority of this request
628    const fn get_priority(&self) -> &Priority {
629        match self {
630            Self::GetBlockHeaders { priority, .. } |
631            Self::GetBlockBodies { priority, .. } |
632            Self::GetBlockAccessLists { priority, .. } |
633            Self::GetReceipts { priority, .. } => priority,
634        }
635    }
636
637    /// Returns `true` if this request is normal priority.
638    const fn is_normal_priority(&self) -> bool {
639        self.get_priority().is_normal()
640    }
641
642    /// Returns the best peer requirements for this request.
643    fn best_peer_requirements(&self) -> BestPeerRequirements {
644        match self {
645            Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
646            Self::GetBlockAccessLists { .. } => BestPeerRequirements::EthVersion(EthVersion::Eth71),
647            Self::GetBlockBodies { range_hint, .. } => {
648                if let Some(range) = range_hint {
649                    BestPeerRequirements::FullBlockRange(range.clone())
650                } else {
651                    BestPeerRequirements::FullBlock
652                }
653            }
654            Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
655        }
656    }
657}
658
659/// An action the syncer can emit.
660pub(crate) enum FetchAction {
661    /// Dispatch an eth request to the given peer.
662    BlockRequest {
663        /// The targeted recipient for the request
664        peer_id: PeerId,
665        /// The request to send
666        request: BlockRequest,
667    },
668}
669
670/// Outcome of a processed response.
671///
672/// Returned after processing a response.
673#[derive(Debug, PartialEq, Eq)]
674pub(crate) enum BlockResponseOutcome {
675    /// Continue with another request to the peer.
676    Request(PeerId, BlockRequest),
677    /// How to handle a bad response and the reputation change to apply, if any.
678    BadResponse(PeerId, ReputationChangeKind),
679}
680
681/// Additional requirements for how to rank peers during selection.
682enum BestPeerRequirements {
683    /// No additional requirements
684    None,
685    /// Peer must have this block range available.
686    FullBlockRange(RangeInclusive<u64>),
687    /// Peer must have full range.
688    FullBlock,
689    /// Peer must support at least this eth protocol version.
690    EthVersion(EthVersion),
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696    use crate::{peers::PeersManager, PeersConfig};
697    use alloy_consensus::Header;
698    use alloy_primitives::B512;
699    use reth_eth_wire::Capability;
700    use std::future::poll_fn;
701
702    #[tokio::test(flavor = "multi_thread")]
703    async fn test_poll_fetcher() {
704        let manager = PeersManager::new(PeersConfig::default());
705        let mut fetcher =
706            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
707
708        poll_fn(move |cx| {
709            assert!(fetcher.poll(cx).is_pending());
710            let (tx, _rx) = oneshot::channel();
711            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
712                request: vec![],
713                response: tx,
714                priority: Priority::default(),
715                range_hint: None,
716            });
717            assert!(fetcher.poll(cx).is_pending());
718
719            Poll::Ready(())
720        })
721        .await;
722    }
723
724    #[tokio::test]
725    async fn test_peer_rotation() {
726        let manager = PeersManager::new(PeersConfig::default());
727        let mut fetcher =
728            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
729        // Add a few random peers
730        let peer1 = B512::random();
731        let peer2 = B512::random();
732        let capabilities = Arc::new(Capabilities::from(vec![]));
733        fetcher.new_active_peer(
734            peer1,
735            B256::random(),
736            1,
737            Arc::clone(&capabilities),
738            Arc::new(AtomicU64::new(1)),
739            None,
740        );
741        fetcher.new_active_peer(
742            peer2,
743            B256::random(),
744            2,
745            Arc::clone(&capabilities),
746            Arc::new(AtomicU64::new(1)),
747            None,
748        );
749
750        let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
751        assert!(first_peer == peer1 || first_peer == peer2);
752        // Pending disconnect for first_peer
753        fetcher.on_pending_disconnect(&first_peer);
754        // first_peer now isn't idle, so we should get other peer
755        let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
756        assert!(first_peer == peer1 || first_peer == peer2);
757        assert_ne!(first_peer, second_peer);
758        // without idle peers, returns None
759        fetcher.on_pending_disconnect(&second_peer);
760        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
761    }
762
763    #[tokio::test]
764    async fn test_peer_prioritization() {
765        let manager = PeersManager::new(PeersConfig::default());
766        let mut fetcher =
767            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
768        // Add a few random peers
769        let peer1 = B512::random();
770        let peer2 = B512::random();
771        let peer3 = B512::random();
772
773        let peer2_timeout = Arc::new(AtomicU64::new(300));
774
775        let capabilities = Arc::new(Capabilities::from(vec![]));
776        fetcher.new_active_peer(
777            peer1,
778            B256::random(),
779            1,
780            Arc::clone(&capabilities),
781            Arc::new(AtomicU64::new(30)),
782            None,
783        );
784        fetcher.new_active_peer(
785            peer2,
786            B256::random(),
787            2,
788            Arc::clone(&capabilities),
789            Arc::clone(&peer2_timeout),
790            None,
791        );
792        fetcher.new_active_peer(
793            peer3,
794            B256::random(),
795            3,
796            Arc::clone(&capabilities),
797            Arc::new(AtomicU64::new(50)),
798            None,
799        );
800
801        // Must always get peer1 (lowest timeout)
802        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
803        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
804        // peer2's timeout changes below peer1's
805        peer2_timeout.store(10, Ordering::Relaxed);
806        // Then we get peer 2 always (now lowest)
807        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
808        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
809    }
810
811    #[tokio::test]
812    async fn test_on_block_headers_response() {
813        let manager = PeersManager::new(PeersConfig::default());
814        let mut fetcher =
815            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
816        let peer_id = B512::random();
817
818        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
819
820        assert_eq!(
821            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
822            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
823        );
824        assert_eq!(
825            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
826            None
827        );
828        assert_eq!(
829            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
830            None
831        );
832        assert_eq!(
833            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
834            None
835        );
836        assert_eq!(
837            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
838            None
839        );
840    }
841
842    #[tokio::test]
843    async fn test_header_response_outcome() {
844        let manager = PeersManager::new(PeersConfig::default());
845        let mut fetcher =
846            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
847        let peer_id = B512::random();
848
849        let request_pair = || {
850            let (tx, _rx) = oneshot::channel();
851            let req = Request {
852                request: HeadersRequest {
853                    start: 0u64.into(),
854                    limit: 1,
855                    direction: Default::default(),
856                },
857                response: tx,
858            };
859            let header = Header { number: 0, ..Default::default() };
860            (req, header)
861        };
862
863        fetcher.new_active_peer(
864            peer_id,
865            Default::default(),
866            Default::default(),
867            Arc::new(Capabilities::from(vec![])),
868            Default::default(),
869            None,
870        );
871
872        let (req, header) = request_pair();
873        fetcher.inflight_headers_requests.insert(peer_id, req);
874
875        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
876        assert!(outcome.is_none());
877        assert!(fetcher.peers[&peer_id].state.is_idle());
878
879        let outcome =
880            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
881
882        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
883            RequestError::Timeout
884        ))
885        .is_some());
886
887        match outcome {
888            BlockResponseOutcome::BadResponse(peer, _) => {
889                assert_eq!(peer, peer_id)
890            }
891            BlockResponseOutcome::Request(_, _) => {
892                unreachable!()
893            }
894        };
895
896        assert!(fetcher.peers[&peer_id].state.is_idle());
897    }
898
899    #[test]
900    fn test_peer_is_better_none_requirement() {
901        let peer1 = Peer {
902            state: PeerState::Idle,
903            best_hash: B256::random(),
904            best_number: 100,
905            capabilities: Arc::new(Capabilities::new(vec![])),
906            timeout: Arc::new(AtomicU64::new(10)),
907            last_response_likely_bad: false,
908            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
909        };
910
911        let peer2 = Peer {
912            state: PeerState::Idle,
913            best_hash: B256::random(),
914            best_number: 50,
915            capabilities: Arc::new(Capabilities::new(vec![])),
916            timeout: Arc::new(AtomicU64::new(20)),
917            last_response_likely_bad: false,
918            range_info: None,
919        };
920
921        // With None requirement, is_better should always return false
922        assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
923        assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
924    }
925
926    #[test]
927    fn test_peer_is_better_full_block_requirement() {
928        // Peer with full history (earliest = 0)
929        let peer_full = Peer {
930            state: PeerState::Idle,
931            best_hash: B256::random(),
932            best_number: 100,
933            capabilities: Arc::new(Capabilities::new(vec![])),
934            timeout: Arc::new(AtomicU64::new(10)),
935            last_response_likely_bad: false,
936            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
937        };
938
939        // Peer without full history (earliest = 50)
940        let peer_partial = Peer {
941            state: PeerState::Idle,
942            best_hash: B256::random(),
943            best_number: 100,
944            capabilities: Arc::new(Capabilities::new(vec![])),
945            timeout: Arc::new(AtomicU64::new(10)),
946            last_response_likely_bad: false,
947            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
948        };
949
950        // Peer without range info (treated as full history)
951        let peer_no_range = Peer {
952            state: PeerState::Idle,
953            best_hash: B256::random(),
954            best_number: 100,
955            capabilities: Arc::new(Capabilities::new(vec![])),
956            timeout: Arc::new(AtomicU64::new(10)),
957            last_response_likely_bad: false,
958            range_info: None,
959        };
960
961        // Peer with full history is better than peer without
962        assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
963        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
964
965        // Peer without range info (full history) is better than partial
966        assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
967        assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
968
969        // Both have full history - no improvement
970        assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
971        assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
972    }
973
974    #[test]
975    fn test_peer_is_better_full_block_range_requirement() {
976        let range = RangeInclusive::new(40, 60);
977
978        // Peer that covers the requested range
979        let peer_covers = Peer {
980            state: PeerState::Idle,
981            best_hash: B256::random(),
982            best_number: 100,
983            capabilities: Arc::new(Capabilities::new(vec![])),
984            timeout: Arc::new(AtomicU64::new(10)),
985            last_response_likely_bad: false,
986            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
987        };
988
989        // Peer that doesn't cover the range (earliest too high)
990        let peer_no_cover = Peer {
991            state: PeerState::Idle,
992            best_hash: B256::random(),
993            best_number: 100,
994            capabilities: Arc::new(Capabilities::new(vec![])),
995            timeout: Arc::new(AtomicU64::new(10)),
996            last_response_likely_bad: false,
997            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
998        };
999
1000        // Peer that covers the requested range is better than one that doesn't
1001        assert!(peer_covers
1002            .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
1003        assert!(
1004            !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
1005        );
1006    }
1007
1008    #[test]
1009    fn test_peer_is_better_both_cover_range() {
1010        let range = RangeInclusive::new(30, 50);
1011
1012        // Peer with full history that covers the range
1013        let peer_full = Peer {
1014            state: PeerState::Idle,
1015            best_hash: B256::random(),
1016            best_number: 100,
1017            capabilities: Arc::new(Capabilities::new(vec![])),
1018            timeout: Arc::new(AtomicU64::new(10)),
1019            last_response_likely_bad: false,
1020            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1021        };
1022
1023        // Peer without full history that also covers the range
1024        let peer_partial = Peer {
1025            state: PeerState::Idle,
1026            best_hash: B256::random(),
1027            best_number: 100,
1028            capabilities: Arc::new(Capabilities::new(vec![])),
1029            timeout: Arc::new(AtomicU64::new(10)),
1030            last_response_likely_bad: false,
1031            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1032        };
1033
1034        // When both cover the range, prefer none
1035        assert!(!peer_full
1036            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1037        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1038    }
1039
1040    #[test]
1041    fn test_peer_is_better_lower_start() {
1042        let range = RangeInclusive::new(30, 60);
1043
1044        // Peer with full history that covers the range
1045        let peer_full = Peer {
1046            state: PeerState::Idle,
1047            best_hash: B256::random(),
1048            best_number: 100,
1049            capabilities: Arc::new(Capabilities::new(vec![])),
1050            timeout: Arc::new(AtomicU64::new(10)),
1051            last_response_likely_bad: false,
1052            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1053        };
1054
1055        // Peer without full history that also covers the range
1056        let peer_partial = Peer {
1057            state: PeerState::Idle,
1058            best_hash: B256::random(),
1059            best_number: 100,
1060            capabilities: Arc::new(Capabilities::new(vec![])),
1061            timeout: Arc::new(AtomicU64::new(10)),
1062            last_response_likely_bad: false,
1063            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1064        };
1065
1066        // When both cover the range, prefer lower start value
1067        assert!(peer_full
1068            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1069        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1070    }
1071
1072    #[test]
1073    fn test_peer_is_better_neither_covers_range() {
1074        let range = RangeInclusive::new(40, 60);
1075
1076        // Peer with full history that doesn't cover the range (latest too low)
1077        let peer_full = Peer {
1078            state: PeerState::Idle,
1079            best_hash: B256::random(),
1080            best_number: 30,
1081            capabilities: Arc::new(Capabilities::new(vec![])),
1082            timeout: Arc::new(AtomicU64::new(10)),
1083            last_response_likely_bad: false,
1084            range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
1085        };
1086
1087        // Peer without full history that also doesn't cover the range
1088        let peer_partial = Peer {
1089            state: PeerState::Idle,
1090            best_hash: B256::random(),
1091            best_number: 30,
1092            capabilities: Arc::new(Capabilities::new(vec![])),
1093            timeout: Arc::new(AtomicU64::new(10)),
1094            last_response_likely_bad: false,
1095            range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
1096        };
1097
1098        // When neither covers the range, prefer full history
1099        assert!(peer_full
1100            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1101        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1102    }
1103
1104    #[test]
1105    fn test_peer_is_better_no_range_info() {
1106        let range = RangeInclusive::new(40, 60);
1107
1108        // Peer with range info
1109        let peer_with_range = Peer {
1110            state: PeerState::Idle,
1111            best_hash: B256::random(),
1112            best_number: 100,
1113            capabilities: Arc::new(Capabilities::new(vec![])),
1114            timeout: Arc::new(AtomicU64::new(10)),
1115            last_response_likely_bad: false,
1116            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1117        };
1118
1119        // Peer without range info
1120        let peer_no_range = Peer {
1121            state: PeerState::Idle,
1122            best_hash: B256::random(),
1123            best_number: 100,
1124            capabilities: Arc::new(Capabilities::new(vec![])),
1125            timeout: Arc::new(AtomicU64::new(10)),
1126            last_response_likely_bad: false,
1127            range_info: None,
1128        };
1129
1130        // Peer without range info is not better (we prefer peers with known ranges)
1131        assert!(!peer_no_range
1132            .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1133
1134        // Peer with range info is better than peer without
1135        assert!(
1136            peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1137        );
1138    }
1139
1140    #[test]
1141    fn test_peer_is_better_one_peer_no_range_covers() {
1142        let range = RangeInclusive::new(40, 60);
1143
1144        // Peer with range info that covers the requested range
1145        let peer_with_range_covers = Peer {
1146            state: PeerState::Idle,
1147            best_hash: B256::random(),
1148            best_number: 100,
1149            capabilities: Arc::new(Capabilities::new(vec![])),
1150            timeout: Arc::new(AtomicU64::new(10)),
1151            last_response_likely_bad: false,
1152            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1153        };
1154
1155        // Peer without range info (treated as full history with unknown latest)
1156        let peer_no_range = Peer {
1157            state: PeerState::Idle,
1158            best_hash: B256::random(),
1159            best_number: 100,
1160            capabilities: Arc::new(Capabilities::new(vec![])),
1161            timeout: Arc::new(AtomicU64::new(10)),
1162            last_response_likely_bad: false,
1163            range_info: None,
1164        };
1165
1166        // Peer with range that covers is better than peer without range info
1167        assert!(peer_with_range_covers
1168            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1169
1170        // Peer without range info is not better when other covers
1171        assert!(!peer_no_range
1172            .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1173    }
1174
1175    #[test]
1176    fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1177        let range = RangeInclusive::new(40, 60);
1178
1179        // Peer with range info that does NOT cover the requested range (too high)
1180        let peer_with_range_no_cover = Peer {
1181            state: PeerState::Idle,
1182            best_hash: B256::random(),
1183            best_number: 100,
1184            capabilities: Arc::new(Capabilities::new(vec![])),
1185            timeout: Arc::new(AtomicU64::new(10)),
1186            last_response_likely_bad: false,
1187            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1188        };
1189
1190        // Peer without range info (treated as full history)
1191        let peer_no_range = Peer {
1192            state: PeerState::Idle,
1193            best_hash: B256::random(),
1194            best_number: 100,
1195            capabilities: Arc::new(Capabilities::new(vec![])),
1196            timeout: Arc::new(AtomicU64::new(10)),
1197            last_response_likely_bad: false,
1198            range_info: None,
1199        };
1200
1201        // Peer with range that doesn't cover is not better
1202        assert!(!peer_with_range_no_cover
1203            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1204
1205        // Peer without range info (full history) is better when other doesn't cover
1206        assert!(peer_no_range
1207            .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1208    }
1209
1210    #[test]
1211    fn test_peer_is_better_edge_cases() {
1212        // Test exact range boundaries
1213        let range = RangeInclusive::new(50, 100);
1214
1215        // Peer that exactly covers the range
1216        let peer_exact = Peer {
1217            state: PeerState::Idle,
1218            best_hash: B256::random(),
1219            best_number: 100,
1220            capabilities: Arc::new(Capabilities::new(vec![])),
1221            timeout: Arc::new(AtomicU64::new(10)),
1222            last_response_likely_bad: false,
1223            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1224        };
1225
1226        // Peer that's one block short at the start
1227        let peer_short_start = Peer {
1228            state: PeerState::Idle,
1229            best_hash: B256::random(),
1230            best_number: 100,
1231            capabilities: Arc::new(Capabilities::new(vec![])),
1232            timeout: Arc::new(AtomicU64::new(10)),
1233            last_response_likely_bad: false,
1234            range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1235        };
1236
1237        // Peer that's one block short at the end
1238        let peer_short_end = Peer {
1239            state: PeerState::Idle,
1240            best_hash: B256::random(),
1241            best_number: 100,
1242            capabilities: Arc::new(Capabilities::new(vec![])),
1243            timeout: Arc::new(AtomicU64::new(10)),
1244            last_response_likely_bad: false,
1245            range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1246        };
1247
1248        // Exact coverage is better than short coverage
1249        assert!(peer_exact
1250            .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1251        assert!(peer_exact
1252            .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1253
1254        // Short coverage is not better than exact coverage
1255        assert!(!peer_short_start
1256            .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1257        assert!(
1258            !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1259        );
1260    }
1261
1262    /// Creates a `StateFetcher` with a single idle peer and returns both.
1263    fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
1264        let manager = PeersManager::new(PeersConfig::default());
1265        let mut fetcher =
1266            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1267        let peer_id = B512::random();
1268
1269        fetcher.new_active_peer(
1270            peer_id,
1271            Default::default(),
1272            Default::default(),
1273            Arc::new(Capabilities::from(vec![])),
1274            Default::default(),
1275            None,
1276        );
1277        (fetcher, peer_id)
1278    }
1279
1280    /// Inserts an inflight receipts request into the fetcher and returns the
1281    /// `oneshot::Receiver` that the final response will be sent through.
1282    fn insert_inflight_receipts(
1283        fetcher: &mut StateFetcher<EthNetworkPrimitives>,
1284        peer_id: PeerId,
1285    ) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
1286    {
1287        let (tx, rx) = oneshot::channel();
1288        fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
1289        fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
1290        rx
1291    }
1292
1293    // ---- Receipts: basic dispatch ----
1294
1295    #[tokio::test]
1296    async fn test_poll_dispatches_receipts_to_peer() {
1297        let (mut fetcher, peer_id) = fetcher_with_peer();
1298
1299        poll_fn(move |cx| {
1300            let (tx, _rx) = oneshot::channel();
1301            fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
1302                request: vec![B256::ZERO],
1303                response: tx,
1304                priority: Priority::default(),
1305            });
1306
1307            let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
1308                fetcher.poll(cx)
1309            else {
1310                panic!("expected Ready(BlockRequest)");
1311            };
1312            assert_eq!(dispatched_peer, peer_id);
1313            assert!(matches!(request, BlockRequest::GetReceipts(_)));
1314
1315            // Peer should now be in GetReceipts state
1316            assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1317            // Inflight request should be tracked
1318            assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1319
1320            Poll::Ready(())
1321        })
1322        .await;
1323    }
1324
1325    // ---- Receipts: response handling ----
1326
1327    #[tokio::test]
1328    async fn test_receipts_complete_response_resolves_and_idles_peer() {
1329        let (mut fetcher, peer_id) = fetcher_with_peer();
1330
1331        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1332
1333        let resp = ReceiptsResponse::new(vec![vec![]]);
1334        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1335
1336        // No queued requests, so no followup
1337        assert!(outcome.is_none());
1338        // Peer back to idle
1339        assert!(fetcher.peers[&peer_id].state.is_idle());
1340        // Inflight cleaned up
1341        assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1342
1343        // Caller receives the response
1344        let result = rx.await.unwrap().unwrap();
1345        assert_eq!(result.1.receipts.len(), 1);
1346    }
1347
1348    #[tokio::test]
1349    async fn test_receipts_empty_response_marks_peer_bad() {
1350        let (mut fetcher, peer_id) = fetcher_with_peer();
1351        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1352
1353        let resp = ReceiptsResponse::new(vec![]);
1354        let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
1355
1356        assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1357    }
1358
1359    #[tokio::test]
1360    async fn test_receipts_error_forwards_and_marks_peer_bad() {
1361        let (mut fetcher, peer_id) = fetcher_with_peer();
1362        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1363
1364        let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
1365
1366        assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1367        // Error is forwarded to the caller
1368        let result = rx.await.unwrap();
1369        assert_eq!(result.unwrap_err(), RequestError::Timeout);
1370    }
1371
1372    #[tokio::test]
1373    async fn test_session_closed_cancels_inflight_receipts() {
1374        let (mut fetcher, peer_id) = fetcher_with_peer();
1375        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1376
1377        fetcher.on_session_closed(&peer_id);
1378
1379        assert!(!fetcher.peers.contains_key(&peer_id));
1380        assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1381
1382        let result = rx.await.unwrap();
1383        assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
1384    }
1385
1386    #[tokio::test]
1387    async fn test_receipts_response_triggers_followup() {
1388        let (mut fetcher, peer_id) = fetcher_with_peer();
1389
1390        // Queue a bodies request as a followup candidate
1391        let (followup_tx, _followup_rx) = oneshot::channel();
1392        fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1393            request: vec![B256::random()],
1394            response: followup_tx,
1395            priority: Priority::default(),
1396            range_hint: None,
1397        });
1398
1399        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1400
1401        let resp = ReceiptsResponse::new(vec![vec![]]);
1402        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1403
1404        assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
1405    }
1406
1407    #[tokio::test]
1408    async fn test_prepare_block_request_creates_inflight_receipts() {
1409        let (mut fetcher, peer_id) = fetcher_with_peer();
1410        let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
1411
1412        let (tx, _rx) = oneshot::channel();
1413        let req = DownloadRequest::GetReceipts {
1414            request: hashes.clone(),
1415            response: tx,
1416            priority: Priority::default(),
1417        };
1418
1419        let block_request = fetcher.prepare_block_request(peer_id, req);
1420
1421        // Returns a GetReceipts block request with the same hashes
1422        match block_request {
1423            BlockRequest::GetReceipts(ref get) => {
1424                assert_eq!(get.0, hashes);
1425            }
1426            other => panic!("expected GetReceipts, got {other:?}"),
1427        }
1428
1429        // Peer state transitions to GetReceipts
1430        assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1431
1432        // Inflight request is tracked
1433        assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1434    }
1435    #[tokio::test]
1436    async fn test_next_best_peer_eth71_no_support() {
1437        let manager = PeersManager::new(PeersConfig::default());
1438        let mut fetcher =
1439            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1440
1441        let peer = B512::random();
1442
1443        // Capabilities WITHOUT eth71
1444        let capabilities = Arc::new(Capabilities::new(vec![]));
1445
1446        fetcher.new_active_peer(
1447            peer,
1448            B256::random(),
1449            100,
1450            capabilities,
1451            Arc::new(AtomicU64::new(10)),
1452            None,
1453        );
1454
1455        // Should return None because peer doesn't support eth71
1456        assert_eq!(
1457            fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1458            None
1459        );
1460    }
1461
1462    #[tokio::test]
1463    async fn test_next_best_peer_eth71_supported() {
1464        let manager = PeersManager::new(PeersConfig::default());
1465        let mut fetcher =
1466            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1467
1468        let peer = B512::random();
1469
1470        // Build capability list that includes Eth71
1471        let capabilities = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1472
1473        fetcher.new_active_peer(
1474            peer,
1475            B256::random(),
1476            100,
1477            capabilities,
1478            Arc::new(AtomicU64::new(10)),
1479            None,
1480        );
1481
1482        assert_eq!(
1483            fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1484            Some(peer)
1485        );
1486    }
1487
1488    #[tokio::test]
1489    async fn test_next_best_peer_eth71_filters_correctly() {
1490        let manager = PeersManager::new(PeersConfig::default());
1491        let mut fetcher =
1492            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1493
1494        let peer_no_71 = B512::random();
1495        let peer_with_71 = B512::random();
1496
1497        // Peer without eth71
1498        let caps_old = Arc::new(Capabilities::new(vec![]));
1499
1500        // Peer with eth71
1501        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1502
1503        fetcher.new_active_peer(
1504            peer_no_71,
1505            B256::random(),
1506            100,
1507            caps_old,
1508            Arc::new(AtomicU64::new(5)),
1509            None,
1510        );
1511
1512        fetcher.new_active_peer(
1513            peer_with_71,
1514            B256::random(),
1515            100,
1516            caps_71,
1517            Arc::new(AtomicU64::new(50)),
1518            None,
1519        );
1520
1521        // Even though peer_no_71 has lower timeout,
1522        // it must NOT be selected.
1523        assert_eq!(
1524            fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1525            Some(peer_with_71)
1526        );
1527    }
1528
1529    #[tokio::test]
1530    async fn test_wakes_when_eth71_peer_connects() {
1531        use futures::task::noop_waker;
1532        use std::task::{Context, Poll};
1533
1534        let manager = PeersManager::new(PeersConfig::default());
1535        let mut fetcher =
1536            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1537
1538        // Queue Eth71-required request
1539        let (tx, _rx) = oneshot::channel();
1540        fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1541            request: vec![],
1542            response: tx,
1543            priority: Priority::Normal,
1544        });
1545
1546        let waker = noop_waker();
1547        let mut cx = Context::from_waker(&waker);
1548
1549        // No peers -> must be Pending
1550        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1551
1552        // Add peer WITHOUT Eth71 support
1553        let peer_old = B512::random();
1554        let caps_old = Arc::new(Capabilities::new(vec![]));
1555
1556        fetcher.new_active_peer(
1557            peer_old,
1558            B256::random(),
1559            100,
1560            caps_old,
1561            Arc::new(AtomicU64::new(10)),
1562            None,
1563        );
1564
1565        // Still Pending
1566        assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1567
1568        // Add peer WITH Eth71 support
1569        let peer_71 = B512::random();
1570        let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1571
1572        fetcher.new_active_peer(
1573            peer_71,
1574            B256::random(),
1575            100,
1576            caps_71,
1577            Arc::new(AtomicU64::new(10)),
1578            None,
1579        );
1580
1581        // Now we must get Ready(BlockRequest)
1582        if let Poll::Ready(FetchAction::BlockRequest { peer_id, .. }) = fetcher.poll(&mut cx) {
1583            assert_eq!(peer_id, peer_71);
1584        }
1585    }
1586}