1mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11 Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
12};
13use reth_network_api::test_utils::PeersHandle;
14use reth_network_p2p::{
15 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
16 headers::client::HeadersRequest,
17 priority::Priority,
18};
19use reth_network_peers::PeerId;
20use reth_network_types::ReputationChangeKind;
21use std::{
22 collections::{HashMap, VecDeque},
23 ops::RangeInclusive,
24 sync::{
25 atomic::{AtomicU64, AtomicUsize, Ordering},
26 Arc,
27 },
28 task::{Context, Poll},
29};
30use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32
33type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
34type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
35
36#[derive(Debug)]
43pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
44 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
46 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
48 peers: HashMap<PeerId, Peer>,
50 peers_handle: PeersHandle,
52 num_active_peers: Arc<AtomicUsize>,
54 queued_requests: VecDeque<DownloadRequest<N>>,
56 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
58 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
60}
61
62impl<N: NetworkPrimitives> StateFetcher<N> {
65 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
66 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
67 Self {
68 inflight_headers_requests: Default::default(),
69 inflight_bodies_requests: Default::default(),
70 peers: Default::default(),
71 peers_handle,
72 num_active_peers,
73 queued_requests: Default::default(),
74 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
75 download_requests_tx,
76 }
77 }
78
79 pub(crate) fn new_active_peer(
81 &mut self,
82 peer_id: PeerId,
83 best_hash: B256,
84 best_number: u64,
85 capabilities: Arc<Capabilities>,
86 timeout: Arc<AtomicU64>,
87 range_info: Option<BlockRangeInfo>,
88 ) {
89 self.peers.insert(
90 peer_id,
91 Peer {
92 state: PeerState::Idle,
93 best_hash,
94 best_number,
95 capabilities,
96 timeout,
97 last_response_likely_bad: false,
98 range_info,
99 },
100 );
101 }
102
103 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
110 self.peers.remove(peer);
111 if let Some(req) = self.inflight_headers_requests.remove(peer) {
112 let _ = req.response.send(Err(RequestError::ConnectionDropped));
113 }
114 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
115 let _ = req.response.send(Err(RequestError::ConnectionDropped));
116 }
117 }
118
119 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
123 if let Some(peer) = self.peers.get_mut(peer_id) &&
124 number > peer.best_number
125 {
126 peer.best_hash = hash;
127 peer.best_number = number;
128 return true
129 }
130 false
131 }
132
133 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
135 if let Some(peer) = self.peers.get_mut(peer_id) {
136 peer.state = PeerState::Closing;
137 }
138 }
139
140 fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
145 let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
146
147 let mut best_peer = idle.next()?;
148
149 for maybe_better in idle {
150 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
152 best_peer = maybe_better;
153 continue
154 }
155
156 if maybe_better.1.is_better(best_peer.1, &requirement) {
158 best_peer = maybe_better;
159 continue
160 }
161
162 if maybe_better.1.timeout() < best_peer.1.timeout() &&
164 !maybe_better.1.last_response_likely_bad
165 {
166 best_peer = maybe_better;
167 }
168 }
169
170 Some(*best_peer.0)
171 }
172
173 fn poll_action(&mut self) -> PollAction {
175 if self.queued_requests.is_empty() {
177 return PollAction::NoRequests
178 }
179
180 let request = self.queued_requests.pop_front().expect("not empty");
181 let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
182 self.queued_requests.push_front(request);
184 return PollAction::NoPeersAvailable
185 };
186
187 let request = self.prepare_block_request(peer_id, request);
188
189 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
190 }
191
192 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
194 loop {
196 let no_peers_available = match self.poll_action() {
197 PollAction::Ready(action) => return Poll::Ready(action),
198 PollAction::NoRequests => false,
199 PollAction::NoPeersAvailable => true,
200 };
201
202 loop {
203 match self.download_requests_rx.poll_next_unpin(cx) {
205 Poll::Ready(Some(request)) => match request.get_priority() {
206 Priority::High => {
207 let pos = self
210 .queued_requests
211 .iter()
212 .position(|req| req.is_normal_priority())
213 .unwrap_or(0);
214 self.queued_requests.insert(pos, request);
215 }
216 Priority::Normal => {
217 self.queued_requests.push_back(request);
218 }
219 },
220 Poll::Ready(None) => {
221 unreachable!("channel can't close")
222 }
223 Poll::Pending => break,
224 }
225 }
226
227 if self.queued_requests.is_empty() || no_peers_available {
228 return Poll::Pending
229 }
230 }
231 }
232
233 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
237 if let Some(peer) = self.peers.get_mut(&peer_id) {
239 peer.state = req.peer_state();
240 }
241
242 match req {
243 DownloadRequest::GetBlockHeaders { request, response, .. } => {
244 let inflight = Request { request: request.clone(), response };
245 self.inflight_headers_requests.insert(peer_id, inflight);
246 let HeadersRequest { start, limit, direction } = request;
247 BlockRequest::GetBlockHeaders(GetBlockHeaders {
248 start_block: start,
249 limit,
250 skip: 0,
251 direction,
252 })
253 }
254 DownloadRequest::GetBlockBodies { request, response, .. } => {
255 let inflight = Request { request: (), response };
256 self.inflight_bodies_requests.insert(peer_id, inflight);
257 BlockRequest::GetBlockBodies(GetBlockBodies(request))
258 }
259 }
260 }
261
262 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
266 let req = self.queued_requests.pop_front()?;
267 let req = self.prepare_block_request(peer_id, req);
268 Some(BlockResponseOutcome::Request(peer_id, req))
269 }
270
271 pub(crate) fn on_block_headers_response(
277 &mut self,
278 peer_id: PeerId,
279 res: RequestResult<Vec<N::BlockHeader>>,
280 ) -> Option<BlockResponseOutcome> {
281 let is_error = res.is_err();
282 let maybe_reputation_change = res.reputation_change_err();
283
284 let resp = self.inflight_headers_requests.remove(&peer_id);
285
286 let is_likely_bad_response =
287 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
288
289 if let Some(resp) = resp {
290 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
292 }
293
294 if let Some(peer) = self.peers.get_mut(&peer_id) {
295 peer.last_response_likely_bad = is_likely_bad_response;
297
298 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
301 return self.followup_request(peer_id)
302 }
303 }
304
305 maybe_reputation_change
308 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
309 }
310
311 pub(crate) fn on_block_bodies_response(
313 &mut self,
314 peer_id: PeerId,
315 res: RequestResult<Vec<N::BlockBody>>,
316 ) -> Option<BlockResponseOutcome> {
317 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
318
319 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
320 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
321 }
322 if let Some(peer) = self.peers.get_mut(&peer_id) {
323 peer.last_response_likely_bad = is_likely_bad_response;
325
326 if peer.state.on_request_finished() && !is_likely_bad_response {
327 return self.followup_request(peer_id)
328 }
329 }
330 None
331 }
332
333 pub(crate) fn client(&self) -> FetchClient<N> {
335 FetchClient {
336 request_tx: self.download_requests_tx.clone(),
337 peers_handle: self.peers_handle.clone(),
338 num_active_peers: Arc::clone(&self.num_active_peers),
339 }
340 }
341}
342
343enum PollAction {
345 Ready(FetchAction),
346 NoRequests,
347 NoPeersAvailable,
348}
349
350#[derive(Debug)]
352struct Peer {
353 state: PeerState,
355 best_hash: B256,
357 best_number: u64,
359 #[allow(dead_code)]
361 capabilities: Arc<Capabilities>,
362 timeout: Arc<AtomicU64>,
364 last_response_likely_bad: bool,
371 range_info: Option<BlockRangeInfo>,
373}
374
375impl Peer {
376 fn timeout(&self) -> u64 {
377 self.timeout.load(Ordering::Relaxed)
378 }
379
380 fn earliest(&self) -> u64 {
382 self.range_info.as_ref().map_or(0, |info| info.earliest())
383 }
384
385 fn has_full_history(&self) -> bool {
387 self.earliest() == 0
388 }
389
390 fn range(&self) -> Option<RangeInclusive<u64>> {
391 self.range_info.as_ref().map(|info| info.range())
392 }
393
394 fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
403 let self_range = self.range();
404 let other_range = other.range();
405
406 match (self_range, other_range) {
407 (Some(self_r), Some(other_r)) => {
408 let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
410 let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
411
412 #[allow(clippy::match_same_arms)]
413 match (self_covers, other_covers) {
414 (true, false) => true, (false, true) => false, (true, true) => false, (false, false) => {
418 self_r.start() < other_r.start()
420 }
421 }
422 }
423 (Some(self_r), None) => {
424 self_r.contains(range.start()) && self_r.contains(range.end())
427 }
428 (None, Some(other_r)) => {
429 !(other_r.contains(range.start()) && other_r.contains(range.end()))
432 }
433 (None, None) => false, }
435 }
436
437 fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
439 match requirement {
440 BestPeerRequirements::None => false,
441 BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
442 BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
443 }
444 }
445}
446
447#[derive(Debug)]
449enum PeerState {
450 Idle,
452 GetBlockHeaders,
454 GetBlockBodies,
456 Closing,
458}
459
460impl PeerState {
463 const fn is_idle(&self) -> bool {
465 matches!(self, Self::Idle)
466 }
467
468 const fn on_request_finished(&mut self) -> bool {
474 if !matches!(self, Self::Closing) {
475 *self = Self::Idle;
476 return true
477 }
478 false
479 }
480}
481
482#[derive(Debug)]
485struct Request<Req, Resp> {
486 request: Req,
489 response: oneshot::Sender<Resp>,
490}
491
492#[derive(Debug)]
494pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
495 GetBlockHeaders {
497 request: HeadersRequest,
498 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
499 priority: Priority,
500 },
501 GetBlockBodies {
503 request: Vec<B256>,
504 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
505 priority: Priority,
506 range_hint: Option<RangeInclusive<u64>>,
507 },
508}
509
510impl<N: NetworkPrimitives> DownloadRequest<N> {
513 const fn peer_state(&self) -> PeerState {
515 match self {
516 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
517 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
518 }
519 }
520
521 const fn get_priority(&self) -> &Priority {
523 match self {
524 Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
525 priority
526 }
527 }
528 }
529
530 const fn is_normal_priority(&self) -> bool {
532 self.get_priority().is_normal()
533 }
534
535 fn best_peer_requirements(&self) -> BestPeerRequirements {
537 match self {
538 Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
539 Self::GetBlockBodies { range_hint, .. } => {
540 if let Some(range) = range_hint {
541 BestPeerRequirements::FullBlockRange(range.clone())
542 } else {
543 BestPeerRequirements::FullBlock
544 }
545 }
546 }
547 }
548}
549
550pub(crate) enum FetchAction {
552 BlockRequest {
554 peer_id: PeerId,
556 request: BlockRequest,
558 },
559}
560
561#[derive(Debug, PartialEq, Eq)]
565pub(crate) enum BlockResponseOutcome {
566 Request(PeerId, BlockRequest),
568 BadResponse(PeerId, ReputationChangeKind),
570}
571
572enum BestPeerRequirements {
574 None,
576 FullBlockRange(RangeInclusive<u64>),
578 FullBlock,
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::{peers::PeersManager, PeersConfig};
586 use alloy_consensus::Header;
587 use alloy_primitives::B512;
588 use std::future::poll_fn;
589
590 #[tokio::test(flavor = "multi_thread")]
591 async fn test_poll_fetcher() {
592 let manager = PeersManager::new(PeersConfig::default());
593 let mut fetcher =
594 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
595
596 poll_fn(move |cx| {
597 assert!(fetcher.poll(cx).is_pending());
598 let (tx, _rx) = oneshot::channel();
599 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
600 request: vec![],
601 response: tx,
602 priority: Priority::default(),
603 range_hint: None,
604 });
605 assert!(fetcher.poll(cx).is_pending());
606
607 Poll::Ready(())
608 })
609 .await;
610 }
611
612 #[tokio::test]
613 async fn test_peer_rotation() {
614 let manager = PeersManager::new(PeersConfig::default());
615 let mut fetcher =
616 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
617 let peer1 = B512::random();
619 let peer2 = B512::random();
620 let capabilities = Arc::new(Capabilities::from(vec![]));
621 fetcher.new_active_peer(
622 peer1,
623 B256::random(),
624 1,
625 Arc::clone(&capabilities),
626 Arc::new(AtomicU64::new(1)),
627 None,
628 );
629 fetcher.new_active_peer(
630 peer2,
631 B256::random(),
632 2,
633 Arc::clone(&capabilities),
634 Arc::new(AtomicU64::new(1)),
635 None,
636 );
637
638 let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
639 assert!(first_peer == peer1 || first_peer == peer2);
640 fetcher.on_pending_disconnect(&first_peer);
642 let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
644 assert!(first_peer == peer1 || first_peer == peer2);
645 assert_ne!(first_peer, second_peer);
646 fetcher.on_pending_disconnect(&second_peer);
648 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
649 }
650
651 #[tokio::test]
652 async fn test_peer_prioritization() {
653 let manager = PeersManager::new(PeersConfig::default());
654 let mut fetcher =
655 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
656 let peer1 = B512::random();
658 let peer2 = B512::random();
659 let peer3 = B512::random();
660
661 let peer2_timeout = Arc::new(AtomicU64::new(300));
662
663 let capabilities = Arc::new(Capabilities::from(vec![]));
664 fetcher.new_active_peer(
665 peer1,
666 B256::random(),
667 1,
668 Arc::clone(&capabilities),
669 Arc::new(AtomicU64::new(30)),
670 None,
671 );
672 fetcher.new_active_peer(
673 peer2,
674 B256::random(),
675 2,
676 Arc::clone(&capabilities),
677 Arc::clone(&peer2_timeout),
678 None,
679 );
680 fetcher.new_active_peer(
681 peer3,
682 B256::random(),
683 3,
684 Arc::clone(&capabilities),
685 Arc::new(AtomicU64::new(50)),
686 None,
687 );
688
689 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
691 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
692 peer2_timeout.store(10, Ordering::Relaxed);
694 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
696 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
697 }
698
699 #[tokio::test]
700 async fn test_on_block_headers_response() {
701 let manager = PeersManager::new(PeersConfig::default());
702 let mut fetcher =
703 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
704 let peer_id = B512::random();
705
706 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
707
708 assert_eq!(
709 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
710 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
711 );
712 assert_eq!(
713 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
714 None
715 );
716 assert_eq!(
717 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
718 None
719 );
720 assert_eq!(
721 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
722 None
723 );
724 assert_eq!(
725 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
726 None
727 );
728 }
729
730 #[tokio::test]
731 async fn test_header_response_outcome() {
732 let manager = PeersManager::new(PeersConfig::default());
733 let mut fetcher =
734 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
735 let peer_id = B512::random();
736
737 let request_pair = || {
738 let (tx, _rx) = oneshot::channel();
739 let req = Request {
740 request: HeadersRequest {
741 start: 0u64.into(),
742 limit: 1,
743 direction: Default::default(),
744 },
745 response: tx,
746 };
747 let header = Header { number: 0, ..Default::default() };
748 (req, header)
749 };
750
751 fetcher.new_active_peer(
752 peer_id,
753 Default::default(),
754 Default::default(),
755 Arc::new(Capabilities::from(vec![])),
756 Default::default(),
757 None,
758 );
759
760 let (req, header) = request_pair();
761 fetcher.inflight_headers_requests.insert(peer_id, req);
762
763 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
764 assert!(outcome.is_none());
765 assert!(fetcher.peers[&peer_id].state.is_idle());
766
767 let outcome =
768 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
769
770 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
771 RequestError::Timeout
772 ))
773 .is_some());
774
775 match outcome {
776 BlockResponseOutcome::BadResponse(peer, _) => {
777 assert_eq!(peer, peer_id)
778 }
779 BlockResponseOutcome::Request(_, _) => {
780 unreachable!()
781 }
782 };
783
784 assert!(fetcher.peers[&peer_id].state.is_idle());
785 }
786
787 #[test]
788 fn test_peer_is_better_none_requirement() {
789 let peer1 = Peer {
790 state: PeerState::Idle,
791 best_hash: B256::random(),
792 best_number: 100,
793 capabilities: Arc::new(Capabilities::new(vec![])),
794 timeout: Arc::new(AtomicU64::new(10)),
795 last_response_likely_bad: false,
796 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
797 };
798
799 let peer2 = Peer {
800 state: PeerState::Idle,
801 best_hash: B256::random(),
802 best_number: 50,
803 capabilities: Arc::new(Capabilities::new(vec![])),
804 timeout: Arc::new(AtomicU64::new(20)),
805 last_response_likely_bad: false,
806 range_info: None,
807 };
808
809 assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
811 assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
812 }
813
814 #[test]
815 fn test_peer_is_better_full_block_requirement() {
816 let peer_full = Peer {
818 state: PeerState::Idle,
819 best_hash: B256::random(),
820 best_number: 100,
821 capabilities: Arc::new(Capabilities::new(vec![])),
822 timeout: Arc::new(AtomicU64::new(10)),
823 last_response_likely_bad: false,
824 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
825 };
826
827 let peer_partial = Peer {
829 state: PeerState::Idle,
830 best_hash: B256::random(),
831 best_number: 100,
832 capabilities: Arc::new(Capabilities::new(vec![])),
833 timeout: Arc::new(AtomicU64::new(10)),
834 last_response_likely_bad: false,
835 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
836 };
837
838 let peer_no_range = Peer {
840 state: PeerState::Idle,
841 best_hash: B256::random(),
842 best_number: 100,
843 capabilities: Arc::new(Capabilities::new(vec![])),
844 timeout: Arc::new(AtomicU64::new(10)),
845 last_response_likely_bad: false,
846 range_info: None,
847 };
848
849 assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
851 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
852
853 assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
855 assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
856
857 assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
859 assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
860 }
861
862 #[test]
863 fn test_peer_is_better_full_block_range_requirement() {
864 let range = RangeInclusive::new(40, 60);
865
866 let peer_covers = Peer {
868 state: PeerState::Idle,
869 best_hash: B256::random(),
870 best_number: 100,
871 capabilities: Arc::new(Capabilities::new(vec![])),
872 timeout: Arc::new(AtomicU64::new(10)),
873 last_response_likely_bad: false,
874 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
875 };
876
877 let peer_no_cover = Peer {
879 state: PeerState::Idle,
880 best_hash: B256::random(),
881 best_number: 100,
882 capabilities: Arc::new(Capabilities::new(vec![])),
883 timeout: Arc::new(AtomicU64::new(10)),
884 last_response_likely_bad: false,
885 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
886 };
887
888 assert!(peer_covers
890 .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
891 assert!(
892 !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
893 );
894 }
895
896 #[test]
897 fn test_peer_is_better_both_cover_range() {
898 let range = RangeInclusive::new(30, 50);
899
900 let peer_full = Peer {
902 state: PeerState::Idle,
903 best_hash: B256::random(),
904 best_number: 100,
905 capabilities: Arc::new(Capabilities::new(vec![])),
906 timeout: Arc::new(AtomicU64::new(10)),
907 last_response_likely_bad: false,
908 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
909 };
910
911 let peer_partial = Peer {
913 state: PeerState::Idle,
914 best_hash: B256::random(),
915 best_number: 100,
916 capabilities: Arc::new(Capabilities::new(vec![])),
917 timeout: Arc::new(AtomicU64::new(10)),
918 last_response_likely_bad: false,
919 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
920 };
921
922 assert!(!peer_full
924 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
925 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
926 }
927
928 #[test]
929 fn test_peer_is_better_lower_start() {
930 let range = RangeInclusive::new(30, 60);
931
932 let peer_full = Peer {
934 state: PeerState::Idle,
935 best_hash: B256::random(),
936 best_number: 100,
937 capabilities: Arc::new(Capabilities::new(vec![])),
938 timeout: Arc::new(AtomicU64::new(10)),
939 last_response_likely_bad: false,
940 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
941 };
942
943 let peer_partial = Peer {
945 state: PeerState::Idle,
946 best_hash: B256::random(),
947 best_number: 100,
948 capabilities: Arc::new(Capabilities::new(vec![])),
949 timeout: Arc::new(AtomicU64::new(10)),
950 last_response_likely_bad: false,
951 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
952 };
953
954 assert!(peer_full
956 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
957 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
958 }
959
960 #[test]
961 fn test_peer_is_better_neither_covers_range() {
962 let range = RangeInclusive::new(40, 60);
963
964 let peer_full = Peer {
966 state: PeerState::Idle,
967 best_hash: B256::random(),
968 best_number: 30,
969 capabilities: Arc::new(Capabilities::new(vec![])),
970 timeout: Arc::new(AtomicU64::new(10)),
971 last_response_likely_bad: false,
972 range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
973 };
974
975 let peer_partial = Peer {
977 state: PeerState::Idle,
978 best_hash: B256::random(),
979 best_number: 30,
980 capabilities: Arc::new(Capabilities::new(vec![])),
981 timeout: Arc::new(AtomicU64::new(10)),
982 last_response_likely_bad: false,
983 range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
984 };
985
986 assert!(peer_full
988 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
989 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
990 }
991
992 #[test]
993 fn test_peer_is_better_no_range_info() {
994 let range = RangeInclusive::new(40, 60);
995
996 let peer_with_range = Peer {
998 state: PeerState::Idle,
999 best_hash: B256::random(),
1000 best_number: 100,
1001 capabilities: Arc::new(Capabilities::new(vec![])),
1002 timeout: Arc::new(AtomicU64::new(10)),
1003 last_response_likely_bad: false,
1004 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1005 };
1006
1007 let peer_no_range = Peer {
1009 state: PeerState::Idle,
1010 best_hash: B256::random(),
1011 best_number: 100,
1012 capabilities: Arc::new(Capabilities::new(vec![])),
1013 timeout: Arc::new(AtomicU64::new(10)),
1014 last_response_likely_bad: false,
1015 range_info: None,
1016 };
1017
1018 assert!(!peer_no_range
1020 .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1021
1022 assert!(
1024 peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1025 );
1026 }
1027
1028 #[test]
1029 fn test_peer_is_better_one_peer_no_range_covers() {
1030 let range = RangeInclusive::new(40, 60);
1031
1032 let peer_with_range_covers = Peer {
1034 state: PeerState::Idle,
1035 best_hash: B256::random(),
1036 best_number: 100,
1037 capabilities: Arc::new(Capabilities::new(vec![])),
1038 timeout: Arc::new(AtomicU64::new(10)),
1039 last_response_likely_bad: false,
1040 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1041 };
1042
1043 let peer_no_range = Peer {
1045 state: PeerState::Idle,
1046 best_hash: B256::random(),
1047 best_number: 100,
1048 capabilities: Arc::new(Capabilities::new(vec![])),
1049 timeout: Arc::new(AtomicU64::new(10)),
1050 last_response_likely_bad: false,
1051 range_info: None,
1052 };
1053
1054 assert!(peer_with_range_covers
1056 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1057
1058 assert!(!peer_no_range
1060 .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1061 }
1062
1063 #[test]
1064 fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1065 let range = RangeInclusive::new(40, 60);
1066
1067 let peer_with_range_no_cover = Peer {
1069 state: PeerState::Idle,
1070 best_hash: B256::random(),
1071 best_number: 100,
1072 capabilities: Arc::new(Capabilities::new(vec![])),
1073 timeout: Arc::new(AtomicU64::new(10)),
1074 last_response_likely_bad: false,
1075 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1076 };
1077
1078 let peer_no_range = Peer {
1080 state: PeerState::Idle,
1081 best_hash: B256::random(),
1082 best_number: 100,
1083 capabilities: Arc::new(Capabilities::new(vec![])),
1084 timeout: Arc::new(AtomicU64::new(10)),
1085 last_response_likely_bad: false,
1086 range_info: None,
1087 };
1088
1089 assert!(!peer_with_range_no_cover
1091 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1092
1093 assert!(peer_no_range
1095 .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1096 }
1097
1098 #[test]
1099 fn test_peer_is_better_edge_cases() {
1100 let range = RangeInclusive::new(50, 100);
1102
1103 let peer_exact = Peer {
1105 state: PeerState::Idle,
1106 best_hash: B256::random(),
1107 best_number: 100,
1108 capabilities: Arc::new(Capabilities::new(vec![])),
1109 timeout: Arc::new(AtomicU64::new(10)),
1110 last_response_likely_bad: false,
1111 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1112 };
1113
1114 let peer_short_start = Peer {
1116 state: PeerState::Idle,
1117 best_hash: B256::random(),
1118 best_number: 100,
1119 capabilities: Arc::new(Capabilities::new(vec![])),
1120 timeout: Arc::new(AtomicU64::new(10)),
1121 last_response_likely_bad: false,
1122 range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1123 };
1124
1125 let peer_short_end = Peer {
1127 state: PeerState::Idle,
1128 best_hash: B256::random(),
1129 best_number: 100,
1130 capabilities: Arc::new(Capabilities::new(vec![])),
1131 timeout: Arc::new(AtomicU64::new(10)),
1132 last_response_likely_bad: false,
1133 range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1134 };
1135
1136 assert!(peer_exact
1138 .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1139 assert!(peer_exact
1140 .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1141
1142 assert!(!peer_short_start
1144 .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1145 assert!(
1146 !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1147 );
1148 }
1149}