1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 is_connection_failed_reputation,
18 peers::{
19 config::PeerBackoffDurations,
20 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21 },
22 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27 fmt::Display,
28 io::{self},
29 net::{IpAddr, SocketAddr},
30 task::{Context, Poll},
31 time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35 sync::mpsc,
36 time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41#[derive(Debug)]
48pub struct PeersManager {
49 peers: HashMap<PeerId, Peer>,
51 trusted_peer_ids: HashSet<PeerId>,
56 trusted_peers_resolver: TrustedPeersResolver,
59 manager_tx: mpsc::UnboundedSender<PeerCommand>,
61 handle_rx: UnboundedReceiverStream<PeerCommand>,
63 queued_actions: VecDeque<PeerAction>,
65 refill_slots_interval: Interval,
67 reputation_weights: ReputationChangeWeights,
69 connection_info: ConnectionInfo,
71 ban_list: BanList,
73 backed_off_peers: HashMap<PeerId, std::time::Instant>,
75 release_interval: Interval,
77 ban_duration: Duration,
79 backoff_durations: PeerBackoffDurations,
82 trusted_nodes_only: bool,
85 last_tick: Instant,
87 max_backoff_count: u8,
89 net_connection_state: NetworkConnectionState,
91 incoming_ip_throttle_duration: Duration,
93}
94
95impl PeersManager {
96 pub fn new(config: PeersConfig) -> Self {
98 let PeersConfig {
99 refill_slots_interval,
100 connection_info,
101 reputation_weights,
102 ban_list,
103 ban_duration,
104 backoff_durations,
105 trusted_nodes,
106 trusted_nodes_only,
107 trusted_nodes_resolution_interval,
108 basic_nodes,
109 max_backoff_count,
110 incoming_ip_throttle_duration,
111 } = config;
112 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
113 let now = Instant::now();
114
115 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
117
118 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
119 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
120
121 for trusted_peer in &trusted_nodes {
122 match trusted_peer.resolve_blocking() {
123 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
124 trusted_peer_ids.insert(id);
125 peers.entry(id).or_insert_with(|| {
126 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
127 });
128 }
129 Err(err) => {
130 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
131 }
132 }
133 }
134
135 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
136 peers.entry(id).or_insert_with(|| {
137 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
138 });
139 }
140
141 Self {
142 peers,
143 trusted_peer_ids,
144 trusted_peers_resolver: TrustedPeersResolver::new(
145 trusted_nodes,
146 tokio::time::interval(trusted_nodes_resolution_interval), ),
148 manager_tx,
149 handle_rx: UnboundedReceiverStream::new(handle_rx),
150 queued_actions: Default::default(),
151 reputation_weights,
152 refill_slots_interval: tokio::time::interval(refill_slots_interval),
153 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
154 connection_info: ConnectionInfo::new(connection_info),
155 ban_list,
156 backed_off_peers: Default::default(),
157 ban_duration,
158 backoff_durations,
159 trusted_nodes_only,
160 last_tick: Instant::now(),
161 max_backoff_count,
162 net_connection_state: NetworkConnectionState::default(),
163 incoming_ip_throttle_duration,
164 }
165 }
166
167 pub(crate) fn handle(&self) -> PeersHandle {
169 PeersHandle::new(self.manager_tx.clone())
170 }
171
172 #[inline]
174 pub(crate) fn num_known_peers(&self) -> usize {
175 self.peers.len()
176 }
177
178 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
180 self.peers.iter().map(|(peer_id, v)| {
181 NodeRecord::new_with_ports(
182 v.addr.tcp().ip(),
183 v.addr.tcp().port(),
184 v.addr.udp().map(|addr| addr.port()),
185 *peer_id,
186 )
187 })
188 }
189
190 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
192 self.peers.get(&peer_id).map(|v| {
193 (
194 NodeRecord::new_with_ports(
195 v.addr.tcp().ip(),
196 v.addr.tcp().port(),
197 v.addr.udp().map(|addr| addr.port()),
198 peer_id,
199 ),
200 v.kind,
201 )
202 })
203 }
204
205 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
207 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
208 }
209
210 #[inline]
212 pub(crate) const fn num_inbound_connections(&self) -> usize {
213 self.connection_info.num_inbound
214 }
215
216 #[inline]
218 pub(crate) const fn num_outbound_connections(&self) -> usize {
219 self.connection_info.num_outbound
220 }
221
222 #[inline]
224 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
225 self.connection_info.num_pending_out
226 }
227
228 #[inline]
230 pub(crate) fn num_backed_off_peers(&self) -> usize {
231 self.backed_off_peers.len()
232 }
233
234 fn num_idle_trusted_peers(&self) -> usize {
236 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
237 }
238
239 pub(crate) fn on_incoming_pending_session(
243 &mut self,
244 addr: IpAddr,
245 ) -> Result<(), InboundConnectionError> {
246 if self.ban_list.is_banned_ip(&addr) {
247 return Err(InboundConnectionError::IpBanned)
248 }
249
250 if !self.connection_info.has_in_capacity() {
252 if self.trusted_peer_ids.is_empty() {
253 return Err(InboundConnectionError::ExceedsCapacity)
256 }
257
258 let num_idle_trusted_peers = self.num_idle_trusted_peers();
262 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
263 let max_inbound =
265 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
266 if self.connection_info.num_pending_in < max_inbound {
267 self.connection_info.inc_pending_in();
268 return Ok(())
269 }
270 }
271
272 return Err(InboundConnectionError::ExceedsCapacity)
274 }
275
276 if !self.connection_info.has_in_pending_capacity() {
278 return Err(InboundConnectionError::ExceedsCapacity)
279 }
280
281 self.throttle_incoming_ip(addr);
283
284 self.connection_info.inc_pending_in();
285 Ok(())
286 }
287
288 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
291 self.connection_info.decr_pending_in();
292 }
293
294 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
296 self.connection_info.decr_pending_in()
297 }
298
299 pub(crate) fn on_incoming_pending_session_dropped(
301 &mut self,
302 remote_addr: SocketAddr,
303 err: &PendingSessionHandshakeError,
304 ) {
305 if err.is_fatal_protocol_error() {
306 self.ban_ip(remote_addr.ip());
307
308 if err.merits_discovery_ban() {
309 self.queued_actions
310 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
311 }
312 }
313
314 self.connection_info.decr_pending_in();
315 }
316
317 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
324 self.connection_info.decr_pending_in();
325
326 if self.ban_list.is_banned_peer(&peer_id) {
329 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
330 return
331 }
332
333 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
335 if self.trusted_nodes_only && !is_trusted {
336 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
337 return
338 }
339
340 self.tick();
342
343 match self.peers.entry(peer_id) {
344 Entry::Occupied(mut entry) => {
345 let peer = entry.get_mut();
346 if peer.is_banned() {
347 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
348 return
349 }
350 if peer.state.is_pending_out() {
353 self.connection_info.decr_state(peer.state);
354 }
355
356 peer.state = PeerConnectionState::In;
357
358 is_trusted = is_trusted || peer.is_trusted();
359 }
360 Entry::Vacant(entry) => {
361 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
364 peer.remove_after_disconnect = true;
365 entry.insert(peer);
366 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
367 }
368 }
369
370 let has_in_capacity = self.connection_info.has_in_capacity();
371 self.connection_info.inc_in();
373
374 if !is_trusted && !has_in_capacity {
376 self.queued_actions.push_back(PeerAction::Disconnect {
377 peer_id,
378 reason: Some(DisconnectReason::TooManyPeers),
379 });
380 }
381 }
382
383 fn ban_peer(&mut self, peer_id: PeerId) {
385 let mut ban_duration = self.ban_duration;
386 if let Some(peer) = self.peers.get(&peer_id) {
387 if peer.is_trusted() || peer.is_static() {
388 ban_duration = self.backoff_durations.low / 2;
391 }
392 }
393
394 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
395 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
396 }
397
398 fn ban_ip(&mut self, ip: IpAddr) {
400 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
401 }
402
403 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
405 self.ban_list
406 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
407 }
408
409 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
411 trace!(target: "net::peers", ?peer_id, "backing off");
412
413 if let Some(peer) = self.peers.get_mut(&peer_id) {
414 peer.backed_off = true;
415 self.backed_off_peers.insert(peer_id, until);
416 }
417 }
418
419 fn unban_peer(&mut self, peer_id: PeerId) {
421 self.ban_list.unban_peer(&peer_id);
422 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
423 }
424
425 fn tick(&mut self) {
430 let now = Instant::now();
431 let secs_since_last_tick =
435 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
436 self.last_tick = now;
437
438 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
440 if peer.1.reputation < DEFAULT_REPUTATION {
443 peer.1.reputation += secs_since_last_tick;
444 }
445 }
446 }
447
448 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
450 self.peers.get(peer_id).map(|peer| peer.reputation)
451 }
452
453 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
459 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
460 if rep.is_reset() {
462 peer.reset_reputation()
463 } else {
464 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
465 if peer.is_trusted() || peer.is_static() {
466 if matches!(
468 rep,
469 ReputationChangeKind::Dropped |
470 ReputationChangeKind::BadAnnouncement |
471 ReputationChangeKind::Timeout |
472 ReputationChangeKind::AlreadySeenTransaction
473 ) {
474 return
475 }
476
477 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
479 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
481 }
482 }
483 peer.apply_reputation(reputation_change, rep)
484 }
485 } else {
486 return
487 };
488
489 match outcome {
490 ReputationChangeOutcome::None => {}
491 ReputationChangeOutcome::Ban => {
492 self.ban_peer(*peer_id);
493 }
494 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
495 ReputationChangeOutcome::DisconnectAndBan => {
496 self.queued_actions.push_back(PeerAction::Disconnect {
497 peer_id: *peer_id,
498 reason: Some(DisconnectReason::DisconnectRequested),
499 });
500 self.ban_peer(*peer_id);
501 }
502 }
503 }
504
505 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
507 if let Some(peer) = self.peers.get_mut(peer_id) {
508 self.connection_info.decr_state(peer.state);
509 peer.state = PeerConnectionState::Idle;
510 }
511 }
512
513 pub(crate) fn on_outgoing_pending_session_dropped(
516 &mut self,
517 remote_addr: &SocketAddr,
518 peer_id: &PeerId,
519 err: &PendingSessionHandshakeError,
520 ) {
521 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
522 }
523
524 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
526 match self.peers.entry(peer_id) {
527 Entry::Occupied(mut entry) => {
528 self.connection_info.decr_state(entry.get().state);
529
530 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
531 entry.remove();
533 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
534 } else {
535 entry.get_mut().severe_backoff_counter = 0;
539 entry.get_mut().state = PeerConnectionState::Idle;
540 return
541 }
542 }
543 Entry::Vacant(_) => return,
544 }
545
546 self.fill_outbound_slots();
547 }
548
549 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
551 if let Some(peer) = self.peers.get_mut(&peer_id) {
552 self.connection_info.decr_state(peer.state);
553 self.connection_info.inc_out();
554 peer.state = PeerConnectionState::Out;
555 }
556 }
557
558 pub(crate) fn on_active_session_dropped(
563 &mut self,
564 remote_addr: &SocketAddr,
565 peer_id: &PeerId,
566 err: &EthStreamError,
567 ) {
568 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
569 }
570
571 pub(crate) fn on_outgoing_connection_failure(
574 &mut self,
575 remote_addr: &SocketAddr,
576 peer_id: &PeerId,
577 err: &io::Error,
578 ) {
579 if let Some(peer) = self.peers.get(peer_id) {
583 if peer.state.is_incoming() {
584 return
586 }
587
588 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
589 self.trusted_peers_resolver.interval.reset_immediately();
592 }
593 }
594
595 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
596 }
597
598 fn on_connection_failure(
599 &mut self,
600 remote_addr: &SocketAddr,
601 peer_id: &PeerId,
602 err: impl SessionError,
603 reputation_change: ReputationChangeKind,
604 ) {
605 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
606
607 if err.is_fatal_protocol_error() {
608 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
609 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
612 self.connection_info.decr_state(entry.get().state);
613 if entry.get().is_trusted() {
615 entry.get_mut().state = PeerConnectionState::Idle;
616 } else {
617 entry.remove();
618 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
619 if err.merits_discovery_ban() {
621 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
622 peer_id: *peer_id,
623 ip_addr: remote_addr.ip(),
624 })
625 }
626 }
627 }
628
629 self.ban_peer(*peer_id);
631 } else {
632 let mut backoff_until = None;
633 let mut remove_peer = false;
634
635 if let Some(peer) = self.peers.get_mut(peer_id) {
636 if let Some(kind) = err.should_backoff() {
637 if peer.is_trusted() || peer.is_static() {
638 let backoff = self.backoff_durations.low / 2;
641 backoff_until = Some(std::time::Instant::now() + backoff);
642 } else {
643 if kind.is_severe() {
645 peer.severe_backoff_counter =
646 peer.severe_backoff_counter.saturating_add(1);
647 }
648
649 let backoff_time =
650 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
651
652 backoff_until = Some(backoff_time);
656 }
657 } else {
658 let reputation_change = self.reputation_weights.change(reputation_change);
660 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
661 };
662
663 self.connection_info.decr_state(peer.state);
664 peer.state = PeerConnectionState::Idle;
665
666 if peer.severe_backoff_counter > self.max_backoff_count &&
667 !peer.is_trusted() &&
668 !peer.is_static()
669 {
670 remove_peer = true;
673 }
674 }
675
676 if remove_peer {
678 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
679 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
680 } else if let Some(backoff_until) = backoff_until {
681 self.backoff_peer_until(*peer_id, backoff_until);
683 }
684 }
685
686 self.fill_outbound_slots();
687 }
688
689 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
695 match direction {
696 Direction::Incoming => {
697 self.connection_info.decr_pending_in();
699 }
700 Direction::Outgoing(_) => {
701 }
704 }
705 }
706
707 pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
712 if let Some(peer) = self.peers.get_mut(&peer_id) {
713 trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
714 peer.fork_id = Some(fork_id);
715 }
716 }
717
718 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
722 self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
723 }
724
725 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
727 self.trusted_peer_ids.insert(peer_id);
728 }
729
730 #[cfg_attr(not(test), expect(dead_code))]
734 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
735 self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
736 }
737
738 pub(crate) fn add_peer_kind(
742 &mut self,
743 peer_id: PeerId,
744 kind: PeerKind,
745 addr: PeerAddr,
746 fork_id: Option<ForkId>,
747 ) {
748 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
749 return
750 }
751
752 match self.peers.entry(peer_id) {
753 Entry::Occupied(mut entry) => {
754 let peer = entry.get_mut();
755 peer.kind = kind;
756 peer.fork_id = fork_id;
757 peer.addr = addr;
758
759 if peer.state.is_incoming() {
760 peer.remove_after_disconnect = false;
764 }
765 }
766 Entry::Vacant(entry) => {
767 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
768 let mut peer = Peer::with_kind(addr, kind);
769 peer.fork_id = fork_id;
770 entry.insert(peer);
771 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
772 }
773 }
774
775 if kind.is_trusted() {
776 self.trusted_peer_ids.insert(peer_id);
777 }
778 }
779
780 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
782 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
783 if entry.get().is_trusted() {
784 return
785 }
786 let mut peer = entry.remove();
787
788 trace!(target: "net::peers", ?peer_id, "remove discovered node");
789 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
790
791 if peer.state.is_connected() {
792 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
793 peer.remove_after_disconnect = true;
798 peer.state.disconnect();
799 self.peers.insert(peer_id, peer);
800 self.queued_actions.push_back(PeerAction::Disconnect {
801 peer_id,
802 reason: Some(DisconnectReason::DisconnectRequested),
803 })
804 }
805 }
806
807 #[cfg_attr(not(test), expect(dead_code))]
810 pub(crate) fn add_and_connect(
811 &mut self,
812 peer_id: PeerId,
813 addr: PeerAddr,
814 fork_id: Option<ForkId>,
815 ) {
816 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
817 }
818
819 pub(crate) fn add_and_connect_kind(
823 &mut self,
824 peer_id: PeerId,
825 kind: PeerKind,
826 addr: PeerAddr,
827 fork_id: Option<ForkId>,
828 ) {
829 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
830 return
831 }
832
833 match self.peers.entry(peer_id) {
834 Entry::Occupied(mut entry) => {
835 let peer = entry.get_mut();
836 peer.kind = kind;
837 peer.fork_id = fork_id;
838 peer.addr = addr;
839
840 if peer.state == PeerConnectionState::Idle {
841 peer.state = PeerConnectionState::PendingOut;
843 self.connection_info.inc_pending_out();
844 self.queued_actions
845 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
846 }
847 }
848 Entry::Vacant(entry) => {
849 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
850 let mut peer = Peer::with_kind(addr, kind);
851 peer.state = PeerConnectionState::PendingOut;
852 peer.fork_id = fork_id;
853 entry.insert(peer);
854 self.connection_info.inc_pending_out();
855 self.queued_actions
856 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
857 }
858 }
859
860 if kind.is_trusted() {
861 self.trusted_peer_ids.insert(peer_id);
862 }
863 }
864
865 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
867 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
868 if !entry.get().is_trusted() {
869 return
870 }
871
872 let peer = entry.get_mut();
873 peer.kind = PeerKind::Basic;
874
875 self.trusted_peer_ids.remove(&peer_id);
876 }
877
878 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
888 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
889 !peer.is_backed_off() &&
890 !peer.is_banned() &&
891 peer.state.is_unconnected() &&
892 (!self.trusted_nodes_only || peer.is_trusted())
893 });
894
895 let mut best_peer = unconnected.next()?;
897
898 if best_peer.1.is_trusted() || best_peer.1.is_static() {
899 return Some((*best_peer.0, best_peer.1))
900 }
901
902 for maybe_better in unconnected {
903 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
905 return Some((*maybe_better.0, maybe_better.1))
906 }
907
908 if maybe_better.1.reputation > best_peer.1.reputation {
910 best_peer = maybe_better;
911 }
912 }
913 Some((*best_peer.0, best_peer.1))
914 }
915
916 fn fill_outbound_slots(&mut self) {
922 self.tick();
923
924 if !self.net_connection_state.is_active() {
925 return
927 }
928
929 while self.connection_info.has_out_capacity() {
931 let action = {
932 let (peer_id, peer) = match self.best_unconnected() {
933 Some(peer) => peer,
934 _ => break,
935 };
936
937 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
938
939 peer.state = PeerConnectionState::PendingOut;
940 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
941 };
942
943 self.connection_info.inc_pending_out();
944
945 self.queued_actions.push_back(action);
946 }
947 }
948
949 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
950 if let Some(peer) = self.peers.get_mut(&peer_id) {
951 let new_addr = PeerAddr::new_with_ports(
952 new_record.address,
953 new_record.tcp_port,
954 Some(new_record.udp_port),
955 );
956
957 if peer.addr != new_addr {
958 peer.addr = new_addr;
959 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
960 }
961 }
962 }
963
964 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
966 self.net_connection_state = state;
967 }
968
969 pub const fn connection_state(&self) -> &NetworkConnectionState {
971 &self.net_connection_state
972 }
973
974 pub const fn on_shutdown(&mut self) {
976 self.net_connection_state = NetworkConnectionState::ShuttingDown;
977 }
978
979 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
984 loop {
985 if let Some(action) = self.queued_actions.pop_front() {
987 return Poll::Ready(action)
988 }
989
990 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
991 match cmd {
992 PeerCommand::Add(peer_id, addr) => {
993 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
994 }
995 PeerCommand::Remove(peer) => self.remove_peer(peer),
996 PeerCommand::ReputationChange(peer_id, rep) => {
997 self.apply_reputation_change(&peer_id, rep)
998 }
999 PeerCommand::GetPeer(peer, tx) => {
1000 let _ = tx.send(self.peers.get(&peer).cloned());
1001 }
1002 PeerCommand::GetPeers(tx) => {
1003 let _ = tx.send(self.iter_peers().collect());
1004 }
1005 }
1006 }
1007
1008 if self.release_interval.poll_tick(cx).is_ready() {
1009 let now = std::time::Instant::now();
1010 let (_, unbanned_peers) = self.ban_list.evict(now);
1011
1012 for peer_id in unbanned_peers {
1013 if let Some(peer) = self.peers.get_mut(&peer_id) {
1014 peer.unban();
1015 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1016 }
1017 }
1018
1019 self.backed_off_peers.retain(|peer_id, until| {
1022 if now > *until {
1023 if let Some(peer) = self.peers.get_mut(peer_id) {
1024 peer.backed_off = false;
1025 }
1026 return false
1027 }
1028 true
1029 })
1030 }
1031
1032 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1033 self.fill_outbound_slots();
1034 }
1035
1036 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1037 self.on_resolved_peer(peer_id, new_record);
1038 }
1039
1040 if self.queued_actions.is_empty() {
1041 return Poll::Pending
1042 }
1043 }
1044 }
1045}
1046
1047impl Default for PeersManager {
1048 fn default() -> Self {
1049 Self::new(Default::default())
1050 }
1051}
1052
1053#[derive(Debug, Clone, PartialEq, Eq, Default)]
1055pub struct ConnectionInfo {
1056 num_outbound: usize,
1058 num_pending_out: usize,
1060 num_inbound: usize,
1062 num_pending_in: usize,
1064 config: ConnectionsConfig,
1066}
1067
1068impl ConnectionInfo {
1071 const fn new(config: ConnectionsConfig) -> Self {
1073 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1074 }
1075
1076 const fn has_out_capacity(&self) -> bool {
1078 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1079 self.num_outbound < self.config.max_outbound
1080 }
1081
1082 const fn has_in_capacity(&self) -> bool {
1084 self.num_inbound < self.config.max_inbound
1085 }
1086
1087 const fn has_in_pending_capacity(&self) -> bool {
1089 self.num_pending_in < self.config.max_inbound
1090 }
1091
1092 const fn decr_state(&mut self, state: PeerConnectionState) {
1093 match state {
1094 PeerConnectionState::Idle => {}
1095 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1096 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1097 PeerConnectionState::PendingOut => self.decr_pending_out(),
1098 }
1099 }
1100
1101 const fn decr_out(&mut self) {
1102 self.num_outbound -= 1;
1103 }
1104
1105 const fn inc_out(&mut self) {
1106 self.num_outbound += 1;
1107 }
1108
1109 const fn inc_pending_out(&mut self) {
1110 self.num_pending_out += 1;
1111 }
1112
1113 const fn inc_in(&mut self) {
1114 self.num_inbound += 1;
1115 }
1116
1117 const fn inc_pending_in(&mut self) {
1118 self.num_pending_in += 1;
1119 }
1120
1121 const fn decr_in(&mut self) {
1122 self.num_inbound -= 1;
1123 }
1124
1125 const fn decr_pending_out(&mut self) {
1126 self.num_pending_out -= 1;
1127 }
1128
1129 const fn decr_pending_in(&mut self) {
1130 self.num_pending_in -= 1;
1131 }
1132}
1133
1134#[derive(Debug)]
1136pub enum PeerAction {
1137 Connect {
1139 peer_id: PeerId,
1141 remote_addr: SocketAddr,
1143 },
1144 Disconnect {
1146 peer_id: PeerId,
1148 reason: Option<DisconnectReason>,
1150 },
1151 DisconnectBannedIncoming {
1154 peer_id: PeerId,
1156 },
1157 DisconnectUntrustedIncoming {
1159 peer_id: PeerId,
1161 },
1162 DiscoveryBanPeerId {
1164 peer_id: PeerId,
1166 ip_addr: IpAddr,
1168 },
1169 DiscoveryBanIp {
1171 ip_addr: IpAddr,
1173 },
1174 BanPeer {
1176 peer_id: PeerId,
1178 },
1179 UnBanPeer {
1181 peer_id: PeerId,
1183 },
1184 PeerAdded(PeerId),
1186 PeerRemoved(PeerId),
1188}
1189
1190#[derive(Debug, Error, PartialEq, Eq)]
1192pub enum InboundConnectionError {
1193 IpBanned,
1195 ExceedsCapacity,
1197}
1198
1199impl Display for InboundConnectionError {
1200 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1201 write!(f, "{self:?}")
1202 }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use alloy_primitives::B512;
1208 use reth_eth_wire::{
1209 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1210 DisconnectReason,
1211 };
1212 use reth_net_banlist::BanList;
1213 use reth_network_api::Direction;
1214 use reth_network_peers::{PeerId, TrustedPeer};
1215 use reth_network_types::{
1216 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1217 };
1218 use std::{
1219 future::{poll_fn, Future},
1220 io,
1221 net::{IpAddr, Ipv4Addr, SocketAddr},
1222 pin::Pin,
1223 task::{Context, Poll},
1224 time::Duration,
1225 };
1226 use url::Host;
1227
1228 use super::PeersManager;
1229 use crate::{
1230 error::SessionError,
1231 peers::{
1232 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1233 PeerConnectionState,
1234 },
1235 session::PendingSessionHandshakeError,
1236 PeersConfig,
1237 };
1238
1239 struct PeerActionFuture<'a> {
1240 peers: &'a mut PeersManager,
1241 }
1242
1243 impl Future for PeerActionFuture<'_> {
1244 type Output = PeerAction;
1245
1246 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1247 self.get_mut().peers.poll(cx)
1248 }
1249 }
1250
1251 macro_rules! event {
1252 ($peers:expr) => {
1253 PeerActionFuture { peers: &mut $peers }.await
1254 };
1255 }
1256
1257 #[tokio::test]
1258 async fn test_insert() {
1259 let peer = PeerId::random();
1260 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1261 let mut peers = PeersManager::default();
1262 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1263
1264 match event!(peers) {
1265 PeerAction::PeerAdded(peer_id) => {
1266 assert_eq!(peer_id, peer);
1267 }
1268 _ => unreachable!(),
1269 }
1270 match event!(peers) {
1271 PeerAction::Connect { peer_id, remote_addr } => {
1272 assert_eq!(peer_id, peer);
1273 assert_eq!(remote_addr, socket_addr);
1274 }
1275 _ => unreachable!(),
1276 }
1277
1278 let (record, _) = peers.peer_by_id(peer).unwrap();
1279 assert_eq!(record.tcp_addr(), socket_addr);
1280 assert_eq!(record.udp_addr(), socket_addr);
1281 }
1282
1283 #[tokio::test]
1284 async fn test_insert_udp() {
1285 let peer = PeerId::random();
1286 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1287 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1288 let mut peers = PeersManager::default();
1289 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1290
1291 match event!(peers) {
1292 PeerAction::PeerAdded(peer_id) => {
1293 assert_eq!(peer_id, peer);
1294 }
1295 _ => unreachable!(),
1296 }
1297 match event!(peers) {
1298 PeerAction::Connect { peer_id, remote_addr } => {
1299 assert_eq!(peer_id, peer);
1300 assert_eq!(remote_addr, tcp_addr);
1301 }
1302 _ => unreachable!(),
1303 }
1304
1305 let (record, _) = peers.peer_by_id(peer).unwrap();
1306 assert_eq!(record.tcp_addr(), tcp_addr);
1307 assert_eq!(record.udp_addr(), udp_addr);
1308 }
1309
1310 #[tokio::test]
1311 async fn test_ban() {
1312 let peer = PeerId::random();
1313 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1314 let mut peers = PeersManager::default();
1315 peers.ban_peer(peer);
1316 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1317
1318 match event!(peers) {
1319 PeerAction::BanPeer { peer_id } => {
1320 assert_eq!(peer_id, peer);
1321 }
1322 _ => unreachable!(),
1323 }
1324
1325 poll_fn(|cx| {
1326 assert!(peers.poll(cx).is_pending());
1327 Poll::Ready(())
1328 })
1329 .await;
1330 }
1331
1332 #[tokio::test]
1333 async fn test_unban() {
1334 let peer = PeerId::random();
1335 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1336 let mut peers = PeersManager::default();
1337 peers.ban_peer(peer);
1338 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1339
1340 match event!(peers) {
1341 PeerAction::BanPeer { peer_id } => {
1342 assert_eq!(peer_id, peer);
1343 }
1344 _ => unreachable!(),
1345 }
1346
1347 poll_fn(|cx| {
1348 assert!(peers.poll(cx).is_pending());
1349 Poll::Ready(())
1350 })
1351 .await;
1352
1353 peers.unban_peer(peer);
1354
1355 match event!(peers) {
1356 PeerAction::UnBanPeer { peer_id } => {
1357 assert_eq!(peer_id, peer);
1358 }
1359 _ => unreachable!(),
1360 }
1361
1362 poll_fn(|cx| {
1363 assert!(peers.poll(cx).is_pending());
1364 Poll::Ready(())
1365 })
1366 .await;
1367 }
1368
1369 #[tokio::test]
1370 async fn test_backoff_on_busy() {
1371 let peer = PeerId::random();
1372 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1373
1374 let mut peers = PeersManager::new(PeersConfig::test());
1375 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1376
1377 match event!(peers) {
1378 PeerAction::PeerAdded(peer_id) => {
1379 assert_eq!(peer_id, peer);
1380 }
1381 _ => unreachable!(),
1382 }
1383 match event!(peers) {
1384 PeerAction::Connect { peer_id, .. } => {
1385 assert_eq!(peer_id, peer);
1386 }
1387 _ => unreachable!(),
1388 }
1389
1390 poll_fn(|cx| {
1391 assert!(peers.poll(cx).is_pending());
1392 Poll::Ready(())
1393 })
1394 .await;
1395
1396 peers.on_active_session_dropped(
1397 &socket_addr,
1398 &peer,
1399 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1400 DisconnectReason::TooManyPeers,
1401 )),
1402 );
1403
1404 poll_fn(|cx| {
1405 assert!(peers.poll(cx).is_pending());
1406 Poll::Ready(())
1407 })
1408 .await;
1409
1410 assert!(peers.backed_off_peers.contains_key(&peer));
1411 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1412
1413 tokio::time::sleep(peers.backoff_durations.low).await;
1414
1415 match event!(peers) {
1416 PeerAction::Connect { peer_id, .. } => {
1417 assert_eq!(peer_id, peer);
1418 }
1419 _ => unreachable!(),
1420 }
1421
1422 assert!(!peers.backed_off_peers.contains_key(&peer));
1423 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1424 }
1425
1426 #[tokio::test]
1427 async fn test_backoff_on_no_response() {
1428 let peer = PeerId::random();
1429 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1430
1431 let backoff_durations = PeerBackoffDurations::test();
1432 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1433 let mut peers = PeersManager::new(config);
1434 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1435
1436 match event!(peers) {
1437 PeerAction::PeerAdded(peer_id) => {
1438 assert_eq!(peer_id, peer);
1439 }
1440 _ => unreachable!(),
1441 }
1442 match event!(peers) {
1443 PeerAction::Connect { peer_id, .. } => {
1444 assert_eq!(peer_id, peer);
1445 }
1446 _ => unreachable!(),
1447 }
1448
1449 poll_fn(|cx| {
1450 assert!(peers.poll(cx).is_pending());
1451 Poll::Ready(())
1452 })
1453 .await;
1454
1455 peers.on_outgoing_pending_session_dropped(
1456 &socket_addr,
1457 &peer,
1458 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1459 EthHandshakeError::NoResponse,
1460 )),
1461 );
1462
1463 poll_fn(|cx| {
1464 assert!(peers.poll(cx).is_pending());
1465 Poll::Ready(())
1466 })
1467 .await;
1468
1469 assert!(peers.backed_off_peers.contains_key(&peer));
1470 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1471
1472 tokio::time::sleep(backoff_durations.high).await;
1473
1474 match event!(peers) {
1475 PeerAction::Connect { peer_id, .. } => {
1476 assert_eq!(peer_id, peer);
1477 }
1478 _ => unreachable!(),
1479 }
1480
1481 assert!(!peers.backed_off_peers.contains_key(&peer));
1482 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1483 }
1484
1485 #[tokio::test]
1486 async fn test_low_backoff() {
1487 let peer = PeerId::random();
1488 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1489 let config = PeersConfig::test();
1490 let mut peers = PeersManager::new(config);
1491 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1492 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1493
1494 let backoff_timestamp = peers
1495 .backoff_durations
1496 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1497
1498 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1499 assert!(backoff_timestamp <= expected);
1500 }
1501
1502 #[tokio::test]
1503 async fn test_multiple_backoff_calculations() {
1504 let peer = PeerId::random();
1505 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1506 let config = PeersConfig::default();
1507 let mut peers = PeersManager::new(config);
1508 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1509 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1510
1511 peer_struct.severe_backoff_counter = 1;
1513
1514 let now = std::time::Instant::now();
1515
1516 peer_struct.severe_backoff_counter += 1;
1518 let backoff_time = peers
1520 .backoff_durations
1521 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1522
1523 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1525
1526 assert!(backoff_time.duration_since(now) > backoff_duration);
1529 }
1530
1531 #[tokio::test]
1532 async fn test_ban_on_active_drop() {
1533 let peer = PeerId::random();
1534 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1535 let mut peers = PeersManager::default();
1536 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1537
1538 match event!(peers) {
1539 PeerAction::PeerAdded(peer_id) => {
1540 assert_eq!(peer_id, peer);
1541 }
1542 _ => unreachable!(),
1543 }
1544 match event!(peers) {
1545 PeerAction::Connect { peer_id, .. } => {
1546 assert_eq!(peer_id, peer);
1547 }
1548 _ => unreachable!(),
1549 }
1550
1551 poll_fn(|cx| {
1552 assert!(peers.poll(cx).is_pending());
1553 Poll::Ready(())
1554 })
1555 .await;
1556
1557 peers.on_active_session_dropped(
1558 &socket_addr,
1559 &peer,
1560 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1561 DisconnectReason::UselessPeer,
1562 )),
1563 );
1564
1565 match event!(peers) {
1566 PeerAction::PeerRemoved(peer_id) => {
1567 assert_eq!(peer_id, peer);
1568 }
1569 _ => unreachable!(),
1570 }
1571 match event!(peers) {
1572 PeerAction::BanPeer { peer_id } => {
1573 assert_eq!(peer_id, peer);
1574 }
1575 _ => unreachable!(),
1576 }
1577
1578 poll_fn(|cx| {
1579 assert!(peers.poll(cx).is_pending());
1580 Poll::Ready(())
1581 })
1582 .await;
1583
1584 assert!(!peers.peers.contains_key(&peer));
1585 }
1586
1587 #[tokio::test]
1588 async fn test_remove_on_max_backoff_count() {
1589 let peer = PeerId::random();
1590 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1591 let config = PeersConfig::test();
1592 let mut peers = PeersManager::new(config.clone());
1593 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1594 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1595
1596 peer_struct.severe_backoff_counter = config.max_backoff_count;
1598
1599 match event!(peers) {
1600 PeerAction::PeerAdded(peer_id) => {
1601 assert_eq!(peer_id, peer);
1602 }
1603 _ => unreachable!(),
1604 }
1605 match event!(peers) {
1606 PeerAction::Connect { peer_id, .. } => {
1607 assert_eq!(peer_id, peer);
1608 }
1609 _ => unreachable!(),
1610 }
1611
1612 poll_fn(|cx| {
1613 assert!(peers.poll(cx).is_pending());
1614 Poll::Ready(())
1615 })
1616 .await;
1617
1618 peers.on_outgoing_pending_session_dropped(
1619 &socket_addr,
1620 &peer,
1621 &PendingSessionHandshakeError::Eth(
1622 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1623 ),
1624 );
1625
1626 match event!(peers) {
1627 PeerAction::PeerRemoved(peer_id) => {
1628 assert_eq!(peer_id, peer);
1629 }
1630 _ => unreachable!(),
1631 }
1632
1633 poll_fn(|cx| {
1634 assert!(peers.poll(cx).is_pending());
1635 Poll::Ready(())
1636 })
1637 .await;
1638
1639 assert!(!peers.peers.contains_key(&peer));
1640 }
1641
1642 #[tokio::test]
1643 async fn test_ban_on_pending_drop() {
1644 let peer = PeerId::random();
1645 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1646 let mut peers = PeersManager::default();
1647 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1648
1649 match event!(peers) {
1650 PeerAction::PeerAdded(peer_id) => {
1651 assert_eq!(peer_id, peer);
1652 }
1653 _ => unreachable!(),
1654 }
1655 match event!(peers) {
1656 PeerAction::Connect { peer_id, .. } => {
1657 assert_eq!(peer_id, peer);
1658 }
1659 _ => unreachable!(),
1660 }
1661
1662 poll_fn(|cx| {
1663 assert!(peers.poll(cx).is_pending());
1664 Poll::Ready(())
1665 })
1666 .await;
1667
1668 peers.on_outgoing_pending_session_dropped(
1669 &socket_addr,
1670 &peer,
1671 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1672 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1673 )),
1674 );
1675
1676 match event!(peers) {
1677 PeerAction::PeerRemoved(peer_id) => {
1678 assert_eq!(peer_id, peer);
1679 }
1680 _ => unreachable!(),
1681 }
1682 match event!(peers) {
1683 PeerAction::BanPeer { peer_id } => {
1684 assert_eq!(peer_id, peer);
1685 }
1686 _ => unreachable!(),
1687 }
1688
1689 poll_fn(|cx| {
1690 assert!(peers.poll(cx).is_pending());
1691 Poll::Ready(())
1692 })
1693 .await;
1694
1695 assert!(!peers.peers.contains_key(&peer));
1696 }
1697
1698 #[tokio::test]
1699 async fn test_internally_closed_incoming() {
1700 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1701 let mut peers = PeersManager::default();
1702
1703 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1704 assert_eq!(peers.connection_info.num_pending_in, 1);
1705 peers.on_incoming_pending_session_rejected_internally();
1706 assert_eq!(peers.connection_info.num_pending_in, 0);
1707 }
1708
1709 #[tokio::test]
1710 async fn test_reject_incoming_at_pending_capacity() {
1711 let mut peers = PeersManager::default();
1712
1713 for count in 1..=peers.connection_info.config.max_inbound {
1714 let socket_addr =
1715 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1716 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1717 assert_eq!(peers.connection_info.num_pending_in, count);
1718 }
1719 assert!(peers.connection_info.has_in_capacity());
1720 assert!(!peers.connection_info.has_in_pending_capacity());
1721
1722 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1723 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1724 }
1725
1726 #[tokio::test]
1727 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1728 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1729 let trusted = PeerId::random();
1730 peers.add_trusted_peer_id(trusted);
1731
1732 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1734 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1735 peers.on_incoming_session_established(trusted, addr);
1736
1737 match event!(peers) {
1738 PeerAction::PeerAdded(id) => {
1739 assert_eq!(id, trusted);
1740 }
1741 _ => unreachable!(),
1742 }
1743
1744 let mut connected_untrusted_peer_ids = Vec::new();
1746 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1747 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1748 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1749 let peer_id = PeerId::random();
1750 peers.on_incoming_session_established(peer_id, addr);
1751 connected_untrusted_peer_ids.push(peer_id);
1752
1753 match event!(peers) {
1754 PeerAction::PeerAdded(id) => {
1755 assert_eq!(id, peer_id);
1756 }
1757 _ => unreachable!(),
1758 }
1759 }
1760
1761 let mut pending_addrs = Vec::new();
1762
1763 for i in 0..2 {
1765 let socket_addr =
1766 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1767 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1768
1769 pending_addrs.push(socket_addr);
1770 }
1771
1772 assert_eq!(peers.connection_info.num_pending_in, 2);
1773
1774 for i in 0..2 {
1776 let socket_addr =
1777 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1778 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1779 }
1780
1781 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1782 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1783 DisconnectReason::UselessPeer,
1784 )),
1785 ));
1786
1787 for pending_addr in pending_addrs {
1789 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1790 }
1791
1792 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1793
1794 println!(
1795 "num_inbound: {}, has_in_capacity: {}",
1796 peers.connection_info.num_inbound,
1797 peers.connection_info.has_in_capacity()
1798 );
1799
1800 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1802
1803 println!(
1804 "num_inbound: {}, has_in_capacity: {}",
1805 peers.connection_info.num_inbound,
1806 peers.connection_info.has_in_capacity()
1807 );
1808
1809 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1810 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1811 }
1812
1813 #[tokio::test]
1814 async fn test_closed_incoming() {
1815 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1816 let mut peers = PeersManager::default();
1817
1818 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1819 assert_eq!(peers.connection_info.num_pending_in, 1);
1820 peers.on_incoming_pending_session_gracefully_closed();
1821 assert_eq!(peers.connection_info.num_pending_in, 0);
1822 }
1823
1824 #[tokio::test]
1825 async fn test_dropped_incoming() {
1826 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1827 let ban_duration = Duration::from_millis(500);
1828 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1829 let mut peers = PeersManager::new(config);
1830
1831 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1832 assert_eq!(peers.connection_info.num_pending_in, 1);
1833 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1834 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1835 DisconnectReason::UselessPeer,
1836 )),
1837 ));
1838
1839 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1840 assert_eq!(peers.connection_info.num_pending_in, 0);
1841 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1842
1843 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1844
1845 tokio::time::sleep(ban_duration).await;
1847
1848 poll_fn(|cx| {
1849 let _ = peers.poll(cx);
1850 Poll::Ready(())
1851 })
1852 .await;
1853
1854 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1855 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1856 }
1857
1858 #[tokio::test]
1859 async fn test_reputation_change_connected() {
1860 let peer = PeerId::random();
1861 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1862 let mut peers = PeersManager::default();
1863 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1864
1865 match event!(peers) {
1866 PeerAction::PeerAdded(peer_id) => {
1867 assert_eq!(peer_id, peer);
1868 }
1869 _ => unreachable!(),
1870 }
1871 match event!(peers) {
1872 PeerAction::Connect { peer_id, remote_addr } => {
1873 assert_eq!(peer_id, peer);
1874 assert_eq!(remote_addr, socket_addr);
1875 }
1876 _ => unreachable!(),
1877 }
1878
1879 let p = peers.peers.get_mut(&peer).unwrap();
1880 assert_eq!(p.state, PeerConnectionState::PendingOut);
1881
1882 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1883
1884 let p = peers.peers.get(&peer).unwrap();
1885 assert_eq!(p.state, PeerConnectionState::PendingOut);
1886 assert!(p.is_banned());
1887
1888 peers.on_active_session_gracefully_closed(peer);
1889
1890 let p = peers.peers.get(&peer).unwrap();
1891 assert_eq!(p.state, PeerConnectionState::Idle);
1892 assert!(p.is_banned());
1893
1894 match event!(peers) {
1895 PeerAction::Disconnect { peer_id, .. } => {
1896 assert_eq!(peer_id, peer);
1897 }
1898 _ => unreachable!(),
1899 }
1900 }
1901
1902 #[tokio::test]
1903 async fn accept_incoming_trusted_unknown_peer_address() {
1904 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1905 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1906 let trusted = PeerId::random();
1908 peers.add_trusted_peer_id(trusted);
1909
1910 for i in 0..peers.connection_info.config.max_inbound {
1912 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1913 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1914 let peer_id = PeerId::random();
1915 peers.on_incoming_session_established(peer_id, addr);
1916
1917 match event!(peers) {
1918 PeerAction::PeerAdded(id) => {
1919 assert_eq!(id, peer_id);
1920 }
1921 _ => unreachable!(),
1922 }
1923 }
1924
1925 let untrusted = PeerId::random();
1927 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1928 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1929 peers.on_incoming_session_established(untrusted, socket_addr);
1930
1931 match event!(peers) {
1932 PeerAction::PeerAdded(id) => {
1933 assert_eq!(id, untrusted);
1934 }
1935 _ => unreachable!(),
1936 }
1937
1938 match event!(peers) {
1939 PeerAction::Disconnect { peer_id, reason } => {
1940 assert_eq!(peer_id, untrusted);
1941 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1942 }
1943 _ => unreachable!(),
1944 }
1945
1946 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1947 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1948 peers.on_incoming_session_established(trusted, socket_addr);
1949
1950 match event!(peers) {
1951 PeerAction::PeerAdded(id) => {
1952 assert_eq!(id, trusted);
1953 }
1954 _ => unreachable!(),
1955 }
1956
1957 poll_fn(|cx| {
1958 assert!(peers.poll(cx).is_pending());
1959 Poll::Ready(())
1960 })
1961 .await;
1962
1963 let peer = peers.peers.get(&trusted).unwrap();
1964 assert_eq!(peer.state, PeerConnectionState::In);
1965 }
1966
1967 #[tokio::test]
1968 async fn test_already_connected() {
1969 let peer = PeerId::random();
1970 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1971 let mut peers = PeersManager::default();
1972
1973 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1975 assert_eq!(peers.connection_info.num_pending_in, 1);
1976
1977 peers.on_incoming_session_established(peer, socket_addr);
1980 let p = peers.peers.get_mut(&peer).expect("peer not found");
1981 assert_eq!(p.addr.tcp(), socket_addr);
1982 assert_eq!(peers.connection_info.num_pending_in, 0);
1983 assert_eq!(peers.connection_info.num_inbound, 1);
1984
1985 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1988 assert_eq!(peers.connection_info.num_pending_in, 1);
1989
1990 peers.on_already_connected(Direction::Incoming);
1994
1995 let p = peers.peers.get_mut(&peer).expect("peer not found");
1996 assert_eq!(p.addr.tcp(), socket_addr);
1997 assert_eq!(peers.connection_info.num_pending_in, 0);
1998 assert_eq!(peers.connection_info.num_inbound, 1);
1999 }
2000
2001 #[tokio::test]
2002 async fn test_reputation_change_trusted_peer() {
2003 let peer = PeerId::random();
2004 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2005 let mut peers = PeersManager::default();
2006 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2007
2008 match event!(peers) {
2009 PeerAction::PeerAdded(peer_id) => {
2010 assert_eq!(peer_id, peer);
2011 }
2012 _ => unreachable!(),
2013 }
2014 match event!(peers) {
2015 PeerAction::Connect { peer_id, remote_addr } => {
2016 assert_eq!(peer_id, peer);
2017 assert_eq!(remote_addr, socket_addr);
2018 }
2019 _ => unreachable!(),
2020 }
2021
2022 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2023 peers.on_active_outgoing_established(peer);
2024 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2025
2026 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2027
2028 {
2029 let p = peers.peers.get(&peer).unwrap();
2030 assert_eq!(p.state, PeerConnectionState::Out);
2031 assert!(!p.is_banned());
2033 }
2034
2035 loop {
2037 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2038
2039 let p = peers.peers.get(&peer).unwrap();
2040 if p.is_banned() {
2041 break
2042 }
2043 }
2044
2045 match event!(peers) {
2046 PeerAction::Disconnect { peer_id, .. } => {
2047 assert_eq!(peer_id, peer);
2048 }
2049 _ => unreachable!(),
2050 }
2051 }
2052
2053 #[tokio::test]
2054 async fn test_reputation_management() {
2055 let peer = PeerId::random();
2056 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2057 let mut peers = PeersManager::default();
2058 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2059 assert_eq!(peers.get_reputation(&peer), Some(0));
2060
2061 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2062 assert_eq!(peers.get_reputation(&peer), Some(1024));
2063
2064 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2065 assert_eq!(peers.get_reputation(&peer), Some(0));
2066 }
2067
2068 #[tokio::test]
2069 async fn test_remove_discovered_active() {
2070 let peer = PeerId::random();
2071 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2072 let mut peers = PeersManager::default();
2073 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2074
2075 match event!(peers) {
2076 PeerAction::PeerAdded(peer_id) => {
2077 assert_eq!(peer_id, peer);
2078 }
2079 _ => unreachable!(),
2080 }
2081 match event!(peers) {
2082 PeerAction::Connect { peer_id, remote_addr } => {
2083 assert_eq!(peer_id, peer);
2084 assert_eq!(remote_addr, socket_addr);
2085 }
2086 _ => unreachable!(),
2087 }
2088
2089 let p = peers.peers.get(&peer).unwrap();
2090 assert_eq!(p.state, PeerConnectionState::PendingOut);
2091
2092 peers.remove_peer(peer);
2093
2094 match event!(peers) {
2095 PeerAction::PeerRemoved(peer_id) => {
2096 assert_eq!(peer_id, peer);
2097 }
2098 _ => unreachable!(),
2099 }
2100 match event!(peers) {
2101 PeerAction::Disconnect { peer_id, .. } => {
2102 assert_eq!(peer_id, peer);
2103 }
2104 _ => unreachable!(),
2105 }
2106
2107 let p = peers.peers.get(&peer).unwrap();
2108 assert_eq!(p.state, PeerConnectionState::PendingOut);
2109
2110 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2111 let p = peers.peers.get(&peer).unwrap();
2112 assert_eq!(p.state, PeerConnectionState::PendingOut);
2113
2114 peers.on_active_session_gracefully_closed(peer);
2115 assert!(!peers.peers.contains_key(&peer));
2116 }
2117
2118 #[tokio::test]
2119 async fn test_fatal_outgoing_connection_error_trusted() {
2120 let peer = PeerId::random();
2121 let config = PeersConfig::test()
2122 .with_trusted_nodes(vec![TrustedPeer {
2123 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2124 tcp_port: 8008,
2125 udp_port: 8008,
2126 id: peer,
2127 }])
2128 .with_trusted_nodes_only(true);
2129 let mut peers = PeersManager::new(config);
2130 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2131
2132 match event!(peers) {
2133 PeerAction::Connect { peer_id, remote_addr } => {
2134 assert_eq!(peer_id, peer);
2135 assert_eq!(remote_addr, socket_addr);
2136 }
2137 _ => unreachable!(),
2138 }
2139
2140 let p = peers.peers.get(&peer).unwrap();
2141 assert_eq!(p.state, PeerConnectionState::PendingOut);
2142
2143 assert_eq!(peers.num_outbound_connections(), 0);
2144
2145 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2146 EthHandshakeError::NonStatusMessageInHandshake,
2147 ));
2148 assert!(err.is_fatal_protocol_error());
2149
2150 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2151 assert_eq!(peers.num_outbound_connections(), 0);
2152
2153 match event!(peers) {
2155 PeerAction::BanPeer { peer_id } => {
2156 assert_eq!(peer_id, peer);
2157 }
2158 err => unreachable!("{err:?}"),
2159 }
2160
2161 assert!(peers.peers.contains_key(&peer));
2163
2164 tokio::time::sleep(peers.backoff_durations.medium).await;
2166
2167 match event!(peers) {
2168 PeerAction::Connect { peer_id, remote_addr } => {
2169 assert_eq!(peer_id, peer);
2170 assert_eq!(remote_addr, socket_addr);
2171 }
2172 err => unreachable!("{err:?}"),
2173 }
2174 }
2175
2176 #[tokio::test]
2177 async fn test_outgoing_connection_error() {
2178 let peer = PeerId::random();
2179 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2180 let mut peers = PeersManager::default();
2181 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2182
2183 match event!(peers) {
2184 PeerAction::PeerAdded(peer_id) => {
2185 assert_eq!(peer_id, peer);
2186 }
2187 _ => unreachable!(),
2188 }
2189 match event!(peers) {
2190 PeerAction::Connect { peer_id, remote_addr } => {
2191 assert_eq!(peer_id, peer);
2192 assert_eq!(remote_addr, socket_addr);
2193 }
2194 _ => unreachable!(),
2195 }
2196
2197 let p = peers.peers.get(&peer).unwrap();
2198 assert_eq!(p.state, PeerConnectionState::PendingOut);
2199
2200 assert_eq!(peers.num_outbound_connections(), 0);
2201
2202 peers.on_outgoing_connection_failure(
2203 &socket_addr,
2204 &peer,
2205 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2206 );
2207
2208 assert_eq!(peers.num_outbound_connections(), 0);
2209 }
2210
2211 #[tokio::test]
2212 async fn test_outgoing_connection_gracefully_closed() {
2213 let peer = PeerId::random();
2214 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2215 let mut peers = PeersManager::default();
2216 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2217
2218 match event!(peers) {
2219 PeerAction::PeerAdded(peer_id) => {
2220 assert_eq!(peer_id, peer);
2221 }
2222 _ => unreachable!(),
2223 }
2224 match event!(peers) {
2225 PeerAction::Connect { peer_id, remote_addr } => {
2226 assert_eq!(peer_id, peer);
2227 assert_eq!(remote_addr, socket_addr);
2228 }
2229 _ => unreachable!(),
2230 }
2231
2232 let p = peers.peers.get(&peer).unwrap();
2233 assert_eq!(p.state, PeerConnectionState::PendingOut);
2234
2235 assert_eq!(peers.num_outbound_connections(), 0);
2236
2237 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2238
2239 assert_eq!(peers.num_outbound_connections(), 0);
2240 assert_eq!(peers.connection_info.num_pending_out, 0);
2241 }
2242
2243 #[tokio::test]
2244 async fn test_discovery_ban_list() {
2245 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2246 let socket_addr = SocketAddr::new(ip, 8008);
2247 let ban_list = BanList::new(vec![], vec![ip]);
2248 let config = PeersConfig::default().with_ban_list(ban_list);
2249 let mut peer_manager = PeersManager::new(config);
2250 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2251
2252 assert!(peer_manager.peers.is_empty());
2253 }
2254
2255 #[tokio::test]
2256 async fn test_on_pending_ban_list() {
2257 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2258 let socket_addr = SocketAddr::new(ip, 8008);
2259 let ban_list = BanList::new(vec![], vec![ip]);
2260 let config = PeersConfig::test().with_ban_list(ban_list);
2261 let mut peer_manager = PeersManager::new(config);
2262 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2263 match a {
2265 Ok(_) => panic!(),
2266 Err(err) => match err {
2267 InboundConnectionError::IpBanned => {
2268 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2269 }
2270 _ => unreachable!(),
2271 },
2272 }
2273 }
2274
2275 #[tokio::test]
2276 async fn test_on_active_inbound_ban_list() {
2277 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2278 let socket_addr = SocketAddr::new(ip, 8008);
2279 let given_peer_id = PeerId::random();
2280 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2281 let config = PeersConfig::test().with_ban_list(ban_list);
2282 let mut peer_manager = PeersManager::new(config);
2283 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2284 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2286 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2287 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2290 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2291
2292 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2293 peer_manager.queued_actions.pop_front()
2294 else {
2295 panic!()
2296 };
2297
2298 assert_eq!(peer_id, given_peer_id)
2299 }
2300
2301 #[test]
2302 fn test_connection_limits() {
2303 let mut info = ConnectionInfo::default();
2304 info.inc_in();
2305 assert_eq!(info.num_inbound, 1);
2306 assert_eq!(info.num_outbound, 0);
2307 assert!(info.has_in_capacity());
2308
2309 info.decr_in();
2310 assert_eq!(info.num_inbound, 0);
2311 assert_eq!(info.num_outbound, 0);
2312
2313 info.inc_out();
2314 assert_eq!(info.num_inbound, 0);
2315 assert_eq!(info.num_outbound, 1);
2316 assert!(info.has_out_capacity());
2317
2318 info.decr_out();
2319 assert_eq!(info.num_inbound, 0);
2320 assert_eq!(info.num_outbound, 0);
2321 }
2322
2323 #[test]
2324 fn test_connection_peer_state() {
2325 let mut info = ConnectionInfo::default();
2326 info.inc_in();
2327
2328 info.decr_state(PeerConnectionState::In);
2329 assert_eq!(info.num_inbound, 0);
2330 assert_eq!(info.num_outbound, 0);
2331
2332 info.inc_out();
2333
2334 info.decr_state(PeerConnectionState::Out);
2335 assert_eq!(info.num_inbound, 0);
2336 assert_eq!(info.num_outbound, 0);
2337 }
2338
2339 #[tokio::test]
2340 async fn test_trusted_peers_are_prioritized() {
2341 let trusted_peer = PeerId::random();
2342 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2343 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2344 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2345 tcp_port: 8008,
2346 udp_port: 8008,
2347 id: trusted_peer,
2348 }]);
2349 let mut peers = PeersManager::new(config);
2350
2351 let basic_peer = PeerId::random();
2352 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2353 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2354
2355 match event!(peers) {
2356 PeerAction::PeerAdded(peer_id) => {
2357 assert_eq!(peer_id, basic_peer);
2358 }
2359 _ => unreachable!(),
2360 }
2361 match event!(peers) {
2362 PeerAction::Connect { peer_id, remote_addr } => {
2363 assert_eq!(peer_id, trusted_peer);
2364 assert_eq!(remote_addr, trusted_sock);
2365 }
2366 _ => unreachable!(),
2367 }
2368 match event!(peers) {
2369 PeerAction::Connect { peer_id, remote_addr } => {
2370 assert_eq!(peer_id, basic_peer);
2371 assert_eq!(remote_addr, basic_sock);
2372 }
2373 _ => unreachable!(),
2374 }
2375 }
2376
2377 #[tokio::test]
2378 async fn test_connect_trusted_nodes_only() {
2379 let trusted_peer = PeerId::random();
2380 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2381 let config = PeersConfig::test()
2382 .with_trusted_nodes(vec![TrustedPeer {
2383 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2384 tcp_port: 8008,
2385 udp_port: 8008,
2386 id: trusted_peer,
2387 }])
2388 .with_trusted_nodes_only(true);
2389 let mut peers = PeersManager::new(config);
2390
2391 let basic_peer = PeerId::random();
2392 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2393 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2394
2395 match event!(peers) {
2396 PeerAction::PeerAdded(peer_id) => {
2397 assert_eq!(peer_id, basic_peer);
2398 }
2399 _ => unreachable!(),
2400 }
2401 match event!(peers) {
2402 PeerAction::Connect { peer_id, remote_addr } => {
2403 assert_eq!(peer_id, trusted_peer);
2404 assert_eq!(remote_addr, trusted_sock);
2405 }
2406 _ => unreachable!(),
2407 }
2408 poll_fn(|cx| {
2409 assert!(peers.poll(cx).is_pending());
2410 Poll::Ready(())
2411 })
2412 .await;
2413 }
2414
2415 #[tokio::test]
2416 async fn test_incoming_with_trusted_nodes_only() {
2417 let trusted_peer = PeerId::random();
2418 let config = PeersConfig::test()
2419 .with_trusted_nodes(vec![TrustedPeer {
2420 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2421 tcp_port: 8008,
2422 udp_port: 8008,
2423 id: trusted_peer,
2424 }])
2425 .with_trusted_nodes_only(true);
2426 let mut peers = PeersManager::new(config);
2427
2428 let basic_peer = PeerId::random();
2429 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2430 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2431 assert_eq!(peers.connection_info.num_pending_in, 1);
2433 peers.on_incoming_session_established(basic_peer, basic_sock);
2434 assert_eq!(peers.connection_info.num_pending_in, 0);
2437 assert_eq!(peers.connection_info.num_inbound, 0);
2438
2439 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2440 peers.queued_actions.pop_front()
2441 else {
2442 panic!()
2443 };
2444 assert_eq!(basic_peer, peer_id);
2445 assert!(!peers.peers.contains_key(&basic_peer));
2446 }
2447
2448 #[tokio::test]
2449 async fn test_incoming_without_trusted_nodes_only() {
2450 let trusted_peer = PeerId::random();
2451 let config = PeersConfig::test()
2452 .with_trusted_nodes(vec![TrustedPeer {
2453 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2454 tcp_port: 8008,
2455 udp_port: 8008,
2456 id: trusted_peer,
2457 }])
2458 .with_trusted_nodes_only(false);
2459 let mut peers = PeersManager::new(config);
2460
2461 let basic_peer = PeerId::random();
2462 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2463 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2464
2465 assert_eq!(peers.connection_info.num_pending_in, 1);
2467 peers.on_incoming_session_established(basic_peer, basic_sock);
2468 assert_eq!(peers.connection_info.num_pending_in, 0);
2471 assert_eq!(peers.connection_info.num_inbound, 1);
2472 assert!(peers.peers.contains_key(&basic_peer));
2473 }
2474
2475 #[tokio::test]
2476 async fn test_incoming_at_capacity() {
2477 let mut config = PeersConfig::test();
2478 config.connection_info.max_inbound = 1;
2479 let mut peers = PeersManager::new(config);
2480
2481 let peer = PeerId::random();
2482 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2483 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2484
2485 peers.on_incoming_session_established(peer, addr);
2486
2487 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2488 assert_eq!(
2489 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2490 InboundConnectionError::ExceedsCapacity
2491 );
2492 }
2493
2494 #[tokio::test]
2495 async fn test_incoming_rate_limit() {
2496 let config = PeersConfig {
2497 incoming_ip_throttle_duration: Duration::from_millis(100),
2498 ..PeersConfig::test()
2499 };
2500 let mut peers = PeersManager::new(config);
2501
2502 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2503 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2504 assert_eq!(
2505 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2506 InboundConnectionError::IpBanned
2507 );
2508
2509 peers.release_interval.reset_immediately();
2510 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2511
2512 poll_fn(|cx| loop {
2514 if peers.poll(cx).is_pending() {
2515 return Poll::Ready(());
2516 }
2517 })
2518 .await;
2519
2520 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2521 assert_eq!(
2522 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2523 InboundConnectionError::IpBanned
2524 );
2525 }
2526
2527 #[tokio::test]
2528 async fn test_tick() {
2529 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2530 let socket_addr = SocketAddr::new(ip, 8008);
2531 let config = PeersConfig::test();
2532 let mut peer_manager = PeersManager::new(config);
2533 let peer_id = PeerId::random();
2534 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2535
2536 tokio::time::sleep(Duration::from_secs(1)).await;
2537 peer_manager.tick();
2538
2539 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2541
2542 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2544
2545 tokio::time::sleep(Duration::from_secs(1)).await;
2546 peer_manager.tick();
2547
2548 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2550
2551 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2552
2553 tokio::time::sleep(Duration::from_secs(1)).await;
2554 peer_manager.tick();
2555
2556 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2558 }
2559
2560 #[tokio::test]
2561 async fn test_remove_incoming_after_disconnect() {
2562 let peer_id = PeerId::random();
2563 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2564 let mut peers = PeersManager::default();
2565
2566 peers.on_incoming_pending_session(addr.ip()).unwrap();
2567 peers.on_incoming_session_established(peer_id, addr);
2568 let peer = peers.peers.get(&peer_id).unwrap();
2569 assert_eq!(peer.state, PeerConnectionState::In);
2570 assert!(peer.remove_after_disconnect);
2571
2572 peers.on_active_session_gracefully_closed(peer_id);
2573 assert!(!peers.peers.contains_key(&peer_id))
2574 }
2575
2576 #[tokio::test]
2577 async fn test_keep_incoming_after_disconnect_if_discovered() {
2578 let peer_id = PeerId::random();
2579 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2580 let mut peers = PeersManager::default();
2581
2582 peers.on_incoming_pending_session(addr.ip()).unwrap();
2583 peers.on_incoming_session_established(peer_id, addr);
2584 let peer = peers.peers.get(&peer_id).unwrap();
2585 assert_eq!(peer.state, PeerConnectionState::In);
2586 assert!(peer.remove_after_disconnect);
2587
2588 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2590
2591 peers.on_active_session_gracefully_closed(peer_id);
2592
2593 let peer = peers.peers.get(&peer_id).unwrap();
2594 assert_eq!(peer.state, PeerConnectionState::Idle);
2595 assert!(!peer.remove_after_disconnect);
2596 }
2597
2598 #[tokio::test]
2599 async fn test_incoming_outgoing_already_connected() {
2600 let peer_id = PeerId::random();
2601 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2602 let mut peers = PeersManager::default();
2603
2604 peers.on_incoming_pending_session(addr.ip()).unwrap();
2605 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2606
2607 match event!(peers) {
2608 PeerAction::PeerAdded(_) => {}
2609 _ => unreachable!(),
2610 }
2611
2612 match event!(peers) {
2613 PeerAction::Connect { .. } => {}
2614 _ => unreachable!(),
2615 }
2616
2617 peers.on_incoming_session_established(peer_id, addr);
2618 peers.on_already_connected(Direction::Outgoing(peer_id));
2619 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2620 assert_eq!(peers.connection_info.num_inbound, 1);
2621 assert_eq!(peers.connection_info.num_pending_out, 0);
2622 assert_eq!(peers.connection_info.num_pending_in, 0);
2623 assert_eq!(peers.connection_info.num_outbound, 0);
2624 }
2625
2626 #[tokio::test]
2627 async fn test_already_connected_incoming_outgoing_connection_error() {
2628 let peer_id = PeerId::random();
2629 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2630 let mut peers = PeersManager::default();
2631
2632 peers.on_incoming_pending_session(addr.ip()).unwrap();
2633 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2634
2635 match event!(peers) {
2636 PeerAction::PeerAdded(_) => {}
2637 _ => unreachable!(),
2638 }
2639
2640 match event!(peers) {
2641 PeerAction::Connect { .. } => {}
2642 _ => unreachable!(),
2643 }
2644
2645 peers.on_incoming_session_established(peer_id, addr);
2646
2647 peers.on_outgoing_connection_failure(
2648 &addr,
2649 &peer_id,
2650 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2651 );
2652 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2653 assert_eq!(peers.connection_info.num_inbound, 1);
2654 assert_eq!(peers.connection_info.num_pending_out, 0);
2655 assert_eq!(peers.connection_info.num_pending_in, 0);
2656 assert_eq!(peers.connection_info.num_outbound, 0);
2657 }
2658
2659 #[tokio::test]
2660 async fn test_max_concurrent_dials() {
2661 let config = PeersConfig::default();
2662 let mut peer_manager = PeersManager::new(config);
2663 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2664 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2665 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2666 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2667 }
2668
2669 peer_manager.fill_outbound_slots();
2670 let dials = peer_manager
2671 .queued_actions
2672 .iter()
2673 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2674 .count();
2675 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2676 }
2677
2678 #[tokio::test]
2679 async fn test_max_num_of_pending_dials() {
2680 let config = PeersConfig::default();
2681 let mut peer_manager = PeersManager::new(config);
2682 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2683 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2684
2685 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2687 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2688 }
2689
2690 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2691 match event!(peer_manager) {
2692 PeerAction::PeerAdded(_) => {}
2693 _ => unreachable!(),
2694 }
2695 }
2696
2697 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2698 match event!(peer_manager) {
2699 PeerAction::Connect { .. } => {}
2700 _ => unreachable!(),
2701 }
2702 }
2703
2704 peer_manager.fill_outbound_slots();
2706
2707 let dials = peer_manager.connection_info.num_pending_out;
2709 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2710
2711 let num_pendingout_states = peer_manager
2712 .peers
2713 .iter()
2714 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2715 .map(|(peer_id, _)| *peer_id)
2716 .collect::<Vec<PeerId>>();
2717 assert_eq!(
2718 num_pendingout_states.len(),
2719 peer_manager.connection_info.config.max_concurrent_outbound_dials
2720 );
2721
2722 for peer_id in &num_pendingout_states {
2724 peer_manager.on_active_outgoing_established(*peer_id);
2725 }
2726
2727 for peer_id in &num_pendingout_states {
2729 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2730 }
2731
2732 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2734 }
2735
2736 #[tokio::test]
2737 async fn test_connect() {
2738 let peer = PeerId::random();
2739 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2740 let mut peers = PeersManager::default();
2741 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2742 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2743
2744 match event!(peers) {
2745 PeerAction::Connect { peer_id, remote_addr } => {
2746 assert_eq!(peer_id, peer);
2747 assert_eq!(remote_addr, socket_addr);
2748 }
2749 _ => unreachable!(),
2750 }
2751
2752 let (record, _) = peers.peer_by_id(peer).unwrap();
2753 assert_eq!(record.tcp_addr(), socket_addr);
2754 assert_eq!(record.udp_addr(), socket_addr);
2755
2756 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2758
2759 let (record, _) = peers.peer_by_id(peer).unwrap();
2760 assert_eq!(record.tcp_addr(), socket_addr);
2761 assert_eq!(record.udp_addr(), socket_addr);
2762 }
2763
2764 #[tokio::test]
2765 async fn test_incoming_connection_from_banned() {
2766 let peer = PeerId::random();
2767 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2768 let config = PeersConfig::test().with_max_inbound(3);
2769 let mut peers = PeersManager::new(config);
2770 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2771
2772 match event!(peers) {
2773 PeerAction::PeerAdded(peer_id) => {
2774 assert_eq!(peer_id, peer);
2775 }
2776 _ => unreachable!(),
2777 }
2778 match event!(peers) {
2779 PeerAction::Connect { peer_id, .. } => {
2780 assert_eq!(peer_id, peer);
2781 }
2782 _ => unreachable!(),
2783 }
2784
2785 poll_fn(|cx| {
2786 assert!(peers.poll(cx).is_pending());
2787 Poll::Ready(())
2788 })
2789 .await;
2790
2791 loop {
2793 peers.on_active_session_dropped(
2794 &socket_addr,
2795 &peer,
2796 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2797 reth_eth_wire::EthVersion::Eth68,
2798 reth_eth_wire::EthMessageID::Status,
2799 )),
2800 );
2801
2802 if peers.peers.get(&peer).unwrap().is_banned() {
2803 break;
2804 }
2805
2806 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2807 peers.on_incoming_session_established(peer, socket_addr);
2808
2809 match event!(peers) {
2810 PeerAction::Connect { peer_id, .. } => {
2811 assert_eq!(peer_id, peer);
2812 }
2813 _ => unreachable!(),
2814 }
2815 }
2816
2817 assert!(peers.peers.get(&peer).unwrap().is_banned());
2818
2819 for _ in 0..peers.connection_info.config.max_inbound {
2821 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2822 peers.on_incoming_session_established(peer, socket_addr);
2823
2824 match event!(peers) {
2825 PeerAction::DisconnectBannedIncoming { peer_id } => {
2826 assert_eq!(peer_id, peer);
2827 }
2828 _ => unreachable!(),
2829 }
2830 }
2831
2832 poll_fn(|cx| {
2833 assert!(peers.poll(cx).is_pending());
2834 Poll::Ready(())
2835 })
2836 .await;
2837
2838 assert_eq!(peers.connection_info.num_inbound, 0);
2839
2840 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2841
2842 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2844 assert_eq!(peers.connection_info.num_pending_in, 1);
2845
2846 peers.on_active_session_gracefully_closed(peer);
2850 assert_eq!(peers.connection_info.num_inbound, 0);
2851 }
2852
2853 #[tokio::test]
2854 async fn test_add_pending_connect() {
2855 let peer = PeerId::random();
2856 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2857 let mut peers = PeersManager::default();
2858 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2859 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2860 assert_eq!(peers.connection_info.num_pending_out, 1);
2861 }
2862
2863 #[tokio::test]
2864 async fn test_dns_updates_peer_address() {
2865 let peer_id = PeerId::random();
2866 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2867 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2868
2869 let trusted = TrustedPeer {
2870 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2871 tcp_port: 8008,
2872 udp_port: 8008,
2873 id: peer_id,
2874 };
2875
2876 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2877 let mut manager = PeersManager::new(config);
2878 manager
2879 .trusted_peers_resolver
2880 .set_interval(tokio::time::interval(Duration::from_millis(1)));
2881
2882 manager.peers.insert(
2883 peer_id,
2884 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2885 );
2886
2887 for _ in 0..100 {
2888 let _ = event!(manager);
2889 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2890 break;
2891 }
2892 tokio::time::sleep(Duration::from_millis(10)).await;
2893 }
2894
2895 let updated_peer = manager.peers.get(&peer_id).unwrap();
2896 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2897 }
2898}