1mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11 BlockAccessLists, Capabilities, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
12 GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
13};
14use reth_network_api::test_utils::PeersHandle;
15use reth_network_p2p::{
16 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
17 headers::client::HeadersRequest,
18 priority::Priority,
19 receipts::client::ReceiptsResponse,
20};
21use reth_network_peers::PeerId;
22use reth_network_types::ReputationChangeKind;
23use std::{
24 collections::{HashMap, VecDeque},
25 ops::RangeInclusive,
26 sync::{
27 atomic::{AtomicU64, AtomicUsize, Ordering},
28 Arc,
29 },
30 task::{Context, Poll},
31};
32use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
33use tokio_stream::wrappers::UnboundedReceiverStream;
34
35type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
36type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
37type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
38type InflightBlockAccessListsRequest = Request<(), PeerRequestResult<BlockAccessLists>>;
39
40#[derive(Debug)]
47pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
48 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
50 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
52 inflight_bals_requests: HashMap<PeerId, InflightBlockAccessListsRequest>,
54 inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
56 peers: HashMap<PeerId, Peer>,
58 peers_handle: PeersHandle,
60 num_active_peers: Arc<AtomicUsize>,
62 queued_requests: VecDeque<DownloadRequest<N>>,
64 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
66 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
68}
69
70impl<N: NetworkPrimitives> StateFetcher<N> {
73 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
74 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
75 Self {
76 inflight_headers_requests: Default::default(),
77 inflight_bodies_requests: Default::default(),
78 inflight_bals_requests: Default::default(),
79 inflight_receipts_requests: Default::default(),
80 peers: Default::default(),
81 peers_handle,
82 num_active_peers,
83 queued_requests: Default::default(),
84 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
85 download_requests_tx,
86 }
87 }
88
89 pub(crate) fn new_active_peer(
91 &mut self,
92 peer_id: PeerId,
93 best_hash: B256,
94 best_number: u64,
95 capabilities: Arc<Capabilities>,
96 timeout: Arc<AtomicU64>,
97 range_info: Option<BlockRangeInfo>,
98 ) {
99 self.peers.insert(
100 peer_id,
101 Peer {
102 state: PeerState::Idle,
103 best_hash,
104 best_number,
105 capabilities,
106 timeout,
107 last_response_likely_bad: false,
108 range_info,
109 },
110 );
111 }
112
113 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
120 self.peers.remove(peer);
121 if let Some(req) = self.inflight_headers_requests.remove(peer) {
122 let _ = req.response.send(Err(RequestError::ConnectionDropped));
123 }
124 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
125 let _ = req.response.send(Err(RequestError::ConnectionDropped));
126 }
127 if let Some(req) = self.inflight_bals_requests.remove(peer) {
128 let _ = req.response.send(Err(RequestError::ConnectionDropped));
129 }
130 if let Some(req) = self.inflight_receipts_requests.remove(peer) {
131 let _ = req.response.send(Err(RequestError::ConnectionDropped));
132 }
133 }
134
135 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
139 if let Some(peer) = self.peers.get_mut(peer_id) &&
140 number > peer.best_number
141 {
142 peer.best_hash = hash;
143 peer.best_number = number;
144 return true
145 }
146 false
147 }
148
149 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
151 if let Some(peer) = self.peers.get_mut(peer_id) {
152 peer.state = PeerState::Closing;
153 }
154 }
155
156 fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
161 let mut idle = self.peers.iter().filter(|(_, peer)| {
163 peer.state.is_idle() &&
164 match &requirement {
165 BestPeerRequirements::EthVersion(ver) => {
166 peer.capabilities.supports_eth_at_least(ver)
167 }
168 _ => true,
169 }
170 });
171
172 let mut best_peer = idle.next()?;
173
174 for maybe_better in idle {
175 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
177 best_peer = maybe_better;
178 continue
179 }
180
181 if maybe_better.1.is_better(best_peer.1, &requirement) {
183 best_peer = maybe_better;
184 continue
185 }
186
187 if maybe_better.1.timeout() < best_peer.1.timeout() &&
189 !maybe_better.1.last_response_likely_bad
190 {
191 best_peer = maybe_better;
192 }
193 }
194
195 Some(*best_peer.0)
196 }
197
198 fn poll_action(&mut self) -> PollAction {
200 if self.queued_requests.is_empty() {
202 return PollAction::NoRequests
203 }
204
205 if self.peers.is_empty() {
206 return PollAction::NoPeersAvailable
207 }
208
209 let request = self.queued_requests.pop_front().expect("not empty");
210 let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
211 self.queued_requests.push_back(request);
214 return PollAction::NoPeersAvailable
215 };
216
217 let request = self.prepare_block_request(peer_id, request);
218
219 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
220 }
221
222 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
224 loop {
226 let no_peers_available = match self.poll_action() {
227 PollAction::Ready(action) => return Poll::Ready(action),
228 PollAction::NoRequests => false,
229 PollAction::NoPeersAvailable => true,
230 };
231
232 loop {
233 match self.download_requests_rx.poll_next_unpin(cx) {
235 Poll::Ready(Some(request)) => match request.get_priority() {
236 Priority::High => {
237 let pos = self
240 .queued_requests
241 .iter()
242 .position(|req| req.is_normal_priority())
243 .unwrap_or(0);
244 self.queued_requests.insert(pos, request);
245 }
246 Priority::Normal => {
247 self.queued_requests.push_back(request);
248 }
249 },
250 Poll::Ready(None) => {
251 unreachable!("channel can't close")
252 }
253 Poll::Pending => break,
254 }
255 }
256
257 if self.queued_requests.is_empty() || no_peers_available {
258 return Poll::Pending
259 }
260 }
261 }
262
263 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
267 if let Some(peer) = self.peers.get_mut(&peer_id) {
269 peer.state = req.peer_state();
270 }
271
272 match req {
273 DownloadRequest::GetBlockHeaders { request, response, .. } => {
274 let inflight = Request { request: request.clone(), response };
275 self.inflight_headers_requests.insert(peer_id, inflight);
276 let HeadersRequest { start, limit, direction } = request;
277 BlockRequest::GetBlockHeaders(GetBlockHeaders {
278 start_block: start,
279 limit,
280 skip: 0,
281 direction,
282 })
283 }
284 DownloadRequest::GetBlockBodies { request, response, .. } => {
285 let inflight = Request { request: (), response };
286 self.inflight_bodies_requests.insert(peer_id, inflight);
287 BlockRequest::GetBlockBodies(GetBlockBodies(request))
288 }
289 DownloadRequest::GetBlockAccessLists { request, response, .. } => {
290 let inflight = Request { request: (), response };
291 self.inflight_bals_requests.insert(peer_id, inflight);
292 BlockRequest::GetBlockAccessLists(GetBlockAccessLists(request))
293 }
294 DownloadRequest::GetReceipts { request, response, .. } => {
295 let inflight = Request { request: (), response };
296 self.inflight_receipts_requests.insert(peer_id, inflight);
297 BlockRequest::GetReceipts(GetReceipts(request))
298 }
299 }
300 }
301
302 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
306 let req = self.queued_requests.pop_front()?;
307 let req = self.prepare_block_request(peer_id, req);
308 Some(BlockResponseOutcome::Request(peer_id, req))
309 }
310
311 pub(crate) fn on_block_headers_response(
317 &mut self,
318 peer_id: PeerId,
319 res: RequestResult<Vec<N::BlockHeader>>,
320 ) -> Option<BlockResponseOutcome> {
321 let is_error = res.is_err();
322 let maybe_reputation_change = res.reputation_change_err();
323
324 let resp = self.inflight_headers_requests.remove(&peer_id);
325
326 let is_likely_bad_response =
327 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
328
329 if let Some(resp) = resp {
330 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
332 }
333
334 if let Some(peer) = self.peers.get_mut(&peer_id) {
335 peer.last_response_likely_bad = is_likely_bad_response;
337
338 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
341 return self.followup_request(peer_id)
342 }
343 }
344
345 maybe_reputation_change
348 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
349 }
350
351 pub(crate) fn on_block_bodies_response(
353 &mut self,
354 peer_id: PeerId,
355 res: RequestResult<Vec<N::BlockBody>>,
356 ) -> Option<BlockResponseOutcome> {
357 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
358
359 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
360 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
361 }
362 if let Some(peer) = self.peers.get_mut(&peer_id) {
363 peer.last_response_likely_bad = is_likely_bad_response;
365
366 if peer.state.on_request_finished() && !is_likely_bad_response {
367 return self.followup_request(peer_id)
368 }
369 }
370 None
371 }
372
373 pub(crate) fn on_block_access_lists_response(
375 &mut self,
376 peer_id: PeerId,
377 res: RequestResult<BlockAccessLists>,
378 ) -> Option<BlockResponseOutcome> {
379 let is_likely_bad_response = res.is_err();
380
381 if let Some(resp) = self.inflight_bals_requests.remove(&peer_id) {
382 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
383 }
384 if let Some(peer) = self.peers.get_mut(&peer_id) {
385 peer.last_response_likely_bad = is_likely_bad_response;
386
387 if peer.state.on_request_finished() && !is_likely_bad_response {
388 return self.followup_request(peer_id)
389 }
390 }
391 None
392 }
393
394 pub(crate) fn on_receipts_response(
399 &mut self,
400 peer_id: PeerId,
401 res: RequestResult<ReceiptsResponse<N::Receipt>>,
402 ) -> Option<BlockResponseOutcome> {
403 let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
404
405 if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
406 let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
407 }
408 if let Some(peer) = self.peers.get_mut(&peer_id) {
409 peer.last_response_likely_bad = is_likely_bad_response;
410
411 if peer.state.on_request_finished() && !is_likely_bad_response {
412 return self.followup_request(peer_id)
413 }
414 }
415 None
416 }
417
418 pub(crate) fn client(&self) -> FetchClient<N> {
420 FetchClient {
421 request_tx: self.download_requests_tx.clone(),
422 peers_handle: self.peers_handle.clone(),
423 num_active_peers: Arc::clone(&self.num_active_peers),
424 }
425 }
426}
427
428enum PollAction {
430 Ready(FetchAction),
431 NoRequests,
432 NoPeersAvailable,
433}
434
435#[derive(Debug)]
437struct Peer {
438 state: PeerState,
440 best_hash: B256,
442 best_number: u64,
444 #[allow(dead_code)]
446 capabilities: Arc<Capabilities>,
447 timeout: Arc<AtomicU64>,
449 last_response_likely_bad: bool,
456 range_info: Option<BlockRangeInfo>,
458}
459
460impl Peer {
461 fn timeout(&self) -> u64 {
462 self.timeout.load(Ordering::Relaxed)
463 }
464
465 fn earliest(&self) -> u64 {
467 self.range_info.as_ref().map_or(0, |info| info.earliest())
468 }
469
470 fn has_full_history(&self) -> bool {
472 self.earliest() == 0
473 }
474
475 fn range(&self) -> Option<RangeInclusive<u64>> {
476 self.range_info.as_ref().map(|info| info.range())
477 }
478
479 fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
488 let self_range = self.range();
489 let other_range = other.range();
490
491 match (self_range, other_range) {
492 (Some(self_r), Some(other_r)) => {
493 let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
495 let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
496
497 #[expect(clippy::match_same_arms)]
498 match (self_covers, other_covers) {
499 (true, false) => true, (false, true) => false, (true, true) => false, (false, false) => {
503 self_r.start() < other_r.start()
505 }
506 }
507 }
508 (Some(self_r), None) => {
509 self_r.contains(range.start()) && self_r.contains(range.end())
512 }
513 (None, Some(other_r)) => {
514 !(other_r.contains(range.start()) && other_r.contains(range.end()))
517 }
518 (None, None) => false, }
520 }
521
522 fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
524 match requirement {
525 BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
526 BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
527 BestPeerRequirements::None | BestPeerRequirements::EthVersion(_) => false,
530 }
531 }
532}
533
534#[derive(Debug)]
536enum PeerState {
537 Idle,
539 GetBlockHeaders,
541 GetBlockBodies,
543 GetBlockAccessLists,
545 GetReceipts,
547 Closing,
549}
550
551impl PeerState {
554 const fn is_idle(&self) -> bool {
556 matches!(self, Self::Idle)
557 }
558
559 const fn on_request_finished(&mut self) -> bool {
565 if !matches!(self, Self::Closing) {
566 *self = Self::Idle;
567 return true
568 }
569 false
570 }
571}
572
573#[derive(Debug)]
576struct Request<Req, Resp> {
577 request: Req,
580 response: oneshot::Sender<Resp>,
581}
582
583#[derive(Debug)]
585#[expect(clippy::enum_variant_names)]
586pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
587 GetBlockHeaders {
589 request: HeadersRequest,
590 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
591 priority: Priority,
592 },
593 GetBlockBodies {
595 request: Vec<B256>,
596 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
597 priority: Priority,
598 range_hint: Option<RangeInclusive<u64>>,
599 },
600 GetBlockAccessLists {
602 request: Vec<B256>,
603 response: oneshot::Sender<PeerRequestResult<BlockAccessLists>>,
604 priority: Priority,
605 },
606 GetReceipts {
608 request: Vec<B256>,
609 response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
610 priority: Priority,
611 },
612}
613
614impl<N: NetworkPrimitives> DownloadRequest<N> {
617 const fn peer_state(&self) -> PeerState {
619 match self {
620 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
621 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
622 Self::GetBlockAccessLists { .. } => PeerState::GetBlockAccessLists,
623 Self::GetReceipts { .. } => PeerState::GetReceipts,
624 }
625 }
626
627 const fn get_priority(&self) -> &Priority {
629 match self {
630 Self::GetBlockHeaders { priority, .. } |
631 Self::GetBlockBodies { priority, .. } |
632 Self::GetBlockAccessLists { priority, .. } |
633 Self::GetReceipts { priority, .. } => priority,
634 }
635 }
636
637 const fn is_normal_priority(&self) -> bool {
639 self.get_priority().is_normal()
640 }
641
642 fn best_peer_requirements(&self) -> BestPeerRequirements {
644 match self {
645 Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
646 Self::GetBlockAccessLists { .. } => BestPeerRequirements::EthVersion(EthVersion::Eth71),
647 Self::GetBlockBodies { range_hint, .. } => {
648 if let Some(range) = range_hint {
649 BestPeerRequirements::FullBlockRange(range.clone())
650 } else {
651 BestPeerRequirements::FullBlock
652 }
653 }
654 Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
655 }
656 }
657}
658
659pub(crate) enum FetchAction {
661 BlockRequest {
663 peer_id: PeerId,
665 request: BlockRequest,
667 },
668}
669
670#[derive(Debug, PartialEq, Eq)]
674pub(crate) enum BlockResponseOutcome {
675 Request(PeerId, BlockRequest),
677 BadResponse(PeerId, ReputationChangeKind),
679}
680
681enum BestPeerRequirements {
683 None,
685 FullBlockRange(RangeInclusive<u64>),
687 FullBlock,
689 EthVersion(EthVersion),
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696 use crate::{peers::PeersManager, PeersConfig};
697 use alloy_consensus::Header;
698 use alloy_primitives::B512;
699 use reth_eth_wire::Capability;
700 use std::future::poll_fn;
701
702 #[tokio::test(flavor = "multi_thread")]
703 async fn test_poll_fetcher() {
704 let manager = PeersManager::new(PeersConfig::default());
705 let mut fetcher =
706 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
707
708 poll_fn(move |cx| {
709 assert!(fetcher.poll(cx).is_pending());
710 let (tx, _rx) = oneshot::channel();
711 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
712 request: vec![],
713 response: tx,
714 priority: Priority::default(),
715 range_hint: None,
716 });
717 assert!(fetcher.poll(cx).is_pending());
718
719 Poll::Ready(())
720 })
721 .await;
722 }
723
724 #[tokio::test]
725 async fn test_peer_rotation() {
726 let manager = PeersManager::new(PeersConfig::default());
727 let mut fetcher =
728 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
729 let peer1 = B512::random();
731 let peer2 = B512::random();
732 let capabilities = Arc::new(Capabilities::from(vec![]));
733 fetcher.new_active_peer(
734 peer1,
735 B256::random(),
736 1,
737 Arc::clone(&capabilities),
738 Arc::new(AtomicU64::new(1)),
739 None,
740 );
741 fetcher.new_active_peer(
742 peer2,
743 B256::random(),
744 2,
745 Arc::clone(&capabilities),
746 Arc::new(AtomicU64::new(1)),
747 None,
748 );
749
750 let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
751 assert!(first_peer == peer1 || first_peer == peer2);
752 fetcher.on_pending_disconnect(&first_peer);
754 let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
756 assert!(first_peer == peer1 || first_peer == peer2);
757 assert_ne!(first_peer, second_peer);
758 fetcher.on_pending_disconnect(&second_peer);
760 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
761 }
762
763 #[tokio::test]
764 async fn test_peer_prioritization() {
765 let manager = PeersManager::new(PeersConfig::default());
766 let mut fetcher =
767 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
768 let peer1 = B512::random();
770 let peer2 = B512::random();
771 let peer3 = B512::random();
772
773 let peer2_timeout = Arc::new(AtomicU64::new(300));
774
775 let capabilities = Arc::new(Capabilities::from(vec![]));
776 fetcher.new_active_peer(
777 peer1,
778 B256::random(),
779 1,
780 Arc::clone(&capabilities),
781 Arc::new(AtomicU64::new(30)),
782 None,
783 );
784 fetcher.new_active_peer(
785 peer2,
786 B256::random(),
787 2,
788 Arc::clone(&capabilities),
789 Arc::clone(&peer2_timeout),
790 None,
791 );
792 fetcher.new_active_peer(
793 peer3,
794 B256::random(),
795 3,
796 Arc::clone(&capabilities),
797 Arc::new(AtomicU64::new(50)),
798 None,
799 );
800
801 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
803 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
804 peer2_timeout.store(10, Ordering::Relaxed);
806 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
808 assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
809 }
810
811 #[tokio::test]
812 async fn test_on_block_headers_response() {
813 let manager = PeersManager::new(PeersConfig::default());
814 let mut fetcher =
815 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
816 let peer_id = B512::random();
817
818 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
819
820 assert_eq!(
821 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
822 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
823 );
824 assert_eq!(
825 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
826 None
827 );
828 assert_eq!(
829 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
830 None
831 );
832 assert_eq!(
833 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
834 None
835 );
836 assert_eq!(
837 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
838 None
839 );
840 }
841
842 #[tokio::test]
843 async fn test_header_response_outcome() {
844 let manager = PeersManager::new(PeersConfig::default());
845 let mut fetcher =
846 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
847 let peer_id = B512::random();
848
849 let request_pair = || {
850 let (tx, _rx) = oneshot::channel();
851 let req = Request {
852 request: HeadersRequest {
853 start: 0u64.into(),
854 limit: 1,
855 direction: Default::default(),
856 },
857 response: tx,
858 };
859 let header = Header { number: 0, ..Default::default() };
860 (req, header)
861 };
862
863 fetcher.new_active_peer(
864 peer_id,
865 Default::default(),
866 Default::default(),
867 Arc::new(Capabilities::from(vec![])),
868 Default::default(),
869 None,
870 );
871
872 let (req, header) = request_pair();
873 fetcher.inflight_headers_requests.insert(peer_id, req);
874
875 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
876 assert!(outcome.is_none());
877 assert!(fetcher.peers[&peer_id].state.is_idle());
878
879 let outcome =
880 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
881
882 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
883 RequestError::Timeout
884 ))
885 .is_some());
886
887 match outcome {
888 BlockResponseOutcome::BadResponse(peer, _) => {
889 assert_eq!(peer, peer_id)
890 }
891 BlockResponseOutcome::Request(_, _) => {
892 unreachable!()
893 }
894 };
895
896 assert!(fetcher.peers[&peer_id].state.is_idle());
897 }
898
899 #[test]
900 fn test_peer_is_better_none_requirement() {
901 let peer1 = Peer {
902 state: PeerState::Idle,
903 best_hash: B256::random(),
904 best_number: 100,
905 capabilities: Arc::new(Capabilities::new(vec![])),
906 timeout: Arc::new(AtomicU64::new(10)),
907 last_response_likely_bad: false,
908 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
909 };
910
911 let peer2 = Peer {
912 state: PeerState::Idle,
913 best_hash: B256::random(),
914 best_number: 50,
915 capabilities: Arc::new(Capabilities::new(vec![])),
916 timeout: Arc::new(AtomicU64::new(20)),
917 last_response_likely_bad: false,
918 range_info: None,
919 };
920
921 assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
923 assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
924 }
925
926 #[test]
927 fn test_peer_is_better_full_block_requirement() {
928 let peer_full = Peer {
930 state: PeerState::Idle,
931 best_hash: B256::random(),
932 best_number: 100,
933 capabilities: Arc::new(Capabilities::new(vec![])),
934 timeout: Arc::new(AtomicU64::new(10)),
935 last_response_likely_bad: false,
936 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
937 };
938
939 let peer_partial = Peer {
941 state: PeerState::Idle,
942 best_hash: B256::random(),
943 best_number: 100,
944 capabilities: Arc::new(Capabilities::new(vec![])),
945 timeout: Arc::new(AtomicU64::new(10)),
946 last_response_likely_bad: false,
947 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
948 };
949
950 let peer_no_range = Peer {
952 state: PeerState::Idle,
953 best_hash: B256::random(),
954 best_number: 100,
955 capabilities: Arc::new(Capabilities::new(vec![])),
956 timeout: Arc::new(AtomicU64::new(10)),
957 last_response_likely_bad: false,
958 range_info: None,
959 };
960
961 assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
963 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
964
965 assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
967 assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
968
969 assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
971 assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
972 }
973
974 #[test]
975 fn test_peer_is_better_full_block_range_requirement() {
976 let range = RangeInclusive::new(40, 60);
977
978 let peer_covers = Peer {
980 state: PeerState::Idle,
981 best_hash: B256::random(),
982 best_number: 100,
983 capabilities: Arc::new(Capabilities::new(vec![])),
984 timeout: Arc::new(AtomicU64::new(10)),
985 last_response_likely_bad: false,
986 range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
987 };
988
989 let peer_no_cover = Peer {
991 state: PeerState::Idle,
992 best_hash: B256::random(),
993 best_number: 100,
994 capabilities: Arc::new(Capabilities::new(vec![])),
995 timeout: Arc::new(AtomicU64::new(10)),
996 last_response_likely_bad: false,
997 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
998 };
999
1000 assert!(peer_covers
1002 .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
1003 assert!(
1004 !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
1005 );
1006 }
1007
1008 #[test]
1009 fn test_peer_is_better_both_cover_range() {
1010 let range = RangeInclusive::new(30, 50);
1011
1012 let peer_full = Peer {
1014 state: PeerState::Idle,
1015 best_hash: B256::random(),
1016 best_number: 100,
1017 capabilities: Arc::new(Capabilities::new(vec![])),
1018 timeout: Arc::new(AtomicU64::new(10)),
1019 last_response_likely_bad: false,
1020 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1021 };
1022
1023 let peer_partial = Peer {
1025 state: PeerState::Idle,
1026 best_hash: B256::random(),
1027 best_number: 100,
1028 capabilities: Arc::new(Capabilities::new(vec![])),
1029 timeout: Arc::new(AtomicU64::new(10)),
1030 last_response_likely_bad: false,
1031 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1032 };
1033
1034 assert!(!peer_full
1036 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1037 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1038 }
1039
1040 #[test]
1041 fn test_peer_is_better_lower_start() {
1042 let range = RangeInclusive::new(30, 60);
1043
1044 let peer_full = Peer {
1046 state: PeerState::Idle,
1047 best_hash: B256::random(),
1048 best_number: 100,
1049 capabilities: Arc::new(Capabilities::new(vec![])),
1050 timeout: Arc::new(AtomicU64::new(10)),
1051 last_response_likely_bad: false,
1052 range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
1053 };
1054
1055 let peer_partial = Peer {
1057 state: PeerState::Idle,
1058 best_hash: B256::random(),
1059 best_number: 100,
1060 capabilities: Arc::new(Capabilities::new(vec![])),
1061 timeout: Arc::new(AtomicU64::new(10)),
1062 last_response_likely_bad: false,
1063 range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1064 };
1065
1066 assert!(peer_full
1068 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1069 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1070 }
1071
1072 #[test]
1073 fn test_peer_is_better_neither_covers_range() {
1074 let range = RangeInclusive::new(40, 60);
1075
1076 let peer_full = Peer {
1078 state: PeerState::Idle,
1079 best_hash: B256::random(),
1080 best_number: 30,
1081 capabilities: Arc::new(Capabilities::new(vec![])),
1082 timeout: Arc::new(AtomicU64::new(10)),
1083 last_response_likely_bad: false,
1084 range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
1085 };
1086
1087 let peer_partial = Peer {
1089 state: PeerState::Idle,
1090 best_hash: B256::random(),
1091 best_number: 30,
1092 capabilities: Arc::new(Capabilities::new(vec![])),
1093 timeout: Arc::new(AtomicU64::new(10)),
1094 last_response_likely_bad: false,
1095 range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
1096 };
1097
1098 assert!(peer_full
1100 .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1101 assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1102 }
1103
1104 #[test]
1105 fn test_peer_is_better_no_range_info() {
1106 let range = RangeInclusive::new(40, 60);
1107
1108 let peer_with_range = Peer {
1110 state: PeerState::Idle,
1111 best_hash: B256::random(),
1112 best_number: 100,
1113 capabilities: Arc::new(Capabilities::new(vec![])),
1114 timeout: Arc::new(AtomicU64::new(10)),
1115 last_response_likely_bad: false,
1116 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1117 };
1118
1119 let peer_no_range = Peer {
1121 state: PeerState::Idle,
1122 best_hash: B256::random(),
1123 best_number: 100,
1124 capabilities: Arc::new(Capabilities::new(vec![])),
1125 timeout: Arc::new(AtomicU64::new(10)),
1126 last_response_likely_bad: false,
1127 range_info: None,
1128 };
1129
1130 assert!(!peer_no_range
1132 .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1133
1134 assert!(
1136 peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1137 );
1138 }
1139
1140 #[test]
1141 fn test_peer_is_better_one_peer_no_range_covers() {
1142 let range = RangeInclusive::new(40, 60);
1143
1144 let peer_with_range_covers = Peer {
1146 state: PeerState::Idle,
1147 best_hash: B256::random(),
1148 best_number: 100,
1149 capabilities: Arc::new(Capabilities::new(vec![])),
1150 timeout: Arc::new(AtomicU64::new(10)),
1151 last_response_likely_bad: false,
1152 range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1153 };
1154
1155 let peer_no_range = Peer {
1157 state: PeerState::Idle,
1158 best_hash: B256::random(),
1159 best_number: 100,
1160 capabilities: Arc::new(Capabilities::new(vec![])),
1161 timeout: Arc::new(AtomicU64::new(10)),
1162 last_response_likely_bad: false,
1163 range_info: None,
1164 };
1165
1166 assert!(peer_with_range_covers
1168 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1169
1170 assert!(!peer_no_range
1172 .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1173 }
1174
1175 #[test]
1176 fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1177 let range = RangeInclusive::new(40, 60);
1178
1179 let peer_with_range_no_cover = Peer {
1181 state: PeerState::Idle,
1182 best_hash: B256::random(),
1183 best_number: 100,
1184 capabilities: Arc::new(Capabilities::new(vec![])),
1185 timeout: Arc::new(AtomicU64::new(10)),
1186 last_response_likely_bad: false,
1187 range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1188 };
1189
1190 let peer_no_range = Peer {
1192 state: PeerState::Idle,
1193 best_hash: B256::random(),
1194 best_number: 100,
1195 capabilities: Arc::new(Capabilities::new(vec![])),
1196 timeout: Arc::new(AtomicU64::new(10)),
1197 last_response_likely_bad: false,
1198 range_info: None,
1199 };
1200
1201 assert!(!peer_with_range_no_cover
1203 .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1204
1205 assert!(peer_no_range
1207 .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1208 }
1209
1210 #[test]
1211 fn test_peer_is_better_edge_cases() {
1212 let range = RangeInclusive::new(50, 100);
1214
1215 let peer_exact = Peer {
1217 state: PeerState::Idle,
1218 best_hash: B256::random(),
1219 best_number: 100,
1220 capabilities: Arc::new(Capabilities::new(vec![])),
1221 timeout: Arc::new(AtomicU64::new(10)),
1222 last_response_likely_bad: false,
1223 range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1224 };
1225
1226 let peer_short_start = Peer {
1228 state: PeerState::Idle,
1229 best_hash: B256::random(),
1230 best_number: 100,
1231 capabilities: Arc::new(Capabilities::new(vec![])),
1232 timeout: Arc::new(AtomicU64::new(10)),
1233 last_response_likely_bad: false,
1234 range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1235 };
1236
1237 let peer_short_end = Peer {
1239 state: PeerState::Idle,
1240 best_hash: B256::random(),
1241 best_number: 100,
1242 capabilities: Arc::new(Capabilities::new(vec![])),
1243 timeout: Arc::new(AtomicU64::new(10)),
1244 last_response_likely_bad: false,
1245 range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1246 };
1247
1248 assert!(peer_exact
1250 .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1251 assert!(peer_exact
1252 .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1253
1254 assert!(!peer_short_start
1256 .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1257 assert!(
1258 !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1259 );
1260 }
1261
1262 fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
1264 let manager = PeersManager::new(PeersConfig::default());
1265 let mut fetcher =
1266 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1267 let peer_id = B512::random();
1268
1269 fetcher.new_active_peer(
1270 peer_id,
1271 Default::default(),
1272 Default::default(),
1273 Arc::new(Capabilities::from(vec![])),
1274 Default::default(),
1275 None,
1276 );
1277 (fetcher, peer_id)
1278 }
1279
1280 fn insert_inflight_receipts(
1283 fetcher: &mut StateFetcher<EthNetworkPrimitives>,
1284 peer_id: PeerId,
1285 ) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
1286 {
1287 let (tx, rx) = oneshot::channel();
1288 fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
1289 fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
1290 rx
1291 }
1292
1293 #[tokio::test]
1296 async fn test_poll_dispatches_receipts_to_peer() {
1297 let (mut fetcher, peer_id) = fetcher_with_peer();
1298
1299 poll_fn(move |cx| {
1300 let (tx, _rx) = oneshot::channel();
1301 fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
1302 request: vec![B256::ZERO],
1303 response: tx,
1304 priority: Priority::default(),
1305 });
1306
1307 let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
1308 fetcher.poll(cx)
1309 else {
1310 panic!("expected Ready(BlockRequest)");
1311 };
1312 assert_eq!(dispatched_peer, peer_id);
1313 assert!(matches!(request, BlockRequest::GetReceipts(_)));
1314
1315 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1317 assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1319
1320 Poll::Ready(())
1321 })
1322 .await;
1323 }
1324
1325 #[tokio::test]
1328 async fn test_receipts_complete_response_resolves_and_idles_peer() {
1329 let (mut fetcher, peer_id) = fetcher_with_peer();
1330
1331 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1332
1333 let resp = ReceiptsResponse::new(vec![vec![]]);
1334 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1335
1336 assert!(outcome.is_none());
1338 assert!(fetcher.peers[&peer_id].state.is_idle());
1340 assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1342
1343 let result = rx.await.unwrap().unwrap();
1345 assert_eq!(result.1.receipts.len(), 1);
1346 }
1347
1348 #[tokio::test]
1349 async fn test_receipts_empty_response_marks_peer_bad() {
1350 let (mut fetcher, peer_id) = fetcher_with_peer();
1351 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1352
1353 let resp = ReceiptsResponse::new(vec![]);
1354 let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
1355
1356 assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_receipts_error_forwards_and_marks_peer_bad() {
1361 let (mut fetcher, peer_id) = fetcher_with_peer();
1362 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1363
1364 let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
1365
1366 assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1367 let result = rx.await.unwrap();
1369 assert_eq!(result.unwrap_err(), RequestError::Timeout);
1370 }
1371
1372 #[tokio::test]
1373 async fn test_session_closed_cancels_inflight_receipts() {
1374 let (mut fetcher, peer_id) = fetcher_with_peer();
1375 let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1376
1377 fetcher.on_session_closed(&peer_id);
1378
1379 assert!(!fetcher.peers.contains_key(&peer_id));
1380 assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1381
1382 let result = rx.await.unwrap();
1383 assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
1384 }
1385
1386 #[tokio::test]
1387 async fn test_receipts_response_triggers_followup() {
1388 let (mut fetcher, peer_id) = fetcher_with_peer();
1389
1390 let (followup_tx, _followup_rx) = oneshot::channel();
1392 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1393 request: vec![B256::random()],
1394 response: followup_tx,
1395 priority: Priority::default(),
1396 range_hint: None,
1397 });
1398
1399 let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1400
1401 let resp = ReceiptsResponse::new(vec![vec![]]);
1402 let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1403
1404 assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
1405 }
1406
1407 #[tokio::test]
1408 async fn test_prepare_block_request_creates_inflight_receipts() {
1409 let (mut fetcher, peer_id) = fetcher_with_peer();
1410 let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
1411
1412 let (tx, _rx) = oneshot::channel();
1413 let req = DownloadRequest::GetReceipts {
1414 request: hashes.clone(),
1415 response: tx,
1416 priority: Priority::default(),
1417 };
1418
1419 let block_request = fetcher.prepare_block_request(peer_id, req);
1420
1421 match block_request {
1423 BlockRequest::GetReceipts(ref get) => {
1424 assert_eq!(get.0, hashes);
1425 }
1426 other => panic!("expected GetReceipts, got {other:?}"),
1427 }
1428
1429 assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1431
1432 assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1434 }
1435 #[tokio::test]
1436 async fn test_next_best_peer_eth71_no_support() {
1437 let manager = PeersManager::new(PeersConfig::default());
1438 let mut fetcher =
1439 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1440
1441 let peer = B512::random();
1442
1443 let capabilities = Arc::new(Capabilities::new(vec![]));
1445
1446 fetcher.new_active_peer(
1447 peer,
1448 B256::random(),
1449 100,
1450 capabilities,
1451 Arc::new(AtomicU64::new(10)),
1452 None,
1453 );
1454
1455 assert_eq!(
1457 fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1458 None
1459 );
1460 }
1461
1462 #[tokio::test]
1463 async fn test_next_best_peer_eth71_supported() {
1464 let manager = PeersManager::new(PeersConfig::default());
1465 let mut fetcher =
1466 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1467
1468 let peer = B512::random();
1469
1470 let capabilities = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1472
1473 fetcher.new_active_peer(
1474 peer,
1475 B256::random(),
1476 100,
1477 capabilities,
1478 Arc::new(AtomicU64::new(10)),
1479 None,
1480 );
1481
1482 assert_eq!(
1483 fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1484 Some(peer)
1485 );
1486 }
1487
1488 #[tokio::test]
1489 async fn test_next_best_peer_eth71_filters_correctly() {
1490 let manager = PeersManager::new(PeersConfig::default());
1491 let mut fetcher =
1492 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1493
1494 let peer_no_71 = B512::random();
1495 let peer_with_71 = B512::random();
1496
1497 let caps_old = Arc::new(Capabilities::new(vec![]));
1499
1500 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1502
1503 fetcher.new_active_peer(
1504 peer_no_71,
1505 B256::random(),
1506 100,
1507 caps_old,
1508 Arc::new(AtomicU64::new(5)),
1509 None,
1510 );
1511
1512 fetcher.new_active_peer(
1513 peer_with_71,
1514 B256::random(),
1515 100,
1516 caps_71,
1517 Arc::new(AtomicU64::new(50)),
1518 None,
1519 );
1520
1521 assert_eq!(
1524 fetcher.next_best_peer(BestPeerRequirements::EthVersion(EthVersion::Eth71)),
1525 Some(peer_with_71)
1526 );
1527 }
1528
1529 #[tokio::test]
1530 async fn test_wakes_when_eth71_peer_connects() {
1531 use futures::task::noop_waker;
1532 use std::task::{Context, Poll};
1533
1534 let manager = PeersManager::new(PeersConfig::default());
1535 let mut fetcher =
1536 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1537
1538 let (tx, _rx) = oneshot::channel();
1540 fetcher.queued_requests.push_back(DownloadRequest::GetBlockAccessLists {
1541 request: vec![],
1542 response: tx,
1543 priority: Priority::Normal,
1544 });
1545
1546 let waker = noop_waker();
1547 let mut cx = Context::from_waker(&waker);
1548
1549 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1551
1552 let peer_old = B512::random();
1554 let caps_old = Arc::new(Capabilities::new(vec![]));
1555
1556 fetcher.new_active_peer(
1557 peer_old,
1558 B256::random(),
1559 100,
1560 caps_old,
1561 Arc::new(AtomicU64::new(10)),
1562 None,
1563 );
1564
1565 assert!(matches!(fetcher.poll(&mut cx), Poll::Pending));
1567
1568 let peer_71 = B512::random();
1570 let caps_71 = Arc::new(Capabilities::from(vec![Capability::new("eth".into(), 71)]));
1571
1572 fetcher.new_active_peer(
1573 peer_71,
1574 B256::random(),
1575 100,
1576 caps_71,
1577 Arc::new(AtomicU64::new(10)),
1578 None,
1579 );
1580
1581 if let Poll::Ready(FetchAction::BlockRequest { peer_id, .. }) = fetcher.poll(&mut cx) {
1583 assert_eq!(peer_id, peer_71);
1584 }
1585 }
1586}