1mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11 Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives,
12};
13use reth_network_api::test_utils::PeersHandle;
14use reth_network_p2p::{
15 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
16 headers::client::HeadersRequest,
17 priority::Priority,
18};
19use reth_network_peers::PeerId;
20use reth_network_types::ReputationChangeKind;
21use std::{
22 collections::{HashMap, VecDeque},
23 ops::RangeInclusive,
24 sync::{
25 atomic::{AtomicU64, AtomicUsize, Ordering},
26 Arc,
27 },
28 task::{Context, Poll},
29};
30use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
31use tokio_stream::wrappers::UnboundedReceiverStream;
32
33type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
34type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
35
36#[derive(Debug)]
43pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
44 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
46 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
48 peers: HashMap<PeerId, Peer>,
50 peers_handle: PeersHandle,
52 num_active_peers: Arc<AtomicUsize>,
54 queued_requests: VecDeque<DownloadRequest<N>>,
56 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
58 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
60}
61
62impl<N: NetworkPrimitives> StateFetcher<N> {
65 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
66 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
67 Self {
68 inflight_headers_requests: Default::default(),
69 inflight_bodies_requests: Default::default(),
70 peers: Default::default(),
71 peers_handle,
72 num_active_peers,
73 queued_requests: Default::default(),
74 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
75 download_requests_tx,
76 }
77 }
78
79 pub(crate) fn new_active_peer(
81 &mut self,
82 peer_id: PeerId,
83 best_hash: B256,
84 best_number: u64,
85 capabilities: Arc<Capabilities>,
86 timeout: Arc<AtomicU64>,
87 range_info: Option<BlockRangeInfo>,
88 ) {
89 self.peers.insert(
90 peer_id,
91 Peer {
92 state: PeerState::Idle,
93 best_hash,
94 best_number,
95 capabilities,
96 timeout,
97 last_response_likely_bad: false,
98 range_info,
99 },
100 );
101 }
102
103 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
110 self.peers.remove(peer);
111 if let Some(req) = self.inflight_headers_requests.remove(peer) {
112 let _ = req.response.send(Err(RequestError::ConnectionDropped));
113 }
114 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
115 let _ = req.response.send(Err(RequestError::ConnectionDropped));
116 }
117 }
118
119 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
123 if let Some(peer) = self.peers.get_mut(peer_id) &&
124 number > peer.best_number
125 {
126 peer.best_hash = hash;
127 peer.best_number = number;
128 return true
129 }
130 false
131 }
132
133 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
135 if let Some(peer) = self.peers.get_mut(peer_id) {
136 peer.state = PeerState::Closing;
137 }
138 }
139
140 fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
145 let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
146
147 let mut best_peer = idle.next()?;
148
149 for maybe_better in idle {
150 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
152 best_peer = maybe_better;
153 continue
154 }
155
156 if maybe_better.1.is_better(best_peer.1, &requirement) {
158 best_peer = maybe_better;
159 continue
160 }
161
162 if maybe_better.1.timeout() < best_peer.1.timeout() &&
164 !maybe_better.1.last_response_likely_bad
165 {
166 best_peer = maybe_better;
167 }
168 }
169
170 Some(*best_peer.0)
171 }
172
173 fn poll_action(&mut self) -> PollAction {
175 if self.queued_requests.is_empty() {
177 return PollAction::NoRequests
178 }
179
180 let request = self.queued_requests.pop_front().expect("not empty");
181 let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
182 self.queued_requests.push_front(request);
184 return PollAction::NoPeersAvailable
185 };
186
187 let request = self.prepare_block_request(peer_id, request);
188
189 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
190 }
191
192 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
194 loop {
196 let no_peers_available = match self.poll_action() {
197 PollAction::Ready(action) => return Poll::Ready(action),
198 PollAction::NoRequests => false,
199 PollAction::NoPeersAvailable => true,
200 };
201
202 loop {
203 match self.download_requests_rx.poll_next_unpin(cx) {
205 Poll::Ready(Some(request)) => match request.get_priority() {
206 Priority::High => {
207 let pos = self
210 .queued_requests
211 .iter()
212 .position(|req| req.is_normal_priority())
213 .unwrap_or(0);
214 self.queued_requests.insert(pos, request);
215 }
216 Priority::Normal => {
217 self.queued_requests.push_back(request);
218 }
219 },
220 Poll::Ready(None) => {
221 unreachable!("channel can't close")
222 }
223 Poll::Pending => break,
224 }
225 }
226
227 if self.queued_requests.is_empty() || no_peers_available {
228 return Poll::Pending
229 }
230 }
231 }
232
233 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
237 if let Some(peer) = self.peers.get_mut(&peer_id) {
239 peer.state = req.peer_state();
240 }
241
242 match req {
243 DownloadRequest::GetBlockHeaders { request, response, .. } => {
244 let inflight = Request { request: request.clone(), response };
245 self.inflight_headers_requests.insert(peer_id, inflight);
246 let HeadersRequest { start, limit, direction } = request;
247 BlockRequest::GetBlockHeaders(GetBlockHeaders {
248 start_block: start,
249 limit,
250 skip: 0,
251 direction,
252 })
253 }
254 DownloadRequest::GetBlockBodies { request, response, .. } => {
255 let inflight = Request { request: (), response };
256 self.inflight_bodies_requests.insert(peer_id, inflight);
257 BlockRequest::GetBlockBodies(GetBlockBodies(request))
258 }
259 }
260 }
261
262 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
266 let req = self.queued_requests.pop_front()?;
267 let req = self.prepare_block_request(peer_id, req);
268 Some(BlockResponseOutcome::Request(peer_id, req))
269 }
270
271 pub(crate) fn on_block_headers_response(
277 &mut self,
278 peer_id: PeerId,
279 res: RequestResult<Vec<N::BlockHeader>>,
280 ) -> Option<BlockResponseOutcome> {
281 let is_error = res.is_err();
282 let maybe_reputation_change = res.reputation_change_err();
283
284 let resp = self.inflight_headers_requests.remove(&peer_id);
285
286 let is_likely_bad_response =
287 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
288
289 if let Some(resp) = resp {
290 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
292 }
293
294 if let Some(peer) = self.peers.get_mut(&peer_id) {
295 peer.last_response_likely_bad = is_likely_bad_response;
297
298 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
301 return self.followup_request(peer_id)
302 }
303 }
304
305 maybe_reputation_change
308 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
309 }
310
311 pub(crate) fn on_block_bodies_response(
313 &mut self,
314 peer_id: PeerId,
315 res: RequestResult<Vec<N::BlockBody>>,
316 ) -> Option<BlockResponseOutcome> {
317 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
318
319 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
320 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
321 }
322 if let Some(peer) = self.peers.get_mut(&peer_id) {
323 peer.last_response_likely_bad = is_likely_bad_response;
325
326 if peer.state.on_request_finished() && !is_likely_bad_response {
327 return self.followup_request(peer_id)
328 }
329 }
330 None
331 }
332
333 pub(crate) fn client(&self) -> FetchClient<N> {
335 FetchClient {
336 request_tx: self.download_requests_tx.clone(),
337 peers_handle: self.peers_handle.clone(),
338 num_active_peers: Arc::clone(&self.num_active_peers),
339 }
340 }
341}
342
343enum PollAction {
345 Ready(FetchAction),
346 NoRequests,
347 NoPeersAvailable,
348}
349
350#[derive(Debug)]
352struct Peer {
353 state: PeerState,
355 best_hash: B256,
357 best_number: u64,
359 #[allow(dead_code)]
361 capabilities: Arc<Capabilities>,
362 timeout: Arc<AtomicU64>,
364 last_response_likely_bad: bool,
371 range_info: Option<BlockRangeInfo>,
373}
374
375impl Peer {
376 fn timeout(&self) -> u64 {
377 self.timeout.load(Ordering::Relaxed)
378 }
379
380 fn earliest(&self) -> u64 {
382 self.range_info.as_ref().map_or(0, |info| info.earliest())
383 }
384
385 fn has_full_history(&self) -> bool {
387 self.earliest() == 0
388 }
389
390 fn range(&self) -> Option<RangeInclusive<u64>> {
391 self.range_info.as_ref().map(|info| info.range())
392 }
393
394 fn has_better_range(&self, other: &Self, range: RangeInclusive<u64>) -> bool {
403 let self_range = self.range();
404 let other_range = other.range();
405
406 match (self_range, other_range) {
407 (Some(self_r), Some(other_r)) => {
408 let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
410 let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
411
412 #[allow(clippy::match_same_arms)]
413 match (self_covers, other_covers) {
414 (true, false) => true, (false, true) => false, (true, true) => false, (false, false) => {
418 self_r.start() < other_r.start()
420 }
421 }
422 }
423 (Some(self_r), None) => {
424 self_r.contains(range.start()) && self_r.contains(range.end())
427 }
428 (None, Some(other_r)) => {
429 !(other_r.contains(range.start()) && other_r.contains(range.end()))
432 }
433 (None, None) => false, }
435 }
436
437 fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
439 match requirement {
440 BestPeerRequirements::None => false,
441 BestPeerRequirements::FullBlockRange(range) => {
442 self.has_better_range(other, range.clone())
443 }
444 BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
445 }
446 }
447}
448
449#[derive(Debug)]
451enum PeerState {
452 Idle,
454 GetBlockHeaders,
456 GetBlockBodies,
458 Closing,
460}
461
462impl PeerState {
465 const fn is_idle(&self) -> bool {
467 matches!(self, Self::Idle)
468 }
469
470 const fn on_request_finished(&mut self) -> bool {
476 if !matches!(self, Self::Closing) {
477 *self = Self::Idle;
478 return true
479 }
480 false
481 }
482}
483
484#[derive(Debug)]
487struct Request<Req, Resp> {
488 request: Req,
491 response: oneshot::Sender<Resp>,
492}
493
494#[derive(Debug)]
496pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
497 GetBlockHeaders {
499 request: HeadersRequest,
500 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
501 priority: Priority,
502 },
503 GetBlockBodies {
505 request: Vec<B256>,
506 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
507 priority: Priority,
508 range_hint: Option<RangeInclusive<u64>>,
509 },
510}
511
512impl<N: NetworkPrimitives> DownloadRequest<N> {
515 const fn peer_state(&self) -> PeerState {
517 match self {
518 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
519 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
520 }
521 }
522
523 const fn get_priority(&self) -> &Priority {
525 match self {
526 Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
527 priority
528 }
529 }
530 }
531
532 const fn is_normal_priority(&self) -> bool {
534 self.get_priority().is_normal()
535 }
536
537 fn best_peer_requirements(&self) -> BestPeerRequirements {
539 match self {
540 Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
541 Self::GetBlockBodies { range_hint, .. } => {
542 if let Some(range) = range_hint {
543 BestPeerRequirements::FullBlockRange(range.clone())
544 } else {
545 BestPeerRequirements::FullBlock
546 }
547 }
548 }
549 }
550}
551
552pub(crate) enum FetchAction {
554 BlockRequest {
556 peer_id: PeerId,
558 request: BlockRequest,
560 },
561}
562
563#[derive(Debug, PartialEq, Eq)]
567pub(crate) enum BlockResponseOutcome {
568 Request(PeerId, BlockRequest),
570 BadResponse(PeerId, ReputationChangeKind),
572}
573
574enum BestPeerRequirements {
576 None,
578 FullBlockRange(RangeInclusive<u64>),
580 FullBlock,
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587 use crate::{peers::PeersManager, PeersConfig};
588 use alloy_consensus::Header;
589 use alloy_primitives::B512;
590 use std::future::poll_fn;
591
592 #[tokio::test(flavor = "multi_thread")]
593 async fn test_poll_fetcher() {
594 let manager = PeersManager::new(PeersConfig::default());
595 let mut fetcher =
596 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
597
598 poll_fn(move |cx| {
599 assert!(fetcher.poll(cx).is_pending());
600 let (tx, _rx) = oneshot::channel();
601 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
602 request: vec![],
603 response: tx,
604 priority: Priority::default(),
605 range_hint: None,
606 });
607 assert!(fetcher.poll(cx).is_pending());
608
609 Poll::Ready(())
610 })
611 .await;
612 }
613
614 #[tokio::test]
615 async fn test_peer_rotation() {
616 let manager = PeersManager::new(PeersConfig::default());
617 let mut fetcher =
618 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
619 let peer1 = B512::random();
621 let peer2 = B512::random();
622 let capabilities = Arc::new(Capabilities::from(vec![]));
623 fetcher.new_active_peer(
624 peer1,
625 B256::random(),
626 1,
627 Arc::clone(&capabilities),
628 Arc::new(AtomicU64::new(1)),
629 None,
630 );
631 fetcher.new_active_peer(
632 peer2,
633 B256::random(),
634 2,
635 Arc::clone(&capabilities),
636 Arc::new(AtomicU64::new(1)),
637 None,
638 );
639
640 let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
641 assert!(first_peer == peer1 || first_peer == peer2);
642 fetcher.on_pending_disconnect(&first_peer);
644 let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
646 assert!(first_peer == peer1 || first_peer == peer2);
647 assert_ne!(first_peer, second_peer);
648 fetcher.on_pending_disconnect(&second_peer);
650 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
651 }
652
653 #[tokio::test]
654 async fn test_peer_prioritization() {
655 let manager = PeersManager::new(PeersConfig::default());
656 let mut fetcher =
657 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
658 let peer1 = B512::random();
660 let peer2 = B512::random();
661 let peer3 = B512::random();
662
663 let peer2_timeout = Arc::new(AtomicU64::new(300));
664
665 let capabilities = Arc::new(Capabilities::from(vec![]));
666 fetcher.new_active_peer(
667 peer1,
668 B256::random(),
669 1,
670 Arc::clone(&capabilities),
671 Arc::new(AtomicU64::new(30)),
672 None,
673 );
674 fetcher.new_active_peer(
675 peer2,
676 B256::random(),
677 2,
678 Arc::clone(&capabilities),
679 Arc::clone(&peer2_timeout),
680 None,
681 );
682 fetcher.new_active_peer(
683 peer3,
684 B256::random(),
685 3,
686 Arc::clone(&capabilities),
687 Arc::new(AtomicU64::new(50)),
688 None,
689 );
690
691 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
693 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
694 peer2_timeout.store(10, Ordering::Relaxed);
696 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
698 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
699 }
700
701 #[tokio::test]
702 async fn test_on_block_headers_response() {
703 let manager = PeersManager::new(PeersConfig::default());
704 let mut fetcher =
705 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
706 let peer_id = B512::random();
707
708 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
709
710 assert_eq!(
711 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
712 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
713 );
714 assert_eq!(
715 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
716 None
717 );
718 assert_eq!(
719 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
720 None
721 );
722 assert_eq!(
723 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
724 None
725 );
726 assert_eq!(
727 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
728 None
729 );
730 }
731
732 #[tokio::test]
733 async fn test_header_response_outcome() {
734 let manager = PeersManager::new(PeersConfig::default());
735 let mut fetcher =
736 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
737 let peer_id = B512::random();
738
739 let request_pair = || {
740 let (tx, _rx) = oneshot::channel();
741 let req = Request {
742 request: HeadersRequest {
743 start: 0u64.into(),
744 limit: 1,
745 direction: Default::default(),
746 },
747 response: tx,
748 };
749 let header = Header { number: 0, ..Default::default() };
750 (req, header)
751 };
752
753 fetcher.new_active_peer(
754 peer_id,
755 Default::default(),
756 Default::default(),
757 Arc::new(Capabilities::from(vec![])),
758 Default::default(),
759 None,
760 );
761
762 let (req, header) = request_pair();
763 fetcher.inflight_headers_requests.insert(peer_id, req);
764
765 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
766 assert!(outcome.is_none());
767 assert!(fetcher.peers[&peer_id].state.is_idle());
768
769 let outcome =
770 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
771
772 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
773 RequestError::Timeout
774 ))
775 .is_some());
776
777 match outcome {
778 BlockResponseOutcome::BadResponse(peer, _) => {
779 assert_eq!(peer, peer_id)
780 }
781 BlockResponseOutcome::Request(_, _) => {
782 unreachable!()
783 }
784 };
785
786 assert!(fetcher.peers[&peer_id].state.is_idle());
787 }
788
789 #[test]
790 fn test_peer_is_better_none_requirement() {
791 let peer1 = Peer {
792 state: PeerState::Idle,
793 best_hash: B256::random(),
794 best_number: 100,
795 capabilities: Arc::new(Capabilities::new(vec![])),
796 timeout: Arc::new(AtomicU64::new(10)),
797 last_response_likely_bad: false,
798 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
799 };
800
801 let peer2 = Peer {
802 state: PeerState::Idle,
803 best_hash: B256::random(),
804 best_number: 50,
805 capabilities: Arc::new(Capabilities::new(vec![])),
806 timeout: Arc::new(AtomicU64::new(20)),
807 last_response_likely_bad: false,
808 range_info: None,
809 };
810
811 assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
813 assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
814 }
815
816 #[test]
817 fn test_peer_is_better_full_block_requirement() {
818 let peer_full = Peer {
820 state: PeerState::Idle,
821 best_hash: B256::random(),
822 best_number: 100,
823 capabilities: Arc::new(Capabilities::new(vec![])),
824 timeout: Arc::new(AtomicU64::new(10)),
825 last_response_likely_bad: false,
826 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
827 };
828
829 let peer_partial = Peer {
831 state: PeerState::Idle,
832 best_hash: B256::random(),
833 best_number: 100,
834 capabilities: Arc::new(Capabilities::new(vec![])),
835 timeout: Arc::new(AtomicU64::new(10)),
836 last_response_likely_bad: false,
837 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
838 };
839
840 let peer_no_range = Peer {
842 state: PeerState::Idle,
843 best_hash: B256::random(),
844 best_number: 100,
845 capabilities: Arc::new(Capabilities::new(vec![])),
846 timeout: Arc::new(AtomicU64::new(10)),
847 last_response_likely_bad: false,
848 range_info: None,
849 };
850
851 assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
853 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
854
855 assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
857 assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
858
859 assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
861 assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
862 }
863
864 #[test]
865 fn test_peer_is_better_full_block_range_requirement() {
866 let range = RangeInclusive::new(40, 60);
867
868 let peer_covers = Peer {
870 state: PeerState::Idle,
871 best_hash: B256::random(),
872 best_number: 100,
873 capabilities: Arc::new(Capabilities::new(vec![])),
874 timeout: Arc::new(AtomicU64::new(10)),
875 last_response_likely_bad: false,
876 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
877 };
878
879 let peer_no_cover = Peer {
881 state: PeerState::Idle,
882 best_hash: B256::random(),
883 best_number: 100,
884 capabilities: Arc::new(Capabilities::new(vec![])),
885 timeout: Arc::new(AtomicU64::new(10)),
886 last_response_likely_bad: false,
887 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
888 };
889
890 assert!(peer_covers
892 .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
893 assert!(
894 !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
895 );
896 }
897
898 #[test]
899 fn test_peer_is_better_both_cover_range() {
900 let range = RangeInclusive::new(30, 50);
901
902 let peer_full = Peer {
904 state: PeerState::Idle,
905 best_hash: B256::random(),
906 best_number: 100,
907 capabilities: Arc::new(Capabilities::new(vec![])),
908 timeout: Arc::new(AtomicU64::new(10)),
909 last_response_likely_bad: false,
910 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
911 };
912
913 let peer_partial = Peer {
915 state: PeerState::Idle,
916 best_hash: B256::random(),
917 best_number: 100,
918 capabilities: Arc::new(Capabilities::new(vec![])),
919 timeout: Arc::new(AtomicU64::new(10)),
920 last_response_likely_bad: false,
921 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
922 };
923
924 assert!(!peer_full
926 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
927 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
928 }
929
930 #[test]
931 fn test_peer_is_better_lower_start() {
932 let range = RangeInclusive::new(30, 60);
933
934 let peer_full = Peer {
936 state: PeerState::Idle,
937 best_hash: B256::random(),
938 best_number: 100,
939 capabilities: Arc::new(Capabilities::new(vec![])),
940 timeout: Arc::new(AtomicU64::new(10)),
941 last_response_likely_bad: false,
942 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
943 };
944
945 let peer_partial = Peer {
947 state: PeerState::Idle,
948 best_hash: B256::random(),
949 best_number: 100,
950 capabilities: Arc::new(Capabilities::new(vec![])),
951 timeout: Arc::new(AtomicU64::new(10)),
952 last_response_likely_bad: false,
953 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
954 };
955
956 assert!(peer_full
958 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
959 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
960 }
961
962 #[test]
963 fn test_peer_is_better_neither_covers_range() {
964 let range = RangeInclusive::new(40, 60);
965
966 let peer_full = Peer {
968 state: PeerState::Idle,
969 best_hash: B256::random(),
970 best_number: 30,
971 capabilities: Arc::new(Capabilities::new(vec![])),
972 timeout: Arc::new(AtomicU64::new(10)),
973 last_response_likely_bad: false,
974 range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
975 };
976
977 let peer_partial = Peer {
979 state: PeerState::Idle,
980 best_hash: B256::random(),
981 best_number: 30,
982 capabilities: Arc::new(Capabilities::new(vec![])),
983 timeout: Arc::new(AtomicU64::new(10)),
984 last_response_likely_bad: false,
985 range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
986 };
987
988 assert!(peer_full
990 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
991 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
992 }
993
994 #[test]
995 fn test_peer_is_better_no_range_info() {
996 let range = RangeInclusive::new(40, 60);
997
998 let peer_with_range = Peer {
1000 state: PeerState::Idle,
1001 best_hash: B256::random(),
1002 best_number: 100,
1003 capabilities: Arc::new(Capabilities::new(vec![])),
1004 timeout: Arc::new(AtomicU64::new(10)),
1005 last_response_likely_bad: false,
1006 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1007 };
1008
1009 let peer_no_range = Peer {
1011 state: PeerState::Idle,
1012 best_hash: B256::random(),
1013 best_number: 100,
1014 capabilities: Arc::new(Capabilities::new(vec![])),
1015 timeout: Arc::new(AtomicU64::new(10)),
1016 last_response_likely_bad: false,
1017 range_info: None,
1018 };
1019
1020 assert!(!peer_no_range
1022 .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1023
1024 assert!(
1026 peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1027 );
1028 }
1029
1030 #[test]
1031 fn test_peer_is_better_one_peer_no_range_covers() {
1032 let range = RangeInclusive::new(40, 60);
1033
1034 let peer_with_range_covers = Peer {
1036 state: PeerState::Idle,
1037 best_hash: B256::random(),
1038 best_number: 100,
1039 capabilities: Arc::new(Capabilities::new(vec![])),
1040 timeout: Arc::new(AtomicU64::new(10)),
1041 last_response_likely_bad: false,
1042 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1043 };
1044
1045 let peer_no_range = Peer {
1047 state: PeerState::Idle,
1048 best_hash: B256::random(),
1049 best_number: 100,
1050 capabilities: Arc::new(Capabilities::new(vec![])),
1051 timeout: Arc::new(AtomicU64::new(10)),
1052 last_response_likely_bad: false,
1053 range_info: None,
1054 };
1055
1056 assert!(peer_with_range_covers
1058 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1059
1060 assert!(!peer_no_range
1062 .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1063 }
1064
1065 #[test]
1066 fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1067 let range = RangeInclusive::new(40, 60);
1068
1069 let peer_with_range_no_cover = Peer {
1071 state: PeerState::Idle,
1072 best_hash: B256::random(),
1073 best_number: 100,
1074 capabilities: Arc::new(Capabilities::new(vec![])),
1075 timeout: Arc::new(AtomicU64::new(10)),
1076 last_response_likely_bad: false,
1077 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1078 };
1079
1080 let peer_no_range = Peer {
1082 state: PeerState::Idle,
1083 best_hash: B256::random(),
1084 best_number: 100,
1085 capabilities: Arc::new(Capabilities::new(vec![])),
1086 timeout: Arc::new(AtomicU64::new(10)),
1087 last_response_likely_bad: false,
1088 range_info: None,
1089 };
1090
1091 assert!(!peer_with_range_no_cover
1093 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1094
1095 assert!(peer_no_range
1097 .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1098 }
1099
1100 #[test]
1101 fn test_peer_is_better_edge_cases() {
1102 let range = RangeInclusive::new(50, 100);
1104
1105 let peer_exact = Peer {
1107 state: PeerState::Idle,
1108 best_hash: B256::random(),
1109 best_number: 100,
1110 capabilities: Arc::new(Capabilities::new(vec![])),
1111 timeout: Arc::new(AtomicU64::new(10)),
1112 last_response_likely_bad: false,
1113 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1114 };
1115
1116 let peer_short_start = Peer {
1118 state: PeerState::Idle,
1119 best_hash: B256::random(),
1120 best_number: 100,
1121 capabilities: Arc::new(Capabilities::new(vec![])),
1122 timeout: Arc::new(AtomicU64::new(10)),
1123 last_response_likely_bad: false,
1124 range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1125 };
1126
1127 let peer_short_end = Peer {
1129 state: PeerState::Idle,
1130 best_hash: B256::random(),
1131 best_number: 100,
1132 capabilities: Arc::new(Capabilities::new(vec![])),
1133 timeout: Arc::new(AtomicU64::new(10)),
1134 last_response_likely_bad: false,
1135 range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1136 };
1137
1138 assert!(peer_exact
1140 .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1141 assert!(peer_exact
1142 .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1143
1144 assert!(!peer_short_start
1146 .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1147 assert!(
1148 !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1149 );
1150 }
1151}