reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11    Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
12};
13use reth_network_api::test_utils::PeersHandle;
14use reth_network_p2p::{
15    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
16    headers::client::HeadersRequest,
17    priority::Priority,
18};
19use reth_network_peers::PeerId;
20use reth_network_types::ReputationChangeKind;
21use std::{
22    collections::{HashMap, VecDeque},
23    ops::RangeInclusive,
24    sync::{
25        atomic::{AtomicU64, AtomicUsize, Ordering},
26        Arc,
27    },
28    task::{Context, Poll},
29};
30use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32
33type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
34type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
35
36/// Manages data fetching operations.
37///
38/// This type is hooked into the staged sync pipeline and delegates download request to available
39/// peers and sends the response once ready.
40///
41/// This type maintains a list of connected peers that are available for requests.
42#[derive(Debug)]
43pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
44    /// Currently active [`GetBlockHeaders`] requests
45    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
46    /// Currently active [`GetBlockBodies`] requests
47    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
48    /// The list of _available_ peers for requests.
49    peers: HashMap<PeerId, Peer>,
50    /// The handle to the peers manager
51    peers_handle: PeersHandle,
52    /// Number of active peer sessions the node's currently handling.
53    num_active_peers: Arc<AtomicUsize>,
54    /// Requests queued for processing
55    queued_requests: VecDeque<DownloadRequest<N>>,
56    /// Receiver for new incoming download requests
57    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
58    /// Sender for download requests, used to detach a [`FetchClient`]
59    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
60}
61
62// === impl StateSyncer ===
63
64impl<N: NetworkPrimitives> StateFetcher<N> {
65    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
66        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
67        Self {
68            inflight_headers_requests: Default::default(),
69            inflight_bodies_requests: Default::default(),
70            peers: Default::default(),
71            peers_handle,
72            num_active_peers,
73            queued_requests: Default::default(),
74            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
75            download_requests_tx,
76        }
77    }
78
79    /// Invoked when connected to a new peer.
80    pub(crate) fn new_active_peer(
81        &mut self,
82        peer_id: PeerId,
83        best_hash: B256,
84        best_number: u64,
85        capabilities: Arc<Capabilities>,
86        timeout: Arc<AtomicU64>,
87        range_info: Option<BlockRangeInfo>,
88    ) {
89        self.peers.insert(
90            peer_id,
91            Peer {
92                state: PeerState::Idle,
93                best_hash,
94                best_number,
95                capabilities,
96                timeout,
97                last_response_likely_bad: false,
98                range_info,
99            },
100        );
101    }
102
103    /// Removes the peer from the peer list, after which it is no longer available for future
104    /// requests.
105    ///
106    /// Invoked when an active session was closed.
107    ///
108    /// This cancels also inflight request and sends an error to the receiver.
109    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
110        self.peers.remove(peer);
111        if let Some(req) = self.inflight_headers_requests.remove(peer) {
112            let _ = req.response.send(Err(RequestError::ConnectionDropped));
113        }
114        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
115            let _ = req.response.send(Err(RequestError::ConnectionDropped));
116        }
117    }
118
119    /// Updates the block information for the peer.
120    ///
121    /// Returns `true` if this a newer block
122    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
123        if let Some(peer) = self.peers.get_mut(peer_id) &&
124            number > peer.best_number
125        {
126            peer.best_hash = hash;
127            peer.best_number = number;
128            return true
129        }
130        false
131    }
132
133    /// Invoked when an active session is about to be disconnected.
134    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
135        if let Some(peer) = self.peers.get_mut(peer_id) {
136            peer.state = PeerState::Closing;
137        }
138    }
139
140    /// Returns the _next_ idle peer that's ready to accept a request,
141    /// prioritizing those with the lowest timeout/latency and those that recently responded with
142    /// adequate data. Additionally, if full blocks are required this prioritizes peers that have
143    /// full history available
144    fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
145        let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
146
147        let mut best_peer = idle.next()?;
148
149        for maybe_better in idle {
150            // replace best peer if our current best peer sent us a bad response last time
151            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
152                best_peer = maybe_better;
153                continue
154            }
155
156            // replace best peer if this peer meets the requirements better
157            if maybe_better.1.is_better(best_peer.1, &requirement) {
158                best_peer = maybe_better;
159                continue
160            }
161
162            // replace best peer if this peer has better rtt and both have same range quality
163            if maybe_better.1.timeout() < best_peer.1.timeout() &&
164                !maybe_better.1.last_response_likely_bad
165            {
166                best_peer = maybe_better;
167            }
168        }
169
170        Some(*best_peer.0)
171    }
172
173    /// Returns the next action to return
174    fn poll_action(&mut self) -> PollAction {
175        // we only check and not pop here since we don't know yet whether a peer is available.
176        if self.queued_requests.is_empty() {
177            return PollAction::NoRequests
178        }
179
180        let request = self.queued_requests.pop_front().expect("not empty");
181        let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
182            // need to put back the the request
183            self.queued_requests.push_front(request);
184            return PollAction::NoPeersAvailable
185        };
186
187        let request = self.prepare_block_request(peer_id, request);
188
189        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
190    }
191
192    /// Advance the state the syncer
193    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
194        // drain buffered actions first
195        loop {
196            let no_peers_available = match self.poll_action() {
197                PollAction::Ready(action) => return Poll::Ready(action),
198                PollAction::NoRequests => false,
199                PollAction::NoPeersAvailable => true,
200            };
201
202            loop {
203                // poll incoming requests
204                match self.download_requests_rx.poll_next_unpin(cx) {
205                    Poll::Ready(Some(request)) => match request.get_priority() {
206                        Priority::High => {
207                            // find the first normal request and queue before, add this request to
208                            // the back of the high-priority queue
209                            let pos = self
210                                .queued_requests
211                                .iter()
212                                .position(|req| req.is_normal_priority())
213                                .unwrap_or(0);
214                            self.queued_requests.insert(pos, request);
215                        }
216                        Priority::Normal => {
217                            self.queued_requests.push_back(request);
218                        }
219                    },
220                    Poll::Ready(None) => {
221                        unreachable!("channel can't close")
222                    }
223                    Poll::Pending => break,
224                }
225            }
226
227            if self.queued_requests.is_empty() || no_peers_available {
228                return Poll::Pending
229            }
230        }
231    }
232
233    /// Handles a new request to a peer.
234    ///
235    /// Caution: this assumes the peer exists and is idle
236    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
237        // update the peer's state
238        if let Some(peer) = self.peers.get_mut(&peer_id) {
239            peer.state = req.peer_state();
240        }
241
242        match req {
243            DownloadRequest::GetBlockHeaders { request, response, .. } => {
244                let inflight = Request { request: request.clone(), response };
245                self.inflight_headers_requests.insert(peer_id, inflight);
246                let HeadersRequest { start, limit, direction } = request;
247                BlockRequest::GetBlockHeaders(GetBlockHeaders {
248                    start_block: start,
249                    limit,
250                    skip: 0,
251                    direction,
252                })
253            }
254            DownloadRequest::GetBlockBodies { request, response, .. } => {
255                let inflight = Request { request: (), response };
256                self.inflight_bodies_requests.insert(peer_id, inflight);
257                BlockRequest::GetBlockBodies(GetBlockBodies(request))
258            }
259        }
260    }
261
262    /// Returns a new followup request for the peer.
263    ///
264    /// Caution: this expects that the peer is _not_ closed.
265    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
266        let req = self.queued_requests.pop_front()?;
267        let req = self.prepare_block_request(peer_id, req);
268        Some(BlockResponseOutcome::Request(peer_id, req))
269    }
270
271    /// Called on a `GetBlockHeaders` response from a peer.
272    ///
273    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
274    /// direct followup request or get the peer reported if the response was a
275    /// [`EthResponseValidator::reputation_change_err`]
276    pub(crate) fn on_block_headers_response(
277        &mut self,
278        peer_id: PeerId,
279        res: RequestResult<Vec<N::BlockHeader>>,
280    ) -> Option<BlockResponseOutcome> {
281        let is_error = res.is_err();
282        let maybe_reputation_change = res.reputation_change_err();
283
284        let resp = self.inflight_headers_requests.remove(&peer_id);
285
286        let is_likely_bad_response =
287            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
288
289        if let Some(resp) = resp {
290            // delegate the response
291            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
292        }
293
294        if let Some(peer) = self.peers.get_mut(&peer_id) {
295            // update the peer's response state
296            peer.last_response_likely_bad = is_likely_bad_response;
297
298            // If the peer is still ready to accept new requests, we try to send a followup
299            // request immediately.
300            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
301                return self.followup_request(peer_id)
302            }
303        }
304
305        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
306        // outcome
307        maybe_reputation_change
308            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
309    }
310
311    /// Called on a `GetBlockBodies` response from a peer
312    pub(crate) fn on_block_bodies_response(
313        &mut self,
314        peer_id: PeerId,
315        res: RequestResult<Vec<N::BlockBody>>,
316    ) -> Option<BlockResponseOutcome> {
317        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
318
319        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
320            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
321        }
322        if let Some(peer) = self.peers.get_mut(&peer_id) {
323            // update the peer's response state
324            peer.last_response_likely_bad = is_likely_bad_response;
325
326            if peer.state.on_request_finished() && !is_likely_bad_response {
327                return self.followup_request(peer_id)
328            }
329        }
330        None
331    }
332
333    /// Returns a new [`FetchClient`] that can send requests to this type.
334    pub(crate) fn client(&self) -> FetchClient<N> {
335        FetchClient {
336            request_tx: self.download_requests_tx.clone(),
337            peers_handle: self.peers_handle.clone(),
338            num_active_peers: Arc::clone(&self.num_active_peers),
339        }
340    }
341}
342
343/// The outcome of [`StateFetcher::poll_action`]
344enum PollAction {
345    Ready(FetchAction),
346    NoRequests,
347    NoPeersAvailable,
348}
349
350/// Represents a connected peer
351#[derive(Debug)]
352struct Peer {
353    /// The state this peer currently resides in.
354    state: PeerState,
355    /// Best known hash that the peer has
356    best_hash: B256,
357    /// Tracks the best number of the peer.
358    best_number: u64,
359    /// Capabilities announced by the peer.
360    #[allow(dead_code)]
361    capabilities: Arc<Capabilities>,
362    /// Tracks the current timeout value we use for the peer.
363    timeout: Arc<AtomicU64>,
364    /// Tracks whether the peer has recently responded with a likely bad response.
365    ///
366    /// This is used to de-rank the peer if there are other peers available.
367    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
368    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
369    /// lowest timeout.
370    last_response_likely_bad: bool,
371    /// Tracks the range info for the peer.
372    range_info: Option<BlockRangeInfo>,
373}
374
375impl Peer {
376    fn timeout(&self) -> u64 {
377        self.timeout.load(Ordering::Relaxed)
378    }
379
380    /// Returns the earliest block number available from the peer.
381    fn earliest(&self) -> u64 {
382        self.range_info.as_ref().map_or(0, |info| info.earliest())
383    }
384
385    /// Returns true if the peer has the full history available.
386    fn has_full_history(&self) -> bool {
387        self.earliest() == 0
388    }
389
390    fn range(&self) -> Option<RangeInclusive<u64>> {
391        self.range_info.as_ref().map(|info| info.range())
392    }
393
394    /// Returns true if this peer has a better range than the other peer for serving the requested
395    /// range.
396    ///
397    /// A peer has a "better range" if:
398    /// 1. It can fully cover the requested range while the other cannot
399    /// 2. None can fully cover the range, but this peer has lower start value
400    /// 3. If a peer doesnt announce a range we assume it has full history, but check the other's
401    ///    range and treat that as better if it can cover the range
402    fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
403        let self_range = self.range();
404        let other_range = other.range();
405
406        match (self_range, other_range) {
407            (Some(self_r), Some(other_r)) => {
408                // Check if each peer can fully cover the requested range
409                let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
410                let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
411
412                #[allow(clippy::match_same_arms)]
413                match (self_covers, other_covers) {
414                    (true, false) => true,  // Only self covers the range
415                    (false, true) => false, // Only other covers the range
416                    (true, true) => false,  // Both cover
417                    (false, false) => {
418                        // neither covers - prefer if peer has lower (better) start range
419                        self_r.start() < other_r.start()
420                    }
421                }
422            }
423            (Some(self_r), None) => {
424                // Self has range info, other doesn't (treated as full history with unknown latest)
425                // Self is better only if it covers the range
426                self_r.contains(range.start()) && self_r.contains(range.end())
427            }
428            (None, Some(other_r)) => {
429                // Self has no range info (full history), other has range info
430                // Self is better only if other doesn't cover the range
431                !(other_r.contains(range.start()) && other_r.contains(range.end()))
432            }
433            (None, None) => false, // Neither has range info - no one is better
434        }
435    }
436
437    /// Returns true if this peer is better than the other peer based on the given requirements.
438    fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
439        match requirement {
440            BestPeerRequirements::None => false,
441            BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
442            BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
443        }
444    }
445}
446
447/// Tracks the state of an individual peer
448#[derive(Debug)]
449enum PeerState {
450    /// Peer is currently not handling requests and is available.
451    Idle,
452    /// Peer is handling a `GetBlockHeaders` request.
453    GetBlockHeaders,
454    /// Peer is handling a `GetBlockBodies` request.
455    GetBlockBodies,
456    /// Peer session is about to close
457    Closing,
458}
459
460// === impl PeerState ===
461
462impl PeerState {
463    /// Returns true if the peer is currently idle.
464    const fn is_idle(&self) -> bool {
465        matches!(self, Self::Idle)
466    }
467
468    /// Resets the state on a received response.
469    ///
470    /// If the state was already marked as `Closing` do nothing.
471    ///
472    /// Returns `true` if the peer is ready for another request.
473    const fn on_request_finished(&mut self) -> bool {
474        if !matches!(self, Self::Closing) {
475            *self = Self::Idle;
476            return true
477        }
478        false
479    }
480}
481
482/// A request that waits for a response from the network, so it can send it back through the
483/// response channel.
484#[derive(Debug)]
485struct Request<Req, Resp> {
486    /// The issued request object
487    // TODO: this can be attached to the response in error case
488    request: Req,
489    response: oneshot::Sender<Resp>,
490}
491
492/// Requests that can be sent to the Syncer from a [`FetchClient`]
493#[derive(Debug)]
494pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
495    /// Download the requested headers and send response through channel
496    GetBlockHeaders {
497        request: HeadersRequest,
498        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
499        priority: Priority,
500    },
501    /// Download the requested headers and send response through channel
502    GetBlockBodies {
503        request: Vec<B256>,
504        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
505        priority: Priority,
506        range_hint: Option<RangeInclusive<u64>>,
507    },
508}
509
510// === impl DownloadRequest ===
511
512impl<N: NetworkPrimitives> DownloadRequest<N> {
513    /// Returns the corresponding state for a peer that handles the request.
514    const fn peer_state(&self) -> PeerState {
515        match self {
516            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
517            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
518        }
519    }
520
521    /// Returns the requested priority of this request
522    const fn get_priority(&self) -> &Priority {
523        match self {
524            Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
525                priority
526            }
527        }
528    }
529
530    /// Returns `true` if this request is normal priority.
531    const fn is_normal_priority(&self) -> bool {
532        self.get_priority().is_normal()
533    }
534
535    /// Returns the best peer requirements for this request.
536    fn best_peer_requirements(&self) -> BestPeerRequirements {
537        match self {
538            Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
539            Self::GetBlockBodies { range_hint, .. } => {
540                if let Some(range) = range_hint {
541                    BestPeerRequirements::FullBlockRange(range.clone())
542                } else {
543                    BestPeerRequirements::FullBlock
544                }
545            }
546        }
547    }
548}
549
550/// An action the syncer can emit.
551pub(crate) enum FetchAction {
552    /// Dispatch an eth request to the given peer.
553    BlockRequest {
554        /// The targeted recipient for the request
555        peer_id: PeerId,
556        /// The request to send
557        request: BlockRequest,
558    },
559}
560
561/// Outcome of a processed response.
562///
563/// Returned after processing a response.
564#[derive(Debug, PartialEq, Eq)]
565pub(crate) enum BlockResponseOutcome {
566    /// Continue with another request to the peer.
567    Request(PeerId, BlockRequest),
568    /// How to handle a bad response and the reputation change to apply, if any.
569    BadResponse(PeerId, ReputationChangeKind),
570}
571
572/// Additional requirements for how to rank peers during selection.
573enum BestPeerRequirements {
574    /// No additional requirements
575    None,
576    /// Peer must have this block range available.
577    FullBlockRange(RangeInclusive<u64>),
578    /// Peer must have full range.
579    FullBlock,
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585    use crate::{peers::PeersManager, PeersConfig};
586    use alloy_consensus::Header;
587    use alloy_primitives::B512;
588    use std::future::poll_fn;
589
590    #[tokio::test(flavor = "multi_thread")]
591    async fn test_poll_fetcher() {
592        let manager = PeersManager::new(PeersConfig::default());
593        let mut fetcher =
594            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
595
596        poll_fn(move |cx| {
597            assert!(fetcher.poll(cx).is_pending());
598            let (tx, _rx) = oneshot::channel();
599            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
600                request: vec![],
601                response: tx,
602                priority: Priority::default(),
603                range_hint: None,
604            });
605            assert!(fetcher.poll(cx).is_pending());
606
607            Poll::Ready(())
608        })
609        .await;
610    }
611
612    #[tokio::test]
613    async fn test_peer_rotation() {
614        let manager = PeersManager::new(PeersConfig::default());
615        let mut fetcher =
616            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
617        // Add a few random peers
618        let peer1 = B512::random();
619        let peer2 = B512::random();
620        let capabilities = Arc::new(Capabilities::from(vec![]));
621        fetcher.new_active_peer(
622            peer1,
623            B256::random(),
624            1,
625            Arc::clone(&capabilities),
626            Arc::new(AtomicU64::new(1)),
627            None,
628        );
629        fetcher.new_active_peer(
630            peer2,
631            B256::random(),
632            2,
633            Arc::clone(&capabilities),
634            Arc::new(AtomicU64::new(1)),
635            None,
636        );
637
638        let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
639        assert!(first_peer == peer1 || first_peer == peer2);
640        // Pending disconnect for first_peer
641        fetcher.on_pending_disconnect(&first_peer);
642        // first_peer now isn't idle, so we should get other peer
643        let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
644        assert!(first_peer == peer1 || first_peer == peer2);
645        assert_ne!(first_peer, second_peer);
646        // without idle peers, returns None
647        fetcher.on_pending_disconnect(&second_peer);
648        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
649    }
650
651    #[tokio::test]
652    async fn test_peer_prioritization() {
653        let manager = PeersManager::new(PeersConfig::default());
654        let mut fetcher =
655            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
656        // Add a few random peers
657        let peer1 = B512::random();
658        let peer2 = B512::random();
659        let peer3 = B512::random();
660
661        let peer2_timeout = Arc::new(AtomicU64::new(300));
662
663        let capabilities = Arc::new(Capabilities::from(vec![]));
664        fetcher.new_active_peer(
665            peer1,
666            B256::random(),
667            1,
668            Arc::clone(&capabilities),
669            Arc::new(AtomicU64::new(30)),
670            None,
671        );
672        fetcher.new_active_peer(
673            peer2,
674            B256::random(),
675            2,
676            Arc::clone(&capabilities),
677            Arc::clone(&peer2_timeout),
678            None,
679        );
680        fetcher.new_active_peer(
681            peer3,
682            B256::random(),
683            3,
684            Arc::clone(&capabilities),
685            Arc::new(AtomicU64::new(50)),
686            None,
687        );
688
689        // Must always get peer1 (lowest timeout)
690        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
691        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
692        // peer2's timeout changes below peer1's
693        peer2_timeout.store(10, Ordering::Relaxed);
694        // Then we get peer 2 always (now lowest)
695        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
696        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
697    }
698
699    #[tokio::test]
700    async fn test_on_block_headers_response() {
701        let manager = PeersManager::new(PeersConfig::default());
702        let mut fetcher =
703            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
704        let peer_id = B512::random();
705
706        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
707
708        assert_eq!(
709            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
710            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
711        );
712        assert_eq!(
713            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
714            None
715        );
716        assert_eq!(
717            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
718            None
719        );
720        assert_eq!(
721            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
722            None
723        );
724        assert_eq!(
725            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
726            None
727        );
728    }
729
730    #[tokio::test]
731    async fn test_header_response_outcome() {
732        let manager = PeersManager::new(PeersConfig::default());
733        let mut fetcher =
734            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
735        let peer_id = B512::random();
736
737        let request_pair = || {
738            let (tx, _rx) = oneshot::channel();
739            let req = Request {
740                request: HeadersRequest {
741                    start: 0u64.into(),
742                    limit: 1,
743                    direction: Default::default(),
744                },
745                response: tx,
746            };
747            let header = Header { number: 0, ..Default::default() };
748            (req, header)
749        };
750
751        fetcher.new_active_peer(
752            peer_id,
753            Default::default(),
754            Default::default(),
755            Arc::new(Capabilities::from(vec![])),
756            Default::default(),
757            None,
758        );
759
760        let (req, header) = request_pair();
761        fetcher.inflight_headers_requests.insert(peer_id, req);
762
763        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
764        assert!(outcome.is_none());
765        assert!(fetcher.peers[&peer_id].state.is_idle());
766
767        let outcome =
768            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
769
770        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
771            RequestError::Timeout
772        ))
773        .is_some());
774
775        match outcome {
776            BlockResponseOutcome::BadResponse(peer, _) => {
777                assert_eq!(peer, peer_id)
778            }
779            BlockResponseOutcome::Request(_, _) => {
780                unreachable!()
781            }
782        };
783
784        assert!(fetcher.peers[&peer_id].state.is_idle());
785    }
786
787    #[test]
788    fn test_peer_is_better_none_requirement() {
789        let peer1 = Peer {
790            state: PeerState::Idle,
791            best_hash: B256::random(),
792            best_number: 100,
793            capabilities: Arc::new(Capabilities::new(vec![])),
794            timeout: Arc::new(AtomicU64::new(10)),
795            last_response_likely_bad: false,
796            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
797        };
798
799        let peer2 = Peer {
800            state: PeerState::Idle,
801            best_hash: B256::random(),
802            best_number: 50,
803            capabilities: Arc::new(Capabilities::new(vec![])),
804            timeout: Arc::new(AtomicU64::new(20)),
805            last_response_likely_bad: false,
806            range_info: None,
807        };
808
809        // With None requirement, is_better should always return false
810        assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
811        assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
812    }
813
814    #[test]
815    fn test_peer_is_better_full_block_requirement() {
816        // Peer with full history (earliest = 0)
817        let peer_full = Peer {
818            state: PeerState::Idle,
819            best_hash: B256::random(),
820            best_number: 100,
821            capabilities: Arc::new(Capabilities::new(vec![])),
822            timeout: Arc::new(AtomicU64::new(10)),
823            last_response_likely_bad: false,
824            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
825        };
826
827        // Peer without full history (earliest = 50)
828        let peer_partial = Peer {
829            state: PeerState::Idle,
830            best_hash: B256::random(),
831            best_number: 100,
832            capabilities: Arc::new(Capabilities::new(vec![])),
833            timeout: Arc::new(AtomicU64::new(10)),
834            last_response_likely_bad: false,
835            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
836        };
837
838        // Peer without range info (treated as full history)
839        let peer_no_range = Peer {
840            state: PeerState::Idle,
841            best_hash: B256::random(),
842            best_number: 100,
843            capabilities: Arc::new(Capabilities::new(vec![])),
844            timeout: Arc::new(AtomicU64::new(10)),
845            last_response_likely_bad: false,
846            range_info: None,
847        };
848
849        // Peer with full history is better than peer without
850        assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
851        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
852
853        // Peer without range info (full history) is better than partial
854        assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
855        assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
856
857        // Both have full history - no improvement
858        assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
859        assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
860    }
861
862    #[test]
863    fn test_peer_is_better_full_block_range_requirement() {
864        let range = RangeInclusive::new(40, 60);
865
866        // Peer that covers the requested range
867        let peer_covers = Peer {
868            state: PeerState::Idle,
869            best_hash: B256::random(),
870            best_number: 100,
871            capabilities: Arc::new(Capabilities::new(vec![])),
872            timeout: Arc::new(AtomicU64::new(10)),
873            last_response_likely_bad: false,
874            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
875        };
876
877        // Peer that doesn't cover the range (earliest too high)
878        let peer_no_cover = Peer {
879            state: PeerState::Idle,
880            best_hash: B256::random(),
881            best_number: 100,
882            capabilities: Arc::new(Capabilities::new(vec![])),
883            timeout: Arc::new(AtomicU64::new(10)),
884            last_response_likely_bad: false,
885            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
886        };
887
888        // Peer that covers the requested range is better than one that doesn't
889        assert!(peer_covers
890            .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
891        assert!(
892            !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
893        );
894    }
895
896    #[test]
897    fn test_peer_is_better_both_cover_range() {
898        let range = RangeInclusive::new(30, 50);
899
900        // Peer with full history that covers the range
901        let peer_full = Peer {
902            state: PeerState::Idle,
903            best_hash: B256::random(),
904            best_number: 100,
905            capabilities: Arc::new(Capabilities::new(vec![])),
906            timeout: Arc::new(AtomicU64::new(10)),
907            last_response_likely_bad: false,
908            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
909        };
910
911        // Peer without full history that also covers the range
912        let peer_partial = Peer {
913            state: PeerState::Idle,
914            best_hash: B256::random(),
915            best_number: 100,
916            capabilities: Arc::new(Capabilities::new(vec![])),
917            timeout: Arc::new(AtomicU64::new(10)),
918            last_response_likely_bad: false,
919            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
920        };
921
922        // When both cover the range, prefer none
923        assert!(!peer_full
924            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
925        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
926    }
927
928    #[test]
929    fn test_peer_is_better_lower_start() {
930        let range = RangeInclusive::new(30, 60);
931
932        // Peer with full history that covers the range
933        let peer_full = Peer {
934            state: PeerState::Idle,
935            best_hash: B256::random(),
936            best_number: 100,
937            capabilities: Arc::new(Capabilities::new(vec![])),
938            timeout: Arc::new(AtomicU64::new(10)),
939            last_response_likely_bad: false,
940            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
941        };
942
943        // Peer without full history that also covers the range
944        let peer_partial = Peer {
945            state: PeerState::Idle,
946            best_hash: B256::random(),
947            best_number: 100,
948            capabilities: Arc::new(Capabilities::new(vec![])),
949            timeout: Arc::new(AtomicU64::new(10)),
950            last_response_likely_bad: false,
951            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
952        };
953
954        // When both cover the range, prefer lower start value
955        assert!(peer_full
956            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
957        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
958    }
959
960    #[test]
961    fn test_peer_is_better_neither_covers_range() {
962        let range = RangeInclusive::new(40, 60);
963
964        // Peer with full history that doesn't cover the range (latest too low)
965        let peer_full = Peer {
966            state: PeerState::Idle,
967            best_hash: B256::random(),
968            best_number: 30,
969            capabilities: Arc::new(Capabilities::new(vec![])),
970            timeout: Arc::new(AtomicU64::new(10)),
971            last_response_likely_bad: false,
972            range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
973        };
974
975        // Peer without full history that also doesn't cover the range
976        let peer_partial = Peer {
977            state: PeerState::Idle,
978            best_hash: B256::random(),
979            best_number: 30,
980            capabilities: Arc::new(Capabilities::new(vec![])),
981            timeout: Arc::new(AtomicU64::new(10)),
982            last_response_likely_bad: false,
983            range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
984        };
985
986        // When neither covers the range, prefer full history
987        assert!(peer_full
988            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
989        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
990    }
991
992    #[test]
993    fn test_peer_is_better_no_range_info() {
994        let range = RangeInclusive::new(40, 60);
995
996        // Peer with range info
997        let peer_with_range = Peer {
998            state: PeerState::Idle,
999            best_hash: B256::random(),
1000            best_number: 100,
1001            capabilities: Arc::new(Capabilities::new(vec![])),
1002            timeout: Arc::new(AtomicU64::new(10)),
1003            last_response_likely_bad: false,
1004            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1005        };
1006
1007        // Peer without range info
1008        let peer_no_range = Peer {
1009            state: PeerState::Idle,
1010            best_hash: B256::random(),
1011            best_number: 100,
1012            capabilities: Arc::new(Capabilities::new(vec![])),
1013            timeout: Arc::new(AtomicU64::new(10)),
1014            last_response_likely_bad: false,
1015            range_info: None,
1016        };
1017
1018        // Peer without range info is not better (we prefer peers with known ranges)
1019        assert!(!peer_no_range
1020            .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1021
1022        // Peer with range info is better than peer without
1023        assert!(
1024            peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1025        );
1026    }
1027
1028    #[test]
1029    fn test_peer_is_better_one_peer_no_range_covers() {
1030        let range = RangeInclusive::new(40, 60);
1031
1032        // Peer with range info that covers the requested range
1033        let peer_with_range_covers = Peer {
1034            state: PeerState::Idle,
1035            best_hash: B256::random(),
1036            best_number: 100,
1037            capabilities: Arc::new(Capabilities::new(vec![])),
1038            timeout: Arc::new(AtomicU64::new(10)),
1039            last_response_likely_bad: false,
1040            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1041        };
1042
1043        // Peer without range info (treated as full history with unknown latest)
1044        let peer_no_range = Peer {
1045            state: PeerState::Idle,
1046            best_hash: B256::random(),
1047            best_number: 100,
1048            capabilities: Arc::new(Capabilities::new(vec![])),
1049            timeout: Arc::new(AtomicU64::new(10)),
1050            last_response_likely_bad: false,
1051            range_info: None,
1052        };
1053
1054        // Peer with range that covers is better than peer without range info
1055        assert!(peer_with_range_covers
1056            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1057
1058        // Peer without range info is not better when other covers
1059        assert!(!peer_no_range
1060            .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1061    }
1062
1063    #[test]
1064    fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1065        let range = RangeInclusive::new(40, 60);
1066
1067        // Peer with range info that does NOT cover the requested range (too high)
1068        let peer_with_range_no_cover = Peer {
1069            state: PeerState::Idle,
1070            best_hash: B256::random(),
1071            best_number: 100,
1072            capabilities: Arc::new(Capabilities::new(vec![])),
1073            timeout: Arc::new(AtomicU64::new(10)),
1074            last_response_likely_bad: false,
1075            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1076        };
1077
1078        // Peer without range info (treated as full history)
1079        let peer_no_range = Peer {
1080            state: PeerState::Idle,
1081            best_hash: B256::random(),
1082            best_number: 100,
1083            capabilities: Arc::new(Capabilities::new(vec![])),
1084            timeout: Arc::new(AtomicU64::new(10)),
1085            last_response_likely_bad: false,
1086            range_info: None,
1087        };
1088
1089        // Peer with range that doesn't cover is not better
1090        assert!(!peer_with_range_no_cover
1091            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1092
1093        // Peer without range info (full history) is better when other doesn't cover
1094        assert!(peer_no_range
1095            .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1096    }
1097
1098    #[test]
1099    fn test_peer_is_better_edge_cases() {
1100        // Test exact range boundaries
1101        let range = RangeInclusive::new(50, 100);
1102
1103        // Peer that exactly covers the range
1104        let peer_exact = Peer {
1105            state: PeerState::Idle,
1106            best_hash: B256::random(),
1107            best_number: 100,
1108            capabilities: Arc::new(Capabilities::new(vec![])),
1109            timeout: Arc::new(AtomicU64::new(10)),
1110            last_response_likely_bad: false,
1111            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1112        };
1113
1114        // Peer that's one block short at the start
1115        let peer_short_start = Peer {
1116            state: PeerState::Idle,
1117            best_hash: B256::random(),
1118            best_number: 100,
1119            capabilities: Arc::new(Capabilities::new(vec![])),
1120            timeout: Arc::new(AtomicU64::new(10)),
1121            last_response_likely_bad: false,
1122            range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1123        };
1124
1125        // Peer that's one block short at the end
1126        let peer_short_end = Peer {
1127            state: PeerState::Idle,
1128            best_hash: B256::random(),
1129            best_number: 100,
1130            capabilities: Arc::new(Capabilities::new(vec![])),
1131            timeout: Arc::new(AtomicU64::new(10)),
1132            last_response_likely_bad: false,
1133            range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1134        };
1135
1136        // Exact coverage is better than short coverage
1137        assert!(peer_exact
1138            .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1139        assert!(peer_exact
1140            .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1141
1142        // Short coverage is not better than exact coverage
1143        assert!(!peer_short_start
1144            .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1145        assert!(
1146            !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1147        );
1148    }
1149}