reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11    Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
12};
13use reth_network_api::test_utils::PeersHandle;
14use reth_network_p2p::{
15    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
16    headers::client::HeadersRequest,
17    priority::Priority,
18};
19use reth_network_peers::PeerId;
20use reth_network_types::ReputationChangeKind;
21use std::{
22    collections::{HashMap, VecDeque},
23    ops::RangeInclusive,
24    sync::{
25        atomic::{AtomicU64, AtomicUsize, Ordering},
26        Arc,
27    },
28    task::{Context, Poll},
29};
30use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32
33type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
34type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
35
36/// Manages data fetching operations.
37///
38/// This type is hooked into the staged sync pipeline and delegates download request to available
39/// peers and sends the response once ready.
40///
41/// This type maintains a list of connected peers that are available for requests.
42#[derive(Debug)]
43pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
44    /// Currently active [`GetBlockHeaders`] requests
45    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
46    /// Currently active [`GetBlockBodies`] requests
47    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
48    /// The list of _available_ peers for requests.
49    peers: HashMap<PeerId, Peer>,
50    /// The handle to the peers manager
51    peers_handle: PeersHandle,
52    /// Number of active peer sessions the node's currently handling.
53    num_active_peers: Arc<AtomicUsize>,
54    /// Requests queued for processing
55    queued_requests: VecDeque<DownloadRequest<N>>,
56    /// Receiver for new incoming download requests
57    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
58    /// Sender for download requests, used to detach a [`FetchClient`]
59    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
60}
61
62// === impl StateSyncer ===
63
64impl<N: NetworkPrimitives> StateFetcher<N> {
65    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
66        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
67        Self {
68            inflight_headers_requests: Default::default(),
69            inflight_bodies_requests: Default::default(),
70            peers: Default::default(),
71            peers_handle,
72            num_active_peers,
73            queued_requests: Default::default(),
74            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
75            download_requests_tx,
76        }
77    }
78
79    /// Invoked when connected to a new peer.
80    pub(crate) fn new_active_peer(
81        &mut self,
82        peer_id: PeerId,
83        best_hash: B256,
84        best_number: u64,
85        capabilities: Arc<Capabilities>,
86        timeout: Arc<AtomicU64>,
87        range_info: Option<BlockRangeInfo>,
88    ) {
89        self.peers.insert(
90            peer_id,
91            Peer {
92                state: PeerState::Idle,
93                best_hash,
94                best_number,
95                capabilities,
96                timeout,
97                last_response_likely_bad: false,
98                range_info,
99            },
100        );
101    }
102
103    /// Removes the peer from the peer list, after which it is no longer available for future
104    /// requests.
105    ///
106    /// Invoked when an active session was closed.
107    ///
108    /// This cancels also inflight request and sends an error to the receiver.
109    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
110        self.peers.remove(peer);
111        if let Some(req) = self.inflight_headers_requests.remove(peer) {
112            let _ = req.response.send(Err(RequestError::ConnectionDropped));
113        }
114        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
115            let _ = req.response.send(Err(RequestError::ConnectionDropped));
116        }
117    }
118
119    /// Updates the block information for the peer.
120    ///
121    /// Returns `true` if this a newer block
122    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
123        if let Some(peer) = self.peers.get_mut(peer_id) &&
124            number > peer.best_number
125        {
126            peer.best_hash = hash;
127            peer.best_number = number;
128            return true
129        }
130        false
131    }
132
133    /// Invoked when an active session is about to be disconnected.
134    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
135        if let Some(peer) = self.peers.get_mut(peer_id) {
136            peer.state = PeerState::Closing;
137        }
138    }
139
140    /// Returns the _next_ idle peer that's ready to accept a request,
141    /// prioritizing those with the lowest timeout/latency and those that recently responded with
142    /// adequate data. Additionally, if full blocks are required this prioritizes peers that have
143    /// full history available
144    fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
145        let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
146
147        let mut best_peer = idle.next()?;
148
149        for maybe_better in idle {
150            // replace best peer if our current best peer sent us a bad response last time
151            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
152                best_peer = maybe_better;
153                continue
154            }
155
156            // replace best peer if this peer meets the requirements better
157            if maybe_better.1.is_better(best_peer.1, &requirement) {
158                best_peer = maybe_better;
159                continue
160            }
161
162            // replace best peer if this peer has better rtt and both have same range quality
163            if maybe_better.1.timeout() < best_peer.1.timeout() &&
164                !maybe_better.1.last_response_likely_bad
165            {
166                best_peer = maybe_better;
167            }
168        }
169
170        Some(*best_peer.0)
171    }
172
173    /// Returns the next action to return
174    fn poll_action(&mut self) -> PollAction {
175        // we only check and not pop here since we don't know yet whether a peer is available.
176        if self.queued_requests.is_empty() {
177            return PollAction::NoRequests
178        }
179
180        let request = self.queued_requests.pop_front().expect("not empty");
181        let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
182            // need to put back the the request
183            self.queued_requests.push_front(request);
184            return PollAction::NoPeersAvailable
185        };
186
187        let request = self.prepare_block_request(peer_id, request);
188
189        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
190    }
191
192    /// Advance the state the syncer
193    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
194        // drain buffered actions first
195        loop {
196            let no_peers_available = match self.poll_action() {
197                PollAction::Ready(action) => return Poll::Ready(action),
198                PollAction::NoRequests => false,
199                PollAction::NoPeersAvailable => true,
200            };
201
202            loop {
203                // poll incoming requests
204                match self.download_requests_rx.poll_next_unpin(cx) {
205                    Poll::Ready(Some(request)) => match request.get_priority() {
206                        Priority::High => {
207                            // find the first normal request and queue before, add this request to
208                            // the back of the high-priority queue
209                            let pos = self
210                                .queued_requests
211                                .iter()
212                                .position(|req| req.is_normal_priority())
213                                .unwrap_or(0);
214                            self.queued_requests.insert(pos, request);
215                        }
216                        Priority::Normal => {
217                            self.queued_requests.push_back(request);
218                        }
219                    },
220                    Poll::Ready(None) => {
221                        unreachable!("channel can't close")
222                    }
223                    Poll::Pending => break,
224                }
225            }
226
227            if self.queued_requests.is_empty() || no_peers_available {
228                return Poll::Pending
229            }
230        }
231    }
232
233    /// Handles a new request to a peer.
234    ///
235    /// Caution: this assumes the peer exists and is idle
236    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
237        // update the peer's state
238        if let Some(peer) = self.peers.get_mut(&peer_id) {
239            peer.state = req.peer_state();
240        }
241
242        match req {
243            DownloadRequest::GetBlockHeaders { request, response, .. } => {
244                let inflight = Request { request: request.clone(), response };
245                self.inflight_headers_requests.insert(peer_id, inflight);
246                let HeadersRequest { start, limit, direction } = request;
247                BlockRequest::GetBlockHeaders(GetBlockHeaders {
248                    start_block: start,
249                    limit,
250                    skip: 0,
251                    direction,
252                })
253            }
254            DownloadRequest::GetBlockBodies { request, response, .. } => {
255                let inflight = Request { request: (), response };
256                self.inflight_bodies_requests.insert(peer_id, inflight);
257                BlockRequest::GetBlockBodies(GetBlockBodies(request))
258            }
259        }
260    }
261
262    /// Returns a new followup request for the peer.
263    ///
264    /// Caution: this expects that the peer is _not_ closed.
265    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
266        let req = self.queued_requests.pop_front()?;
267        let req = self.prepare_block_request(peer_id, req);
268        Some(BlockResponseOutcome::Request(peer_id, req))
269    }
270
271    /// Called on a `GetBlockHeaders` response from a peer.
272    ///
273    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
274    /// direct followup request or get the peer reported if the response was a
275    /// [`EthResponseValidator::reputation_change_err`]
276    pub(crate) fn on_block_headers_response(
277        &mut self,
278        peer_id: PeerId,
279        res: RequestResult<Vec<N::BlockHeader>>,
280    ) -> Option<BlockResponseOutcome> {
281        let is_error = res.is_err();
282        let maybe_reputation_change = res.reputation_change_err();
283
284        let resp = self.inflight_headers_requests.remove(&peer_id);
285
286        let is_likely_bad_response =
287            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
288
289        if let Some(resp) = resp {
290            // delegate the response
291            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
292        }
293
294        if let Some(peer) = self.peers.get_mut(&peer_id) {
295            // update the peer's response state
296            peer.last_response_likely_bad = is_likely_bad_response;
297
298            // If the peer is still ready to accept new requests, we try to send a followup
299            // request immediately.
300            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
301                return self.followup_request(peer_id)
302            }
303        }
304
305        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
306        // outcome
307        maybe_reputation_change
308            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
309    }
310
311    /// Called on a `GetBlockBodies` response from a peer
312    pub(crate) fn on_block_bodies_response(
313        &mut self,
314        peer_id: PeerId,
315        res: RequestResult<Vec<N::BlockBody>>,
316    ) -> Option<BlockResponseOutcome> {
317        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
318
319        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
320            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
321        }
322        if let Some(peer) = self.peers.get_mut(&peer_id) {
323            // update the peer's response state
324            peer.last_response_likely_bad = is_likely_bad_response;
325
326            if peer.state.on_request_finished() && !is_likely_bad_response {
327                return self.followup_request(peer_id)
328            }
329        }
330        None
331    }
332
333    /// Returns a new [`FetchClient`] that can send requests to this type.
334    pub(crate) fn client(&self) -> FetchClient<N> {
335        FetchClient {
336            request_tx: self.download_requests_tx.clone(),
337            peers_handle: self.peers_handle.clone(),
338            num_active_peers: Arc::clone(&self.num_active_peers),
339        }
340    }
341}
342
343/// The outcome of [`StateFetcher::poll_action`]
344enum PollAction {
345    Ready(FetchAction),
346    NoRequests,
347    NoPeersAvailable,
348}
349
350/// Represents a connected peer
351#[derive(Debug)]
352struct Peer {
353    /// The state this peer currently resides in.
354    state: PeerState,
355    /// Best known hash that the peer has
356    best_hash: B256,
357    /// Tracks the best number of the peer.
358    best_number: u64,
359    /// Capabilities announced by the peer.
360    #[allow(dead_code)]
361    capabilities: Arc<Capabilities>,
362    /// Tracks the current timeout value we use for the peer.
363    timeout: Arc<AtomicU64>,
364    /// Tracks whether the peer has recently responded with a likely bad response.
365    ///
366    /// This is used to de-rank the peer if there are other peers available.
367    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
368    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
369    /// lowest timeout.
370    last_response_likely_bad: bool,
371    /// Tracks the range info for the peer.
372    range_info: Option<BlockRangeInfo>,
373}
374
375impl Peer {
376    fn timeout(&self) -> u64 {
377        self.timeout.load(Ordering::Relaxed)
378    }
379
380    /// Returns the earliest block number available from the peer.
381    fn earliest(&self) -> u64 {
382        self.range_info.as_ref().map_or(0, |info| info.earliest())
383    }
384
385    /// Returns true if the peer has the full history available.
386    fn has_full_history(&self) -> bool {
387        self.earliest() == 0
388    }
389
390    fn range(&self) -> Option<RangeInclusive<u64>> {
391        self.range_info.as_ref().map(|info| info.range())
392    }
393
394    /// Returns true if this peer has a better range than the other peer for serving the requested
395    /// range.
396    ///
397    /// A peer has a "better range" if:
398    /// 1. It can fully cover the requested range while the other cannot
399    /// 2. None can fully cover the range, but this peer has lower start value
400    /// 3. If a peer doesnt announce a range we assume it has full history, but check the other's
401    ///    range and treat that as better if it can cover the range
402    fn has_better_range(&self, other: &Self, range: RangeInclusive<u64>) -> bool {
403        let self_range = self.range();
404        let other_range = other.range();
405
406        match (self_range, other_range) {
407            (Some(self_r), Some(other_r)) => {
408                // Check if each peer can fully cover the requested range
409                let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
410                let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
411
412                #[allow(clippy::match_same_arms)]
413                match (self_covers, other_covers) {
414                    (true, false) => true,  // Only self covers the range
415                    (false, true) => false, // Only other covers the range
416                    (true, true) => false,  // Both cover
417                    (false, false) => {
418                        // neither covers - prefer if peer has lower (better) start range
419                        self_r.start() < other_r.start()
420                    }
421                }
422            }
423            (Some(self_r), None) => {
424                // Self has range info, other doesn't (treated as full history with unknown latest)
425                // Self is better only if it covers the range
426                self_r.contains(range.start()) && self_r.contains(range.end())
427            }
428            (None, Some(other_r)) => {
429                // Self has no range info (full history), other has range info
430                // Self is better only if other doesn't cover the range
431                !(other_r.contains(range.start()) && other_r.contains(range.end()))
432            }
433            (None, None) => false, // Neither has range info - no one is better
434        }
435    }
436
437    /// Returns true if this peer is better than the other peer based on the given requirements.
438    fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
439        match requirement {
440            BestPeerRequirements::None => false,
441            BestPeerRequirements::FullBlockRange(range) => {
442                self.has_better_range(other, range.clone())
443            }
444            BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
445        }
446    }
447}
448
449/// Tracks the state of an individual peer
450#[derive(Debug)]
451enum PeerState {
452    /// Peer is currently not handling requests and is available.
453    Idle,
454    /// Peer is handling a `GetBlockHeaders` request.
455    GetBlockHeaders,
456    /// Peer is handling a `GetBlockBodies` request.
457    GetBlockBodies,
458    /// Peer session is about to close
459    Closing,
460}
461
462// === impl PeerState ===
463
464impl PeerState {
465    /// Returns true if the peer is currently idle.
466    const fn is_idle(&self) -> bool {
467        matches!(self, Self::Idle)
468    }
469
470    /// Resets the state on a received response.
471    ///
472    /// If the state was already marked as `Closing` do nothing.
473    ///
474    /// Returns `true` if the peer is ready for another request.
475    const fn on_request_finished(&mut self) -> bool {
476        if !matches!(self, Self::Closing) {
477            *self = Self::Idle;
478            return true
479        }
480        false
481    }
482}
483
484/// A request that waits for a response from the network, so it can send it back through the
485/// response channel.
486#[derive(Debug)]
487struct Request<Req, Resp> {
488    /// The issued request object
489    // TODO: this can be attached to the response in error case
490    request: Req,
491    response: oneshot::Sender<Resp>,
492}
493
494/// Requests that can be sent to the Syncer from a [`FetchClient`]
495#[derive(Debug)]
496pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
497    /// Download the requested headers and send response through channel
498    GetBlockHeaders {
499        request: HeadersRequest,
500        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
501        priority: Priority,
502    },
503    /// Download the requested headers and send response through channel
504    GetBlockBodies {
505        request: Vec<B256>,
506        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
507        priority: Priority,
508        range_hint: Option<RangeInclusive<u64>>,
509    },
510}
511
512// === impl DownloadRequest ===
513
514impl<N: NetworkPrimitives> DownloadRequest<N> {
515    /// Returns the corresponding state for a peer that handles the request.
516    const fn peer_state(&self) -> PeerState {
517        match self {
518            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
519            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
520        }
521    }
522
523    /// Returns the requested priority of this request
524    const fn get_priority(&self) -> &Priority {
525        match self {
526            Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
527                priority
528            }
529        }
530    }
531
532    /// Returns `true` if this request is normal priority.
533    const fn is_normal_priority(&self) -> bool {
534        self.get_priority().is_normal()
535    }
536
537    /// Returns the best peer requirements for this request.
538    fn best_peer_requirements(&self) -> BestPeerRequirements {
539        match self {
540            Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
541            Self::GetBlockBodies { range_hint, .. } => {
542                if let Some(range) = range_hint {
543                    BestPeerRequirements::FullBlockRange(range.clone())
544                } else {
545                    BestPeerRequirements::FullBlock
546                }
547            }
548        }
549    }
550}
551
552/// An action the syncer can emit.
553pub(crate) enum FetchAction {
554    /// Dispatch an eth request to the given peer.
555    BlockRequest {
556        /// The targeted recipient for the request
557        peer_id: PeerId,
558        /// The request to send
559        request: BlockRequest,
560    },
561}
562
563/// Outcome of a processed response.
564///
565/// Returned after processing a response.
566#[derive(Debug, PartialEq, Eq)]
567pub(crate) enum BlockResponseOutcome {
568    /// Continue with another request to the peer.
569    Request(PeerId, BlockRequest),
570    /// How to handle a bad response and the reputation change to apply, if any.
571    BadResponse(PeerId, ReputationChangeKind),
572}
573
574/// Additional requirements for how to rank peers during selection.
575enum BestPeerRequirements {
576    /// No additional requirements
577    None,
578    /// Peer must have this block range available.
579    FullBlockRange(RangeInclusive<u64>),
580    /// Peer must have full range.
581    FullBlock,
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587    use crate::{peers::PeersManager, PeersConfig};
588    use alloy_consensus::Header;
589    use alloy_primitives::B512;
590    use std::future::poll_fn;
591
592    #[tokio::test(flavor = "multi_thread")]
593    async fn test_poll_fetcher() {
594        let manager = PeersManager::new(PeersConfig::default());
595        let mut fetcher =
596            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
597
598        poll_fn(move |cx| {
599            assert!(fetcher.poll(cx).is_pending());
600            let (tx, _rx) = oneshot::channel();
601            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
602                request: vec![],
603                response: tx,
604                priority: Priority::default(),
605                range_hint: None,
606            });
607            assert!(fetcher.poll(cx).is_pending());
608
609            Poll::Ready(())
610        })
611        .await;
612    }
613
614    #[tokio::test]
615    async fn test_peer_rotation() {
616        let manager = PeersManager::new(PeersConfig::default());
617        let mut fetcher =
618            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
619        // Add a few random peers
620        let peer1 = B512::random();
621        let peer2 = B512::random();
622        let capabilities = Arc::new(Capabilities::from(vec![]));
623        fetcher.new_active_peer(
624            peer1,
625            B256::random(),
626            1,
627            Arc::clone(&capabilities),
628            Arc::new(AtomicU64::new(1)),
629            None,
630        );
631        fetcher.new_active_peer(
632            peer2,
633            B256::random(),
634            2,
635            Arc::clone(&capabilities),
636            Arc::new(AtomicU64::new(1)),
637            None,
638        );
639
640        let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
641        assert!(first_peer == peer1 || first_peer == peer2);
642        // Pending disconnect for first_peer
643        fetcher.on_pending_disconnect(&first_peer);
644        // first_peer now isn't idle, so we should get other peer
645        let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
646        assert!(first_peer == peer1 || first_peer == peer2);
647        assert_ne!(first_peer, second_peer);
648        // without idle peers, returns None
649        fetcher.on_pending_disconnect(&second_peer);
650        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
651    }
652
653    #[tokio::test]
654    async fn test_peer_prioritization() {
655        let manager = PeersManager::new(PeersConfig::default());
656        let mut fetcher =
657            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
658        // Add a few random peers
659        let peer1 = B512::random();
660        let peer2 = B512::random();
661        let peer3 = B512::random();
662
663        let peer2_timeout = Arc::new(AtomicU64::new(300));
664
665        let capabilities = Arc::new(Capabilities::from(vec![]));
666        fetcher.new_active_peer(
667            peer1,
668            B256::random(),
669            1,
670            Arc::clone(&capabilities),
671            Arc::new(AtomicU64::new(30)),
672            None,
673        );
674        fetcher.new_active_peer(
675            peer2,
676            B256::random(),
677            2,
678            Arc::clone(&capabilities),
679            Arc::clone(&peer2_timeout),
680            None,
681        );
682        fetcher.new_active_peer(
683            peer3,
684            B256::random(),
685            3,
686            Arc::clone(&capabilities),
687            Arc::new(AtomicU64::new(50)),
688            None,
689        );
690
691        // Must always get peer1 (lowest timeout)
692        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
693        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
694        // peer2's timeout changes below peer1's
695        peer2_timeout.store(10, Ordering::Relaxed);
696        // Then we get peer 2 always (now lowest)
697        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
698        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
699    }
700
701    #[tokio::test]
702    async fn test_on_block_headers_response() {
703        let manager = PeersManager::new(PeersConfig::default());
704        let mut fetcher =
705            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
706        let peer_id = B512::random();
707
708        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
709
710        assert_eq!(
711            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
712            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
713        );
714        assert_eq!(
715            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
716            None
717        );
718        assert_eq!(
719            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
720            None
721        );
722        assert_eq!(
723            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
724            None
725        );
726        assert_eq!(
727            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
728            None
729        );
730    }
731
732    #[tokio::test]
733    async fn test_header_response_outcome() {
734        let manager = PeersManager::new(PeersConfig::default());
735        let mut fetcher =
736            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
737        let peer_id = B512::random();
738
739        let request_pair = || {
740            let (tx, _rx) = oneshot::channel();
741            let req = Request {
742                request: HeadersRequest {
743                    start: 0u64.into(),
744                    limit: 1,
745                    direction: Default::default(),
746                },
747                response: tx,
748            };
749            let header = Header { number: 0, ..Default::default() };
750            (req, header)
751        };
752
753        fetcher.new_active_peer(
754            peer_id,
755            Default::default(),
756            Default::default(),
757            Arc::new(Capabilities::from(vec![])),
758            Default::default(),
759            None,
760        );
761
762        let (req, header) = request_pair();
763        fetcher.inflight_headers_requests.insert(peer_id, req);
764
765        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
766        assert!(outcome.is_none());
767        assert!(fetcher.peers[&peer_id].state.is_idle());
768
769        let outcome =
770            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
771
772        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
773            RequestError::Timeout
774        ))
775        .is_some());
776
777        match outcome {
778            BlockResponseOutcome::BadResponse(peer, _) => {
779                assert_eq!(peer, peer_id)
780            }
781            BlockResponseOutcome::Request(_, _) => {
782                unreachable!()
783            }
784        };
785
786        assert!(fetcher.peers[&peer_id].state.is_idle());
787    }
788
789    #[test]
790    fn test_peer_is_better_none_requirement() {
791        let peer1 = Peer {
792            state: PeerState::Idle,
793            best_hash: B256::random(),
794            best_number: 100,
795            capabilities: Arc::new(Capabilities::new(vec![])),
796            timeout: Arc::new(AtomicU64::new(10)),
797            last_response_likely_bad: false,
798            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
799        };
800
801        let peer2 = Peer {
802            state: PeerState::Idle,
803            best_hash: B256::random(),
804            best_number: 50,
805            capabilities: Arc::new(Capabilities::new(vec![])),
806            timeout: Arc::new(AtomicU64::new(20)),
807            last_response_likely_bad: false,
808            range_info: None,
809        };
810
811        // With None requirement, is_better should always return false
812        assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
813        assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
814    }
815
816    #[test]
817    fn test_peer_is_better_full_block_requirement() {
818        // Peer with full history (earliest = 0)
819        let peer_full = Peer {
820            state: PeerState::Idle,
821            best_hash: B256::random(),
822            best_number: 100,
823            capabilities: Arc::new(Capabilities::new(vec![])),
824            timeout: Arc::new(AtomicU64::new(10)),
825            last_response_likely_bad: false,
826            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
827        };
828
829        // Peer without full history (earliest = 50)
830        let peer_partial = Peer {
831            state: PeerState::Idle,
832            best_hash: B256::random(),
833            best_number: 100,
834            capabilities: Arc::new(Capabilities::new(vec![])),
835            timeout: Arc::new(AtomicU64::new(10)),
836            last_response_likely_bad: false,
837            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
838        };
839
840        // Peer without range info (treated as full history)
841        let peer_no_range = Peer {
842            state: PeerState::Idle,
843            best_hash: B256::random(),
844            best_number: 100,
845            capabilities: Arc::new(Capabilities::new(vec![])),
846            timeout: Arc::new(AtomicU64::new(10)),
847            last_response_likely_bad: false,
848            range_info: None,
849        };
850
851        // Peer with full history is better than peer without
852        assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
853        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
854
855        // Peer without range info (full history) is better than partial
856        assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
857        assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
858
859        // Both have full history - no improvement
860        assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
861        assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
862    }
863
864    #[test]
865    fn test_peer_is_better_full_block_range_requirement() {
866        let range = RangeInclusive::new(40, 60);
867
868        // Peer that covers the requested range
869        let peer_covers = Peer {
870            state: PeerState::Idle,
871            best_hash: B256::random(),
872            best_number: 100,
873            capabilities: Arc::new(Capabilities::new(vec![])),
874            timeout: Arc::new(AtomicU64::new(10)),
875            last_response_likely_bad: false,
876            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
877        };
878
879        // Peer that doesn't cover the range (earliest too high)
880        let peer_no_cover = Peer {
881            state: PeerState::Idle,
882            best_hash: B256::random(),
883            best_number: 100,
884            capabilities: Arc::new(Capabilities::new(vec![])),
885            timeout: Arc::new(AtomicU64::new(10)),
886            last_response_likely_bad: false,
887            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
888        };
889
890        // Peer that covers the requested range is better than one that doesn't
891        assert!(peer_covers
892            .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
893        assert!(
894            !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
895        );
896    }
897
898    #[test]
899    fn test_peer_is_better_both_cover_range() {
900        let range = RangeInclusive::new(30, 50);
901
902        // Peer with full history that covers the range
903        let peer_full = Peer {
904            state: PeerState::Idle,
905            best_hash: B256::random(),
906            best_number: 100,
907            capabilities: Arc::new(Capabilities::new(vec![])),
908            timeout: Arc::new(AtomicU64::new(10)),
909            last_response_likely_bad: false,
910            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
911        };
912
913        // Peer without full history that also covers the range
914        let peer_partial = Peer {
915            state: PeerState::Idle,
916            best_hash: B256::random(),
917            best_number: 100,
918            capabilities: Arc::new(Capabilities::new(vec![])),
919            timeout: Arc::new(AtomicU64::new(10)),
920            last_response_likely_bad: false,
921            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
922        };
923
924        // When both cover the range, prefer none
925        assert!(!peer_full
926            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
927        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
928    }
929
930    #[test]
931    fn test_peer_is_better_lower_start() {
932        let range = RangeInclusive::new(30, 60);
933
934        // Peer with full history that covers the range
935        let peer_full = Peer {
936            state: PeerState::Idle,
937            best_hash: B256::random(),
938            best_number: 100,
939            capabilities: Arc::new(Capabilities::new(vec![])),
940            timeout: Arc::new(AtomicU64::new(10)),
941            last_response_likely_bad: false,
942            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
943        };
944
945        // Peer without full history that also covers the range
946        let peer_partial = Peer {
947            state: PeerState::Idle,
948            best_hash: B256::random(),
949            best_number: 100,
950            capabilities: Arc::new(Capabilities::new(vec![])),
951            timeout: Arc::new(AtomicU64::new(10)),
952            last_response_likely_bad: false,
953            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
954        };
955
956        // When both cover the range, prefer lower start value
957        assert!(peer_full
958            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
959        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
960    }
961
962    #[test]
963    fn test_peer_is_better_neither_covers_range() {
964        let range = RangeInclusive::new(40, 60);
965
966        // Peer with full history that doesn't cover the range (latest too low)
967        let peer_full = Peer {
968            state: PeerState::Idle,
969            best_hash: B256::random(),
970            best_number: 30,
971            capabilities: Arc::new(Capabilities::new(vec![])),
972            timeout: Arc::new(AtomicU64::new(10)),
973            last_response_likely_bad: false,
974            range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
975        };
976
977        // Peer without full history that also doesn't cover the range
978        let peer_partial = Peer {
979            state: PeerState::Idle,
980            best_hash: B256::random(),
981            best_number: 30,
982            capabilities: Arc::new(Capabilities::new(vec![])),
983            timeout: Arc::new(AtomicU64::new(10)),
984            last_response_likely_bad: false,
985            range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
986        };
987
988        // When neither covers the range, prefer full history
989        assert!(peer_full
990            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
991        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
992    }
993
994    #[test]
995    fn test_peer_is_better_no_range_info() {
996        let range = RangeInclusive::new(40, 60);
997
998        // Peer with range info
999        let peer_with_range = Peer {
1000            state: PeerState::Idle,
1001            best_hash: B256::random(),
1002            best_number: 100,
1003            capabilities: Arc::new(Capabilities::new(vec![])),
1004            timeout: Arc::new(AtomicU64::new(10)),
1005            last_response_likely_bad: false,
1006            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1007        };
1008
1009        // Peer without range info
1010        let peer_no_range = Peer {
1011            state: PeerState::Idle,
1012            best_hash: B256::random(),
1013            best_number: 100,
1014            capabilities: Arc::new(Capabilities::new(vec![])),
1015            timeout: Arc::new(AtomicU64::new(10)),
1016            last_response_likely_bad: false,
1017            range_info: None,
1018        };
1019
1020        // Peer without range info is not better (we prefer peers with known ranges)
1021        assert!(!peer_no_range
1022            .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1023
1024        // Peer with range info is better than peer without
1025        assert!(
1026            peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1027        );
1028    }
1029
1030    #[test]
1031    fn test_peer_is_better_one_peer_no_range_covers() {
1032        let range = RangeInclusive::new(40, 60);
1033
1034        // Peer with range info that covers the requested range
1035        let peer_with_range_covers = Peer {
1036            state: PeerState::Idle,
1037            best_hash: B256::random(),
1038            best_number: 100,
1039            capabilities: Arc::new(Capabilities::new(vec![])),
1040            timeout: Arc::new(AtomicU64::new(10)),
1041            last_response_likely_bad: false,
1042            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1043        };
1044
1045        // Peer without range info (treated as full history with unknown latest)
1046        let peer_no_range = Peer {
1047            state: PeerState::Idle,
1048            best_hash: B256::random(),
1049            best_number: 100,
1050            capabilities: Arc::new(Capabilities::new(vec![])),
1051            timeout: Arc::new(AtomicU64::new(10)),
1052            last_response_likely_bad: false,
1053            range_info: None,
1054        };
1055
1056        // Peer with range that covers is better than peer without range info
1057        assert!(peer_with_range_covers
1058            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1059
1060        // Peer without range info is not better when other covers
1061        assert!(!peer_no_range
1062            .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1063    }
1064
1065    #[test]
1066    fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1067        let range = RangeInclusive::new(40, 60);
1068
1069        // Peer with range info that does NOT cover the requested range (too high)
1070        let peer_with_range_no_cover = Peer {
1071            state: PeerState::Idle,
1072            best_hash: B256::random(),
1073            best_number: 100,
1074            capabilities: Arc::new(Capabilities::new(vec![])),
1075            timeout: Arc::new(AtomicU64::new(10)),
1076            last_response_likely_bad: false,
1077            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1078        };
1079
1080        // Peer without range info (treated as full history)
1081        let peer_no_range = Peer {
1082            state: PeerState::Idle,
1083            best_hash: B256::random(),
1084            best_number: 100,
1085            capabilities: Arc::new(Capabilities::new(vec![])),
1086            timeout: Arc::new(AtomicU64::new(10)),
1087            last_response_likely_bad: false,
1088            range_info: None,
1089        };
1090
1091        // Peer with range that doesn't cover is not better
1092        assert!(!peer_with_range_no_cover
1093            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1094
1095        // Peer without range info (full history) is better when other doesn't cover
1096        assert!(peer_no_range
1097            .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1098    }
1099
1100    #[test]
1101    fn test_peer_is_better_edge_cases() {
1102        // Test exact range boundaries
1103        let range = RangeInclusive::new(50, 100);
1104
1105        // Peer that exactly covers the range
1106        let peer_exact = Peer {
1107            state: PeerState::Idle,
1108            best_hash: B256::random(),
1109            best_number: 100,
1110            capabilities: Arc::new(Capabilities::new(vec![])),
1111            timeout: Arc::new(AtomicU64::new(10)),
1112            last_response_likely_bad: false,
1113            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1114        };
1115
1116        // Peer that's one block short at the start
1117        let peer_short_start = Peer {
1118            state: PeerState::Idle,
1119            best_hash: B256::random(),
1120            best_number: 100,
1121            capabilities: Arc::new(Capabilities::new(vec![])),
1122            timeout: Arc::new(AtomicU64::new(10)),
1123            last_response_likely_bad: false,
1124            range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1125        };
1126
1127        // Peer that's one block short at the end
1128        let peer_short_end = Peer {
1129            state: PeerState::Idle,
1130            best_hash: B256::random(),
1131            best_number: 100,
1132            capabilities: Arc::new(Capabilities::new(vec![])),
1133            timeout: Arc::new(AtomicU64::new(10)),
1134            last_response_likely_bad: false,
1135            range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1136        };
1137
1138        // Exact coverage is better than short coverage
1139        assert!(peer_exact
1140            .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1141        assert!(peer_exact
1142            .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1143
1144        // Short coverage is not better than exact coverage
1145        assert!(!peer_short_start
1146            .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1147        assert!(
1148            !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1149        );
1150    }
1151}