1mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11 BlockAccessLists, Capabilities, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
12 GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
13};
14use reth_network_api::test_utils::PeersHandle;
15use reth_network_p2p::{
16 block_access_lists::client::BalRequirement,
17 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
18 headers::client::HeadersRequest,
19 priority::Priority,
20 receipts::client::ReceiptsResponse,
21};
22use reth_network_peers::PeerId;
23use reth_network_types::ReputationChangeKind;
24use std::{
25 collections::{HashMap, VecDeque},
26 ops::RangeInclusive,
27 sync::{
28 atomic::{AtomicU64, AtomicUsize, Ordering},
29 Arc,
30 },
31 task::{Context, Poll},
32};
33use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
34use tokio_stream::wrappers::UnboundedReceiverStream;
35
36type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
37type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
38type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
39type InflightBlockAccessListsRequest = Request<(), PeerRequestResult<BlockAccessLists>>;
40
41#[derive(Debug)]
48pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
49 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
51 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
53 inflight_bals_requests: HashMap<PeerId, InflightBlockAccessListsRequest>,
55 inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
57 peers: HashMap<PeerId, Peer>,
59 peers_handle: PeersHandle,
61 num_active_peers: Arc<AtomicUsize>,
63 queued_requests: VecDeque<DownloadRequest<N>>,
65 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
67 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
69}
70
71impl<N: NetworkPrimitives> StateFetcher<N> {
74 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
75 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
76 Self {
77 inflight_headers_requests: Default::default(),
78 inflight_bodies_requests: Default::default(),
79 inflight_bals_requests: Default::default(),
80 inflight_receipts_requests: Default::default(),
81 peers: Default::default(),
82 peers_handle,
83 num_active_peers,
84 queued_requests: Default::default(),
85 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
86 download_requests_tx,
87 }
88 }
89
90 pub(crate) fn new_active_peer(
92 &mut self,
93 peer_id: PeerId,
94 best_hash: B256,
95 best_number: u64,
96 capabilities: Arc<Capabilities>,
97 timeout: Arc<AtomicU64>,
98 range_info: Option<BlockRangeInfo>,
99 ) {
100 self.peers.insert(
101 peer_id,
102 Peer {
103 state: PeerState::Idle,
104 best_hash,
105 best_number,
106 capabilities,
107 timeout,
108 last_response_likely_bad: false,
109 range_info,
110 },
111 );
112 }
113
114 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
121 self.peers.remove(peer);
122 if let Some(req) = self.inflight_headers_requests.remove(peer) {
123 let _ = req.response.send(Err(RequestError::ConnectionDropped));
124 }
125 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
126 let _ = req.response.send(Err(RequestError::ConnectionDropped));
127 }
128 if let Some(req) = self.inflight_bals_requests.remove(peer) {
129 let _ = req.response.send(Err(RequestError::ConnectionDropped));
130 }
131 if let Some(req) = self.inflight_receipts_requests.remove(peer) {
132 let _ = req.response.send(Err(RequestError::ConnectionDropped));
133 }
134 }
135
136 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
140 if let Some(peer) = self.peers.get_mut(peer_id) &&
141 number > peer.best_number
142 {
143 peer.best_hash = hash;
144 peer.best_number = number;
145 return true
146 }
147 false
148 }
149
150 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
152 if let Some(peer) = self.peers.get_mut(peer_id) {
153 peer.state = PeerState::Closing;
154 }
155 }
156
157 fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
162 let mut idle = self
164 .peers
165 .iter()
166 .filter(|(_, peer)| peer.state.is_idle() && peer.satisfies(&requirement));
167
168 let mut best_peer = idle.next()?;
169
170 for maybe_better in idle {
171 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
173 best_peer = maybe_better;
174 continue
175 }
176
177 if maybe_better.1.is_better(best_peer.1, &requirement) {
179 best_peer = maybe_better;
180 continue
181 }
182
183 if maybe_better.1.timeout() < best_peer.1.timeout() &&
185 !maybe_better.1.last_response_likely_bad
186 {
187 best_peer = maybe_better;
188 }
189 }
190
191 Some(*best_peer.0)
192 }
193
194 fn has_eth71_peer(&self) -> bool {
196 self.peers.values().any(|peer| {
197 !matches!(peer.state, PeerState::Closing) &&
198 peer.capabilities.supports_eth_at_least(&EthVersion::Eth71)
199 })
200 }
201
202 fn poll_action(&mut self) -> PollAction {
204 if self.queued_requests.is_empty() {
206 return PollAction::NoRequests
207 }
208
209 if self.peers.is_empty() {
210 return PollAction::NoPeersAvailable
211 }
212
213 let request = self.queued_requests.pop_front().expect("not empty");
214 let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
215 if request.is_optional_bal() && !self.has_eth71_peer() {
218 request.send_err_response(RequestError::UnsupportedCapability);
219 } else {
220 self.queued_requests.push_back(request);
223 }
224 return PollAction::NoPeersAvailable
225 };
226
227 let request = self.prepare_block_request(peer_id, request);
228
229 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
230 }
231
232 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
234 loop {
236 let no_peers_available = match self.poll_action() {
237 PollAction::Ready(action) => return Poll::Ready(action),
238 PollAction::NoRequests => false,
239 PollAction::NoPeersAvailable => true,
240 };
241
242 loop {
243 match self.download_requests_rx.poll_next_unpin(cx) {
245 Poll::Ready(Some(request)) => {
246 if request.is_optional_bal() && !self.has_eth71_peer() {
249 request.send_err_response(RequestError::UnsupportedCapability);
250 continue
251 }
252
253 match request.get_priority() {
254 Priority::High => {
255 let pos = self
258 .queued_requests
259 .iter()
260 .position(|req| req.is_normal_priority())
261 .unwrap_or(0);
262 self.queued_requests.insert(pos, request);
263 }
264 Priority::Normal => {
265 self.queued_requests.push_back(request);
266 }
267 }
268 }
269 Poll::Ready(None) => {
270 unreachable!("channel can't close")
271 }
272 Poll::Pending => break,
273 }
274 }
275
276 if self.queued_requests.is_empty() || no_peers_available {
277 return Poll::Pending
278 }
279 }
280 }
281
282 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
286 if let Some(peer) = self.peers.get_mut(&peer_id) {
288 peer.state = req.peer_state();
289 }
290
291 self.prepare_inflight_block_request(peer_id, req)
292 }
293
294 fn prepare_inflight_block_request(
296 &mut self,
297 peer_id: PeerId,
298 req: DownloadRequest<N>,
299 ) -> BlockRequest {
300 match req {
301 DownloadRequest::GetBlockHeaders { request, response, .. } => {
302 let inflight = Request { request: request.clone(), response };
303 self.inflight_headers_requests.insert(peer_id, inflight);
304 let HeadersRequest { start, limit, direction } = request;
305 BlockRequest::GetBlockHeaders(GetBlockHeaders {
306 start_block: start,
307 limit,
308 skip: 0,
309 direction,
310 })
311 }
312 DownloadRequest::GetBlockBodies { request, response, .. } => {
313 let inflight = Request { request: (), response };
314 self.inflight_bodies_requests.insert(peer_id, inflight);
315 BlockRequest::GetBlockBodies(GetBlockBodies(request))
316 }
317 DownloadRequest::GetBlockAccessLists { request, response, .. } => {
318 let inflight = Request { request: (), response };
319 self.inflight_bals_requests.insert(peer_id, inflight);
320 BlockRequest::GetBlockAccessLists(GetBlockAccessLists(request))
321 }
322 DownloadRequest::GetReceipts { request, response, .. } => {
323 let inflight = Request { request: (), response };
324 self.inflight_receipts_requests.insert(peer_id, inflight);
325 BlockRequest::GetReceipts(GetReceipts(request))
326 }
327 }
328 }
329
330 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
338 let peer = self.peers.get_mut(&peer_id)?;
339 let req_idx = self.queued_requests.iter().position(|req| {
340 peer.satisfies(&req.best_peer_requirements())
342 })?;
343 let req = self.queued_requests.remove(req_idx).expect("valid request index");
344
345 peer.state = req.peer_state();
346 let req = self.prepare_inflight_block_request(peer_id, req);
347 Some(BlockResponseOutcome::Request(peer_id, req))
348 }
349
350 pub(crate) fn on_block_headers_response(
356 &mut self,
357 peer_id: PeerId,
358 res: RequestResult<Vec<N::BlockHeader>>,
359 ) -> Option<BlockResponseOutcome> {
360 let is_error = res.is_err();
361 let maybe_reputation_change = res.reputation_change_err();
362
363 let resp = self.inflight_headers_requests.remove(&peer_id);
364
365 let is_likely_bad_response =
366 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
367
368 if let Some(resp) = resp {
369 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
371 }
372
373 if let Some(peer) = self.peers.get_mut(&peer_id) {
374 peer.last_response_likely_bad = is_likely_bad_response;
376
377 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
380 return self.followup_request(peer_id)
381 }
382 }
383
384 maybe_reputation_change
387 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
388 }
389
390 pub(crate) fn on_block_bodies_response(
392 &mut self,
393 peer_id: PeerId,
394 res: RequestResult<Vec<N::BlockBody>>,
395 ) -> Option<BlockResponseOutcome> {
396 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
397
398 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
399 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
400 }
401 if let Some(peer) = self.peers.get_mut(&peer_id) {
402 peer.last_response_likely_bad = is_likely_bad_response;
404
405 if peer.state.on_request_finished() && !is_likely_bad_response {
406 return self.followup_request(peer_id)
407 }
408 }
409 None
410 }
411
412 pub(crate) fn on_block_access_lists_response(
414 &mut self,
415 peer_id: PeerId,
416 res: RequestResult<BlockAccessLists>,
417 ) -> Option<BlockResponseOutcome> {
418 let is_likely_bad_response = res.is_err();
419
420 if let Some(resp) = self.inflight_bals_requests.remove(&peer_id) {
421 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
422 }
423 if let Some(peer) = self.peers.get_mut(&peer_id) {
424 peer.last_response_likely_bad = is_likely_bad_response;
425
426 if peer.state.on_request_finished() && !is_likely_bad_response {
427 return self.followup_request(peer_id)
428 }
429 }
430 None
431 }
432
433 pub(crate) fn on_receipts_response(
438 &mut self,
439 peer_id: PeerId,
440 res: RequestResult<ReceiptsResponse<N::Receipt>>,
441 ) -> Option<BlockResponseOutcome> {
442 let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
443
444 if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
445 let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
446 }
447 if let Some(peer) = self.peers.get_mut(&peer_id) {
448 peer.last_response_likely_bad = is_likely_bad_response;
449
450 if peer.state.on_request_finished() && !is_likely_bad_response {
451 return self.followup_request(peer_id)
452 }
453 }
454 None
455 }
456
457 pub(crate) fn client(&self) -> FetchClient<N> {
459 FetchClient {
460 request_tx: self.download_requests_tx.clone(),
461 peers_handle: self.peers_handle.clone(),
462 num_active_peers: Arc::clone(&self.num_active_peers),
463 }
464 }
465}
466
467enum PollAction {
469 Ready(FetchAction),
470 NoRequests,
471 NoPeersAvailable,
472}
473
474#[derive(Debug)]
476struct Peer {
477 state: PeerState,
479 best_hash: B256,
481 best_number: u64,
483 #[allow(dead_code)]
485 capabilities: Arc<Capabilities>,
486 timeout: Arc<AtomicU64>,
488 last_response_likely_bad: bool,
495 range_info: Option<BlockRangeInfo>,
497}
498
499impl Peer {
500 fn timeout(&self) -> u64 {
501 self.timeout.load(Ordering::Relaxed)
502 }
503
504 fn earliest(&self) -> u64 {
506 self.range_info.as_ref().map_or(0, |info| info.earliest())
507 }
508
509 fn has_full_history(&self) -> bool {
511 self.earliest() == 0
512 }
513
514 fn range(&self) -> Option<RangeInclusive<u64>> {
515 self.range_info.as_ref().map(|info| info.range())
516 }
517
518 fn satisfies(&self, requirement: &BestPeerRequirements) -> bool {
520 match requirement {
521 BestPeerRequirements::EthVersion(ver) => self.capabilities.supports_eth_at_least(ver),
522 BestPeerRequirements::None |
523 BestPeerRequirements::FullBlock |
524 BestPeerRequirements::FullBlockRange(_) => true,
525 }
526 }
527
528 fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
537 let self_range = self.range();
538 let other_range = other.range();
539
540 match (self_range, other_range) {
541 (Some(self_r), Some(other_r)) => {
542 let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
544 let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
545
546 #[expect(clippy::match_same_arms)]
547 match (self_covers, other_covers) {
548 (true, false) => true, (false, true) => false, (true, true) => false, (false, false) => {
552 self_r.start() < other_r.start()
554 }
555 }
556 }
557 (Some(self_r), None) => {
558 self_r.contains(range.start()) && self_r.contains(range.end())
561 }
562 (None, Some(other_r)) => {
563 !(other_r.contains(range.start()) && other_r.contains(range.end()))
566 }
567 (None, None) => false, }
569 }
570
571 fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
573 match requirement {
574 BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
575 BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
576 BestPeerRequirements::None | BestPeerRequirements::EthVersion(_) => false,
579 }
580 }
581}
582
583#[derive(Debug)]
585enum PeerState {
586 Idle,
588 GetBlockHeaders,
590 GetBlockBodies,
592 GetBlockAccessLists,
594 GetReceipts,
596 Closing,
598}
599
600impl PeerState {
603 const fn is_idle(&self) -> bool {
605 matches!(self, Self::Idle)
606 }
607
608 const fn on_request_finished(&mut self) -> bool {
614 if !matches!(self, Self::Closing) {
615 *self = Self::Idle;
616 return true
617 }
618 false
619 }
620}
621
622#[derive(Debug)]
625struct Request<Req, Resp> {
626 request: Req,
629 response: oneshot::Sender<Resp>,
630}
631
632#[derive(Debug)]
634#[expect(clippy::enum_variant_names)]
635pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
636 GetBlockHeaders {
638 request: HeadersRequest,
639 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
640 priority: Priority,
641 },
642 GetBlockBodies {
644 request: Vec<B256>,
645 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
646 priority: Priority,
647 range_hint: Option<RangeInclusive<u64>>,
648 },
649 GetBlockAccessLists {
651 request: Vec<B256>,
652 response: oneshot::Sender<PeerRequestResult<BlockAccessLists>>,
653 priority: Priority,
654 requirement: BalRequirement,
655 },
656 GetReceipts {
658 request: Vec<B256>,
659 response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
660 priority: Priority,
661 },
662}
663
664impl<N: NetworkPrimitives> DownloadRequest<N> {
667 const fn peer_state(&self) -> PeerState {
669 match self {
670 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
671 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
672 Self::GetBlockAccessLists { .. } => PeerState::GetBlockAccessLists,
673 Self::GetReceipts { .. } => PeerState::GetReceipts,
674 }
675 }
676
677 const fn get_priority(&self) -> &Priority {
679 match self {
680 Self::GetBlockHeaders { priority, .. } |
681 Self::GetBlockBodies { priority, .. } |
682 Self::GetBlockAccessLists { priority, .. } |
683 Self::GetReceipts { priority, .. } => priority,
684 }
685 }
686
687 const fn is_normal_priority(&self) -> bool {
689 self.get_priority().is_normal()
690 }
691
692 const fn is_optional_bal(&self) -> bool {
694 matches!(self, Self::GetBlockAccessLists { requirement: BalRequirement::Optional, .. })
695 }
696
697 fn send_err_response(self, err: RequestError) {
699 let _ = match self {
700 Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
701 Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
702 Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
703 Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
704 };
705 }
706
707 fn best_peer_requirements(&self) -> BestPeerRequirements {
709 match self {
710 Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
711 Self::GetBlockAccessLists { .. } => BestPeerRequirements::EthVersion(EthVersion::Eth71),
712 Self::GetBlockBodies { range_hint, .. } => {
713 if let Some(range) = range_hint {
714 BestPeerRequirements::FullBlockRange(range.clone())
715 } else {
716 BestPeerRequirements::FullBlock
717 }
718 }
719 Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
720 }
721 }
722}
723
724pub(crate) enum FetchAction {
726 BlockRequest {
728 peer_id: PeerId,
730 request: BlockRequest,
732 },
733}
734
735#[derive(Debug, PartialEq, Eq)]
739pub(crate) enum BlockResponseOutcome {
740 Request(PeerId, BlockRequest),
742 BadResponse(PeerId, ReputationChangeKind),
744}
745
746enum BestPeerRequirements {
748 None,
750 FullBlockRange(RangeInclusive<u64>),
752 FullBlock,
754 EthVersion(EthVersion),
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761 use crate::{peers::PeersManager, PeersConfig};
762 use alloy_consensus::Header;
763 use alloy_primitives::B512;
764 use reth_eth_wire::Capability;
765 use std::future::poll_fn;
766
767 #[tokio::test(flavor = "multi_thread")]
768 async fn test_poll_fetcher() {
769 let manager = PeersManager::new(PeersConfig::default());
770 let mut fetcher =
771 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
772
773 poll_fn(move |cx| {
774 assert!(fetcher.poll(cx).is_pending());
775 let (tx, _rx) = oneshot::channel();
776 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
777 request: vec![],
778 response: tx,
779 priority: Priority::default(),
780 range_hint: None,
781 });
782 assert!(fetcher.poll(cx).is_pending());
783
784 Poll::Ready(())
785 })
786 .await;
787 }
788
789 #[tokio::test]
790 async fn test_peer_rotation() {
791 let manager = PeersManager::new(PeersConfig::default());
792 let mut fetcher =
793 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
794 let peer1 = B512::random();
796 let peer2 = B512::random();
797 let capabilities = Arc::new(Capabilities::from(vec![]));
798 fetcher.new_active_peer(
799 peer1,
800 B256::random(),
801 1,
802 Arc::clone(&capabilities),
803 Arc::new(AtomicU64::new(1)),
804 None,
805 );
806 fetcher.new_active_peer(
807 peer2,
808 B256::random(),
809 2,
810 Arc::clone(&capabilities),
811 Arc::new(AtomicU64::new(1)),
812 None,
813 );
814
815 let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
816 assert!(first_peer == peer1 || first_peer == peer2);
817 fetcher.on_pending_disconnect(&first_peer);
819 let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
821 assert!(first_peer == peer1 || first_peer == peer2);
822 assert_ne!(first_peer, second_peer);
823 fetcher.on_pending_disconnect(&second_peer);
825 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
826 }
827
828 #[tokio::test]
829 async fn test_peer_prioritization() {
830 let manager = PeersManager::new(PeersConfig::default());
831 let mut fetcher =
832 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
833 let peer1 = B512::random();
835 let peer2 = B512::random();
836 let peer3 = B512::random();
837
838 let peer2_timeout = Arc::new(AtomicU64::new(300));
839
840 let capabilities = Arc::new(Capabilities::from(vec![]));
841 fetcher.new_active_peer(
842 peer1,
843 B256::random(),
844 1,
845 Arc::clone(&capabilities),
846 Arc::new(AtomicU64::new(30)),
847 None,
848 );
849 fetcher.new_active_peer(
850 peer2,
851 B256::random(),
852 2,
853 Arc::clone(&capabilities),
854 Arc::clone(&peer2_timeout),
855 None,
856 );
857 fetcher.new_active_peer(
858 peer3,
859 B256::random(),
860 3,
861 Arc::clone(&capabilities),
862 Arc::new(AtomicU64::new(50)),
863 None,
864 );
865
866 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
868 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
869 peer2_timeout.store(10, Ordering::Relaxed);
871 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
873 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
874 }
875
876 #[tokio::test]
877 async fn test_on_block_headers_response() {
878 let manager = PeersManager::new(PeersConfig::default());
879 let mut fetcher =
880 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
881 let peer_id = B512::random();
882
883 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
884
885 assert_eq!(
886 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
887 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
888 );
889 assert_eq!(
890 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
891 None
892 );
893 assert_eq!(
894 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
895 None
896 );
897 assert_eq!(
898 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
899 None
900 );
901 assert_eq!(
902 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
903 None
904 );
905 }
906
907 #[tokio::test]
908 async fn test_header_response_outcome() {
909 let manager = PeersManager::new(PeersConfig::default());
910 let mut fetcher =
911 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
912 let peer_id = B512::random();
913
914 let request_pair = || {
915 let (tx, _rx) = oneshot::channel();
916 let req = Request {
917 request: HeadersRequest {
918 start: 0u64.into(),
919 limit: 1,
920 direction: Default::default(),
921 },
922 response: tx,
923 };
924 let header = Header { number: 0, ..Default::default() };
925 (req, header)
926 };
927
928 fetcher.new_active_peer(
929 peer_id,
930 Default::default(),
931 Default::default(),
932 Arc::new(Capabilities::from(vec![])),
933 Default::default(),
934 None,
935 );
936
937 let (req, header) = request_pair();
938 fetcher.inflight_headers_requests.insert(peer_id, req);
939
940 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
941 assert!(outcome.is_none());
942 assert!(fetcher.peers[&peer_id].state.is_idle());
943
944 let outcome =
945 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
946
947 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
948 RequestError::Timeout
949 ))
950 .is_some());
951
952 match outcome {
953 BlockResponseOutcome::BadResponse(peer, _) => {
954 assert_eq!(peer, peer_id)
955 }
956 BlockResponseOutcome::Request(_, _) => {
957 unreachable!()
958 }
959 };
960
961 assert!(fetcher.peers[&peer_id].state.is_idle());
962 }
963
964 #[test]
965 fn test_peer_is_better_none_requirement() {
966 let peer1 = Peer {
967 state: PeerState::Idle,
968 best_hash: B256::random(),
969 best_number: 100,
970 capabilities: Arc::new(Capabilities::new(vec![])),
971 timeout: Arc::new(AtomicU64::new(10)),
972 last_response_likely_bad: false,
973 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
974 };
975
976 let peer2 = Peer {
977 state: PeerState::Idle,
978 best_hash: B256::random(),
979 best_number: 50,
980 capabilities: Arc::new(Capabilities::new(vec![])),
981 timeout: Arc::new(AtomicU64::new(20)),
982 last_response_likely_bad: false,
983 range_info: None,
984 };
985
986 assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
988 assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
989 }
990
991 #[test]
992 fn test_peer_is_better_full_block_requirement() {
993 let peer_full = Peer {
995 state: PeerState::Idle,
996 best_hash: B256::random(),
997 best_number: 100,
998 capabilities: Arc::new(Capabilities::new(vec![])),
999 timeout: Arc::new(AtomicU64::new(10)),
1000 last_response_likely_bad: false,
1001 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
1002 };
1003
1004 let peer_partial = Peer {
1006 state: PeerState::Idle,
1007 best_hash: B256::random(),
1008 best_number: 100,
1009 capabilities: Arc::new(Capabilities::new(vec![])),
1010 timeout: Arc::new(AtomicU64::new(10)),
1011 last_response_likely_bad: false,
1012 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1013 };
1014
1015 let peer_no_range = Peer {
1017 state: PeerState::Idle,
1018 best_hash: B256::random(),
1019 best_number: 100,
1020 capabilities: Arc::new(Capabilities::new(vec![])),
1021 timeout: Arc::new(AtomicU64::new(10)),
1022 last_response_likely_bad: false,
1023 range_info: None,
1024 };
1025
1026 assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
1028 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
1029
1030 assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
1032 assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
1033
1034 assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
1036 assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
1037 }
1038
1039 #[test]
1040 fn test_peer_is_better_full_block_range_requirement() {
1041 let range = RangeInclusive::new(40, 60);
1042
1043 let peer_covers = Peer {
1045 state: PeerState::Idle,
1046 best_hash: B256::random(),
1047 best_number: 100,
1048 capabilities: Arc::new(Capabilities::new(vec![])),
1049 timeout: Arc::new(AtomicU64::new(10)),
1050 last_response_likely_bad: false,
1051 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
1052 };
1053
1054 let peer_no_cover = Peer {
1056 state: PeerState::Idle,
1057 best_hash: B256::random(),
1058 best_number: 100,
1059 capabilities: Arc::new(Capabilities::new(vec![])),
1060 timeout: Arc::new(AtomicU64::new(10)),
1061 last_response_likely_bad: false,
1062 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1063 };
1064
1065 assert!(peer_covers
1067 .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
1068 assert!(
1069 !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
1070 );
1071 }
1072
1073 #[test]
1074 fn test_peer_is_better_both_cover_range() {
1075 let range = RangeInclusive::new(30, 50);
1076
1077 let peer_full = Peer {
1079 state: PeerState::Idle,
1080 best_hash: B256::random(),
1081 best_number: 100,
1082 capabilities: Arc::new(Capabilities::new(vec![])),
1083 timeout: Arc::new(AtomicU64::new(10)),
1084 last_response_likely_bad: false,
1085 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1086 };
1087
1088 let peer_partial = Peer {
1090 state: PeerState::Idle,
1091 best_hash: B256::random(),
1092 best_number: 100,
1093 capabilities: Arc::new(Capabilities::new(vec![])),
1094 timeout: Arc::new(AtomicU64::new(10)),
1095 last_response_likely_bad: false,
1096 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1097 };
1098
1099 assert!(!peer_full
1101 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1102 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1103 }
1104
1105 #[test]
1106 fn test_peer_is_better_lower_start() {
1107 let range = RangeInclusive::new(30, 60);
1108
1109 let peer_full = Peer {
1111 state: PeerState::Idle,
1112 best_hash: B256::random(),
1113 best_number: 100,
1114 capabilities: Arc::new(Capabilities::new(vec![])),
1115 timeout: Arc::new(AtomicU64::new(10)),
1116 last_response_likely_bad: false,
1117 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1118 };
1119
1120 let peer_partial = Peer {
1122 state: PeerState::Idle,
1123 best_hash: B256::random(),
1124 best_number: 100,
1125 capabilities: Arc::new(Capabilities::new(vec![])),
1126 timeout: Arc::new(AtomicU64::new(10)),
1127 last_response_likely_bad: false,
1128 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1129 };
1130
1131 assert!(peer_full
1133 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1134 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1135 }
1136
1137 #[test]
1138 fn test_peer_is_better_neither_covers_range() {
1139 let range = RangeInclusive::new(40, 60);
1140
1141 let peer_full = Peer {
1143 state: PeerState::Idle,
1144 best_hash: B256::random(),
1145 best_number: 30,
1146 capabilities: Arc::new(Capabilities::new(vec![])),
1147 timeout: Arc::new(AtomicU64::new(10)),
1148 last_response_likely_bad: false,
1149 range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
1150 };
1151
1152 let peer_partial = Peer {
1154 state: PeerState::Idle,
1155 best_hash: B256::random(),
1156 best_number: 30,
1157 capabilities: Arc::new(Capabilities::new(vec![])),
1158 timeout: Arc::new(AtomicU64::new(10)),
1159 last_response_likely_bad: false,
1160 range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
1161 };
1162
1163 assert!(peer_full
1165 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1166 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1167 }
1168
1169 #[test]
1170 fn test_peer_is_better_no_range_info() {
1171 let range = RangeInclusive::new(40, 60);
1172
1173 let peer_with_range = Peer {
1175 state: PeerState::Idle,
1176 best_hash: B256::random(),
1177 best_number: 100,
1178 capabilities: Arc::new(Capabilities::new(vec![])),
1179 timeout: Arc::new(AtomicU64::new(10)),
1180 last_response_likely_bad: false,
1181 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1182 };
1183
1184 let peer_no_range = Peer {
1186 state: PeerState::Idle,
1187 best_hash: B256::random(),
1188 best_number: 100,
1189 capabilities: Arc::new(Capabilities::new(vec![])),
1190 timeout: Arc::new(AtomicU64::new(10)),
1191 last_response_likely_bad: false,
1192 range_info: None,
1193 };
1194
1195 assert!(!peer_no_range
1197 .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1198
1199 assert!(
1201 peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1202 );
1203 }
1204
1205 #[test]
1206 fn test_peer_is_better_one_peer_no_range_covers() {
1207 let range = RangeInclusive::new(40, 60);
1208
1209 let peer_with_range_covers = Peer {
1211 state: PeerState::Idle,
1212 best_hash: B256::random(),
1213 best_number: 100,
1214 capabilities: Arc::new(Capabilities::new(vec![])),
1215 timeout: Arc::new(AtomicU64::new(10)),
1216 last_response_likely_bad: false,
1217 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1218 };
1219
1220 let peer_no_range = Peer {
1222 state: PeerState::Idle,
1223 best_hash: B256::random(),
1224 best_number: 100,
1225 capabilities: Arc::new(Capabilities::new(vec![])),
1226 timeout: Arc::new(AtomicU64::new(10)),
1227 last_response_likely_bad: false,
1228 range_info: None,
1229 };
1230
1231 assert!(peer_with_range_covers
1233 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1234
1235 assert!(!peer_no_range
1237 .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1238 }
1239
1240 #[test]
1241 fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1242 let range = RangeInclusive::new(40, 60);
1243
1244 let peer_with_range_no_cover = Peer {
1246 state: PeerState::Idle,
1247 best_hash: B256::random(),
1248 best_number: 100,
1249 capabilities: Arc::new(Capabilities::new(vec![])),
1250 timeout: Arc::new(AtomicU64::new(10)),
1251 last_response_likely_bad: false,
1252 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1253 };
1254
1255 let peer_no_range = Peer {
1257 state: PeerState::Idle,
1258 best_hash: B256::random(),
1259 best_number: 100,
1260 capabilities: Arc::new(Capabilities::new(vec![])),
1261 timeout: Arc::new(AtomicU64::new(10)),
1262 last_response_likely_bad: false,
1263 range_info: None,
1264 };
1265
1266 assert!(!peer_with_range_no_cover
1268 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1269
1270 assert!(peer_no_range
1272 .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1273 }
1274
1275 #[test]
1276 fn test_peer_is_better_edge_cases() {
1277 let range = RangeInclusive::new(50, 100);
1279
1280 let peer_exact = Peer {
1282 state: PeerState::Idle,
1283 best_hash: B256::random(),
1284 best_number: 100,
1285 capabilities: Arc::new(Capabilities::new(vec![])),
1286 timeout: Arc::new(AtomicU64::new(10)),
1287 last_response_likely_bad: false,
1288 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1289 };
1290
1291 let peer_short_start = Peer {
1293 state: PeerState::Idle,
1294 best_hash: B256::random(),
1295 best_number: 100,
1296 capabilities: Arc::new(Capabilities::new(vec![])),
1297 timeout: Arc::new(AtomicU64::new(10)),
1298 last_response_likely_bad: false,
1299 range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1300 };
1301
1302 let peer_short_end = Peer {
1304 state: PeerState::Idle,
1305 best_hash: B256::random(),
1306 best_number: 100,
1307 capabilities: Arc::new(Capabilities::new(vec![])),
1308 timeout: Arc::new(AtomicU64::new(10)),
1309 last_response_likely_bad: false,
1310 range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1311 };
1312
1313 assert!(peer_exact
1315 .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1316 assert!(peer_exact
1317 .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1318
1319 assert!(!peer_short_start
1321 .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1322 assert!(
1323 !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1324 );
1325 }
1326
1327 fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
1329 let manager = PeersManager::new(PeersConfig::default());
1330 let mut fetcher =
1331 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1332 let peer_id = B512::random();
1333
1334 fetcher.new_active_peer(
1335 peer_id,
1336 Default::default(),
1337 Default::default(),
1338 Arc::new(Capabilities::from(vec![])),
1339 Default::default(),
1340 None,
1341 );
1342 (fetcher, peer_id)
1343 }
1344
1345 fn insert_inflight_receipts(
1348 fetcher: &mut StateFetcher<EthNetworkPrimitives>,
1349 peer_id: PeerId,
1350 ) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
1351 {
1352 let (tx, rx) = oneshot::channel();
1353 fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
1354 fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
1355 rx
1356 }
1357
1358 #[tokio::test]
1361 async fn test_poll_dispatches_receipts_to_peer() {
1362 let (mut fetcher, peer_id) = fetcher_with_peer();
1363
1364 poll_fn(move |cx| {
1365 let (tx, _rx) = oneshot::channel();
1366 fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
1367 request: vec![B256::ZERO],
1368 response: tx,
1369 priority: Priority::default(),
1370 });
1371
1372 let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
1373 fetcher.poll(cx)
1374 else {
1375 panic!("expected Ready(BlockRequest)");
1376 };
1377 assert_eq!(dispatched_peer, peer_id);
1378 assert!(matches!(request, BlockRequest::GetReceipts(_)));
1379
1380 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1382 assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1384
1385 Poll::Ready(())
1386 })
1387 .await;
1388 }
1389
1390 #[tokio::test]
1393 async fn test_receipts_complete_response_resolves_and_idles_peer() {
1394 let (mut fetcher, peer_id) = fetcher_with_peer();
1395
1396 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1397
1398 let resp = ReceiptsResponse::new(vec![vec![]]);
1399 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1400
1401 assert!(outcome.is_none());
1403 assert!(fetcher.peers[&peer_id].state.is_idle());
1405 assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1407
1408 let result = rx.await.unwrap().unwrap();
1410 assert_eq!(result.1.receipts.len(), 1);
1411 }
1412
1413 #[tokio::test]
1414 async fn test_receipts_empty_response_marks_peer_bad() {
1415 let (mut fetcher, peer_id) = fetcher_with_peer();
1416 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1417
1418 let resp = ReceiptsResponse::new(vec![]);
1419 let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
1420
1421 assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1422 }
1423
1424 #[tokio::test]
1425 async fn test_receipts_error_forwards_and_marks_peer_bad() {
1426 let (mut fetcher, peer_id) = fetcher_with_peer();
1427 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1428
1429 let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
1430
1431 assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1432 let result = rx.await.unwrap();
1434 assert_eq!(result.unwrap_err(), RequestError::Timeout);
1435 }
1436
1437 #[tokio::test]
1438 async fn test_session_closed_cancels_inflight_receipts() {
1439 let (mut fetcher, peer_id) = fetcher_with_peer();
1440 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1441
1442 fetcher.on_session_closed(&peer_id);
1443
1444 assert!(!fetcher.peers.contains_key(&peer_id));
1445 assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1446
1447 let result = rx.await.unwrap();
1448 assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
1449 }
1450
1451 #[tokio::test]
1452 async fn test_receipts_response_triggers_followup() {
1453 let (mut fetcher, peer_id) = fetcher_with_peer();
1454
1455 let (followup_tx, _followup_rx) = oneshot::channel();
1457 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1458 request: vec![B256::random()],
1459 response: followup_tx,
1460 priority: Priority::default(),
1461 range_hint: None,
1462 });
1463
1464 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1465
1466 let resp = ReceiptsResponse::new(vec![vec![]]);
1467 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1468
1469 assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
1470 }
1471
1472 #[tokio::test]
1473 async fn test_followup_skips_request_peer_cannot_serve() {
1474 let (mut fetcher, peer_id) = fetcher_with_peer();
1475
1476 let peer_71 = B512::random();
1477 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1478 fetcher.new_active_peer(
1479 peer_71,
1480 B256::random(),
1481 100,
1482 caps_71,
1483 Arc::new(AtomicU64::new(10)),
1484 None,
1485 );
1486 fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1487
1488 let (followup_tx, _followup_rx) = oneshot::channel();
1489 fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1490 request: vec![B256::random()],
1491 response: followup_tx,
1492 priority: Priority::Normal,
1493 requirement: BalRequirement::Optional,
1494 });
1495
1496 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1497
1498 let resp = ReceiptsResponse::new(vec![vec![]]);
1499 assert!(fetcher.on_receipts_response(peer_id, Ok(resp)).is_none());
1500 assert!(fetcher.peers[&peer_id].state.is_idle());
1501 assert!(!fetcher.inflight_bals_requests.contains_key(&peer_id));
1502 assert!(matches!(
1503 fetcher.queued_requests.front(),
1504 Some(DownloadRequest::GetBlockAccessLists {
1505 requirement: BalRequirement::Optional,
1506 ..
1507 })
1508 ));
1509 }
1510
1511 #[tokio::test]
1512 async fn test_followup_uses_first_satisfiable_request() {
1513 let (mut fetcher, peer_id) = fetcher_with_peer();
1514
1515 let peer_71 = B512::random();
1516 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1517 fetcher.new_active_peer(
1518 peer_71,
1519 B256::random(),
1520 100,
1521 caps_71,
1522 Arc::new(AtomicU64::new(10)),
1523 None,
1524 );
1525 fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1526
1527 let (bal_tx, _bal_rx) = oneshot::channel();
1528 fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1529 request: vec![B256::random()],
1530 response: bal_tx,
1531 priority: Priority::Normal,
1532 requirement: BalRequirement::Optional,
1533 });
1534
1535 let (bodies_tx, _bodies_rx) = oneshot::channel();
1536 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1537 request: vec![B256::random()],
1538 response: bodies_tx,
1539 priority: Priority::Normal,
1540 range_hint: None,
1541 });
1542
1543 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1544
1545 let resp = ReceiptsResponse::new(vec![vec![]]);
1546 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1547
1548 assert!(matches!(
1549 outcome,
1550 Some(BlockResponseOutcome::Request(pid, BlockRequest::GetBlockBodies(_))) if pid == peer_id
1551 ));
1552 assert!(fetcher.inflight_bodies_requests.contains_key(&peer_id));
1553 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetBlockBodies));
1554 assert_eq!(fetcher.queued_requests.len(), 1);
1555 assert!(matches!(
1556 fetcher.queued_requests.front(),
1557 Some(DownloadRequest::GetBlockAccessLists {
1558 requirement: BalRequirement::Optional,
1559 ..
1560 })
1561 ));
1562 }
1563
1564 #[tokio::test]
1565 async fn test_prepare_block_request_creates_inflight_receipts() {
1566 let (mut fetcher, peer_id) = fetcher_with_peer();
1567 let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
1568
1569 let (tx, _rx) = oneshot::channel();
1570 let req = DownloadRequest::GetReceipts {
1571 request: hashes.clone(),
1572 response: tx,
1573 priority: Priority::default(),
1574 };
1575
1576 let block_request = fetcher.prepare_block_request(peer_id, req);
1577
1578 match block_request {
1580 BlockRequest::GetReceipts(ref get) => {
1581 assert_eq!(get.0, hashes);
1582 }
1583 other => panic!("expected GetReceipts, got {other:?}"),
1584 }
1585
1586 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1588
1589 assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1591 }
1592 #[tokio::test]
1593 async fn test_next_best_peer_eth71_no_support() {
1594 let manager = PeersManager::new(PeersConfig::default());
1595 let mut fetcher =
1596 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1597
1598 let peer = B512::random();
1599
1600 let capabilities = Arc::new(Capabilities::new(vec![]));
1602
1603 fetcher.new_active_peer(
1604 peer,
1605 B256::random(),
1606 100,
1607 capabilities,
1608 Arc::new(AtomicU64::new(10)),
1609 None,
1610 );
1611
1612 assert_eq!(
1614 fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1615 None
1616 );
1617 }
1618
1619 #[tokio::test]
1620 async fn test_next_best_peer_eth71_supported() {
1621 let manager = PeersManager::new(PeersConfig::default());
1622 let mut fetcher =
1623 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1624
1625 let peer = B512::random();
1626
1627 let capabilities = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1629
1630 fetcher.new_active_peer(
1631 peer,
1632 B256::random(),
1633 100,
1634 capabilities,
1635 Arc::new(AtomicU64::new(10)),
1636 None,
1637 );
1638
1639 assert_eq!(
1640 fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1641 Some(peer)
1642 );
1643 }
1644
1645 #[tokio::test]
1646 async fn test_next_best_peer_eth71_filters_correctly() {
1647 let manager = PeersManager::new(PeersConfig::default());
1648 let mut fetcher =
1649 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1650
1651 let peer_no_71 = B512::random();
1652 let peer_with_71 = B512::random();
1653
1654 let caps_old = Arc::new(Capabilities::new(vec![]));
1656
1657 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1659
1660 fetcher.new_active_peer(
1661 peer_no_71,
1662 B256::random(),
1663 100,
1664 caps_old,
1665 Arc::new(AtomicU64::new(5)),
1666 None,
1667 );
1668
1669 fetcher.new_active_peer(
1670 peer_with_71,
1671 B256::random(),
1672 100,
1673 caps_71,
1674 Arc::new(AtomicU64::new(50)),
1675 None,
1676 );
1677
1678 assert_eq!(
1681 fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1682 Some(peer_with_71)
1683 );
1684 }
1685
1686 #[tokio::test]
1687 async fn test_wakes_when_eth71_peer_connects() {
1688 use futures::task::noop_waker;
1689 use std::task::{Context, Poll};
1690
1691 let manager = PeersManager::new(PeersConfig::default());
1692 let mut fetcher =
1693 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1694
1695 let (tx, _rx) = oneshot::channel();
1697 fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1698 request: vec![],
1699 response: tx,
1700 priority: Priority::Normal,
1701 requirement: BalRequirement::Mandatory,
1702 });
1703
1704 let waker = noop_waker();
1705 let mut cx = Context::from_waker(&waker);
1706
1707 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1709
1710 let peer_old = B512::random();
1712 let caps_old = Arc::new(Capabilities::new(vec![]));
1713
1714 fetcher.new_active_peer(
1715 peer_old,
1716 B256::random(),
1717 100,
1718 caps_old,
1719 Arc::new(AtomicU64::new(10)),
1720 None,
1721 );
1722
1723 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1725
1726 let peer_71 = B512::random();
1728 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1729
1730 fetcher.new_active_peer(
1731 peer_71,
1732 B256::random(),
1733 100,
1734 caps_71,
1735 Arc::new(AtomicU64::new(10)),
1736 None,
1737 );
1738
1739 if let Poll::Ready(FetchAction::BlockRequest { peer_id, .. }) = fetcher.poll(&mut cx) {
1741 assert_eq!(peer_id, peer_71);
1742 }
1743 }
1744
1745 #[tokio::test]
1746 async fn test_optional_bal_request_rejected_without_eth71_peer() {
1747 use futures::task::noop_waker;
1748 use std::task::{Context, Poll};
1749
1750 let manager = PeersManager::new(PeersConfig::default());
1751 let mut fetcher =
1752 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1753
1754 let peer_old = B512::random();
1755 let caps_old = Arc::new(Capabilities::new(vec![]));
1756 fetcher.new_active_peer(
1757 peer_old,
1758 B256::random(),
1759 100,
1760 caps_old,
1761 Arc::new(AtomicU64::new(10)),
1762 None,
1763 );
1764
1765 let (tx, rx) = oneshot::channel();
1766 fetcher
1767 .download_requests_tx
1768 .send(DownloadRequest::GetBlockAccessLists {
1769 request: vec![],
1770 response: tx,
1771 priority: Priority::Normal,
1772 requirement: BalRequirement::Optional,
1773 })
1774 .unwrap();
1775
1776 let waker = noop_waker();
1777 let mut cx = Context::from_waker(&waker);
1778
1779 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1780 assert!(fetcher.queued_requests.is_empty());
1781 assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability);
1782 }
1783
1784 #[tokio::test]
1785 async fn test_optional_bal_request_waits_for_busy_eth71_peer() {
1786 use futures::task::noop_waker;
1787 use std::task::{Context, Poll};
1788
1789 let manager = PeersManager::new(PeersConfig::default());
1790 let mut fetcher =
1791 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1792
1793 let peer_71 = B512::random();
1794 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1795 fetcher.new_active_peer(
1796 peer_71,
1797 B256::random(),
1798 100,
1799 caps_71,
1800 Arc::new(AtomicU64::new(10)),
1801 None,
1802 );
1803 fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1804
1805 let (tx, _rx) = oneshot::channel();
1806 fetcher
1807 .download_requests_tx
1808 .send(DownloadRequest::GetBlockAccessLists {
1809 request: vec![],
1810 response: tx,
1811 priority: Priority::Normal,
1812 requirement: BalRequirement::Optional,
1813 })
1814 .unwrap();
1815
1816 let waker = noop_waker();
1817 let mut cx = Context::from_waker(&waker);
1818
1819 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1820 assert_eq!(fetcher.queued_requests.len(), 1);
1821 }
1822
1823 #[tokio::test]
1824 async fn test_queued_optional_bal_request_rejected_after_eth71_disconnect() {
1825 use futures::task::noop_waker;
1826 use std::task::{Context, Poll};
1827
1828 let manager = PeersManager::new(PeersConfig::default());
1829 let mut fetcher =
1830 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1831
1832 let peer_old = B512::random();
1833 let caps_old = Arc::new(Capabilities::new(vec![]));
1834 fetcher.new_active_peer(
1835 peer_old,
1836 B256::random(),
1837 100,
1838 caps_old,
1839 Arc::new(AtomicU64::new(10)),
1840 None,
1841 );
1842
1843 let peer_71 = B512::random();
1844 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1845 fetcher.new_active_peer(
1846 peer_71,
1847 B256::random(),
1848 100,
1849 caps_71,
1850 Arc::new(AtomicU64::new(10)),
1851 None,
1852 );
1853 fetcher.peers.get_mut(&peer_71).expect("peer exists").state = PeerState::GetBlockHeaders;
1854
1855 let (tx, rx) = oneshot::channel();
1856 fetcher
1857 .download_requests_tx
1858 .send(DownloadRequest::GetBlockAccessLists {
1859 request: vec![],
1860 response: tx,
1861 priority: Priority::Normal,
1862 requirement: BalRequirement::Optional,
1863 })
1864 .unwrap();
1865
1866 let waker = noop_waker();
1867 let mut cx = Context::from_waker(&waker);
1868
1869 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1870 assert_eq!(fetcher.queued_requests.len(), 1);
1871
1872 fetcher.on_session_closed(&peer_71);
1873
1874 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1875 assert!(fetcher.queued_requests.is_empty());
1876 assert_eq!(rx.await.unwrap().unwrap_err(), RequestError::UnsupportedCapability);
1877 }
1878}