1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 peers::{
18 config::PeerBackoffDurations,
19 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
20 },
21 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
22 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
23};
24use std::{
25 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
26 fmt::Display,
27 io::{self},
28 net::{IpAddr, SocketAddr},
29 task::{Context, Poll},
30 time::Duration,
31};
32use thiserror::Error;
33use tokio::{
34 sync::mpsc,
35 time::{Instant, Interval},
36};
37use tokio_stream::wrappers::UnboundedReceiverStream;
38use tracing::{trace, warn};
39
40#[derive(Debug)]
47pub struct PeersManager {
48 peers: HashMap<PeerId, Peer>,
50 trusted_peer_ids: HashSet<PeerId>,
55 trusted_peers_resolver: TrustedPeersResolver,
58 manager_tx: mpsc::UnboundedSender<PeerCommand>,
60 handle_rx: UnboundedReceiverStream<PeerCommand>,
62 queued_actions: VecDeque<PeerAction>,
64 refill_slots_interval: Interval,
66 reputation_weights: ReputationChangeWeights,
68 connection_info: ConnectionInfo,
70 ban_list: BanList,
72 backed_off_peers: HashMap<PeerId, std::time::Instant>,
74 release_interval: Interval,
76 ban_duration: Duration,
78 backoff_durations: PeerBackoffDurations,
81 trusted_nodes_only: bool,
84 last_tick: Instant,
86 max_backoff_count: u8,
88 net_connection_state: NetworkConnectionState,
90 incoming_ip_throttle_duration: Duration,
92}
93
94impl PeersManager {
95 pub fn new(config: PeersConfig) -> Self {
97 let PeersConfig {
98 refill_slots_interval,
99 connection_info,
100 reputation_weights,
101 ban_list,
102 ban_duration,
103 backoff_durations,
104 trusted_nodes,
105 trusted_nodes_only,
106 basic_nodes,
107 max_backoff_count,
108 incoming_ip_throttle_duration,
109 } = config;
110 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
111 let now = Instant::now();
112
113 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
115
116 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
117 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
118
119 for trusted_peer in &trusted_nodes {
120 match trusted_peer.resolve_blocking() {
121 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
122 trusted_peer_ids.insert(id);
123 peers.entry(id).or_insert_with(|| {
124 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
125 });
126 }
127 Err(err) => {
128 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
129 }
130 }
131 }
132
133 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
134 peers.entry(id).or_insert_with(|| {
135 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
136 });
137 }
138
139 Self {
140 peers,
141 trusted_peer_ids,
142 trusted_peers_resolver: TrustedPeersResolver::new(
143 trusted_nodes,
144 tokio::time::interval(Duration::from_secs(60 * 60)), ),
146 manager_tx,
147 handle_rx: UnboundedReceiverStream::new(handle_rx),
148 queued_actions: Default::default(),
149 reputation_weights,
150 refill_slots_interval: tokio::time::interval(refill_slots_interval),
151 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
152 connection_info: ConnectionInfo::new(connection_info),
153 ban_list,
154 backed_off_peers: Default::default(),
155 ban_duration,
156 backoff_durations,
157 trusted_nodes_only,
158 last_tick: Instant::now(),
159 max_backoff_count,
160 net_connection_state: NetworkConnectionState::default(),
161 incoming_ip_throttle_duration,
162 }
163 }
164
165 pub(crate) fn handle(&self) -> PeersHandle {
167 PeersHandle::new(self.manager_tx.clone())
168 }
169
170 #[inline]
172 pub(crate) fn num_known_peers(&self) -> usize {
173 self.peers.len()
174 }
175
176 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
178 self.peers.iter().map(|(peer_id, v)| {
179 NodeRecord::new_with_ports(
180 v.addr.tcp().ip(),
181 v.addr.tcp().port(),
182 v.addr.udp().map(|addr| addr.port()),
183 *peer_id,
184 )
185 })
186 }
187
188 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
190 self.peers.get(&peer_id).map(|v| {
191 (
192 NodeRecord::new_with_ports(
193 v.addr.tcp().ip(),
194 v.addr.tcp().port(),
195 v.addr.udp().map(|addr| addr.port()),
196 peer_id,
197 ),
198 v.kind,
199 )
200 })
201 }
202
203 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
205 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
206 }
207
208 #[inline]
210 pub(crate) const fn num_inbound_connections(&self) -> usize {
211 self.connection_info.num_inbound
212 }
213
214 #[inline]
216 pub(crate) const fn num_outbound_connections(&self) -> usize {
217 self.connection_info.num_outbound
218 }
219
220 #[inline]
222 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
223 self.connection_info.num_pending_out
224 }
225
226 #[inline]
228 pub(crate) fn num_backed_off_peers(&self) -> usize {
229 self.backed_off_peers.len()
230 }
231
232 fn num_idle_trusted_peers(&self) -> usize {
234 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
235 }
236
237 pub(crate) fn on_incoming_pending_session(
241 &mut self,
242 addr: IpAddr,
243 ) -> Result<(), InboundConnectionError> {
244 if self.ban_list.is_banned_ip(&addr) {
245 return Err(InboundConnectionError::IpBanned)
246 }
247
248 if !self.connection_info.has_in_capacity() {
250 if self.trusted_peer_ids.is_empty() {
251 return Err(InboundConnectionError::ExceedsCapacity)
254 }
255
256 let num_idle_trusted_peers = self.num_idle_trusted_peers();
260 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
261 let max_inbound =
263 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
264 if self.connection_info.num_pending_in < max_inbound {
265 self.connection_info.inc_pending_in();
266 return Ok(())
267 }
268 }
269
270 return Err(InboundConnectionError::ExceedsCapacity)
272 }
273
274 if !self.connection_info.has_in_pending_capacity() {
276 return Err(InboundConnectionError::ExceedsCapacity)
277 }
278
279 self.throttle_incoming_ip(addr);
281
282 self.connection_info.inc_pending_in();
283 Ok(())
284 }
285
286 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
289 self.connection_info.decr_pending_in();
290 }
291
292 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
294 self.connection_info.decr_pending_in()
295 }
296
297 pub(crate) fn on_incoming_pending_session_dropped(
299 &mut self,
300 remote_addr: SocketAddr,
301 err: &PendingSessionHandshakeError,
302 ) {
303 if err.is_fatal_protocol_error() {
304 self.ban_ip(remote_addr.ip());
305
306 if err.merits_discovery_ban() {
307 self.queued_actions
308 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
309 }
310 }
311
312 self.connection_info.decr_pending_in();
313 }
314
315 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
322 self.connection_info.decr_pending_in();
323
324 if self.ban_list.is_banned_peer(&peer_id) {
327 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
328 return
329 }
330
331 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
333 if self.trusted_nodes_only && !is_trusted {
334 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
335 return
336 }
337
338 self.tick();
340
341 match self.peers.entry(peer_id) {
342 Entry::Occupied(mut entry) => {
343 let peer = entry.get_mut();
344 if peer.is_banned() {
345 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
346 return
347 }
348 if peer.state.is_pending_out() {
351 self.connection_info.decr_state(peer.state);
352 }
353
354 peer.state = PeerConnectionState::In;
355
356 is_trusted = is_trusted || peer.is_trusted();
357 }
358 Entry::Vacant(entry) => {
359 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
362 peer.remove_after_disconnect = true;
363 entry.insert(peer);
364 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
365 }
366 }
367
368 let has_in_capacity = self.connection_info.has_in_capacity();
369 self.connection_info.inc_in();
371
372 if !is_trusted && !has_in_capacity {
374 self.queued_actions.push_back(PeerAction::Disconnect {
375 peer_id,
376 reason: Some(DisconnectReason::TooManyPeers),
377 });
378 }
379 }
380
381 fn ban_peer(&mut self, peer_id: PeerId) {
383 let mut ban_duration = self.ban_duration;
384 if let Some(peer) = self.peers.get(&peer_id) {
385 if peer.is_trusted() || peer.is_static() {
386 ban_duration = self.backoff_durations.low / 2;
389 }
390 }
391
392 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
393 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
394 }
395
396 fn ban_ip(&mut self, ip: IpAddr) {
398 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
399 }
400
401 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
403 self.ban_list
404 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
405 }
406
407 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
409 trace!(target: "net::peers", ?peer_id, "backing off");
410
411 if let Some(peer) = self.peers.get_mut(&peer_id) {
412 peer.backed_off = true;
413 self.backed_off_peers.insert(peer_id, until);
414 }
415 }
416
417 fn unban_peer(&mut self, peer_id: PeerId) {
419 self.ban_list.unban_peer(&peer_id);
420 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
421 }
422
423 fn tick(&mut self) {
428 let now = Instant::now();
429 let secs_since_last_tick =
433 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
434 self.last_tick = now;
435
436 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
438 if peer.1.reputation < DEFAULT_REPUTATION {
441 peer.1.reputation += secs_since_last_tick;
442 }
443 }
444 }
445
446 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
448 self.peers.get(peer_id).map(|peer| peer.reputation)
449 }
450
451 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
457 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
458 if rep.is_reset() {
460 peer.reset_reputation()
461 } else {
462 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
463 if peer.is_trusted() || peer.is_static() {
464 if matches!(
466 rep,
467 ReputationChangeKind::Dropped |
468 ReputationChangeKind::BadAnnouncement |
469 ReputationChangeKind::Timeout |
470 ReputationChangeKind::AlreadySeenTransaction
471 ) {
472 return
473 }
474
475 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
477 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
479 }
480 }
481 peer.apply_reputation(reputation_change)
482 }
483 } else {
484 return
485 };
486
487 match outcome {
488 ReputationChangeOutcome::None => {}
489 ReputationChangeOutcome::Ban => {
490 self.ban_peer(*peer_id);
491 }
492 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
493 ReputationChangeOutcome::DisconnectAndBan => {
494 self.queued_actions.push_back(PeerAction::Disconnect {
495 peer_id: *peer_id,
496 reason: Some(DisconnectReason::DisconnectRequested),
497 });
498 self.ban_peer(*peer_id);
499 }
500 }
501 }
502
503 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
505 if let Some(peer) = self.peers.get_mut(peer_id) {
506 self.connection_info.decr_state(peer.state);
507 peer.state = PeerConnectionState::Idle;
508 }
509 }
510
511 pub(crate) fn on_outgoing_pending_session_dropped(
514 &mut self,
515 remote_addr: &SocketAddr,
516 peer_id: &PeerId,
517 err: &PendingSessionHandshakeError,
518 ) {
519 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
520 }
521
522 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
524 match self.peers.entry(peer_id) {
525 Entry::Occupied(mut entry) => {
526 self.connection_info.decr_state(entry.get().state);
527
528 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
529 entry.remove();
531 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
532 } else {
533 entry.get_mut().severe_backoff_counter = 0;
537 entry.get_mut().state = PeerConnectionState::Idle;
538 return
539 }
540 }
541 Entry::Vacant(_) => return,
542 }
543
544 self.fill_outbound_slots();
545 }
546
547 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
549 if let Some(peer) = self.peers.get_mut(&peer_id) {
550 self.connection_info.decr_state(peer.state);
551 self.connection_info.inc_out();
552 peer.state = PeerConnectionState::Out;
553 }
554 }
555
556 pub(crate) fn on_active_session_dropped(
561 &mut self,
562 remote_addr: &SocketAddr,
563 peer_id: &PeerId,
564 err: &EthStreamError,
565 ) {
566 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
567 }
568
569 pub(crate) fn on_outgoing_connection_failure(
572 &mut self,
573 remote_addr: &SocketAddr,
574 peer_id: &PeerId,
575 err: &io::Error,
576 ) {
577 if let Some(peer) = self.peers.get(peer_id) {
581 if peer.state.is_incoming() {
582 return
584 }
585 }
586
587 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
588 }
589
590 fn on_connection_failure(
591 &mut self,
592 remote_addr: &SocketAddr,
593 peer_id: &PeerId,
594 err: impl SessionError,
595 reputation_change: ReputationChangeKind,
596 ) {
597 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
598
599 if err.is_fatal_protocol_error() {
600 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
601 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
604 self.connection_info.decr_state(entry.get().state);
605 if entry.get().is_trusted() {
607 entry.get_mut().state = PeerConnectionState::Idle;
608 } else {
609 entry.remove();
610 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
611 if err.merits_discovery_ban() {
613 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
614 peer_id: *peer_id,
615 ip_addr: remote_addr.ip(),
616 })
617 }
618 }
619 }
620
621 self.ban_peer(*peer_id);
623 } else {
624 let mut backoff_until = None;
625 let mut remove_peer = false;
626
627 if let Some(peer) = self.peers.get_mut(peer_id) {
628 if let Some(kind) = err.should_backoff() {
629 if peer.is_trusted() || peer.is_static() {
630 let backoff = self.backoff_durations.low / 2;
633 backoff_until = Some(std::time::Instant::now() + backoff);
634 } else {
635 if kind.is_severe() {
637 peer.severe_backoff_counter =
638 peer.severe_backoff_counter.saturating_add(1);
639 }
640
641 let backoff_time =
642 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
643
644 backoff_until = Some(backoff_time);
648 }
649 } else {
650 let reputation_change = self.reputation_weights.change(reputation_change);
652 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
653 };
654
655 self.connection_info.decr_state(peer.state);
656 peer.state = PeerConnectionState::Idle;
657
658 if peer.severe_backoff_counter > self.max_backoff_count &&
659 !peer.is_trusted() &&
660 !peer.is_static()
661 {
662 remove_peer = true;
665 }
666 }
667
668 if remove_peer {
670 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
671 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
672 } else if let Some(backoff_until) = backoff_until {
673 self.backoff_peer_until(*peer_id, backoff_until);
675 }
676 }
677
678 self.fill_outbound_slots();
679 }
680
681 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
687 match direction {
688 Direction::Incoming => {
689 self.connection_info.decr_pending_in();
691 }
692 Direction::Outgoing(_) => {
693 }
696 }
697 }
698
699 pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
704 if let Some(peer) = self.peers.get_mut(&peer_id) {
705 trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
706 peer.fork_id = Some(fork_id);
707 }
708 }
709
710 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
714 self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
715 }
716
717 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
719 self.trusted_peer_ids.insert(peer_id);
720 }
721
722 #[cfg_attr(not(test), expect(dead_code))]
726 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
727 self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
728 }
729
730 pub(crate) fn add_peer_kind(
734 &mut self,
735 peer_id: PeerId,
736 kind: PeerKind,
737 addr: PeerAddr,
738 fork_id: Option<ForkId>,
739 ) {
740 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
741 return
742 }
743
744 match self.peers.entry(peer_id) {
745 Entry::Occupied(mut entry) => {
746 let peer = entry.get_mut();
747 peer.kind = kind;
748 peer.fork_id = fork_id;
749 peer.addr = addr;
750
751 if peer.state.is_incoming() {
752 peer.remove_after_disconnect = false;
756 }
757 }
758 Entry::Vacant(entry) => {
759 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
760 let mut peer = Peer::with_kind(addr, kind);
761 peer.fork_id = fork_id;
762 entry.insert(peer);
763 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
764 }
765 }
766
767 if kind.is_trusted() {
768 self.trusted_peer_ids.insert(peer_id);
769 }
770 }
771
772 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
774 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
775 if entry.get().is_trusted() {
776 return
777 }
778 let mut peer = entry.remove();
779
780 trace!(target: "net::peers", ?peer_id, "remove discovered node");
781 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
782
783 if peer.state.is_connected() {
784 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
785 peer.remove_after_disconnect = true;
790 peer.state.disconnect();
791 self.peers.insert(peer_id, peer);
792 self.queued_actions.push_back(PeerAction::Disconnect {
793 peer_id,
794 reason: Some(DisconnectReason::DisconnectRequested),
795 })
796 }
797 }
798
799 #[cfg_attr(not(test), expect(dead_code))]
802 pub(crate) fn add_and_connect(
803 &mut self,
804 peer_id: PeerId,
805 addr: PeerAddr,
806 fork_id: Option<ForkId>,
807 ) {
808 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
809 }
810
811 pub(crate) fn add_and_connect_kind(
815 &mut self,
816 peer_id: PeerId,
817 kind: PeerKind,
818 addr: PeerAddr,
819 fork_id: Option<ForkId>,
820 ) {
821 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
822 return
823 }
824
825 match self.peers.entry(peer_id) {
826 Entry::Occupied(mut entry) => {
827 let peer = entry.get_mut();
828 peer.kind = kind;
829 peer.fork_id = fork_id;
830 peer.addr = addr;
831
832 if peer.state == PeerConnectionState::Idle {
833 peer.state = PeerConnectionState::PendingOut;
835 self.connection_info.inc_pending_out();
836 self.queued_actions
837 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
838 }
839 }
840 Entry::Vacant(entry) => {
841 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
842 let mut peer = Peer::with_kind(addr, kind);
843 peer.state = PeerConnectionState::PendingOut;
844 peer.fork_id = fork_id;
845 entry.insert(peer);
846 self.connection_info.inc_pending_out();
847 self.queued_actions
848 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
849 }
850 }
851
852 if kind.is_trusted() {
853 self.trusted_peer_ids.insert(peer_id);
854 }
855 }
856
857 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
859 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
860 if !entry.get().is_trusted() {
861 return
862 }
863
864 let peer = entry.get_mut();
865 peer.kind = PeerKind::Basic;
866
867 self.trusted_peer_ids.remove(&peer_id);
868 }
869
870 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
880 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
881 !peer.is_backed_off() &&
882 !peer.is_banned() &&
883 peer.state.is_unconnected() &&
884 (!self.trusted_nodes_only || peer.is_trusted())
885 });
886
887 let mut best_peer = unconnected.next()?;
889
890 if best_peer.1.is_trusted() || best_peer.1.is_static() {
891 return Some((*best_peer.0, best_peer.1))
892 }
893
894 for maybe_better in unconnected {
895 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
897 return Some((*maybe_better.0, maybe_better.1))
898 }
899
900 if maybe_better.1.reputation > best_peer.1.reputation {
902 best_peer = maybe_better;
903 }
904 }
905 Some((*best_peer.0, best_peer.1))
906 }
907
908 fn fill_outbound_slots(&mut self) {
914 self.tick();
915
916 if !self.net_connection_state.is_active() {
917 return
919 }
920
921 while self.connection_info.has_out_capacity() {
923 let action = {
924 let (peer_id, peer) = match self.best_unconnected() {
925 Some(peer) => peer,
926 _ => break,
927 };
928
929 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
930
931 peer.state = PeerConnectionState::PendingOut;
932 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
933 };
934
935 self.connection_info.inc_pending_out();
936
937 self.queued_actions.push_back(action);
938 }
939 }
940
941 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
942 if let Some(peer) = self.peers.get_mut(&peer_id) {
943 let new_addr = PeerAddr::new_with_ports(
944 new_record.address,
945 new_record.tcp_port,
946 Some(new_record.udp_port),
947 );
948
949 if peer.addr != new_addr {
950 peer.addr = new_addr;
951 trace!(target: "net::peers", ?peer_id, addre=?peer.addr, "Updated resolved trusted peer address");
952 }
953 }
954 }
955
956 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
958 self.net_connection_state = state;
959 }
960
961 pub const fn connection_state(&self) -> &NetworkConnectionState {
963 &self.net_connection_state
964 }
965
966 pub const fn on_shutdown(&mut self) {
968 self.net_connection_state = NetworkConnectionState::ShuttingDown;
969 }
970
971 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
976 loop {
977 if let Some(action) = self.queued_actions.pop_front() {
979 return Poll::Ready(action)
980 }
981
982 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
983 match cmd {
984 PeerCommand::Add(peer_id, addr) => {
985 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
986 }
987 PeerCommand::Remove(peer) => self.remove_peer(peer),
988 PeerCommand::ReputationChange(peer_id, rep) => {
989 self.apply_reputation_change(&peer_id, rep)
990 }
991 PeerCommand::GetPeer(peer, tx) => {
992 let _ = tx.send(self.peers.get(&peer).cloned());
993 }
994 PeerCommand::GetPeers(tx) => {
995 let _ = tx.send(self.iter_peers().collect());
996 }
997 }
998 }
999
1000 if self.release_interval.poll_tick(cx).is_ready() {
1001 let now = std::time::Instant::now();
1002 let (_, unbanned_peers) = self.ban_list.evict(now);
1003
1004 for peer_id in unbanned_peers {
1005 if let Some(peer) = self.peers.get_mut(&peer_id) {
1006 peer.unban();
1007 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1008 }
1009 }
1010
1011 self.backed_off_peers.retain(|peer_id, until| {
1014 if now > *until {
1015 if let Some(peer) = self.peers.get_mut(peer_id) {
1016 peer.backed_off = false;
1017 }
1018 return false
1019 }
1020 true
1021 })
1022 }
1023
1024 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1025 self.fill_outbound_slots();
1026 }
1027
1028 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1029 self.on_resolved_peer(peer_id, new_record);
1030 }
1031
1032 if self.queued_actions.is_empty() {
1033 return Poll::Pending
1034 }
1035 }
1036 }
1037}
1038
1039impl Default for PeersManager {
1040 fn default() -> Self {
1041 Self::new(Default::default())
1042 }
1043}
1044
1045#[derive(Debug, Clone, PartialEq, Eq, Default)]
1047pub struct ConnectionInfo {
1048 num_outbound: usize,
1050 num_pending_out: usize,
1052 num_inbound: usize,
1054 num_pending_in: usize,
1056 config: ConnectionsConfig,
1058}
1059
1060impl ConnectionInfo {
1063 const fn new(config: ConnectionsConfig) -> Self {
1065 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1066 }
1067
1068 const fn has_out_capacity(&self) -> bool {
1070 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1071 self.num_outbound < self.config.max_outbound
1072 }
1073
1074 const fn has_in_capacity(&self) -> bool {
1076 self.num_inbound < self.config.max_inbound
1077 }
1078
1079 const fn has_in_pending_capacity(&self) -> bool {
1081 self.num_pending_in < self.config.max_inbound
1082 }
1083
1084 const fn decr_state(&mut self, state: PeerConnectionState) {
1085 match state {
1086 PeerConnectionState::Idle => {}
1087 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1088 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1089 PeerConnectionState::PendingOut => self.decr_pending_out(),
1090 }
1091 }
1092
1093 const fn decr_out(&mut self) {
1094 self.num_outbound -= 1;
1095 }
1096
1097 const fn inc_out(&mut self) {
1098 self.num_outbound += 1;
1099 }
1100
1101 const fn inc_pending_out(&mut self) {
1102 self.num_pending_out += 1;
1103 }
1104
1105 const fn inc_in(&mut self) {
1106 self.num_inbound += 1;
1107 }
1108
1109 const fn inc_pending_in(&mut self) {
1110 self.num_pending_in += 1;
1111 }
1112
1113 const fn decr_in(&mut self) {
1114 self.num_inbound -= 1;
1115 }
1116
1117 const fn decr_pending_out(&mut self) {
1118 self.num_pending_out -= 1;
1119 }
1120
1121 const fn decr_pending_in(&mut self) {
1122 self.num_pending_in -= 1;
1123 }
1124}
1125
1126#[derive(Debug)]
1128pub enum PeerAction {
1129 Connect {
1131 peer_id: PeerId,
1133 remote_addr: SocketAddr,
1135 },
1136 Disconnect {
1138 peer_id: PeerId,
1140 reason: Option<DisconnectReason>,
1142 },
1143 DisconnectBannedIncoming {
1146 peer_id: PeerId,
1148 },
1149 DisconnectUntrustedIncoming {
1151 peer_id: PeerId,
1153 },
1154 DiscoveryBanPeerId {
1156 peer_id: PeerId,
1158 ip_addr: IpAddr,
1160 },
1161 DiscoveryBanIp {
1163 ip_addr: IpAddr,
1165 },
1166 BanPeer {
1168 peer_id: PeerId,
1170 },
1171 UnBanPeer {
1173 peer_id: PeerId,
1175 },
1176 PeerAdded(PeerId),
1178 PeerRemoved(PeerId),
1180}
1181
1182#[derive(Debug, Error, PartialEq, Eq)]
1184pub enum InboundConnectionError {
1185 IpBanned,
1187 ExceedsCapacity,
1189}
1190
1191impl Display for InboundConnectionError {
1192 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193 write!(f, "{self:?}")
1194 }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use alloy_primitives::B512;
1200 use reth_eth_wire::{
1201 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1202 DisconnectReason,
1203 };
1204 use reth_net_banlist::BanList;
1205 use reth_network_api::Direction;
1206 use reth_network_peers::{PeerId, TrustedPeer};
1207 use reth_network_types::{
1208 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1209 };
1210 use std::{
1211 future::{poll_fn, Future},
1212 io,
1213 net::{IpAddr, Ipv4Addr, SocketAddr},
1214 pin::Pin,
1215 task::{Context, Poll},
1216 time::Duration,
1217 };
1218 use url::Host;
1219
1220 use super::PeersManager;
1221 use crate::{
1222 error::SessionError,
1223 peers::{
1224 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1225 PeerConnectionState,
1226 },
1227 session::PendingSessionHandshakeError,
1228 PeersConfig,
1229 };
1230
1231 struct PeerActionFuture<'a> {
1232 peers: &'a mut PeersManager,
1233 }
1234
1235 impl Future for PeerActionFuture<'_> {
1236 type Output = PeerAction;
1237
1238 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1239 self.get_mut().peers.poll(cx)
1240 }
1241 }
1242
1243 macro_rules! event {
1244 ($peers:expr) => {
1245 PeerActionFuture { peers: &mut $peers }.await
1246 };
1247 }
1248
1249 #[tokio::test]
1250 async fn test_insert() {
1251 let peer = PeerId::random();
1252 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1253 let mut peers = PeersManager::default();
1254 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1255
1256 match event!(peers) {
1257 PeerAction::PeerAdded(peer_id) => {
1258 assert_eq!(peer_id, peer);
1259 }
1260 _ => unreachable!(),
1261 }
1262 match event!(peers) {
1263 PeerAction::Connect { peer_id, remote_addr } => {
1264 assert_eq!(peer_id, peer);
1265 assert_eq!(remote_addr, socket_addr);
1266 }
1267 _ => unreachable!(),
1268 }
1269
1270 let (record, _) = peers.peer_by_id(peer).unwrap();
1271 assert_eq!(record.tcp_addr(), socket_addr);
1272 assert_eq!(record.udp_addr(), socket_addr);
1273 }
1274
1275 #[tokio::test]
1276 async fn test_insert_udp() {
1277 let peer = PeerId::random();
1278 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1279 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1280 let mut peers = PeersManager::default();
1281 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1282
1283 match event!(peers) {
1284 PeerAction::PeerAdded(peer_id) => {
1285 assert_eq!(peer_id, peer);
1286 }
1287 _ => unreachable!(),
1288 }
1289 match event!(peers) {
1290 PeerAction::Connect { peer_id, remote_addr } => {
1291 assert_eq!(peer_id, peer);
1292 assert_eq!(remote_addr, tcp_addr);
1293 }
1294 _ => unreachable!(),
1295 }
1296
1297 let (record, _) = peers.peer_by_id(peer).unwrap();
1298 assert_eq!(record.tcp_addr(), tcp_addr);
1299 assert_eq!(record.udp_addr(), udp_addr);
1300 }
1301
1302 #[tokio::test]
1303 async fn test_ban() {
1304 let peer = PeerId::random();
1305 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1306 let mut peers = PeersManager::default();
1307 peers.ban_peer(peer);
1308 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1309
1310 match event!(peers) {
1311 PeerAction::BanPeer { peer_id } => {
1312 assert_eq!(peer_id, peer);
1313 }
1314 _ => unreachable!(),
1315 }
1316
1317 poll_fn(|cx| {
1318 assert!(peers.poll(cx).is_pending());
1319 Poll::Ready(())
1320 })
1321 .await;
1322 }
1323
1324 #[tokio::test]
1325 async fn test_unban() {
1326 let peer = PeerId::random();
1327 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1328 let mut peers = PeersManager::default();
1329 peers.ban_peer(peer);
1330 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1331
1332 match event!(peers) {
1333 PeerAction::BanPeer { peer_id } => {
1334 assert_eq!(peer_id, peer);
1335 }
1336 _ => unreachable!(),
1337 }
1338
1339 poll_fn(|cx| {
1340 assert!(peers.poll(cx).is_pending());
1341 Poll::Ready(())
1342 })
1343 .await;
1344
1345 peers.unban_peer(peer);
1346
1347 match event!(peers) {
1348 PeerAction::UnBanPeer { peer_id } => {
1349 assert_eq!(peer_id, peer);
1350 }
1351 _ => unreachable!(),
1352 }
1353
1354 poll_fn(|cx| {
1355 assert!(peers.poll(cx).is_pending());
1356 Poll::Ready(())
1357 })
1358 .await;
1359 }
1360
1361 #[tokio::test]
1362 async fn test_backoff_on_busy() {
1363 let peer = PeerId::random();
1364 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1365
1366 let mut peers = PeersManager::new(PeersConfig::test());
1367 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1368
1369 match event!(peers) {
1370 PeerAction::PeerAdded(peer_id) => {
1371 assert_eq!(peer_id, peer);
1372 }
1373 _ => unreachable!(),
1374 }
1375 match event!(peers) {
1376 PeerAction::Connect { peer_id, .. } => {
1377 assert_eq!(peer_id, peer);
1378 }
1379 _ => unreachable!(),
1380 }
1381
1382 poll_fn(|cx| {
1383 assert!(peers.poll(cx).is_pending());
1384 Poll::Ready(())
1385 })
1386 .await;
1387
1388 peers.on_active_session_dropped(
1389 &socket_addr,
1390 &peer,
1391 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1392 DisconnectReason::TooManyPeers,
1393 )),
1394 );
1395
1396 poll_fn(|cx| {
1397 assert!(peers.poll(cx).is_pending());
1398 Poll::Ready(())
1399 })
1400 .await;
1401
1402 assert!(peers.backed_off_peers.contains_key(&peer));
1403 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1404
1405 tokio::time::sleep(peers.backoff_durations.low).await;
1406
1407 match event!(peers) {
1408 PeerAction::Connect { peer_id, .. } => {
1409 assert_eq!(peer_id, peer);
1410 }
1411 _ => unreachable!(),
1412 }
1413
1414 assert!(!peers.backed_off_peers.contains_key(&peer));
1415 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1416 }
1417
1418 #[tokio::test]
1419 async fn test_backoff_on_no_response() {
1420 let peer = PeerId::random();
1421 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1422
1423 let backoff_durations = PeerBackoffDurations::test();
1424 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1425 let mut peers = PeersManager::new(config);
1426 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1427
1428 match event!(peers) {
1429 PeerAction::PeerAdded(peer_id) => {
1430 assert_eq!(peer_id, peer);
1431 }
1432 _ => unreachable!(),
1433 }
1434 match event!(peers) {
1435 PeerAction::Connect { peer_id, .. } => {
1436 assert_eq!(peer_id, peer);
1437 }
1438 _ => unreachable!(),
1439 }
1440
1441 poll_fn(|cx| {
1442 assert!(peers.poll(cx).is_pending());
1443 Poll::Ready(())
1444 })
1445 .await;
1446
1447 peers.on_outgoing_pending_session_dropped(
1448 &socket_addr,
1449 &peer,
1450 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1451 EthHandshakeError::NoResponse,
1452 )),
1453 );
1454
1455 poll_fn(|cx| {
1456 assert!(peers.poll(cx).is_pending());
1457 Poll::Ready(())
1458 })
1459 .await;
1460
1461 assert!(peers.backed_off_peers.contains_key(&peer));
1462 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1463
1464 tokio::time::sleep(backoff_durations.high).await;
1465
1466 match event!(peers) {
1467 PeerAction::Connect { peer_id, .. } => {
1468 assert_eq!(peer_id, peer);
1469 }
1470 _ => unreachable!(),
1471 }
1472
1473 assert!(!peers.backed_off_peers.contains_key(&peer));
1474 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1475 }
1476
1477 #[tokio::test]
1478 async fn test_low_backoff() {
1479 let peer = PeerId::random();
1480 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1481 let config = PeersConfig::test();
1482 let mut peers = PeersManager::new(config);
1483 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1484 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1485
1486 let backoff_timestamp = peers
1487 .backoff_durations
1488 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1489
1490 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1491 assert!(backoff_timestamp <= expected);
1492 }
1493
1494 #[tokio::test]
1495 async fn test_multiple_backoff_calculations() {
1496 let peer = PeerId::random();
1497 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1498 let config = PeersConfig::default();
1499 let mut peers = PeersManager::new(config);
1500 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1501 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1502
1503 peer_struct.severe_backoff_counter = 1;
1505
1506 let now = std::time::Instant::now();
1507
1508 peer_struct.severe_backoff_counter += 1;
1510 let backoff_time = peers
1512 .backoff_durations
1513 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1514
1515 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1517
1518 assert!(backoff_time.duration_since(now) > backoff_duration);
1521 }
1522
1523 #[tokio::test]
1524 async fn test_ban_on_active_drop() {
1525 let peer = PeerId::random();
1526 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1527 let mut peers = PeersManager::default();
1528 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1529
1530 match event!(peers) {
1531 PeerAction::PeerAdded(peer_id) => {
1532 assert_eq!(peer_id, peer);
1533 }
1534 _ => unreachable!(),
1535 }
1536 match event!(peers) {
1537 PeerAction::Connect { peer_id, .. } => {
1538 assert_eq!(peer_id, peer);
1539 }
1540 _ => unreachable!(),
1541 }
1542
1543 poll_fn(|cx| {
1544 assert!(peers.poll(cx).is_pending());
1545 Poll::Ready(())
1546 })
1547 .await;
1548
1549 peers.on_active_session_dropped(
1550 &socket_addr,
1551 &peer,
1552 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1553 DisconnectReason::UselessPeer,
1554 )),
1555 );
1556
1557 match event!(peers) {
1558 PeerAction::PeerRemoved(peer_id) => {
1559 assert_eq!(peer_id, peer);
1560 }
1561 _ => unreachable!(),
1562 }
1563 match event!(peers) {
1564 PeerAction::BanPeer { peer_id } => {
1565 assert_eq!(peer_id, peer);
1566 }
1567 _ => unreachable!(),
1568 }
1569
1570 poll_fn(|cx| {
1571 assert!(peers.poll(cx).is_pending());
1572 Poll::Ready(())
1573 })
1574 .await;
1575
1576 assert!(!peers.peers.contains_key(&peer));
1577 }
1578
1579 #[tokio::test]
1580 async fn test_remove_on_max_backoff_count() {
1581 let peer = PeerId::random();
1582 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1583 let config = PeersConfig::test();
1584 let mut peers = PeersManager::new(config.clone());
1585 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1586 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1587
1588 peer_struct.severe_backoff_counter = config.max_backoff_count;
1590
1591 match event!(peers) {
1592 PeerAction::PeerAdded(peer_id) => {
1593 assert_eq!(peer_id, peer);
1594 }
1595 _ => unreachable!(),
1596 }
1597 match event!(peers) {
1598 PeerAction::Connect { peer_id, .. } => {
1599 assert_eq!(peer_id, peer);
1600 }
1601 _ => unreachable!(),
1602 }
1603
1604 poll_fn(|cx| {
1605 assert!(peers.poll(cx).is_pending());
1606 Poll::Ready(())
1607 })
1608 .await;
1609
1610 peers.on_outgoing_pending_session_dropped(
1611 &socket_addr,
1612 &peer,
1613 &PendingSessionHandshakeError::Eth(
1614 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1615 ),
1616 );
1617
1618 match event!(peers) {
1619 PeerAction::PeerRemoved(peer_id) => {
1620 assert_eq!(peer_id, peer);
1621 }
1622 _ => unreachable!(),
1623 }
1624
1625 poll_fn(|cx| {
1626 assert!(peers.poll(cx).is_pending());
1627 Poll::Ready(())
1628 })
1629 .await;
1630
1631 assert!(!peers.peers.contains_key(&peer));
1632 }
1633
1634 #[tokio::test]
1635 async fn test_ban_on_pending_drop() {
1636 let peer = PeerId::random();
1637 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1638 let mut peers = PeersManager::default();
1639 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1640
1641 match event!(peers) {
1642 PeerAction::PeerAdded(peer_id) => {
1643 assert_eq!(peer_id, peer);
1644 }
1645 _ => unreachable!(),
1646 }
1647 match event!(peers) {
1648 PeerAction::Connect { peer_id, .. } => {
1649 assert_eq!(peer_id, peer);
1650 }
1651 _ => unreachable!(),
1652 }
1653
1654 poll_fn(|cx| {
1655 assert!(peers.poll(cx).is_pending());
1656 Poll::Ready(())
1657 })
1658 .await;
1659
1660 peers.on_outgoing_pending_session_dropped(
1661 &socket_addr,
1662 &peer,
1663 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1664 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1665 )),
1666 );
1667
1668 match event!(peers) {
1669 PeerAction::PeerRemoved(peer_id) => {
1670 assert_eq!(peer_id, peer);
1671 }
1672 _ => unreachable!(),
1673 }
1674 match event!(peers) {
1675 PeerAction::BanPeer { peer_id } => {
1676 assert_eq!(peer_id, peer);
1677 }
1678 _ => unreachable!(),
1679 }
1680
1681 poll_fn(|cx| {
1682 assert!(peers.poll(cx).is_pending());
1683 Poll::Ready(())
1684 })
1685 .await;
1686
1687 assert!(!peers.peers.contains_key(&peer));
1688 }
1689
1690 #[tokio::test]
1691 async fn test_internally_closed_incoming() {
1692 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1693 let mut peers = PeersManager::default();
1694
1695 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1696 assert_eq!(peers.connection_info.num_pending_in, 1);
1697 peers.on_incoming_pending_session_rejected_internally();
1698 assert_eq!(peers.connection_info.num_pending_in, 0);
1699 }
1700
1701 #[tokio::test]
1702 async fn test_reject_incoming_at_pending_capacity() {
1703 let mut peers = PeersManager::default();
1704
1705 for count in 1..=peers.connection_info.config.max_inbound {
1706 let socket_addr =
1707 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1708 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1709 assert_eq!(peers.connection_info.num_pending_in, count);
1710 }
1711 assert!(peers.connection_info.has_in_capacity());
1712 assert!(!peers.connection_info.has_in_pending_capacity());
1713
1714 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1715 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1716 }
1717
1718 #[tokio::test]
1719 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1720 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1721 let trusted = PeerId::random();
1722 peers.add_trusted_peer_id(trusted);
1723
1724 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1726 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1727 peers.on_incoming_session_established(trusted, addr);
1728
1729 match event!(peers) {
1730 PeerAction::PeerAdded(id) => {
1731 assert_eq!(id, trusted);
1732 }
1733 _ => unreachable!(),
1734 }
1735
1736 let mut connected_untrusted_peer_ids = Vec::new();
1738 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1739 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1740 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1741 let peer_id = PeerId::random();
1742 peers.on_incoming_session_established(peer_id, addr);
1743 connected_untrusted_peer_ids.push(peer_id);
1744
1745 match event!(peers) {
1746 PeerAction::PeerAdded(id) => {
1747 assert_eq!(id, peer_id);
1748 }
1749 _ => unreachable!(),
1750 }
1751 }
1752
1753 let mut pending_addrs = Vec::new();
1754
1755 for i in 0..2 {
1757 let socket_addr =
1758 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1759 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1760
1761 pending_addrs.push(socket_addr);
1762 }
1763
1764 assert_eq!(peers.connection_info.num_pending_in, 2);
1765
1766 for i in 0..2 {
1768 let socket_addr =
1769 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1770 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1771 }
1772
1773 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1774 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1775 DisconnectReason::UselessPeer,
1776 )),
1777 ));
1778
1779 for pending_addr in pending_addrs {
1781 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1782 }
1783
1784 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1785
1786 println!(
1787 "num_inbound: {}, has_in_capacity: {}",
1788 peers.connection_info.num_inbound,
1789 peers.connection_info.has_in_capacity()
1790 );
1791
1792 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1794
1795 println!(
1796 "num_inbound: {}, has_in_capacity: {}",
1797 peers.connection_info.num_inbound,
1798 peers.connection_info.has_in_capacity()
1799 );
1800
1801 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1802 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1803 }
1804
1805 #[tokio::test]
1806 async fn test_closed_incoming() {
1807 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1808 let mut peers = PeersManager::default();
1809
1810 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1811 assert_eq!(peers.connection_info.num_pending_in, 1);
1812 peers.on_incoming_pending_session_gracefully_closed();
1813 assert_eq!(peers.connection_info.num_pending_in, 0);
1814 }
1815
1816 #[tokio::test]
1817 async fn test_dropped_incoming() {
1818 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1819 let ban_duration = Duration::from_millis(500);
1820 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1821 let mut peers = PeersManager::new(config);
1822
1823 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1824 assert_eq!(peers.connection_info.num_pending_in, 1);
1825 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1826 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1827 DisconnectReason::UselessPeer,
1828 )),
1829 ));
1830
1831 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1832 assert_eq!(peers.connection_info.num_pending_in, 0);
1833 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1834
1835 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1836
1837 tokio::time::sleep(ban_duration).await;
1839
1840 poll_fn(|cx| {
1841 let _ = peers.poll(cx);
1842 Poll::Ready(())
1843 })
1844 .await;
1845
1846 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1847 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1848 }
1849
1850 #[tokio::test]
1851 async fn test_reputation_change_connected() {
1852 let peer = PeerId::random();
1853 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1854 let mut peers = PeersManager::default();
1855 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1856
1857 match event!(peers) {
1858 PeerAction::PeerAdded(peer_id) => {
1859 assert_eq!(peer_id, peer);
1860 }
1861 _ => unreachable!(),
1862 }
1863 match event!(peers) {
1864 PeerAction::Connect { peer_id, remote_addr } => {
1865 assert_eq!(peer_id, peer);
1866 assert_eq!(remote_addr, socket_addr);
1867 }
1868 _ => unreachable!(),
1869 }
1870
1871 let p = peers.peers.get_mut(&peer).unwrap();
1872 assert_eq!(p.state, PeerConnectionState::PendingOut);
1873
1874 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1875
1876 let p = peers.peers.get(&peer).unwrap();
1877 assert_eq!(p.state, PeerConnectionState::PendingOut);
1878 assert!(p.is_banned());
1879
1880 peers.on_active_session_gracefully_closed(peer);
1881
1882 let p = peers.peers.get(&peer).unwrap();
1883 assert_eq!(p.state, PeerConnectionState::Idle);
1884 assert!(p.is_banned());
1885
1886 match event!(peers) {
1887 PeerAction::Disconnect { peer_id, .. } => {
1888 assert_eq!(peer_id, peer);
1889 }
1890 _ => unreachable!(),
1891 }
1892 }
1893
1894 #[tokio::test]
1895 async fn accept_incoming_trusted_unknown_peer_address() {
1896 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1897 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1898 let trusted = PeerId::random();
1900 peers.add_trusted_peer_id(trusted);
1901
1902 for i in 0..peers.connection_info.config.max_inbound {
1904 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1905 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1906 let peer_id = PeerId::random();
1907 peers.on_incoming_session_established(peer_id, addr);
1908
1909 match event!(peers) {
1910 PeerAction::PeerAdded(id) => {
1911 assert_eq!(id, peer_id);
1912 }
1913 _ => unreachable!(),
1914 }
1915 }
1916
1917 let untrusted = PeerId::random();
1919 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1920 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1921 peers.on_incoming_session_established(untrusted, socket_addr);
1922
1923 match event!(peers) {
1924 PeerAction::PeerAdded(id) => {
1925 assert_eq!(id, untrusted);
1926 }
1927 _ => unreachable!(),
1928 }
1929
1930 match event!(peers) {
1931 PeerAction::Disconnect { peer_id, reason } => {
1932 assert_eq!(peer_id, untrusted);
1933 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1934 }
1935 _ => unreachable!(),
1936 }
1937
1938 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1939 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1940 peers.on_incoming_session_established(trusted, socket_addr);
1941
1942 match event!(peers) {
1943 PeerAction::PeerAdded(id) => {
1944 assert_eq!(id, trusted);
1945 }
1946 _ => unreachable!(),
1947 }
1948
1949 poll_fn(|cx| {
1950 assert!(peers.poll(cx).is_pending());
1951 Poll::Ready(())
1952 })
1953 .await;
1954
1955 let peer = peers.peers.get(&trusted).unwrap();
1956 assert_eq!(peer.state, PeerConnectionState::In);
1957 }
1958
1959 #[tokio::test]
1960 async fn test_already_connected() {
1961 let peer = PeerId::random();
1962 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1963 let mut peers = PeersManager::default();
1964
1965 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1967 assert_eq!(peers.connection_info.num_pending_in, 1);
1968
1969 peers.on_incoming_session_established(peer, socket_addr);
1972 let p = peers.peers.get_mut(&peer).expect("peer not found");
1973 assert_eq!(p.addr.tcp(), socket_addr);
1974 assert_eq!(peers.connection_info.num_pending_in, 0);
1975 assert_eq!(peers.connection_info.num_inbound, 1);
1976
1977 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1980 assert_eq!(peers.connection_info.num_pending_in, 1);
1981
1982 peers.on_already_connected(Direction::Incoming);
1986
1987 let p = peers.peers.get_mut(&peer).expect("peer not found");
1988 assert_eq!(p.addr.tcp(), socket_addr);
1989 assert_eq!(peers.connection_info.num_pending_in, 0);
1990 assert_eq!(peers.connection_info.num_inbound, 1);
1991 }
1992
1993 #[tokio::test]
1994 async fn test_reputation_change_trusted_peer() {
1995 let peer = PeerId::random();
1996 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1997 let mut peers = PeersManager::default();
1998 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
1999
2000 match event!(peers) {
2001 PeerAction::PeerAdded(peer_id) => {
2002 assert_eq!(peer_id, peer);
2003 }
2004 _ => unreachable!(),
2005 }
2006 match event!(peers) {
2007 PeerAction::Connect { peer_id, remote_addr } => {
2008 assert_eq!(peer_id, peer);
2009 assert_eq!(remote_addr, socket_addr);
2010 }
2011 _ => unreachable!(),
2012 }
2013
2014 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2015 peers.on_active_outgoing_established(peer);
2016 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2017
2018 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2019
2020 {
2021 let p = peers.peers.get(&peer).unwrap();
2022 assert_eq!(p.state, PeerConnectionState::Out);
2023 assert!(!p.is_banned());
2025 }
2026
2027 loop {
2029 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2030
2031 let p = peers.peers.get(&peer).unwrap();
2032 if p.is_banned() {
2033 break
2034 }
2035 }
2036
2037 match event!(peers) {
2038 PeerAction::Disconnect { peer_id, .. } => {
2039 assert_eq!(peer_id, peer);
2040 }
2041 _ => unreachable!(),
2042 }
2043 }
2044
2045 #[tokio::test]
2046 async fn test_reputation_management() {
2047 let peer = PeerId::random();
2048 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2049 let mut peers = PeersManager::default();
2050 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2051 assert_eq!(peers.get_reputation(&peer), Some(0));
2052
2053 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2054 assert_eq!(peers.get_reputation(&peer), Some(1024));
2055
2056 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2057 assert_eq!(peers.get_reputation(&peer), Some(0));
2058 }
2059
2060 #[tokio::test]
2061 async fn test_remove_discovered_active() {
2062 let peer = PeerId::random();
2063 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2064 let mut peers = PeersManager::default();
2065 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2066
2067 match event!(peers) {
2068 PeerAction::PeerAdded(peer_id) => {
2069 assert_eq!(peer_id, peer);
2070 }
2071 _ => unreachable!(),
2072 }
2073 match event!(peers) {
2074 PeerAction::Connect { peer_id, remote_addr } => {
2075 assert_eq!(peer_id, peer);
2076 assert_eq!(remote_addr, socket_addr);
2077 }
2078 _ => unreachable!(),
2079 }
2080
2081 let p = peers.peers.get(&peer).unwrap();
2082 assert_eq!(p.state, PeerConnectionState::PendingOut);
2083
2084 peers.remove_peer(peer);
2085
2086 match event!(peers) {
2087 PeerAction::PeerRemoved(peer_id) => {
2088 assert_eq!(peer_id, peer);
2089 }
2090 _ => unreachable!(),
2091 }
2092 match event!(peers) {
2093 PeerAction::Disconnect { peer_id, .. } => {
2094 assert_eq!(peer_id, peer);
2095 }
2096 _ => unreachable!(),
2097 }
2098
2099 let p = peers.peers.get(&peer).unwrap();
2100 assert_eq!(p.state, PeerConnectionState::PendingOut);
2101
2102 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2103 let p = peers.peers.get(&peer).unwrap();
2104 assert_eq!(p.state, PeerConnectionState::PendingOut);
2105
2106 peers.on_active_session_gracefully_closed(peer);
2107 assert!(!peers.peers.contains_key(&peer));
2108 }
2109
2110 #[tokio::test]
2111 async fn test_fatal_outgoing_connection_error_trusted() {
2112 let peer = PeerId::random();
2113 let config = PeersConfig::test()
2114 .with_trusted_nodes(vec![TrustedPeer {
2115 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2116 tcp_port: 8008,
2117 udp_port: 8008,
2118 id: peer,
2119 }])
2120 .with_trusted_nodes_only(true);
2121 let mut peers = PeersManager::new(config);
2122 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2123
2124 match event!(peers) {
2125 PeerAction::Connect { peer_id, remote_addr } => {
2126 assert_eq!(peer_id, peer);
2127 assert_eq!(remote_addr, socket_addr);
2128 }
2129 _ => unreachable!(),
2130 }
2131
2132 let p = peers.peers.get(&peer).unwrap();
2133 assert_eq!(p.state, PeerConnectionState::PendingOut);
2134
2135 assert_eq!(peers.num_outbound_connections(), 0);
2136
2137 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2138 EthHandshakeError::NonStatusMessageInHandshake,
2139 ));
2140 assert!(err.is_fatal_protocol_error());
2141
2142 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2143 assert_eq!(peers.num_outbound_connections(), 0);
2144
2145 match event!(peers) {
2147 PeerAction::BanPeer { peer_id } => {
2148 assert_eq!(peer_id, peer);
2149 }
2150 err => unreachable!("{err:?}"),
2151 }
2152
2153 assert!(peers.peers.contains_key(&peer));
2155
2156 tokio::time::sleep(peers.backoff_durations.medium).await;
2158
2159 match event!(peers) {
2160 PeerAction::Connect { peer_id, remote_addr } => {
2161 assert_eq!(peer_id, peer);
2162 assert_eq!(remote_addr, socket_addr);
2163 }
2164 err => unreachable!("{err:?}"),
2165 }
2166 }
2167
2168 #[tokio::test]
2169 async fn test_outgoing_connection_error() {
2170 let peer = PeerId::random();
2171 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2172 let mut peers = PeersManager::default();
2173 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2174
2175 match event!(peers) {
2176 PeerAction::PeerAdded(peer_id) => {
2177 assert_eq!(peer_id, peer);
2178 }
2179 _ => unreachable!(),
2180 }
2181 match event!(peers) {
2182 PeerAction::Connect { peer_id, remote_addr } => {
2183 assert_eq!(peer_id, peer);
2184 assert_eq!(remote_addr, socket_addr);
2185 }
2186 _ => unreachable!(),
2187 }
2188
2189 let p = peers.peers.get(&peer).unwrap();
2190 assert_eq!(p.state, PeerConnectionState::PendingOut);
2191
2192 assert_eq!(peers.num_outbound_connections(), 0);
2193
2194 peers.on_outgoing_connection_failure(
2195 &socket_addr,
2196 &peer,
2197 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2198 );
2199
2200 assert_eq!(peers.num_outbound_connections(), 0);
2201 }
2202
2203 #[tokio::test]
2204 async fn test_outgoing_connection_gracefully_closed() {
2205 let peer = PeerId::random();
2206 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2207 let mut peers = PeersManager::default();
2208 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2209
2210 match event!(peers) {
2211 PeerAction::PeerAdded(peer_id) => {
2212 assert_eq!(peer_id, peer);
2213 }
2214 _ => unreachable!(),
2215 }
2216 match event!(peers) {
2217 PeerAction::Connect { peer_id, remote_addr } => {
2218 assert_eq!(peer_id, peer);
2219 assert_eq!(remote_addr, socket_addr);
2220 }
2221 _ => unreachable!(),
2222 }
2223
2224 let p = peers.peers.get(&peer).unwrap();
2225 assert_eq!(p.state, PeerConnectionState::PendingOut);
2226
2227 assert_eq!(peers.num_outbound_connections(), 0);
2228
2229 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2230
2231 assert_eq!(peers.num_outbound_connections(), 0);
2232 assert_eq!(peers.connection_info.num_pending_out, 0);
2233 }
2234
2235 #[tokio::test]
2236 async fn test_discovery_ban_list() {
2237 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2238 let socket_addr = SocketAddr::new(ip, 8008);
2239 let ban_list = BanList::new(vec![], vec![ip]);
2240 let config = PeersConfig::default().with_ban_list(ban_list);
2241 let mut peer_manager = PeersManager::new(config);
2242 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2243
2244 assert!(peer_manager.peers.is_empty());
2245 }
2246
2247 #[tokio::test]
2248 async fn test_on_pending_ban_list() {
2249 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2250 let socket_addr = SocketAddr::new(ip, 8008);
2251 let ban_list = BanList::new(vec![], vec![ip]);
2252 let config = PeersConfig::test().with_ban_list(ban_list);
2253 let mut peer_manager = PeersManager::new(config);
2254 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2255 match a {
2257 Ok(_) => panic!(),
2258 Err(err) => match err {
2259 InboundConnectionError::IpBanned => {
2260 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2261 }
2262 _ => unreachable!(),
2263 },
2264 }
2265 }
2266
2267 #[tokio::test]
2268 async fn test_on_active_inbound_ban_list() {
2269 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2270 let socket_addr = SocketAddr::new(ip, 8008);
2271 let given_peer_id = PeerId::random();
2272 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2273 let config = PeersConfig::test().with_ban_list(ban_list);
2274 let mut peer_manager = PeersManager::new(config);
2275 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2276 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2278 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2279 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2282 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2283
2284 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2285 peer_manager.queued_actions.pop_front()
2286 else {
2287 panic!()
2288 };
2289
2290 assert_eq!(peer_id, given_peer_id)
2291 }
2292
2293 #[test]
2294 fn test_connection_limits() {
2295 let mut info = ConnectionInfo::default();
2296 info.inc_in();
2297 assert_eq!(info.num_inbound, 1);
2298 assert_eq!(info.num_outbound, 0);
2299 assert!(info.has_in_capacity());
2300
2301 info.decr_in();
2302 assert_eq!(info.num_inbound, 0);
2303 assert_eq!(info.num_outbound, 0);
2304
2305 info.inc_out();
2306 assert_eq!(info.num_inbound, 0);
2307 assert_eq!(info.num_outbound, 1);
2308 assert!(info.has_out_capacity());
2309
2310 info.decr_out();
2311 assert_eq!(info.num_inbound, 0);
2312 assert_eq!(info.num_outbound, 0);
2313 }
2314
2315 #[test]
2316 fn test_connection_peer_state() {
2317 let mut info = ConnectionInfo::default();
2318 info.inc_in();
2319
2320 info.decr_state(PeerConnectionState::In);
2321 assert_eq!(info.num_inbound, 0);
2322 assert_eq!(info.num_outbound, 0);
2323
2324 info.inc_out();
2325
2326 info.decr_state(PeerConnectionState::Out);
2327 assert_eq!(info.num_inbound, 0);
2328 assert_eq!(info.num_outbound, 0);
2329 }
2330
2331 #[tokio::test]
2332 async fn test_trusted_peers_are_prioritized() {
2333 let trusted_peer = PeerId::random();
2334 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2335 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2336 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2337 tcp_port: 8008,
2338 udp_port: 8008,
2339 id: trusted_peer,
2340 }]);
2341 let mut peers = PeersManager::new(config);
2342
2343 let basic_peer = PeerId::random();
2344 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2345 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2346
2347 match event!(peers) {
2348 PeerAction::PeerAdded(peer_id) => {
2349 assert_eq!(peer_id, basic_peer);
2350 }
2351 _ => unreachable!(),
2352 }
2353 match event!(peers) {
2354 PeerAction::Connect { peer_id, remote_addr } => {
2355 assert_eq!(peer_id, trusted_peer);
2356 assert_eq!(remote_addr, trusted_sock);
2357 }
2358 _ => unreachable!(),
2359 }
2360 match event!(peers) {
2361 PeerAction::Connect { peer_id, remote_addr } => {
2362 assert_eq!(peer_id, basic_peer);
2363 assert_eq!(remote_addr, basic_sock);
2364 }
2365 _ => unreachable!(),
2366 }
2367 }
2368
2369 #[tokio::test]
2370 async fn test_connect_trusted_nodes_only() {
2371 let trusted_peer = PeerId::random();
2372 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2373 let config = PeersConfig::test()
2374 .with_trusted_nodes(vec![TrustedPeer {
2375 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2376 tcp_port: 8008,
2377 udp_port: 8008,
2378 id: trusted_peer,
2379 }])
2380 .with_trusted_nodes_only(true);
2381 let mut peers = PeersManager::new(config);
2382
2383 let basic_peer = PeerId::random();
2384 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2385 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2386
2387 match event!(peers) {
2388 PeerAction::PeerAdded(peer_id) => {
2389 assert_eq!(peer_id, basic_peer);
2390 }
2391 _ => unreachable!(),
2392 }
2393 match event!(peers) {
2394 PeerAction::Connect { peer_id, remote_addr } => {
2395 assert_eq!(peer_id, trusted_peer);
2396 assert_eq!(remote_addr, trusted_sock);
2397 }
2398 _ => unreachable!(),
2399 }
2400 poll_fn(|cx| {
2401 assert!(peers.poll(cx).is_pending());
2402 Poll::Ready(())
2403 })
2404 .await;
2405 }
2406
2407 #[tokio::test]
2408 async fn test_incoming_with_trusted_nodes_only() {
2409 let trusted_peer = PeerId::random();
2410 let config = PeersConfig::test()
2411 .with_trusted_nodes(vec![TrustedPeer {
2412 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2413 tcp_port: 8008,
2414 udp_port: 8008,
2415 id: trusted_peer,
2416 }])
2417 .with_trusted_nodes_only(true);
2418 let mut peers = PeersManager::new(config);
2419
2420 let basic_peer = PeerId::random();
2421 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2422 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2423 assert_eq!(peers.connection_info.num_pending_in, 1);
2425 peers.on_incoming_session_established(basic_peer, basic_sock);
2426 assert_eq!(peers.connection_info.num_pending_in, 0);
2429 assert_eq!(peers.connection_info.num_inbound, 0);
2430
2431 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2432 peers.queued_actions.pop_front()
2433 else {
2434 panic!()
2435 };
2436 assert_eq!(basic_peer, peer_id);
2437 assert!(!peers.peers.contains_key(&basic_peer));
2438 }
2439
2440 #[tokio::test]
2441 async fn test_incoming_without_trusted_nodes_only() {
2442 let trusted_peer = PeerId::random();
2443 let config = PeersConfig::test()
2444 .with_trusted_nodes(vec![TrustedPeer {
2445 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2446 tcp_port: 8008,
2447 udp_port: 8008,
2448 id: trusted_peer,
2449 }])
2450 .with_trusted_nodes_only(false);
2451 let mut peers = PeersManager::new(config);
2452
2453 let basic_peer = PeerId::random();
2454 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2455 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2456
2457 assert_eq!(peers.connection_info.num_pending_in, 1);
2459 peers.on_incoming_session_established(basic_peer, basic_sock);
2460 assert_eq!(peers.connection_info.num_pending_in, 0);
2463 assert_eq!(peers.connection_info.num_inbound, 1);
2464 assert!(peers.peers.contains_key(&basic_peer));
2465 }
2466
2467 #[tokio::test]
2468 async fn test_incoming_at_capacity() {
2469 let mut config = PeersConfig::test();
2470 config.connection_info.max_inbound = 1;
2471 let mut peers = PeersManager::new(config);
2472
2473 let peer = PeerId::random();
2474 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2475 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2476
2477 peers.on_incoming_session_established(peer, addr);
2478
2479 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2480 assert_eq!(
2481 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2482 InboundConnectionError::ExceedsCapacity
2483 );
2484 }
2485
2486 #[tokio::test]
2487 async fn test_incoming_rate_limit() {
2488 let config = PeersConfig {
2489 incoming_ip_throttle_duration: Duration::from_millis(100),
2490 ..PeersConfig::test()
2491 };
2492 let mut peers = PeersManager::new(config);
2493
2494 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2495 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2496 assert_eq!(
2497 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2498 InboundConnectionError::IpBanned
2499 );
2500
2501 peers.release_interval.reset_immediately();
2502 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2503
2504 poll_fn(|cx| loop {
2506 if peers.poll(cx).is_pending() {
2507 return Poll::Ready(());
2508 }
2509 })
2510 .await;
2511
2512 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2513 assert_eq!(
2514 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2515 InboundConnectionError::IpBanned
2516 );
2517 }
2518
2519 #[tokio::test]
2520 async fn test_tick() {
2521 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2522 let socket_addr = SocketAddr::new(ip, 8008);
2523 let config = PeersConfig::test();
2524 let mut peer_manager = PeersManager::new(config);
2525 let peer_id = PeerId::random();
2526 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2527
2528 tokio::time::sleep(Duration::from_secs(1)).await;
2529 peer_manager.tick();
2530
2531 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2533
2534 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2536
2537 tokio::time::sleep(Duration::from_secs(1)).await;
2538 peer_manager.tick();
2539
2540 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2542
2543 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2544
2545 tokio::time::sleep(Duration::from_secs(1)).await;
2546 peer_manager.tick();
2547
2548 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2550 }
2551
2552 #[tokio::test]
2553 async fn test_remove_incoming_after_disconnect() {
2554 let peer_id = PeerId::random();
2555 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2556 let mut peers = PeersManager::default();
2557
2558 peers.on_incoming_pending_session(addr.ip()).unwrap();
2559 peers.on_incoming_session_established(peer_id, addr);
2560 let peer = peers.peers.get(&peer_id).unwrap();
2561 assert_eq!(peer.state, PeerConnectionState::In);
2562 assert!(peer.remove_after_disconnect);
2563
2564 peers.on_active_session_gracefully_closed(peer_id);
2565 assert!(!peers.peers.contains_key(&peer_id))
2566 }
2567
2568 #[tokio::test]
2569 async fn test_keep_incoming_after_disconnect_if_discovered() {
2570 let peer_id = PeerId::random();
2571 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2572 let mut peers = PeersManager::default();
2573
2574 peers.on_incoming_pending_session(addr.ip()).unwrap();
2575 peers.on_incoming_session_established(peer_id, addr);
2576 let peer = peers.peers.get(&peer_id).unwrap();
2577 assert_eq!(peer.state, PeerConnectionState::In);
2578 assert!(peer.remove_after_disconnect);
2579
2580 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2582
2583 peers.on_active_session_gracefully_closed(peer_id);
2584
2585 let peer = peers.peers.get(&peer_id).unwrap();
2586 assert_eq!(peer.state, PeerConnectionState::Idle);
2587 assert!(!peer.remove_after_disconnect);
2588 }
2589
2590 #[tokio::test]
2591 async fn test_incoming_outgoing_already_connected() {
2592 let peer_id = PeerId::random();
2593 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2594 let mut peers = PeersManager::default();
2595
2596 peers.on_incoming_pending_session(addr.ip()).unwrap();
2597 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2598
2599 match event!(peers) {
2600 PeerAction::PeerAdded(_) => {}
2601 _ => unreachable!(),
2602 }
2603
2604 match event!(peers) {
2605 PeerAction::Connect { .. } => {}
2606 _ => unreachable!(),
2607 }
2608
2609 peers.on_incoming_session_established(peer_id, addr);
2610 peers.on_already_connected(Direction::Outgoing(peer_id));
2611 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2612 assert_eq!(peers.connection_info.num_inbound, 1);
2613 assert_eq!(peers.connection_info.num_pending_out, 0);
2614 assert_eq!(peers.connection_info.num_pending_in, 0);
2615 assert_eq!(peers.connection_info.num_outbound, 0);
2616 }
2617
2618 #[tokio::test]
2619 async fn test_already_connected_incoming_outgoing_connection_error() {
2620 let peer_id = PeerId::random();
2621 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2622 let mut peers = PeersManager::default();
2623
2624 peers.on_incoming_pending_session(addr.ip()).unwrap();
2625 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2626
2627 match event!(peers) {
2628 PeerAction::PeerAdded(_) => {}
2629 _ => unreachable!(),
2630 }
2631
2632 match event!(peers) {
2633 PeerAction::Connect { .. } => {}
2634 _ => unreachable!(),
2635 }
2636
2637 peers.on_incoming_session_established(peer_id, addr);
2638
2639 peers.on_outgoing_connection_failure(
2640 &addr,
2641 &peer_id,
2642 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2643 );
2644 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2645 assert_eq!(peers.connection_info.num_inbound, 1);
2646 assert_eq!(peers.connection_info.num_pending_out, 0);
2647 assert_eq!(peers.connection_info.num_pending_in, 0);
2648 assert_eq!(peers.connection_info.num_outbound, 0);
2649 }
2650
2651 #[tokio::test]
2652 async fn test_max_concurrent_dials() {
2653 let config = PeersConfig::default();
2654 let mut peer_manager = PeersManager::new(config);
2655 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2656 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2657 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2658 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2659 }
2660
2661 peer_manager.fill_outbound_slots();
2662 let dials = peer_manager
2663 .queued_actions
2664 .iter()
2665 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2666 .count();
2667 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2668 }
2669
2670 #[tokio::test]
2671 async fn test_max_num_of_pending_dials() {
2672 let config = PeersConfig::default();
2673 let mut peer_manager = PeersManager::new(config);
2674 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2675 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2676
2677 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2679 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2680 }
2681
2682 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2683 match event!(peer_manager) {
2684 PeerAction::PeerAdded(_) => {}
2685 _ => unreachable!(),
2686 }
2687 }
2688
2689 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2690 match event!(peer_manager) {
2691 PeerAction::Connect { .. } => {}
2692 _ => unreachable!(),
2693 }
2694 }
2695
2696 peer_manager.fill_outbound_slots();
2698
2699 let dials = peer_manager.connection_info.num_pending_out;
2701 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2702
2703 let num_pendingout_states = peer_manager
2704 .peers
2705 .iter()
2706 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2707 .map(|(peer_id, _)| *peer_id)
2708 .collect::<Vec<PeerId>>();
2709 assert_eq!(
2710 num_pendingout_states.len(),
2711 peer_manager.connection_info.config.max_concurrent_outbound_dials
2712 );
2713
2714 for peer_id in &num_pendingout_states {
2716 peer_manager.on_active_outgoing_established(*peer_id);
2717 }
2718
2719 for peer_id in &num_pendingout_states {
2721 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2722 }
2723
2724 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2726 }
2727
2728 #[tokio::test]
2729 async fn test_connect() {
2730 let peer = PeerId::random();
2731 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2732 let mut peers = PeersManager::default();
2733 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2734 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2735
2736 match event!(peers) {
2737 PeerAction::Connect { peer_id, remote_addr } => {
2738 assert_eq!(peer_id, peer);
2739 assert_eq!(remote_addr, socket_addr);
2740 }
2741 _ => unreachable!(),
2742 }
2743
2744 let (record, _) = peers.peer_by_id(peer).unwrap();
2745 assert_eq!(record.tcp_addr(), socket_addr);
2746 assert_eq!(record.udp_addr(), socket_addr);
2747
2748 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2750
2751 let (record, _) = peers.peer_by_id(peer).unwrap();
2752 assert_eq!(record.tcp_addr(), socket_addr);
2753 assert_eq!(record.udp_addr(), socket_addr);
2754 }
2755
2756 #[tokio::test]
2757 async fn test_incoming_connection_from_banned() {
2758 let peer = PeerId::random();
2759 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2760 let config = PeersConfig::test().with_max_inbound(3);
2761 let mut peers = PeersManager::new(config);
2762 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2763
2764 match event!(peers) {
2765 PeerAction::PeerAdded(peer_id) => {
2766 assert_eq!(peer_id, peer);
2767 }
2768 _ => unreachable!(),
2769 }
2770 match event!(peers) {
2771 PeerAction::Connect { peer_id, .. } => {
2772 assert_eq!(peer_id, peer);
2773 }
2774 _ => unreachable!(),
2775 }
2776
2777 poll_fn(|cx| {
2778 assert!(peers.poll(cx).is_pending());
2779 Poll::Ready(())
2780 })
2781 .await;
2782
2783 loop {
2785 peers.on_active_session_dropped(
2786 &socket_addr,
2787 &peer,
2788 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2789 reth_eth_wire::EthVersion::Eth68,
2790 reth_eth_wire::EthMessageID::Status,
2791 )),
2792 );
2793
2794 if peers.peers.get(&peer).unwrap().is_banned() {
2795 break;
2796 }
2797
2798 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2799 peers.on_incoming_session_established(peer, socket_addr);
2800
2801 match event!(peers) {
2802 PeerAction::Connect { peer_id, .. } => {
2803 assert_eq!(peer_id, peer);
2804 }
2805 _ => unreachable!(),
2806 }
2807 }
2808
2809 assert!(peers.peers.get(&peer).unwrap().is_banned());
2810
2811 for _ in 0..peers.connection_info.config.max_inbound {
2813 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2814 peers.on_incoming_session_established(peer, socket_addr);
2815
2816 match event!(peers) {
2817 PeerAction::DisconnectBannedIncoming { peer_id } => {
2818 assert_eq!(peer_id, peer);
2819 }
2820 _ => unreachable!(),
2821 }
2822 }
2823
2824 poll_fn(|cx| {
2825 assert!(peers.poll(cx).is_pending());
2826 Poll::Ready(())
2827 })
2828 .await;
2829
2830 assert_eq!(peers.connection_info.num_inbound, 0);
2831
2832 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2833
2834 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2836 assert_eq!(peers.connection_info.num_pending_in, 1);
2837
2838 peers.on_active_session_gracefully_closed(peer);
2842 assert_eq!(peers.connection_info.num_inbound, 0);
2843 }
2844
2845 #[tokio::test]
2846 async fn test_add_pending_connect() {
2847 let peer = PeerId::random();
2848 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2849 let mut peers = PeersManager::default();
2850 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2851 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2852 assert_eq!(peers.connection_info.num_pending_out, 1);
2853 }
2854
2855 #[tokio::test]
2856 async fn test_dns_updates_peer_address() {
2857 let peer_id = PeerId::random();
2858 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2859 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2860
2861 let trusted = TrustedPeer {
2862 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2863 tcp_port: 8008,
2864 udp_port: 8008,
2865 id: peer_id,
2866 };
2867
2868 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2869 let mut manager = PeersManager::new(config);
2870 manager
2871 .trusted_peers_resolver
2872 .set_interval(tokio::time::interval(Duration::from_millis(1)));
2873
2874 manager.peers.insert(
2875 peer_id,
2876 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2877 );
2878
2879 for _ in 0..100 {
2880 let _ = event!(manager);
2881 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2882 break;
2883 }
2884 tokio::time::sleep(Duration::from_millis(10)).await;
2885 }
2886
2887 let updated_peer = manager.peers.get(&peer_id).unwrap();
2888 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2889 }
2890}