1mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11 Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetReceipts,
12 NetworkPrimitives,
13};
14use reth_network_api::test_utils::PeersHandle;
15use reth_network_p2p::{
16 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
17 headers::client::HeadersRequest,
18 priority::Priority,
19 receipts::client::ReceiptsResponse,
20};
21use reth_network_peers::PeerId;
22use reth_network_types::ReputationChangeKind;
23use std::{
24 collections::{HashMap, VecDeque},
25 ops::RangeInclusive,
26 sync::{
27 atomic::{AtomicU64, AtomicUsize, Ordering},
28 Arc,
29 },
30 task::{Context, Poll},
31};
32use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
33use tokio_stream::wrappers::UnboundedReceiverStream;
34
35type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
36type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
37type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
38
39#[derive(Debug)]
46pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
47 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
49 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
51 inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
53 peers: HashMap<PeerId, Peer>,
55 peers_handle: PeersHandle,
57 num_active_peers: Arc<AtomicUsize>,
59 queued_requests: VecDeque<DownloadRequest<N>>,
61 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
63 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
65}
66
67impl<N: NetworkPrimitives> StateFetcher<N> {
70 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
71 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
72 Self {
73 inflight_headers_requests: Default::default(),
74 inflight_bodies_requests: Default::default(),
75 inflight_receipts_requests: Default::default(),
76 peers: Default::default(),
77 peers_handle,
78 num_active_peers,
79 queued_requests: Default::default(),
80 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
81 download_requests_tx,
82 }
83 }
84
85 pub(crate) fn new_active_peer(
87 &mut self,
88 peer_id: PeerId,
89 best_hash: B256,
90 best_number: u64,
91 capabilities: Arc<Capabilities>,
92 timeout: Arc<AtomicU64>,
93 range_info: Option<BlockRangeInfo>,
94 ) {
95 self.peers.insert(
96 peer_id,
97 Peer {
98 state: PeerState::Idle,
99 best_hash,
100 best_number,
101 capabilities,
102 timeout,
103 last_response_likely_bad: false,
104 range_info,
105 },
106 );
107 }
108
109 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
116 self.peers.remove(peer);
117 if let Some(req) = self.inflight_headers_requests.remove(peer) {
118 let _ = req.response.send(Err(RequestError::ConnectionDropped));
119 }
120 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
121 let _ = req.response.send(Err(RequestError::ConnectionDropped));
122 }
123 if let Some(req) = self.inflight_receipts_requests.remove(peer) {
124 let _ = req.response.send(Err(RequestError::ConnectionDropped));
125 }
126 }
127
128 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
132 if let Some(peer) = self.peers.get_mut(peer_id) &&
133 number > peer.best_number
134 {
135 peer.best_hash = hash;
136 peer.best_number = number;
137 return true
138 }
139 false
140 }
141
142 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
144 if let Some(peer) = self.peers.get_mut(peer_id) {
145 peer.state = PeerState::Closing;
146 }
147 }
148
149 fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
154 let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
155
156 let mut best_peer = idle.next()?;
157
158 for maybe_better in idle {
159 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
161 best_peer = maybe_better;
162 continue
163 }
164
165 if maybe_better.1.is_better(best_peer.1, &requirement) {
167 best_peer = maybe_better;
168 continue
169 }
170
171 if maybe_better.1.timeout() < best_peer.1.timeout() &&
173 !maybe_better.1.last_response_likely_bad
174 {
175 best_peer = maybe_better;
176 }
177 }
178
179 Some(*best_peer.0)
180 }
181
182 fn poll_action(&mut self) -> PollAction {
184 if self.queued_requests.is_empty() {
186 return PollAction::NoRequests
187 }
188
189 let request = self.queued_requests.pop_front().expect("not empty");
190 let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
191 self.queued_requests.push_front(request);
193 return PollAction::NoPeersAvailable
194 };
195
196 let request = self.prepare_block_request(peer_id, request);
197
198 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
199 }
200
201 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
203 loop {
205 let no_peers_available = match self.poll_action() {
206 PollAction::Ready(action) => return Poll::Ready(action),
207 PollAction::NoRequests => false,
208 PollAction::NoPeersAvailable => true,
209 };
210
211 loop {
212 match self.download_requests_rx.poll_next_unpin(cx) {
214 Poll::Ready(Some(request)) => match request.get_priority() {
215 Priority::High => {
216 let pos = self
219 .queued_requests
220 .iter()
221 .position(|req| req.is_normal_priority())
222 .unwrap_or(0);
223 self.queued_requests.insert(pos, request);
224 }
225 Priority::Normal => {
226 self.queued_requests.push_back(request);
227 }
228 },
229 Poll::Ready(None) => {
230 unreachable!("channel can't close")
231 }
232 Poll::Pending => break,
233 }
234 }
235
236 if self.queued_requests.is_empty() || no_peers_available {
237 return Poll::Pending
238 }
239 }
240 }
241
242 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
246 if let Some(peer) = self.peers.get_mut(&peer_id) {
248 peer.state = req.peer_state();
249 }
250
251 match req {
252 DownloadRequest::GetBlockHeaders { request, response, .. } => {
253 let inflight = Request { request: request.clone(), response };
254 self.inflight_headers_requests.insert(peer_id, inflight);
255 let HeadersRequest { start, limit, direction } = request;
256 BlockRequest::GetBlockHeaders(GetBlockHeaders {
257 start_block: start,
258 limit,
259 skip: 0,
260 direction,
261 })
262 }
263 DownloadRequest::GetBlockBodies { request, response, .. } => {
264 let inflight = Request { request: (), response };
265 self.inflight_bodies_requests.insert(peer_id, inflight);
266 BlockRequest::GetBlockBodies(GetBlockBodies(request))
267 }
268 DownloadRequest::GetReceipts { request, response, .. } => {
269 let inflight = Request { request: (), response };
270 self.inflight_receipts_requests.insert(peer_id, inflight);
271 BlockRequest::GetReceipts(GetReceipts(request))
272 }
273 }
274 }
275
276 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
280 let req = self.queued_requests.pop_front()?;
281 let req = self.prepare_block_request(peer_id, req);
282 Some(BlockResponseOutcome::Request(peer_id, req))
283 }
284
285 pub(crate) fn on_block_headers_response(
291 &mut self,
292 peer_id: PeerId,
293 res: RequestResult<Vec<N::BlockHeader>>,
294 ) -> Option<BlockResponseOutcome> {
295 let is_error = res.is_err();
296 let maybe_reputation_change = res.reputation_change_err();
297
298 let resp = self.inflight_headers_requests.remove(&peer_id);
299
300 let is_likely_bad_response =
301 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
302
303 if let Some(resp) = resp {
304 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
306 }
307
308 if let Some(peer) = self.peers.get_mut(&peer_id) {
309 peer.last_response_likely_bad = is_likely_bad_response;
311
312 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
315 return self.followup_request(peer_id)
316 }
317 }
318
319 maybe_reputation_change
322 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
323 }
324
325 pub(crate) fn on_block_bodies_response(
327 &mut self,
328 peer_id: PeerId,
329 res: RequestResult<Vec<N::BlockBody>>,
330 ) -> Option<BlockResponseOutcome> {
331 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
332
333 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
334 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
335 }
336 if let Some(peer) = self.peers.get_mut(&peer_id) {
337 peer.last_response_likely_bad = is_likely_bad_response;
339
340 if peer.state.on_request_finished() && !is_likely_bad_response {
341 return self.followup_request(peer_id)
342 }
343 }
344 None
345 }
346
347 pub(crate) fn on_receipts_response(
352 &mut self,
353 peer_id: PeerId,
354 res: RequestResult<ReceiptsResponse<N::Receipt>>,
355 ) -> Option<BlockResponseOutcome> {
356 let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
357
358 if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
359 let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
360 }
361 if let Some(peer) = self.peers.get_mut(&peer_id) {
362 peer.last_response_likely_bad = is_likely_bad_response;
363
364 if peer.state.on_request_finished() && !is_likely_bad_response {
365 return self.followup_request(peer_id)
366 }
367 }
368 None
369 }
370
371 pub(crate) fn client(&self) -> FetchClient<N> {
373 FetchClient {
374 request_tx: self.download_requests_tx.clone(),
375 peers_handle: self.peers_handle.clone(),
376 num_active_peers: Arc::clone(&self.num_active_peers),
377 }
378 }
379}
380
381enum PollAction {
383 Ready(FetchAction),
384 NoRequests,
385 NoPeersAvailable,
386}
387
388#[derive(Debug)]
390struct Peer {
391 state: PeerState,
393 best_hash: B256,
395 best_number: u64,
397 #[allow(dead_code)]
399 capabilities: Arc<Capabilities>,
400 timeout: Arc<AtomicU64>,
402 last_response_likely_bad: bool,
409 range_info: Option<BlockRangeInfo>,
411}
412
413impl Peer {
414 fn timeout(&self) -> u64 {
415 self.timeout.load(Ordering::Relaxed)
416 }
417
418 fn earliest(&self) -> u64 {
420 self.range_info.as_ref().map_or(0, |info| info.earliest())
421 }
422
423 fn has_full_history(&self) -> bool {
425 self.earliest() == 0
426 }
427
428 fn range(&self) -> Option<RangeInclusive<u64>> {
429 self.range_info.as_ref().map(|info| info.range())
430 }
431
432 fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
441 let self_range = self.range();
442 let other_range = other.range();
443
444 match (self_range, other_range) {
445 (Some(self_r), Some(other_r)) => {
446 let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
448 let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
449
450 #[allow(clippy::match_same_arms)]
451 match (self_covers, other_covers) {
452 (true, false) => true, (false, true) => false, (true, true) => false, (false, false) => {
456 self_r.start() < other_r.start()
458 }
459 }
460 }
461 (Some(self_r), None) => {
462 self_r.contains(range.start()) && self_r.contains(range.end())
465 }
466 (None, Some(other_r)) => {
467 !(other_r.contains(range.start()) && other_r.contains(range.end()))
470 }
471 (None, None) => false, }
473 }
474
475 fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
477 match requirement {
478 BestPeerRequirements::None => false,
479 BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
480 BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
481 }
482 }
483}
484
485#[derive(Debug)]
487enum PeerState {
488 Idle,
490 GetBlockHeaders,
492 GetBlockBodies,
494 GetReceipts,
496 Closing,
498}
499
500impl PeerState {
503 const fn is_idle(&self) -> bool {
505 matches!(self, Self::Idle)
506 }
507
508 const fn on_request_finished(&mut self) -> bool {
514 if !matches!(self, Self::Closing) {
515 *self = Self::Idle;
516 return true
517 }
518 false
519 }
520}
521
522#[derive(Debug)]
525struct Request<Req, Resp> {
526 request: Req,
529 response: oneshot::Sender<Resp>,
530}
531
532#[derive(Debug)]
534#[allow(clippy::enum_variant_names)]
535pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
536 GetBlockHeaders {
538 request: HeadersRequest,
539 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
540 priority: Priority,
541 },
542 GetBlockBodies {
544 request: Vec<B256>,
545 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
546 priority: Priority,
547 range_hint: Option<RangeInclusive<u64>>,
548 },
549 GetReceipts {
551 request: Vec<B256>,
552 response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
553 priority: Priority,
554 },
555}
556
557impl<N: NetworkPrimitives> DownloadRequest<N> {
560 const fn peer_state(&self) -> PeerState {
562 match self {
563 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
564 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
565 Self::GetReceipts { .. } => PeerState::GetReceipts,
566 }
567 }
568
569 const fn get_priority(&self) -> &Priority {
571 match self {
572 Self::GetBlockHeaders { priority, .. } |
573 Self::GetBlockBodies { priority, .. } |
574 Self::GetReceipts { priority, .. } => priority,
575 }
576 }
577
578 const fn is_normal_priority(&self) -> bool {
580 self.get_priority().is_normal()
581 }
582
583 fn best_peer_requirements(&self) -> BestPeerRequirements {
585 match self {
586 Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
587 Self::GetBlockBodies { range_hint, .. } => {
588 if let Some(range) = range_hint {
589 BestPeerRequirements::FullBlockRange(range.clone())
590 } else {
591 BestPeerRequirements::FullBlock
592 }
593 }
594 Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
595 }
596 }
597}
598
599pub(crate) enum FetchAction {
601 BlockRequest {
603 peer_id: PeerId,
605 request: BlockRequest,
607 },
608}
609
610#[derive(Debug, PartialEq, Eq)]
614pub(crate) enum BlockResponseOutcome {
615 Request(PeerId, BlockRequest),
617 BadResponse(PeerId, ReputationChangeKind),
619}
620
621enum BestPeerRequirements {
623 None,
625 FullBlockRange(RangeInclusive<u64>),
627 FullBlock,
629}
630
631#[cfg(test)]
632mod tests {
633 use super::*;
634 use crate::{peers::PeersManager, PeersConfig};
635 use alloy_consensus::Header;
636 use alloy_primitives::B512;
637 use std::future::poll_fn;
638
639 #[tokio::test(flavor = "multi_thread")]
640 async fn test_poll_fetcher() {
641 let manager = PeersManager::new(PeersConfig::default());
642 let mut fetcher =
643 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
644
645 poll_fn(move |cx| {
646 assert!(fetcher.poll(cx).is_pending());
647 let (tx, _rx) = oneshot::channel();
648 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
649 request: vec![],
650 response: tx,
651 priority: Priority::default(),
652 range_hint: None,
653 });
654 assert!(fetcher.poll(cx).is_pending());
655
656 Poll::Ready(())
657 })
658 .await;
659 }
660
661 #[tokio::test]
662 async fn test_peer_rotation() {
663 let manager = PeersManager::new(PeersConfig::default());
664 let mut fetcher =
665 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
666 let peer1 = B512::random();
668 let peer2 = B512::random();
669 let capabilities = Arc::new(Capabilities::from(vec![]));
670 fetcher.new_active_peer(
671 peer1,
672 B256::random(),
673 1,
674 Arc::clone(&capabilities),
675 Arc::new(AtomicU64::new(1)),
676 None,
677 );
678 fetcher.new_active_peer(
679 peer2,
680 B256::random(),
681 2,
682 Arc::clone(&capabilities),
683 Arc::new(AtomicU64::new(1)),
684 None,
685 );
686
687 let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
688 assert!(first_peer == peer1 || first_peer == peer2);
689 fetcher.on_pending_disconnect(&first_peer);
691 let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
693 assert!(first_peer == peer1 || first_peer == peer2);
694 assert_ne!(first_peer, second_peer);
695 fetcher.on_pending_disconnect(&second_peer);
697 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
698 }
699
700 #[tokio::test]
701 async fn test_peer_prioritization() {
702 let manager = PeersManager::new(PeersConfig::default());
703 let mut fetcher =
704 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
705 let peer1 = B512::random();
707 let peer2 = B512::random();
708 let peer3 = B512::random();
709
710 let peer2_timeout = Arc::new(AtomicU64::new(300));
711
712 let capabilities = Arc::new(Capabilities::from(vec![]));
713 fetcher.new_active_peer(
714 peer1,
715 B256::random(),
716 1,
717 Arc::clone(&capabilities),
718 Arc::new(AtomicU64::new(30)),
719 None,
720 );
721 fetcher.new_active_peer(
722 peer2,
723 B256::random(),
724 2,
725 Arc::clone(&capabilities),
726 Arc::clone(&peer2_timeout),
727 None,
728 );
729 fetcher.new_active_peer(
730 peer3,
731 B256::random(),
732 3,
733 Arc::clone(&capabilities),
734 Arc::new(AtomicU64::new(50)),
735 None,
736 );
737
738 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
740 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
741 peer2_timeout.store(10, Ordering::Relaxed);
743 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
745 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
746 }
747
748 #[tokio::test]
749 async fn test_on_block_headers_response() {
750 let manager = PeersManager::new(PeersConfig::default());
751 let mut fetcher =
752 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
753 let peer_id = B512::random();
754
755 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
756
757 assert_eq!(
758 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
759 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
760 );
761 assert_eq!(
762 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
763 None
764 );
765 assert_eq!(
766 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
767 None
768 );
769 assert_eq!(
770 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
771 None
772 );
773 assert_eq!(
774 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
775 None
776 );
777 }
778
779 #[tokio::test]
780 async fn test_header_response_outcome() {
781 let manager = PeersManager::new(PeersConfig::default());
782 let mut fetcher =
783 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
784 let peer_id = B512::random();
785
786 let request_pair = || {
787 let (tx, _rx) = oneshot::channel();
788 let req = Request {
789 request: HeadersRequest {
790 start: 0u64.into(),
791 limit: 1,
792 direction: Default::default(),
793 },
794 response: tx,
795 };
796 let header = Header { number: 0, ..Default::default() };
797 (req, header)
798 };
799
800 fetcher.new_active_peer(
801 peer_id,
802 Default::default(),
803 Default::default(),
804 Arc::new(Capabilities::from(vec![])),
805 Default::default(),
806 None,
807 );
808
809 let (req, header) = request_pair();
810 fetcher.inflight_headers_requests.insert(peer_id, req);
811
812 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
813 assert!(outcome.is_none());
814 assert!(fetcher.peers[&peer_id].state.is_idle());
815
816 let outcome =
817 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
818
819 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
820 RequestError::Timeout
821 ))
822 .is_some());
823
824 match outcome {
825 BlockResponseOutcome::BadResponse(peer, _) => {
826 assert_eq!(peer, peer_id)
827 }
828 BlockResponseOutcome::Request(_, _) => {
829 unreachable!()
830 }
831 };
832
833 assert!(fetcher.peers[&peer_id].state.is_idle());
834 }
835
836 #[test]
837 fn test_peer_is_better_none_requirement() {
838 let peer1 = Peer {
839 state: PeerState::Idle,
840 best_hash: B256::random(),
841 best_number: 100,
842 capabilities: Arc::new(Capabilities::new(vec![])),
843 timeout: Arc::new(AtomicU64::new(10)),
844 last_response_likely_bad: false,
845 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
846 };
847
848 let peer2 = Peer {
849 state: PeerState::Idle,
850 best_hash: B256::random(),
851 best_number: 50,
852 capabilities: Arc::new(Capabilities::new(vec![])),
853 timeout: Arc::new(AtomicU64::new(20)),
854 last_response_likely_bad: false,
855 range_info: None,
856 };
857
858 assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
860 assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
861 }
862
863 #[test]
864 fn test_peer_is_better_full_block_requirement() {
865 let peer_full = Peer {
867 state: PeerState::Idle,
868 best_hash: B256::random(),
869 best_number: 100,
870 capabilities: Arc::new(Capabilities::new(vec![])),
871 timeout: Arc::new(AtomicU64::new(10)),
872 last_response_likely_bad: false,
873 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
874 };
875
876 let peer_partial = Peer {
878 state: PeerState::Idle,
879 best_hash: B256::random(),
880 best_number: 100,
881 capabilities: Arc::new(Capabilities::new(vec![])),
882 timeout: Arc::new(AtomicU64::new(10)),
883 last_response_likely_bad: false,
884 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
885 };
886
887 let peer_no_range = Peer {
889 state: PeerState::Idle,
890 best_hash: B256::random(),
891 best_number: 100,
892 capabilities: Arc::new(Capabilities::new(vec![])),
893 timeout: Arc::new(AtomicU64::new(10)),
894 last_response_likely_bad: false,
895 range_info: None,
896 };
897
898 assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
900 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
901
902 assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
904 assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
905
906 assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
908 assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
909 }
910
911 #[test]
912 fn test_peer_is_better_full_block_range_requirement() {
913 let range = RangeInclusive::new(40, 60);
914
915 let peer_covers = Peer {
917 state: PeerState::Idle,
918 best_hash: B256::random(),
919 best_number: 100,
920 capabilities: Arc::new(Capabilities::new(vec![])),
921 timeout: Arc::new(AtomicU64::new(10)),
922 last_response_likely_bad: false,
923 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
924 };
925
926 let peer_no_cover = Peer {
928 state: PeerState::Idle,
929 best_hash: B256::random(),
930 best_number: 100,
931 capabilities: Arc::new(Capabilities::new(vec![])),
932 timeout: Arc::new(AtomicU64::new(10)),
933 last_response_likely_bad: false,
934 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
935 };
936
937 assert!(peer_covers
939 .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
940 assert!(
941 !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
942 );
943 }
944
945 #[test]
946 fn test_peer_is_better_both_cover_range() {
947 let range = RangeInclusive::new(30, 50);
948
949 let peer_full = Peer {
951 state: PeerState::Idle,
952 best_hash: B256::random(),
953 best_number: 100,
954 capabilities: Arc::new(Capabilities::new(vec![])),
955 timeout: Arc::new(AtomicU64::new(10)),
956 last_response_likely_bad: false,
957 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
958 };
959
960 let peer_partial = Peer {
962 state: PeerState::Idle,
963 best_hash: B256::random(),
964 best_number: 100,
965 capabilities: Arc::new(Capabilities::new(vec![])),
966 timeout: Arc::new(AtomicU64::new(10)),
967 last_response_likely_bad: false,
968 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
969 };
970
971 assert!(!peer_full
973 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
974 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
975 }
976
977 #[test]
978 fn test_peer_is_better_lower_start() {
979 let range = RangeInclusive::new(30, 60);
980
981 let peer_full = Peer {
983 state: PeerState::Idle,
984 best_hash: B256::random(),
985 best_number: 100,
986 capabilities: Arc::new(Capabilities::new(vec![])),
987 timeout: Arc::new(AtomicU64::new(10)),
988 last_response_likely_bad: false,
989 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
990 };
991
992 let peer_partial = Peer {
994 state: PeerState::Idle,
995 best_hash: B256::random(),
996 best_number: 100,
997 capabilities: Arc::new(Capabilities::new(vec![])),
998 timeout: Arc::new(AtomicU64::new(10)),
999 last_response_likely_bad: false,
1000 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1001 };
1002
1003 assert!(peer_full
1005 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1006 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1007 }
1008
1009 #[test]
1010 fn test_peer_is_better_neither_covers_range() {
1011 let range = RangeInclusive::new(40, 60);
1012
1013 let peer_full = Peer {
1015 state: PeerState::Idle,
1016 best_hash: B256::random(),
1017 best_number: 30,
1018 capabilities: Arc::new(Capabilities::new(vec![])),
1019 timeout: Arc::new(AtomicU64::new(10)),
1020 last_response_likely_bad: false,
1021 range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
1022 };
1023
1024 let peer_partial = Peer {
1026 state: PeerState::Idle,
1027 best_hash: B256::random(),
1028 best_number: 30,
1029 capabilities: Arc::new(Capabilities::new(vec![])),
1030 timeout: Arc::new(AtomicU64::new(10)),
1031 last_response_likely_bad: false,
1032 range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
1033 };
1034
1035 assert!(peer_full
1037 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1038 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1039 }
1040
1041 #[test]
1042 fn test_peer_is_better_no_range_info() {
1043 let range = RangeInclusive::new(40, 60);
1044
1045 let peer_with_range = Peer {
1047 state: PeerState::Idle,
1048 best_hash: B256::random(),
1049 best_number: 100,
1050 capabilities: Arc::new(Capabilities::new(vec![])),
1051 timeout: Arc::new(AtomicU64::new(10)),
1052 last_response_likely_bad: false,
1053 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1054 };
1055
1056 let peer_no_range = Peer {
1058 state: PeerState::Idle,
1059 best_hash: B256::random(),
1060 best_number: 100,
1061 capabilities: Arc::new(Capabilities::new(vec![])),
1062 timeout: Arc::new(AtomicU64::new(10)),
1063 last_response_likely_bad: false,
1064 range_info: None,
1065 };
1066
1067 assert!(!peer_no_range
1069 .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1070
1071 assert!(
1073 peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1074 );
1075 }
1076
1077 #[test]
1078 fn test_peer_is_better_one_peer_no_range_covers() {
1079 let range = RangeInclusive::new(40, 60);
1080
1081 let peer_with_range_covers = Peer {
1083 state: PeerState::Idle,
1084 best_hash: B256::random(),
1085 best_number: 100,
1086 capabilities: Arc::new(Capabilities::new(vec![])),
1087 timeout: Arc::new(AtomicU64::new(10)),
1088 last_response_likely_bad: false,
1089 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1090 };
1091
1092 let peer_no_range = Peer {
1094 state: PeerState::Idle,
1095 best_hash: B256::random(),
1096 best_number: 100,
1097 capabilities: Arc::new(Capabilities::new(vec![])),
1098 timeout: Arc::new(AtomicU64::new(10)),
1099 last_response_likely_bad: false,
1100 range_info: None,
1101 };
1102
1103 assert!(peer_with_range_covers
1105 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1106
1107 assert!(!peer_no_range
1109 .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1110 }
1111
1112 #[test]
1113 fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1114 let range = RangeInclusive::new(40, 60);
1115
1116 let peer_with_range_no_cover = Peer {
1118 state: PeerState::Idle,
1119 best_hash: B256::random(),
1120 best_number: 100,
1121 capabilities: Arc::new(Capabilities::new(vec![])),
1122 timeout: Arc::new(AtomicU64::new(10)),
1123 last_response_likely_bad: false,
1124 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1125 };
1126
1127 let peer_no_range = Peer {
1129 state: PeerState::Idle,
1130 best_hash: B256::random(),
1131 best_number: 100,
1132 capabilities: Arc::new(Capabilities::new(vec![])),
1133 timeout: Arc::new(AtomicU64::new(10)),
1134 last_response_likely_bad: false,
1135 range_info: None,
1136 };
1137
1138 assert!(!peer_with_range_no_cover
1140 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1141
1142 assert!(peer_no_range
1144 .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1145 }
1146
1147 #[test]
1148 fn test_peer_is_better_edge_cases() {
1149 let range = RangeInclusive::new(50, 100);
1151
1152 let peer_exact = Peer {
1154 state: PeerState::Idle,
1155 best_hash: B256::random(),
1156 best_number: 100,
1157 capabilities: Arc::new(Capabilities::new(vec![])),
1158 timeout: Arc::new(AtomicU64::new(10)),
1159 last_response_likely_bad: false,
1160 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1161 };
1162
1163 let peer_short_start = Peer {
1165 state: PeerState::Idle,
1166 best_hash: B256::random(),
1167 best_number: 100,
1168 capabilities: Arc::new(Capabilities::new(vec![])),
1169 timeout: Arc::new(AtomicU64::new(10)),
1170 last_response_likely_bad: false,
1171 range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1172 };
1173
1174 let peer_short_end = Peer {
1176 state: PeerState::Idle,
1177 best_hash: B256::random(),
1178 best_number: 100,
1179 capabilities: Arc::new(Capabilities::new(vec![])),
1180 timeout: Arc::new(AtomicU64::new(10)),
1181 last_response_likely_bad: false,
1182 range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1183 };
1184
1185 assert!(peer_exact
1187 .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1188 assert!(peer_exact
1189 .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1190
1191 assert!(!peer_short_start
1193 .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1194 assert!(
1195 !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1196 );
1197 }
1198
1199 fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
1201 let manager = PeersManager::new(PeersConfig::default());
1202 let mut fetcher =
1203 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1204 let peer_id = B512::random();
1205
1206 fetcher.new_active_peer(
1207 peer_id,
1208 Default::default(),
1209 Default::default(),
1210 Arc::new(Capabilities::from(vec![])),
1211 Default::default(),
1212 None,
1213 );
1214 (fetcher, peer_id)
1215 }
1216
1217 fn insert_inflight_receipts(
1220 fetcher: &mut StateFetcher<EthNetworkPrimitives>,
1221 peer_id: PeerId,
1222 ) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
1223 {
1224 let (tx, rx) = oneshot::channel();
1225 fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
1226 fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
1227 rx
1228 }
1229
1230 #[tokio::test]
1233 async fn test_poll_dispatches_receipts_to_peer() {
1234 let (mut fetcher, peer_id) = fetcher_with_peer();
1235
1236 poll_fn(move |cx| {
1237 let (tx, _rx) = oneshot::channel();
1238 fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
1239 request: vec![B256::ZERO],
1240 response: tx,
1241 priority: Priority::default(),
1242 });
1243
1244 let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
1245 fetcher.poll(cx)
1246 else {
1247 panic!("expected Ready(BlockRequest)");
1248 };
1249 assert_eq!(dispatched_peer, peer_id);
1250 assert!(matches!(request, BlockRequest::GetReceipts(_)));
1251
1252 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1254 assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1256
1257 Poll::Ready(())
1258 })
1259 .await;
1260 }
1261
1262 #[tokio::test]
1265 async fn test_receipts_complete_response_resolves_and_idles_peer() {
1266 let (mut fetcher, peer_id) = fetcher_with_peer();
1267
1268 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1269
1270 let resp = ReceiptsResponse::new(vec![vec![]]);
1271 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1272
1273 assert!(outcome.is_none());
1275 assert!(fetcher.peers[&peer_id].state.is_idle());
1277 assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1279
1280 let result = rx.await.unwrap().unwrap();
1282 assert_eq!(result.1.receipts.len(), 1);
1283 }
1284
1285 #[tokio::test]
1286 async fn test_receipts_empty_response_marks_peer_bad() {
1287 let (mut fetcher, peer_id) = fetcher_with_peer();
1288 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1289
1290 let resp = ReceiptsResponse::new(vec![]);
1291 let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
1292
1293 assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1294 }
1295
1296 #[tokio::test]
1297 async fn test_receipts_error_forwards_and_marks_peer_bad() {
1298 let (mut fetcher, peer_id) = fetcher_with_peer();
1299 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1300
1301 let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
1302
1303 assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1304 let result = rx.await.unwrap();
1306 assert_eq!(result.unwrap_err(), RequestError::Timeout);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_session_closed_cancels_inflight_receipts() {
1311 let (mut fetcher, peer_id) = fetcher_with_peer();
1312 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1313
1314 fetcher.on_session_closed(&peer_id);
1315
1316 assert!(!fetcher.peers.contains_key(&peer_id));
1317 assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1318
1319 let result = rx.await.unwrap();
1320 assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
1321 }
1322
1323 #[tokio::test]
1324 async fn test_receipts_response_triggers_followup() {
1325 let (mut fetcher, peer_id) = fetcher_with_peer();
1326
1327 let (followup_tx, _followup_rx) = oneshot::channel();
1329 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1330 request: vec![B256::random()],
1331 response: followup_tx,
1332 priority: Priority::default(),
1333 range_hint: None,
1334 });
1335
1336 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1337
1338 let resp = ReceiptsResponse::new(vec![vec![]]);
1339 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1340
1341 assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
1342 }
1343
1344 #[tokio::test]
1345 async fn test_prepare_block_request_creates_inflight_receipts() {
1346 let (mut fetcher, peer_id) = fetcher_with_peer();
1347 let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
1348
1349 let (tx, _rx) = oneshot::channel();
1350 let req = DownloadRequest::GetReceipts {
1351 request: hashes.clone(),
1352 response: tx,
1353 priority: Priority::default(),
1354 };
1355
1356 let block_request = fetcher.prepare_block_request(peer_id, req);
1357
1358 match block_request {
1360 BlockRequest::GetReceipts(ref get) => {
1361 assert_eq!(get.0, hashes);
1362 }
1363 other => panic!("expected GetReceipts, got {other:?}"),
1364 }
1365
1366 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1368
1369 assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1371 }
1372}