1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 is_connection_failed_reputation,
18 peers::{
19 config::PeerBackoffDurations,
20 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21 },
22 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27 fmt::Display,
28 io::{self},
29 net::{IpAddr, SocketAddr},
30 task::{Context, Poll},
31 time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35 sync::mpsc,
36 time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41#[derive(Debug)]
48pub struct PeersManager {
49 peers: HashMap<PeerId, Peer>,
51 trusted_peer_ids: HashSet<PeerId>,
56 trusted_peers_resolver: TrustedPeersResolver,
59 manager_tx: mpsc::UnboundedSender<PeerCommand>,
61 handle_rx: UnboundedReceiverStream<PeerCommand>,
63 queued_actions: VecDeque<PeerAction>,
65 refill_slots_interval: Interval,
67 reputation_weights: ReputationChangeWeights,
69 connection_info: ConnectionInfo,
71 ban_list: BanList,
73 backed_off_peers: HashMap<PeerId, std::time::Instant>,
75 release_interval: Interval,
77 ban_duration: Duration,
79 backoff_durations: PeerBackoffDurations,
82 trusted_nodes_only: bool,
85 last_tick: Instant,
87 max_backoff_count: u8,
89 net_connection_state: NetworkConnectionState,
91 incoming_ip_throttle_duration: Duration,
93}
94
95impl PeersManager {
96 pub fn new(config: PeersConfig) -> Self {
98 let PeersConfig {
99 refill_slots_interval,
100 connection_info,
101 reputation_weights,
102 ban_list,
103 ban_duration,
104 backoff_durations,
105 trusted_nodes,
106 trusted_nodes_only,
107 trusted_nodes_resolution_interval,
108 basic_nodes,
109 max_backoff_count,
110 incoming_ip_throttle_duration,
111 } = config;
112 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
113 let now = Instant::now();
114
115 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
117
118 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
119 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
120
121 for trusted_peer in &trusted_nodes {
122 match trusted_peer.resolve_blocking() {
123 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
124 trusted_peer_ids.insert(id);
125 peers.entry(id).or_insert_with(|| {
126 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
127 });
128 }
129 Err(err) => {
130 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
131 }
132 }
133 }
134
135 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
136 peers.entry(id).or_insert_with(|| {
137 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
138 });
139 }
140
141 Self {
142 peers,
143 trusted_peer_ids,
144 trusted_peers_resolver: TrustedPeersResolver::new(
145 trusted_nodes,
146 tokio::time::interval(trusted_nodes_resolution_interval), ),
148 manager_tx,
149 handle_rx: UnboundedReceiverStream::new(handle_rx),
150 queued_actions: Default::default(),
151 reputation_weights,
152 refill_slots_interval: tokio::time::interval(refill_slots_interval),
153 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
154 connection_info: ConnectionInfo::new(connection_info),
155 ban_list,
156 backed_off_peers: Default::default(),
157 ban_duration,
158 backoff_durations,
159 trusted_nodes_only,
160 last_tick: Instant::now(),
161 max_backoff_count,
162 net_connection_state: NetworkConnectionState::default(),
163 incoming_ip_throttle_duration,
164 }
165 }
166
167 pub(crate) fn handle(&self) -> PeersHandle {
169 PeersHandle::new(self.manager_tx.clone())
170 }
171
172 #[inline]
174 pub(crate) fn num_known_peers(&self) -> usize {
175 self.peers.len()
176 }
177
178 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
180 self.peers.iter().map(|(peer_id, v)| {
181 NodeRecord::new_with_ports(
182 v.addr.tcp().ip(),
183 v.addr.tcp().port(),
184 v.addr.udp().map(|addr| addr.port()),
185 *peer_id,
186 )
187 })
188 }
189
190 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
192 self.peers.get(&peer_id).map(|v| {
193 (
194 NodeRecord::new_with_ports(
195 v.addr.tcp().ip(),
196 v.addr.tcp().port(),
197 v.addr.udp().map(|addr| addr.port()),
198 peer_id,
199 ),
200 v.kind,
201 )
202 })
203 }
204
205 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
207 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
208 }
209
210 #[inline]
212 pub(crate) const fn num_inbound_connections(&self) -> usize {
213 self.connection_info.num_inbound
214 }
215
216 #[inline]
218 pub(crate) const fn num_outbound_connections(&self) -> usize {
219 self.connection_info.num_outbound
220 }
221
222 #[inline]
224 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
225 self.connection_info.num_pending_out
226 }
227
228 #[inline]
230 pub(crate) fn num_backed_off_peers(&self) -> usize {
231 self.backed_off_peers.len()
232 }
233
234 fn num_idle_trusted_peers(&self) -> usize {
236 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
237 }
238
239 pub(crate) fn on_incoming_pending_session(
243 &mut self,
244 addr: IpAddr,
245 ) -> Result<(), InboundConnectionError> {
246 if self.ban_list.is_banned_ip(&addr) {
247 return Err(InboundConnectionError::IpBanned)
248 }
249
250 if !self.connection_info.has_in_capacity() {
252 if self.trusted_peer_ids.is_empty() {
253 return Err(InboundConnectionError::ExceedsCapacity)
256 }
257
258 let num_idle_trusted_peers = self.num_idle_trusted_peers();
262 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
263 let max_inbound =
265 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
266 if self.connection_info.num_pending_in < max_inbound {
267 self.connection_info.inc_pending_in();
268 return Ok(())
269 }
270 }
271
272 return Err(InboundConnectionError::ExceedsCapacity)
274 }
275
276 if !self.connection_info.has_in_pending_capacity() {
278 return Err(InboundConnectionError::ExceedsCapacity)
279 }
280
281 self.throttle_incoming_ip(addr);
283
284 self.connection_info.inc_pending_in();
285 Ok(())
286 }
287
288 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
291 self.connection_info.decr_pending_in();
292 }
293
294 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
296 self.connection_info.decr_pending_in()
297 }
298
299 pub(crate) fn on_incoming_pending_session_dropped(
301 &mut self,
302 remote_addr: SocketAddr,
303 err: &PendingSessionHandshakeError,
304 ) {
305 if err.is_fatal_protocol_error() {
306 self.ban_ip(remote_addr.ip());
307
308 if err.merits_discovery_ban() {
309 self.queued_actions
310 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
311 }
312 }
313
314 self.connection_info.decr_pending_in();
315 }
316
317 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
324 self.connection_info.decr_pending_in();
325
326 if self.ban_list.is_banned_peer(&peer_id) {
329 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
330 return
331 }
332
333 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
335 if self.trusted_nodes_only && !is_trusted {
336 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
337 return
338 }
339
340 self.tick();
342
343 match self.peers.entry(peer_id) {
344 Entry::Occupied(mut entry) => {
345 let peer = entry.get_mut();
346 if peer.is_banned() {
347 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
348 return
349 }
350 if peer.state.is_pending_out() {
353 self.connection_info.decr_state(peer.state);
354 }
355
356 peer.state = PeerConnectionState::In;
357
358 is_trusted = is_trusted || peer.is_trusted();
359 }
360 Entry::Vacant(entry) => {
361 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
364 peer.remove_after_disconnect = true;
365 entry.insert(peer);
366 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
367 }
368 }
369
370 let has_in_capacity = self.connection_info.has_in_capacity();
371 self.connection_info.inc_in();
373
374 if !is_trusted && !has_in_capacity {
376 self.queued_actions.push_back(PeerAction::Disconnect {
377 peer_id,
378 reason: Some(DisconnectReason::TooManyPeers),
379 });
380 }
381 }
382
383 fn ban_peer(&mut self, peer_id: PeerId) {
385 let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
386 (peer.is_trusted() || peer.is_static())
387 {
388 self.backoff_durations.low / 2
391 } else {
392 self.ban_duration
393 };
394
395 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
396 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
397 }
398
399 fn ban_ip(&mut self, ip: IpAddr) {
401 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
402 }
403
404 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
406 self.ban_list
407 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
408 }
409
410 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
412 trace!(target: "net::peers", ?peer_id, "backing off");
413
414 if let Some(peer) = self.peers.get_mut(&peer_id) {
415 peer.backed_off = true;
416 self.backed_off_peers.insert(peer_id, until);
417 }
418 }
419
420 fn unban_peer(&mut self, peer_id: PeerId) {
422 self.ban_list.unban_peer(&peer_id);
423 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
424 }
425
426 fn tick(&mut self) {
431 let now = Instant::now();
432 let secs_since_last_tick =
436 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
437 self.last_tick = now;
438
439 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
441 if peer.1.reputation < DEFAULT_REPUTATION {
444 peer.1.reputation += secs_since_last_tick;
445 }
446 }
447 }
448
449 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
451 self.peers.get(peer_id).map(|peer| peer.reputation)
452 }
453
454 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
460 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
461 if rep.is_reset() {
463 peer.reset_reputation()
464 } else {
465 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
466 if peer.is_trusted() || peer.is_static() {
467 if matches!(
469 rep,
470 ReputationChangeKind::Dropped |
471 ReputationChangeKind::BadAnnouncement |
472 ReputationChangeKind::Timeout |
473 ReputationChangeKind::AlreadySeenTransaction
474 ) {
475 return
476 }
477
478 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
480 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
482 }
483 }
484 peer.apply_reputation(reputation_change, rep)
485 }
486 } else {
487 return
488 };
489
490 match outcome {
491 ReputationChangeOutcome::None => {}
492 ReputationChangeOutcome::Ban => {
493 self.ban_peer(*peer_id);
494 }
495 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
496 ReputationChangeOutcome::DisconnectAndBan => {
497 self.queued_actions.push_back(PeerAction::Disconnect {
498 peer_id: *peer_id,
499 reason: Some(DisconnectReason::DisconnectRequested),
500 });
501 self.ban_peer(*peer_id);
502 }
503 }
504 }
505
506 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
508 if let Some(peer) = self.peers.get_mut(peer_id) {
509 self.connection_info.decr_state(peer.state);
510 peer.state = PeerConnectionState::Idle;
511 }
512 }
513
514 pub(crate) fn on_outgoing_pending_session_dropped(
517 &mut self,
518 remote_addr: &SocketAddr,
519 peer_id: &PeerId,
520 err: &PendingSessionHandshakeError,
521 ) {
522 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
523 }
524
525 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
527 match self.peers.entry(peer_id) {
528 Entry::Occupied(mut entry) => {
529 self.connection_info.decr_state(entry.get().state);
530
531 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
532 entry.remove();
534 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
535 } else {
536 entry.get_mut().severe_backoff_counter = 0;
540 entry.get_mut().state = PeerConnectionState::Idle;
541 return
542 }
543 }
544 Entry::Vacant(_) => return,
545 }
546
547 self.fill_outbound_slots();
548 }
549
550 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
552 if let Some(peer) = self.peers.get_mut(&peer_id) {
553 self.connection_info.decr_state(peer.state);
554 self.connection_info.inc_out();
555 peer.state = PeerConnectionState::Out;
556 }
557 }
558
559 pub(crate) fn on_active_session_dropped(
564 &mut self,
565 remote_addr: &SocketAddr,
566 peer_id: &PeerId,
567 err: &EthStreamError,
568 ) {
569 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
570 }
571
572 pub(crate) fn on_outgoing_connection_failure(
575 &mut self,
576 remote_addr: &SocketAddr,
577 peer_id: &PeerId,
578 err: &io::Error,
579 ) {
580 if let Some(peer) = self.peers.get(peer_id) {
584 if peer.state.is_incoming() {
585 return
587 }
588
589 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
590 self.trusted_peers_resolver.interval.reset_immediately();
593 }
594 }
595
596 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
597 }
598
599 fn on_connection_failure(
600 &mut self,
601 remote_addr: &SocketAddr,
602 peer_id: &PeerId,
603 err: impl SessionError,
604 reputation_change: ReputationChangeKind,
605 ) {
606 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
607
608 if err.is_fatal_protocol_error() {
609 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
610 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
613 self.connection_info.decr_state(entry.get().state);
614 if entry.get().is_trusted() {
616 entry.get_mut().state = PeerConnectionState::Idle;
617 } else {
618 entry.remove();
619 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
620 if err.merits_discovery_ban() {
622 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
623 peer_id: *peer_id,
624 ip_addr: remote_addr.ip(),
625 })
626 }
627 }
628 }
629
630 self.ban_peer(*peer_id);
632 } else {
633 let mut backoff_until = None;
634 let mut remove_peer = false;
635
636 if let Some(peer) = self.peers.get_mut(peer_id) {
637 if let Some(kind) = err.should_backoff() {
638 if peer.is_trusted() || peer.is_static() {
639 let backoff = self.backoff_durations.low;
645 backoff_until = Some(std::time::Instant::now() + backoff);
646 } else {
647 if kind.is_severe() {
649 peer.severe_backoff_counter =
650 peer.severe_backoff_counter.saturating_add(1);
651 }
652
653 let backoff_time =
654 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
655
656 backoff_until = Some(backoff_time);
660 }
661 } else {
662 let reputation_change = self.reputation_weights.change(reputation_change);
664 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
665 };
666
667 self.connection_info.decr_state(peer.state);
668 peer.state = PeerConnectionState::Idle;
669
670 if peer.severe_backoff_counter > self.max_backoff_count &&
671 !peer.is_trusted() &&
672 !peer.is_static()
673 {
674 remove_peer = true;
677 }
678 }
679
680 if remove_peer {
682 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
683 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
684 } else if let Some(backoff_until) = backoff_until {
685 self.backoff_peer_until(*peer_id, backoff_until);
687 }
688 }
689
690 self.fill_outbound_slots();
691 }
692
693 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
699 match direction {
700 Direction::Incoming => {
701 self.connection_info.decr_pending_in();
703 }
704 Direction::Outgoing(_) => {
705 }
708 }
709 }
710
711 pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
716 if let Some(peer) = self.peers.get_mut(&peer_id) {
717 trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
718 peer.fork_id = Some(fork_id);
719 }
720 }
721
722 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
726 self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
727 }
728
729 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
731 self.trusted_peer_ids.insert(peer_id);
732 }
733
734 #[cfg_attr(not(test), expect(dead_code))]
738 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
739 self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
740 }
741
742 pub(crate) fn add_peer_kind(
746 &mut self,
747 peer_id: PeerId,
748 kind: PeerKind,
749 addr: PeerAddr,
750 fork_id: Option<ForkId>,
751 ) {
752 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
753 return
754 }
755
756 match self.peers.entry(peer_id) {
757 Entry::Occupied(mut entry) => {
758 let peer = entry.get_mut();
759 peer.kind = kind;
760 peer.fork_id = fork_id;
761 peer.addr = addr;
762
763 if peer.state.is_incoming() {
764 peer.remove_after_disconnect = false;
768 }
769 }
770 Entry::Vacant(entry) => {
771 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
772 let mut peer = Peer::with_kind(addr, kind);
773 peer.fork_id = fork_id;
774 entry.insert(peer);
775 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
776 }
777 }
778
779 if kind.is_trusted() {
780 self.trusted_peer_ids.insert(peer_id);
781 }
782 }
783
784 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
786 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
787 if entry.get().is_trusted() {
788 return
789 }
790 let mut peer = entry.remove();
791
792 trace!(target: "net::peers", ?peer_id, "remove discovered node");
793 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
794
795 if peer.state.is_connected() {
796 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
797 peer.remove_after_disconnect = true;
802 peer.state.disconnect();
803 self.peers.insert(peer_id, peer);
804 self.queued_actions.push_back(PeerAction::Disconnect {
805 peer_id,
806 reason: Some(DisconnectReason::DisconnectRequested),
807 })
808 }
809 }
810
811 #[cfg_attr(not(test), expect(dead_code))]
814 pub(crate) fn add_and_connect(
815 &mut self,
816 peer_id: PeerId,
817 addr: PeerAddr,
818 fork_id: Option<ForkId>,
819 ) {
820 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
821 }
822
823 pub(crate) fn add_and_connect_kind(
827 &mut self,
828 peer_id: PeerId,
829 kind: PeerKind,
830 addr: PeerAddr,
831 fork_id: Option<ForkId>,
832 ) {
833 if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
834 return
835 }
836
837 match self.peers.entry(peer_id) {
838 Entry::Occupied(mut entry) => {
839 let peer = entry.get_mut();
840 peer.kind = kind;
841 peer.fork_id = fork_id;
842 peer.addr = addr;
843
844 if peer.state == PeerConnectionState::Idle {
845 peer.state = PeerConnectionState::PendingOut;
847 self.connection_info.inc_pending_out();
848 self.queued_actions
849 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
850 }
851 }
852 Entry::Vacant(entry) => {
853 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
854 let mut peer = Peer::with_kind(addr, kind);
855 peer.state = PeerConnectionState::PendingOut;
856 peer.fork_id = fork_id;
857 entry.insert(peer);
858 self.connection_info.inc_pending_out();
859 self.queued_actions
860 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
861 }
862 }
863
864 if kind.is_trusted() {
865 self.trusted_peer_ids.insert(peer_id);
866 }
867 }
868
869 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
871 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
872 if !entry.get().is_trusted() {
873 return
874 }
875
876 let peer = entry.get_mut();
877 peer.kind = PeerKind::Basic;
878
879 self.trusted_peer_ids.remove(&peer_id);
880 }
881
882 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
892 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
893 !peer.is_backed_off() &&
894 !peer.is_banned() &&
895 peer.state.is_unconnected() &&
896 (!self.trusted_nodes_only || peer.is_trusted())
897 });
898
899 let mut best_peer = unconnected.next()?;
901
902 if best_peer.1.is_trusted() || best_peer.1.is_static() {
903 return Some((*best_peer.0, best_peer.1))
904 }
905
906 for maybe_better in unconnected {
907 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
909 return Some((*maybe_better.0, maybe_better.1))
910 }
911
912 if maybe_better.1.reputation > best_peer.1.reputation {
914 best_peer = maybe_better;
915 }
916 }
917 Some((*best_peer.0, best_peer.1))
918 }
919
920 fn fill_outbound_slots(&mut self) {
926 self.tick();
927
928 if !self.net_connection_state.is_active() {
929 return
931 }
932
933 while self.connection_info.has_out_capacity() {
935 let action = {
936 let (peer_id, peer) = match self.best_unconnected() {
937 Some(peer) => peer,
938 _ => break,
939 };
940
941 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
942
943 peer.state = PeerConnectionState::PendingOut;
944 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
945 };
946
947 self.connection_info.inc_pending_out();
948
949 self.queued_actions.push_back(action);
950 }
951 }
952
953 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
954 if let Some(peer) = self.peers.get_mut(&peer_id) {
955 let new_addr = PeerAddr::new_with_ports(
956 new_record.address,
957 new_record.tcp_port,
958 Some(new_record.udp_port),
959 );
960
961 if peer.addr != new_addr {
962 peer.addr = new_addr;
963 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
964 }
965 }
966 }
967
968 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
970 self.net_connection_state = state;
971 }
972
973 pub const fn connection_state(&self) -> &NetworkConnectionState {
975 &self.net_connection_state
976 }
977
978 pub const fn on_shutdown(&mut self) {
980 self.net_connection_state = NetworkConnectionState::ShuttingDown;
981 }
982
983 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
988 loop {
989 if let Some(action) = self.queued_actions.pop_front() {
991 return Poll::Ready(action)
992 }
993
994 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
995 match cmd {
996 PeerCommand::Add(peer_id, addr) => {
997 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
998 }
999 PeerCommand::Remove(peer) => self.remove_peer(peer),
1000 PeerCommand::ReputationChange(peer_id, rep) => {
1001 self.apply_reputation_change(&peer_id, rep)
1002 }
1003 PeerCommand::GetPeer(peer, tx) => {
1004 let _ = tx.send(self.peers.get(&peer).cloned());
1005 }
1006 PeerCommand::GetPeers(tx) => {
1007 let _ = tx.send(self.iter_peers().collect());
1008 }
1009 }
1010 }
1011
1012 if self.release_interval.poll_tick(cx).is_ready() {
1013 let now = std::time::Instant::now();
1014 let (_, unbanned_peers) = self.ban_list.evict(now);
1015
1016 for peer_id in unbanned_peers {
1017 if let Some(peer) = self.peers.get_mut(&peer_id) {
1018 peer.unban();
1019 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1020 }
1021 }
1022
1023 self.backed_off_peers.retain(|peer_id, until| {
1026 if now > *until {
1027 if let Some(peer) = self.peers.get_mut(peer_id) {
1028 peer.backed_off = false;
1029 }
1030 return false
1031 }
1032 true
1033 })
1034 }
1035
1036 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1037 self.fill_outbound_slots();
1038 }
1039
1040 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1041 self.on_resolved_peer(peer_id, new_record);
1042 }
1043
1044 if self.queued_actions.is_empty() {
1045 return Poll::Pending
1046 }
1047 }
1048 }
1049}
1050
1051impl Default for PeersManager {
1052 fn default() -> Self {
1053 Self::new(Default::default())
1054 }
1055}
1056
1057#[derive(Debug, Clone, PartialEq, Eq, Default)]
1059pub struct ConnectionInfo {
1060 num_outbound: usize,
1062 num_pending_out: usize,
1064 num_inbound: usize,
1066 num_pending_in: usize,
1068 config: ConnectionsConfig,
1070}
1071
1072impl ConnectionInfo {
1075 const fn new(config: ConnectionsConfig) -> Self {
1077 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1078 }
1079
1080 const fn has_out_capacity(&self) -> bool {
1082 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1083 self.num_outbound < self.config.max_outbound
1084 }
1085
1086 const fn has_in_capacity(&self) -> bool {
1088 self.num_inbound < self.config.max_inbound
1089 }
1090
1091 const fn has_in_pending_capacity(&self) -> bool {
1093 self.num_pending_in < self.config.max_inbound
1094 }
1095
1096 const fn decr_state(&mut self, state: PeerConnectionState) {
1097 match state {
1098 PeerConnectionState::Idle => {}
1099 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1100 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1101 PeerConnectionState::PendingOut => self.decr_pending_out(),
1102 }
1103 }
1104
1105 const fn decr_out(&mut self) {
1106 self.num_outbound -= 1;
1107 }
1108
1109 const fn inc_out(&mut self) {
1110 self.num_outbound += 1;
1111 }
1112
1113 const fn inc_pending_out(&mut self) {
1114 self.num_pending_out += 1;
1115 }
1116
1117 const fn inc_in(&mut self) {
1118 self.num_inbound += 1;
1119 }
1120
1121 const fn inc_pending_in(&mut self) {
1122 self.num_pending_in += 1;
1123 }
1124
1125 const fn decr_in(&mut self) {
1126 self.num_inbound -= 1;
1127 }
1128
1129 const fn decr_pending_out(&mut self) {
1130 self.num_pending_out -= 1;
1131 }
1132
1133 const fn decr_pending_in(&mut self) {
1134 self.num_pending_in -= 1;
1135 }
1136}
1137
1138#[derive(Debug)]
1140pub enum PeerAction {
1141 Connect {
1143 peer_id: PeerId,
1145 remote_addr: SocketAddr,
1147 },
1148 Disconnect {
1150 peer_id: PeerId,
1152 reason: Option<DisconnectReason>,
1154 },
1155 DisconnectBannedIncoming {
1158 peer_id: PeerId,
1160 },
1161 DisconnectUntrustedIncoming {
1163 peer_id: PeerId,
1165 },
1166 DiscoveryBanPeerId {
1168 peer_id: PeerId,
1170 ip_addr: IpAddr,
1172 },
1173 DiscoveryBanIp {
1175 ip_addr: IpAddr,
1177 },
1178 BanPeer {
1180 peer_id: PeerId,
1182 },
1183 UnBanPeer {
1185 peer_id: PeerId,
1187 },
1188 PeerAdded(PeerId),
1190 PeerRemoved(PeerId),
1192}
1193
1194#[derive(Debug, Error, PartialEq, Eq)]
1196pub enum InboundConnectionError {
1197 IpBanned,
1199 ExceedsCapacity,
1201}
1202
1203impl Display for InboundConnectionError {
1204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1205 write!(f, "{self:?}")
1206 }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211 use alloy_primitives::B512;
1212 use reth_eth_wire::{
1213 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1214 DisconnectReason,
1215 };
1216 use reth_net_banlist::BanList;
1217 use reth_network_api::Direction;
1218 use reth_network_peers::{PeerId, TrustedPeer};
1219 use reth_network_types::{
1220 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1221 };
1222 use std::{
1223 future::{poll_fn, Future},
1224 io,
1225 net::{IpAddr, Ipv4Addr, SocketAddr},
1226 pin::Pin,
1227 task::{Context, Poll},
1228 time::Duration,
1229 };
1230 use url::Host;
1231
1232 use super::PeersManager;
1233 use crate::{
1234 error::SessionError,
1235 peers::{
1236 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1237 PeerConnectionState,
1238 },
1239 session::PendingSessionHandshakeError,
1240 PeersConfig,
1241 };
1242
1243 struct PeerActionFuture<'a> {
1244 peers: &'a mut PeersManager,
1245 }
1246
1247 impl Future for PeerActionFuture<'_> {
1248 type Output = PeerAction;
1249
1250 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1251 self.get_mut().peers.poll(cx)
1252 }
1253 }
1254
1255 macro_rules! event {
1256 ($peers:expr) => {
1257 PeerActionFuture { peers: &mut $peers }.await
1258 };
1259 }
1260
1261 #[tokio::test]
1262 async fn test_insert() {
1263 let peer = PeerId::random();
1264 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1265 let mut peers = PeersManager::default();
1266 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1267
1268 match event!(peers) {
1269 PeerAction::PeerAdded(peer_id) => {
1270 assert_eq!(peer_id, peer);
1271 }
1272 _ => unreachable!(),
1273 }
1274 match event!(peers) {
1275 PeerAction::Connect { peer_id, remote_addr } => {
1276 assert_eq!(peer_id, peer);
1277 assert_eq!(remote_addr, socket_addr);
1278 }
1279 _ => unreachable!(),
1280 }
1281
1282 let (record, _) = peers.peer_by_id(peer).unwrap();
1283 assert_eq!(record.tcp_addr(), socket_addr);
1284 assert_eq!(record.udp_addr(), socket_addr);
1285 }
1286
1287 #[tokio::test]
1288 async fn test_insert_udp() {
1289 let peer = PeerId::random();
1290 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1291 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1292 let mut peers = PeersManager::default();
1293 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1294
1295 match event!(peers) {
1296 PeerAction::PeerAdded(peer_id) => {
1297 assert_eq!(peer_id, peer);
1298 }
1299 _ => unreachable!(),
1300 }
1301 match event!(peers) {
1302 PeerAction::Connect { peer_id, remote_addr } => {
1303 assert_eq!(peer_id, peer);
1304 assert_eq!(remote_addr, tcp_addr);
1305 }
1306 _ => unreachable!(),
1307 }
1308
1309 let (record, _) = peers.peer_by_id(peer).unwrap();
1310 assert_eq!(record.tcp_addr(), tcp_addr);
1311 assert_eq!(record.udp_addr(), udp_addr);
1312 }
1313
1314 #[tokio::test]
1315 async fn test_ban() {
1316 let peer = PeerId::random();
1317 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1318 let mut peers = PeersManager::default();
1319 peers.ban_peer(peer);
1320 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1321
1322 match event!(peers) {
1323 PeerAction::BanPeer { peer_id } => {
1324 assert_eq!(peer_id, peer);
1325 }
1326 _ => unreachable!(),
1327 }
1328
1329 poll_fn(|cx| {
1330 assert!(peers.poll(cx).is_pending());
1331 Poll::Ready(())
1332 })
1333 .await;
1334 }
1335
1336 #[tokio::test]
1337 async fn test_unban() {
1338 let peer = PeerId::random();
1339 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1340 let mut peers = PeersManager::default();
1341 peers.ban_peer(peer);
1342 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1343
1344 match event!(peers) {
1345 PeerAction::BanPeer { peer_id } => {
1346 assert_eq!(peer_id, peer);
1347 }
1348 _ => unreachable!(),
1349 }
1350
1351 poll_fn(|cx| {
1352 assert!(peers.poll(cx).is_pending());
1353 Poll::Ready(())
1354 })
1355 .await;
1356
1357 peers.unban_peer(peer);
1358
1359 match event!(peers) {
1360 PeerAction::UnBanPeer { peer_id } => {
1361 assert_eq!(peer_id, peer);
1362 }
1363 _ => unreachable!(),
1364 }
1365
1366 poll_fn(|cx| {
1367 assert!(peers.poll(cx).is_pending());
1368 Poll::Ready(())
1369 })
1370 .await;
1371 }
1372
1373 #[tokio::test]
1374 async fn test_backoff_on_busy() {
1375 let peer = PeerId::random();
1376 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1377
1378 let mut peers = PeersManager::new(PeersConfig::test());
1379 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1380
1381 match event!(peers) {
1382 PeerAction::PeerAdded(peer_id) => {
1383 assert_eq!(peer_id, peer);
1384 }
1385 _ => unreachable!(),
1386 }
1387 match event!(peers) {
1388 PeerAction::Connect { peer_id, .. } => {
1389 assert_eq!(peer_id, peer);
1390 }
1391 _ => unreachable!(),
1392 }
1393
1394 poll_fn(|cx| {
1395 assert!(peers.poll(cx).is_pending());
1396 Poll::Ready(())
1397 })
1398 .await;
1399
1400 peers.on_active_session_dropped(
1401 &socket_addr,
1402 &peer,
1403 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1404 DisconnectReason::TooManyPeers,
1405 )),
1406 );
1407
1408 poll_fn(|cx| {
1409 assert!(peers.poll(cx).is_pending());
1410 Poll::Ready(())
1411 })
1412 .await;
1413
1414 assert!(peers.backed_off_peers.contains_key(&peer));
1415 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1416
1417 tokio::time::sleep(peers.backoff_durations.low).await;
1418
1419 match event!(peers) {
1420 PeerAction::Connect { peer_id, .. } => {
1421 assert_eq!(peer_id, peer);
1422 }
1423 _ => unreachable!(),
1424 }
1425
1426 assert!(!peers.backed_off_peers.contains_key(&peer));
1427 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1428 }
1429
1430 #[tokio::test]
1431 async fn test_backoff_on_no_response() {
1432 let peer = PeerId::random();
1433 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1434
1435 let backoff_durations = PeerBackoffDurations::test();
1436 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1437 let mut peers = PeersManager::new(config);
1438 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1439
1440 match event!(peers) {
1441 PeerAction::PeerAdded(peer_id) => {
1442 assert_eq!(peer_id, peer);
1443 }
1444 _ => unreachable!(),
1445 }
1446 match event!(peers) {
1447 PeerAction::Connect { peer_id, .. } => {
1448 assert_eq!(peer_id, peer);
1449 }
1450 _ => unreachable!(),
1451 }
1452
1453 poll_fn(|cx| {
1454 assert!(peers.poll(cx).is_pending());
1455 Poll::Ready(())
1456 })
1457 .await;
1458
1459 peers.on_outgoing_pending_session_dropped(
1460 &socket_addr,
1461 &peer,
1462 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1463 EthHandshakeError::NoResponse,
1464 )),
1465 );
1466
1467 poll_fn(|cx| {
1468 assert!(peers.poll(cx).is_pending());
1469 Poll::Ready(())
1470 })
1471 .await;
1472
1473 assert!(peers.backed_off_peers.contains_key(&peer));
1474 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1475
1476 tokio::time::sleep(backoff_durations.high).await;
1477
1478 match event!(peers) {
1479 PeerAction::Connect { peer_id, .. } => {
1480 assert_eq!(peer_id, peer);
1481 }
1482 _ => unreachable!(),
1483 }
1484
1485 assert!(!peers.backed_off_peers.contains_key(&peer));
1486 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1487 }
1488
1489 #[tokio::test]
1490 async fn test_low_backoff() {
1491 let peer = PeerId::random();
1492 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1493 let config = PeersConfig::test();
1494 let mut peers = PeersManager::new(config);
1495 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1496 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1497
1498 let backoff_timestamp = peers
1499 .backoff_durations
1500 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1501
1502 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1503 assert!(backoff_timestamp <= expected);
1504 }
1505
1506 #[tokio::test]
1507 async fn test_multiple_backoff_calculations() {
1508 let peer = PeerId::random();
1509 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1510 let config = PeersConfig::default();
1511 let mut peers = PeersManager::new(config);
1512 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1513 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1514
1515 peer_struct.severe_backoff_counter = 1;
1517
1518 let now = std::time::Instant::now();
1519
1520 peer_struct.severe_backoff_counter += 1;
1522 let backoff_time = peers
1524 .backoff_durations
1525 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1526
1527 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1529
1530 assert!(backoff_time.duration_since(now) > backoff_duration);
1533 }
1534
1535 #[tokio::test]
1536 async fn test_ban_on_active_drop() {
1537 let peer = PeerId::random();
1538 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1539 let mut peers = PeersManager::default();
1540 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1541
1542 match event!(peers) {
1543 PeerAction::PeerAdded(peer_id) => {
1544 assert_eq!(peer_id, peer);
1545 }
1546 _ => unreachable!(),
1547 }
1548 match event!(peers) {
1549 PeerAction::Connect { peer_id, .. } => {
1550 assert_eq!(peer_id, peer);
1551 }
1552 _ => unreachable!(),
1553 }
1554
1555 poll_fn(|cx| {
1556 assert!(peers.poll(cx).is_pending());
1557 Poll::Ready(())
1558 })
1559 .await;
1560
1561 peers.on_active_session_dropped(
1562 &socket_addr,
1563 &peer,
1564 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1565 DisconnectReason::UselessPeer,
1566 )),
1567 );
1568
1569 match event!(peers) {
1570 PeerAction::PeerRemoved(peer_id) => {
1571 assert_eq!(peer_id, peer);
1572 }
1573 _ => unreachable!(),
1574 }
1575 match event!(peers) {
1576 PeerAction::BanPeer { peer_id } => {
1577 assert_eq!(peer_id, peer);
1578 }
1579 _ => unreachable!(),
1580 }
1581
1582 poll_fn(|cx| {
1583 assert!(peers.poll(cx).is_pending());
1584 Poll::Ready(())
1585 })
1586 .await;
1587
1588 assert!(!peers.peers.contains_key(&peer));
1589 }
1590
1591 #[tokio::test]
1592 async fn test_remove_on_max_backoff_count() {
1593 let peer = PeerId::random();
1594 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1595 let config = PeersConfig::test();
1596 let mut peers = PeersManager::new(config.clone());
1597 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1598 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1599
1600 peer_struct.severe_backoff_counter = config.max_backoff_count;
1602
1603 match event!(peers) {
1604 PeerAction::PeerAdded(peer_id) => {
1605 assert_eq!(peer_id, peer);
1606 }
1607 _ => unreachable!(),
1608 }
1609 match event!(peers) {
1610 PeerAction::Connect { peer_id, .. } => {
1611 assert_eq!(peer_id, peer);
1612 }
1613 _ => unreachable!(),
1614 }
1615
1616 poll_fn(|cx| {
1617 assert!(peers.poll(cx).is_pending());
1618 Poll::Ready(())
1619 })
1620 .await;
1621
1622 peers.on_outgoing_pending_session_dropped(
1623 &socket_addr,
1624 &peer,
1625 &PendingSessionHandshakeError::Eth(
1626 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1627 ),
1628 );
1629
1630 match event!(peers) {
1631 PeerAction::PeerRemoved(peer_id) => {
1632 assert_eq!(peer_id, peer);
1633 }
1634 _ => unreachable!(),
1635 }
1636
1637 poll_fn(|cx| {
1638 assert!(peers.poll(cx).is_pending());
1639 Poll::Ready(())
1640 })
1641 .await;
1642
1643 assert!(!peers.peers.contains_key(&peer));
1644 }
1645
1646 #[tokio::test]
1647 async fn test_ban_on_pending_drop() {
1648 let peer = PeerId::random();
1649 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1650 let mut peers = PeersManager::default();
1651 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1652
1653 match event!(peers) {
1654 PeerAction::PeerAdded(peer_id) => {
1655 assert_eq!(peer_id, peer);
1656 }
1657 _ => unreachable!(),
1658 }
1659 match event!(peers) {
1660 PeerAction::Connect { peer_id, .. } => {
1661 assert_eq!(peer_id, peer);
1662 }
1663 _ => unreachable!(),
1664 }
1665
1666 poll_fn(|cx| {
1667 assert!(peers.poll(cx).is_pending());
1668 Poll::Ready(())
1669 })
1670 .await;
1671
1672 peers.on_outgoing_pending_session_dropped(
1673 &socket_addr,
1674 &peer,
1675 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1676 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1677 )),
1678 );
1679
1680 match event!(peers) {
1681 PeerAction::PeerRemoved(peer_id) => {
1682 assert_eq!(peer_id, peer);
1683 }
1684 _ => unreachable!(),
1685 }
1686 match event!(peers) {
1687 PeerAction::BanPeer { peer_id } => {
1688 assert_eq!(peer_id, peer);
1689 }
1690 _ => unreachable!(),
1691 }
1692
1693 poll_fn(|cx| {
1694 assert!(peers.poll(cx).is_pending());
1695 Poll::Ready(())
1696 })
1697 .await;
1698
1699 assert!(!peers.peers.contains_key(&peer));
1700 }
1701
1702 #[tokio::test]
1703 async fn test_internally_closed_incoming() {
1704 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1705 let mut peers = PeersManager::default();
1706
1707 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1708 assert_eq!(peers.connection_info.num_pending_in, 1);
1709 peers.on_incoming_pending_session_rejected_internally();
1710 assert_eq!(peers.connection_info.num_pending_in, 0);
1711 }
1712
1713 #[tokio::test]
1714 async fn test_reject_incoming_at_pending_capacity() {
1715 let mut peers = PeersManager::default();
1716
1717 for count in 1..=peers.connection_info.config.max_inbound {
1718 let socket_addr =
1719 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1720 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1721 assert_eq!(peers.connection_info.num_pending_in, count);
1722 }
1723 assert!(peers.connection_info.has_in_capacity());
1724 assert!(!peers.connection_info.has_in_pending_capacity());
1725
1726 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1727 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1728 }
1729
1730 #[tokio::test]
1731 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1732 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1733 let trusted = PeerId::random();
1734 peers.add_trusted_peer_id(trusted);
1735
1736 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1738 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1739 peers.on_incoming_session_established(trusted, addr);
1740
1741 match event!(peers) {
1742 PeerAction::PeerAdded(id) => {
1743 assert_eq!(id, trusted);
1744 }
1745 _ => unreachable!(),
1746 }
1747
1748 let mut connected_untrusted_peer_ids = Vec::new();
1750 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1751 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1752 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1753 let peer_id = PeerId::random();
1754 peers.on_incoming_session_established(peer_id, addr);
1755 connected_untrusted_peer_ids.push(peer_id);
1756
1757 match event!(peers) {
1758 PeerAction::PeerAdded(id) => {
1759 assert_eq!(id, peer_id);
1760 }
1761 _ => unreachable!(),
1762 }
1763 }
1764
1765 let mut pending_addrs = Vec::new();
1766
1767 for i in 0..2 {
1769 let socket_addr =
1770 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1771 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1772
1773 pending_addrs.push(socket_addr);
1774 }
1775
1776 assert_eq!(peers.connection_info.num_pending_in, 2);
1777
1778 for i in 0..2 {
1780 let socket_addr =
1781 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1782 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1783 }
1784
1785 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1786 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1787 DisconnectReason::UselessPeer,
1788 )),
1789 ));
1790
1791 for pending_addr in pending_addrs {
1793 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1794 }
1795
1796 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1797
1798 println!(
1799 "num_inbound: {}, has_in_capacity: {}",
1800 peers.connection_info.num_inbound,
1801 peers.connection_info.has_in_capacity()
1802 );
1803
1804 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1806
1807 println!(
1808 "num_inbound: {}, has_in_capacity: {}",
1809 peers.connection_info.num_inbound,
1810 peers.connection_info.has_in_capacity()
1811 );
1812
1813 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1814 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1815 }
1816
1817 #[tokio::test]
1818 async fn test_closed_incoming() {
1819 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1820 let mut peers = PeersManager::default();
1821
1822 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1823 assert_eq!(peers.connection_info.num_pending_in, 1);
1824 peers.on_incoming_pending_session_gracefully_closed();
1825 assert_eq!(peers.connection_info.num_pending_in, 0);
1826 }
1827
1828 #[tokio::test]
1829 async fn test_dropped_incoming() {
1830 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1831 let ban_duration = Duration::from_millis(500);
1832 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1833 let mut peers = PeersManager::new(config);
1834
1835 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1836 assert_eq!(peers.connection_info.num_pending_in, 1);
1837 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1838 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1839 DisconnectReason::UselessPeer,
1840 )),
1841 ));
1842
1843 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1844 assert_eq!(peers.connection_info.num_pending_in, 0);
1845 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1846
1847 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1848
1849 tokio::time::sleep(ban_duration).await;
1851
1852 poll_fn(|cx| {
1853 let _ = peers.poll(cx);
1854 Poll::Ready(())
1855 })
1856 .await;
1857
1858 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1859 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1860 }
1861
1862 #[tokio::test]
1863 async fn test_reputation_change_connected() {
1864 let peer = PeerId::random();
1865 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1866 let mut peers = PeersManager::default();
1867 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1868
1869 match event!(peers) {
1870 PeerAction::PeerAdded(peer_id) => {
1871 assert_eq!(peer_id, peer);
1872 }
1873 _ => unreachable!(),
1874 }
1875 match event!(peers) {
1876 PeerAction::Connect { peer_id, remote_addr } => {
1877 assert_eq!(peer_id, peer);
1878 assert_eq!(remote_addr, socket_addr);
1879 }
1880 _ => unreachable!(),
1881 }
1882
1883 let p = peers.peers.get_mut(&peer).unwrap();
1884 assert_eq!(p.state, PeerConnectionState::PendingOut);
1885
1886 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1887
1888 let p = peers.peers.get(&peer).unwrap();
1889 assert_eq!(p.state, PeerConnectionState::PendingOut);
1890 assert!(p.is_banned());
1891
1892 peers.on_active_session_gracefully_closed(peer);
1893
1894 let p = peers.peers.get(&peer).unwrap();
1895 assert_eq!(p.state, PeerConnectionState::Idle);
1896 assert!(p.is_banned());
1897
1898 match event!(peers) {
1899 PeerAction::Disconnect { peer_id, .. } => {
1900 assert_eq!(peer_id, peer);
1901 }
1902 _ => unreachable!(),
1903 }
1904 }
1905
1906 #[tokio::test]
1907 async fn accept_incoming_trusted_unknown_peer_address() {
1908 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1909 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1910 let trusted = PeerId::random();
1912 peers.add_trusted_peer_id(trusted);
1913
1914 for i in 0..peers.connection_info.config.max_inbound {
1916 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1917 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1918 let peer_id = PeerId::random();
1919 peers.on_incoming_session_established(peer_id, addr);
1920
1921 match event!(peers) {
1922 PeerAction::PeerAdded(id) => {
1923 assert_eq!(id, peer_id);
1924 }
1925 _ => unreachable!(),
1926 }
1927 }
1928
1929 let untrusted = PeerId::random();
1931 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1932 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1933 peers.on_incoming_session_established(untrusted, socket_addr);
1934
1935 match event!(peers) {
1936 PeerAction::PeerAdded(id) => {
1937 assert_eq!(id, untrusted);
1938 }
1939 _ => unreachable!(),
1940 }
1941
1942 match event!(peers) {
1943 PeerAction::Disconnect { peer_id, reason } => {
1944 assert_eq!(peer_id, untrusted);
1945 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1946 }
1947 _ => unreachable!(),
1948 }
1949
1950 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1951 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1952 peers.on_incoming_session_established(trusted, socket_addr);
1953
1954 match event!(peers) {
1955 PeerAction::PeerAdded(id) => {
1956 assert_eq!(id, trusted);
1957 }
1958 _ => unreachable!(),
1959 }
1960
1961 poll_fn(|cx| {
1962 assert!(peers.poll(cx).is_pending());
1963 Poll::Ready(())
1964 })
1965 .await;
1966
1967 let peer = peers.peers.get(&trusted).unwrap();
1968 assert_eq!(peer.state, PeerConnectionState::In);
1969 }
1970
1971 #[tokio::test]
1972 async fn test_already_connected() {
1973 let peer = PeerId::random();
1974 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1975 let mut peers = PeersManager::default();
1976
1977 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1979 assert_eq!(peers.connection_info.num_pending_in, 1);
1980
1981 peers.on_incoming_session_established(peer, socket_addr);
1984 let p = peers.peers.get_mut(&peer).expect("peer not found");
1985 assert_eq!(p.addr.tcp(), socket_addr);
1986 assert_eq!(peers.connection_info.num_pending_in, 0);
1987 assert_eq!(peers.connection_info.num_inbound, 1);
1988
1989 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1992 assert_eq!(peers.connection_info.num_pending_in, 1);
1993
1994 peers.on_already_connected(Direction::Incoming);
1998
1999 let p = peers.peers.get_mut(&peer).expect("peer not found");
2000 assert_eq!(p.addr.tcp(), socket_addr);
2001 assert_eq!(peers.connection_info.num_pending_in, 0);
2002 assert_eq!(peers.connection_info.num_inbound, 1);
2003 }
2004
2005 #[tokio::test]
2006 async fn test_reputation_change_trusted_peer() {
2007 let peer = PeerId::random();
2008 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2009 let mut peers = PeersManager::default();
2010 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2011
2012 match event!(peers) {
2013 PeerAction::PeerAdded(peer_id) => {
2014 assert_eq!(peer_id, peer);
2015 }
2016 _ => unreachable!(),
2017 }
2018 match event!(peers) {
2019 PeerAction::Connect { peer_id, remote_addr } => {
2020 assert_eq!(peer_id, peer);
2021 assert_eq!(remote_addr, socket_addr);
2022 }
2023 _ => unreachable!(),
2024 }
2025
2026 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2027 peers.on_active_outgoing_established(peer);
2028 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2029
2030 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2031
2032 {
2033 let p = peers.peers.get(&peer).unwrap();
2034 assert_eq!(p.state, PeerConnectionState::Out);
2035 assert!(!p.is_banned());
2037 }
2038
2039 loop {
2041 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2042
2043 let p = peers.peers.get(&peer).unwrap();
2044 if p.is_banned() {
2045 break
2046 }
2047 }
2048
2049 match event!(peers) {
2050 PeerAction::Disconnect { peer_id, .. } => {
2051 assert_eq!(peer_id, peer);
2052 }
2053 _ => unreachable!(),
2054 }
2055 }
2056
2057 #[tokio::test]
2058 async fn test_reputation_management() {
2059 let peer = PeerId::random();
2060 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2061 let mut peers = PeersManager::default();
2062 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2063 assert_eq!(peers.get_reputation(&peer), Some(0));
2064
2065 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2066 assert_eq!(peers.get_reputation(&peer), Some(1024));
2067
2068 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2069 assert_eq!(peers.get_reputation(&peer), Some(0));
2070 }
2071
2072 #[tokio::test]
2073 async fn test_remove_discovered_active() {
2074 let peer = PeerId::random();
2075 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2076 let mut peers = PeersManager::default();
2077 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2078
2079 match event!(peers) {
2080 PeerAction::PeerAdded(peer_id) => {
2081 assert_eq!(peer_id, peer);
2082 }
2083 _ => unreachable!(),
2084 }
2085 match event!(peers) {
2086 PeerAction::Connect { peer_id, remote_addr } => {
2087 assert_eq!(peer_id, peer);
2088 assert_eq!(remote_addr, socket_addr);
2089 }
2090 _ => unreachable!(),
2091 }
2092
2093 let p = peers.peers.get(&peer).unwrap();
2094 assert_eq!(p.state, PeerConnectionState::PendingOut);
2095
2096 peers.remove_peer(peer);
2097
2098 match event!(peers) {
2099 PeerAction::PeerRemoved(peer_id) => {
2100 assert_eq!(peer_id, peer);
2101 }
2102 _ => unreachable!(),
2103 }
2104 match event!(peers) {
2105 PeerAction::Disconnect { peer_id, .. } => {
2106 assert_eq!(peer_id, peer);
2107 }
2108 _ => unreachable!(),
2109 }
2110
2111 let p = peers.peers.get(&peer).unwrap();
2112 assert_eq!(p.state, PeerConnectionState::PendingOut);
2113
2114 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2115 let p = peers.peers.get(&peer).unwrap();
2116 assert_eq!(p.state, PeerConnectionState::PendingOut);
2117
2118 peers.on_active_session_gracefully_closed(peer);
2119 assert!(!peers.peers.contains_key(&peer));
2120 }
2121
2122 #[tokio::test]
2123 async fn test_fatal_outgoing_connection_error_trusted() {
2124 let peer = PeerId::random();
2125 let config = PeersConfig::test()
2126 .with_trusted_nodes(vec![TrustedPeer {
2127 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2128 tcp_port: 8008,
2129 udp_port: 8008,
2130 id: peer,
2131 }])
2132 .with_trusted_nodes_only(true);
2133 let mut peers = PeersManager::new(config);
2134 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2135
2136 match event!(peers) {
2137 PeerAction::Connect { peer_id, remote_addr } => {
2138 assert_eq!(peer_id, peer);
2139 assert_eq!(remote_addr, socket_addr);
2140 }
2141 _ => unreachable!(),
2142 }
2143
2144 let p = peers.peers.get(&peer).unwrap();
2145 assert_eq!(p.state, PeerConnectionState::PendingOut);
2146
2147 assert_eq!(peers.num_outbound_connections(), 0);
2148
2149 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2150 EthHandshakeError::NonStatusMessageInHandshake,
2151 ));
2152 assert!(err.is_fatal_protocol_error());
2153
2154 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2155 assert_eq!(peers.num_outbound_connections(), 0);
2156
2157 match event!(peers) {
2159 PeerAction::BanPeer { peer_id } => {
2160 assert_eq!(peer_id, peer);
2161 }
2162 err => unreachable!("{err:?}"),
2163 }
2164
2165 assert!(peers.peers.contains_key(&peer));
2167
2168 tokio::time::sleep(peers.backoff_durations.medium).await;
2170
2171 match event!(peers) {
2172 PeerAction::Connect { peer_id, remote_addr } => {
2173 assert_eq!(peer_id, peer);
2174 assert_eq!(remote_addr, socket_addr);
2175 }
2176 err => unreachable!("{err:?}"),
2177 }
2178 }
2179
2180 #[tokio::test]
2181 async fn test_outgoing_connection_error() {
2182 let peer = PeerId::random();
2183 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2184 let mut peers = PeersManager::default();
2185 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2186
2187 match event!(peers) {
2188 PeerAction::PeerAdded(peer_id) => {
2189 assert_eq!(peer_id, peer);
2190 }
2191 _ => unreachable!(),
2192 }
2193 match event!(peers) {
2194 PeerAction::Connect { peer_id, remote_addr } => {
2195 assert_eq!(peer_id, peer);
2196 assert_eq!(remote_addr, socket_addr);
2197 }
2198 _ => unreachable!(),
2199 }
2200
2201 let p = peers.peers.get(&peer).unwrap();
2202 assert_eq!(p.state, PeerConnectionState::PendingOut);
2203
2204 assert_eq!(peers.num_outbound_connections(), 0);
2205
2206 peers.on_outgoing_connection_failure(
2207 &socket_addr,
2208 &peer,
2209 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2210 );
2211
2212 assert_eq!(peers.num_outbound_connections(), 0);
2213 }
2214
2215 #[tokio::test]
2216 async fn test_outgoing_connection_gracefully_closed() {
2217 let peer = PeerId::random();
2218 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2219 let mut peers = PeersManager::default();
2220 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2221
2222 match event!(peers) {
2223 PeerAction::PeerAdded(peer_id) => {
2224 assert_eq!(peer_id, peer);
2225 }
2226 _ => unreachable!(),
2227 }
2228 match event!(peers) {
2229 PeerAction::Connect { peer_id, remote_addr } => {
2230 assert_eq!(peer_id, peer);
2231 assert_eq!(remote_addr, socket_addr);
2232 }
2233 _ => unreachable!(),
2234 }
2235
2236 let p = peers.peers.get(&peer).unwrap();
2237 assert_eq!(p.state, PeerConnectionState::PendingOut);
2238
2239 assert_eq!(peers.num_outbound_connections(), 0);
2240
2241 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2242
2243 assert_eq!(peers.num_outbound_connections(), 0);
2244 assert_eq!(peers.connection_info.num_pending_out, 0);
2245 }
2246
2247 #[tokio::test]
2248 async fn test_discovery_ban_list() {
2249 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2250 let socket_addr = SocketAddr::new(ip, 8008);
2251 let ban_list = BanList::new(vec![], vec![ip]);
2252 let config = PeersConfig::default().with_ban_list(ban_list);
2253 let mut peer_manager = PeersManager::new(config);
2254 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2255
2256 assert!(peer_manager.peers.is_empty());
2257 }
2258
2259 #[tokio::test]
2260 async fn test_on_pending_ban_list() {
2261 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2262 let socket_addr = SocketAddr::new(ip, 8008);
2263 let ban_list = BanList::new(vec![], vec![ip]);
2264 let config = PeersConfig::test().with_ban_list(ban_list);
2265 let mut peer_manager = PeersManager::new(config);
2266 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2267 match a {
2269 Ok(_) => panic!(),
2270 Err(err) => match err {
2271 InboundConnectionError::IpBanned => {
2272 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2273 }
2274 _ => unreachable!(),
2275 },
2276 }
2277 }
2278
2279 #[tokio::test]
2280 async fn test_on_active_inbound_ban_list() {
2281 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2282 let socket_addr = SocketAddr::new(ip, 8008);
2283 let given_peer_id = PeerId::random();
2284 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2285 let config = PeersConfig::test().with_ban_list(ban_list);
2286 let mut peer_manager = PeersManager::new(config);
2287 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2288 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2290 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2291 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2294 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2295
2296 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2297 peer_manager.queued_actions.pop_front()
2298 else {
2299 panic!()
2300 };
2301
2302 assert_eq!(peer_id, given_peer_id)
2303 }
2304
2305 #[test]
2306 fn test_connection_limits() {
2307 let mut info = ConnectionInfo::default();
2308 info.inc_in();
2309 assert_eq!(info.num_inbound, 1);
2310 assert_eq!(info.num_outbound, 0);
2311 assert!(info.has_in_capacity());
2312
2313 info.decr_in();
2314 assert_eq!(info.num_inbound, 0);
2315 assert_eq!(info.num_outbound, 0);
2316
2317 info.inc_out();
2318 assert_eq!(info.num_inbound, 0);
2319 assert_eq!(info.num_outbound, 1);
2320 assert!(info.has_out_capacity());
2321
2322 info.decr_out();
2323 assert_eq!(info.num_inbound, 0);
2324 assert_eq!(info.num_outbound, 0);
2325 }
2326
2327 #[test]
2328 fn test_connection_peer_state() {
2329 let mut info = ConnectionInfo::default();
2330 info.inc_in();
2331
2332 info.decr_state(PeerConnectionState::In);
2333 assert_eq!(info.num_inbound, 0);
2334 assert_eq!(info.num_outbound, 0);
2335
2336 info.inc_out();
2337
2338 info.decr_state(PeerConnectionState::Out);
2339 assert_eq!(info.num_inbound, 0);
2340 assert_eq!(info.num_outbound, 0);
2341 }
2342
2343 #[tokio::test]
2344 async fn test_trusted_peers_are_prioritized() {
2345 let trusted_peer = PeerId::random();
2346 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2347 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2348 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2349 tcp_port: 8008,
2350 udp_port: 8008,
2351 id: trusted_peer,
2352 }]);
2353 let mut peers = PeersManager::new(config);
2354
2355 let basic_peer = PeerId::random();
2356 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2357 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2358
2359 match event!(peers) {
2360 PeerAction::PeerAdded(peer_id) => {
2361 assert_eq!(peer_id, basic_peer);
2362 }
2363 _ => unreachable!(),
2364 }
2365 match event!(peers) {
2366 PeerAction::Connect { peer_id, remote_addr } => {
2367 assert_eq!(peer_id, trusted_peer);
2368 assert_eq!(remote_addr, trusted_sock);
2369 }
2370 _ => unreachable!(),
2371 }
2372 match event!(peers) {
2373 PeerAction::Connect { peer_id, remote_addr } => {
2374 assert_eq!(peer_id, basic_peer);
2375 assert_eq!(remote_addr, basic_sock);
2376 }
2377 _ => unreachable!(),
2378 }
2379 }
2380
2381 #[tokio::test]
2382 async fn test_connect_trusted_nodes_only() {
2383 let trusted_peer = PeerId::random();
2384 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2385 let config = PeersConfig::test()
2386 .with_trusted_nodes(vec![TrustedPeer {
2387 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2388 tcp_port: 8008,
2389 udp_port: 8008,
2390 id: trusted_peer,
2391 }])
2392 .with_trusted_nodes_only(true);
2393 let mut peers = PeersManager::new(config);
2394
2395 let basic_peer = PeerId::random();
2396 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2397 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2398
2399 match event!(peers) {
2400 PeerAction::PeerAdded(peer_id) => {
2401 assert_eq!(peer_id, basic_peer);
2402 }
2403 _ => unreachable!(),
2404 }
2405 match event!(peers) {
2406 PeerAction::Connect { peer_id, remote_addr } => {
2407 assert_eq!(peer_id, trusted_peer);
2408 assert_eq!(remote_addr, trusted_sock);
2409 }
2410 _ => unreachable!(),
2411 }
2412 poll_fn(|cx| {
2413 assert!(peers.poll(cx).is_pending());
2414 Poll::Ready(())
2415 })
2416 .await;
2417 }
2418
2419 #[tokio::test]
2420 async fn test_incoming_with_trusted_nodes_only() {
2421 let trusted_peer = PeerId::random();
2422 let config = PeersConfig::test()
2423 .with_trusted_nodes(vec![TrustedPeer {
2424 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2425 tcp_port: 8008,
2426 udp_port: 8008,
2427 id: trusted_peer,
2428 }])
2429 .with_trusted_nodes_only(true);
2430 let mut peers = PeersManager::new(config);
2431
2432 let basic_peer = PeerId::random();
2433 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2434 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2435 assert_eq!(peers.connection_info.num_pending_in, 1);
2437 peers.on_incoming_session_established(basic_peer, basic_sock);
2438 assert_eq!(peers.connection_info.num_pending_in, 0);
2441 assert_eq!(peers.connection_info.num_inbound, 0);
2442
2443 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2444 peers.queued_actions.pop_front()
2445 else {
2446 panic!()
2447 };
2448 assert_eq!(basic_peer, peer_id);
2449 assert!(!peers.peers.contains_key(&basic_peer));
2450 }
2451
2452 #[tokio::test]
2453 async fn test_incoming_without_trusted_nodes_only() {
2454 let trusted_peer = PeerId::random();
2455 let config = PeersConfig::test()
2456 .with_trusted_nodes(vec![TrustedPeer {
2457 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2458 tcp_port: 8008,
2459 udp_port: 8008,
2460 id: trusted_peer,
2461 }])
2462 .with_trusted_nodes_only(false);
2463 let mut peers = PeersManager::new(config);
2464
2465 let basic_peer = PeerId::random();
2466 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2467 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2468
2469 assert_eq!(peers.connection_info.num_pending_in, 1);
2471 peers.on_incoming_session_established(basic_peer, basic_sock);
2472 assert_eq!(peers.connection_info.num_pending_in, 0);
2475 assert_eq!(peers.connection_info.num_inbound, 1);
2476 assert!(peers.peers.contains_key(&basic_peer));
2477 }
2478
2479 #[tokio::test]
2480 async fn test_incoming_at_capacity() {
2481 let mut config = PeersConfig::test();
2482 config.connection_info.max_inbound = 1;
2483 let mut peers = PeersManager::new(config);
2484
2485 let peer = PeerId::random();
2486 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2487 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2488
2489 peers.on_incoming_session_established(peer, addr);
2490
2491 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2492 assert_eq!(
2493 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2494 InboundConnectionError::ExceedsCapacity
2495 );
2496 }
2497
2498 #[tokio::test]
2499 async fn test_incoming_rate_limit() {
2500 let config = PeersConfig {
2501 incoming_ip_throttle_duration: Duration::from_millis(100),
2502 ..PeersConfig::test()
2503 };
2504 let mut peers = PeersManager::new(config);
2505
2506 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2507 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2508 assert_eq!(
2509 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2510 InboundConnectionError::IpBanned
2511 );
2512
2513 peers.release_interval.reset_immediately();
2514 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2515
2516 poll_fn(|cx| loop {
2518 if peers.poll(cx).is_pending() {
2519 return Poll::Ready(());
2520 }
2521 })
2522 .await;
2523
2524 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2525 assert_eq!(
2526 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2527 InboundConnectionError::IpBanned
2528 );
2529 }
2530
2531 #[tokio::test]
2532 async fn test_tick() {
2533 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2534 let socket_addr = SocketAddr::new(ip, 8008);
2535 let config = PeersConfig::test();
2536 let mut peer_manager = PeersManager::new(config);
2537 let peer_id = PeerId::random();
2538 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2539
2540 tokio::time::sleep(Duration::from_secs(1)).await;
2541 peer_manager.tick();
2542
2543 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2545
2546 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2548
2549 tokio::time::sleep(Duration::from_secs(1)).await;
2550 peer_manager.tick();
2551
2552 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2554
2555 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2556
2557 tokio::time::sleep(Duration::from_secs(1)).await;
2558 peer_manager.tick();
2559
2560 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2562 }
2563
2564 #[tokio::test]
2565 async fn test_remove_incoming_after_disconnect() {
2566 let peer_id = PeerId::random();
2567 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2568 let mut peers = PeersManager::default();
2569
2570 peers.on_incoming_pending_session(addr.ip()).unwrap();
2571 peers.on_incoming_session_established(peer_id, addr);
2572 let peer = peers.peers.get(&peer_id).unwrap();
2573 assert_eq!(peer.state, PeerConnectionState::In);
2574 assert!(peer.remove_after_disconnect);
2575
2576 peers.on_active_session_gracefully_closed(peer_id);
2577 assert!(!peers.peers.contains_key(&peer_id))
2578 }
2579
2580 #[tokio::test]
2581 async fn test_keep_incoming_after_disconnect_if_discovered() {
2582 let peer_id = PeerId::random();
2583 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2584 let mut peers = PeersManager::default();
2585
2586 peers.on_incoming_pending_session(addr.ip()).unwrap();
2587 peers.on_incoming_session_established(peer_id, addr);
2588 let peer = peers.peers.get(&peer_id).unwrap();
2589 assert_eq!(peer.state, PeerConnectionState::In);
2590 assert!(peer.remove_after_disconnect);
2591
2592 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2594
2595 peers.on_active_session_gracefully_closed(peer_id);
2596
2597 let peer = peers.peers.get(&peer_id).unwrap();
2598 assert_eq!(peer.state, PeerConnectionState::Idle);
2599 assert!(!peer.remove_after_disconnect);
2600 }
2601
2602 #[tokio::test]
2603 async fn test_incoming_outgoing_already_connected() {
2604 let peer_id = PeerId::random();
2605 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2606 let mut peers = PeersManager::default();
2607
2608 peers.on_incoming_pending_session(addr.ip()).unwrap();
2609 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2610
2611 match event!(peers) {
2612 PeerAction::PeerAdded(_) => {}
2613 _ => unreachable!(),
2614 }
2615
2616 match event!(peers) {
2617 PeerAction::Connect { .. } => {}
2618 _ => unreachable!(),
2619 }
2620
2621 peers.on_incoming_session_established(peer_id, addr);
2622 peers.on_already_connected(Direction::Outgoing(peer_id));
2623 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2624 assert_eq!(peers.connection_info.num_inbound, 1);
2625 assert_eq!(peers.connection_info.num_pending_out, 0);
2626 assert_eq!(peers.connection_info.num_pending_in, 0);
2627 assert_eq!(peers.connection_info.num_outbound, 0);
2628 }
2629
2630 #[tokio::test]
2631 async fn test_already_connected_incoming_outgoing_connection_error() {
2632 let peer_id = PeerId::random();
2633 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2634 let mut peers = PeersManager::default();
2635
2636 peers.on_incoming_pending_session(addr.ip()).unwrap();
2637 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2638
2639 match event!(peers) {
2640 PeerAction::PeerAdded(_) => {}
2641 _ => unreachable!(),
2642 }
2643
2644 match event!(peers) {
2645 PeerAction::Connect { .. } => {}
2646 _ => unreachable!(),
2647 }
2648
2649 peers.on_incoming_session_established(peer_id, addr);
2650
2651 peers.on_outgoing_connection_failure(
2652 &addr,
2653 &peer_id,
2654 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2655 );
2656 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2657 assert_eq!(peers.connection_info.num_inbound, 1);
2658 assert_eq!(peers.connection_info.num_pending_out, 0);
2659 assert_eq!(peers.connection_info.num_pending_in, 0);
2660 assert_eq!(peers.connection_info.num_outbound, 0);
2661 }
2662
2663 #[tokio::test]
2664 async fn test_max_concurrent_dials() {
2665 let config = PeersConfig::default();
2666 let mut peer_manager = PeersManager::new(config);
2667 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2668 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2669 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2670 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2671 }
2672
2673 peer_manager.fill_outbound_slots();
2674 let dials = peer_manager
2675 .queued_actions
2676 .iter()
2677 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2678 .count();
2679 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2680 }
2681
2682 #[tokio::test]
2683 async fn test_max_num_of_pending_dials() {
2684 let config = PeersConfig::default();
2685 let mut peer_manager = PeersManager::new(config);
2686 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2687 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2688
2689 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2691 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2692 }
2693
2694 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2695 match event!(peer_manager) {
2696 PeerAction::PeerAdded(_) => {}
2697 _ => unreachable!(),
2698 }
2699 }
2700
2701 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2702 match event!(peer_manager) {
2703 PeerAction::Connect { .. } => {}
2704 _ => unreachable!(),
2705 }
2706 }
2707
2708 peer_manager.fill_outbound_slots();
2710
2711 let dials = peer_manager.connection_info.num_pending_out;
2713 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2714
2715 let num_pendingout_states = peer_manager
2716 .peers
2717 .iter()
2718 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2719 .map(|(peer_id, _)| *peer_id)
2720 .collect::<Vec<PeerId>>();
2721 assert_eq!(
2722 num_pendingout_states.len(),
2723 peer_manager.connection_info.config.max_concurrent_outbound_dials
2724 );
2725
2726 for peer_id in &num_pendingout_states {
2728 peer_manager.on_active_outgoing_established(*peer_id);
2729 }
2730
2731 for peer_id in &num_pendingout_states {
2733 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2734 }
2735
2736 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2738 }
2739
2740 #[tokio::test]
2741 async fn test_connect() {
2742 let peer = PeerId::random();
2743 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2744 let mut peers = PeersManager::default();
2745 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2746 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2747
2748 match event!(peers) {
2749 PeerAction::Connect { peer_id, remote_addr } => {
2750 assert_eq!(peer_id, peer);
2751 assert_eq!(remote_addr, socket_addr);
2752 }
2753 _ => unreachable!(),
2754 }
2755
2756 let (record, _) = peers.peer_by_id(peer).unwrap();
2757 assert_eq!(record.tcp_addr(), socket_addr);
2758 assert_eq!(record.udp_addr(), socket_addr);
2759
2760 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2762
2763 let (record, _) = peers.peer_by_id(peer).unwrap();
2764 assert_eq!(record.tcp_addr(), socket_addr);
2765 assert_eq!(record.udp_addr(), socket_addr);
2766 }
2767
2768 #[tokio::test]
2769 async fn test_incoming_connection_from_banned() {
2770 let peer = PeerId::random();
2771 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2772 let config = PeersConfig::test().with_max_inbound(3);
2773 let mut peers = PeersManager::new(config);
2774 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2775
2776 match event!(peers) {
2777 PeerAction::PeerAdded(peer_id) => {
2778 assert_eq!(peer_id, peer);
2779 }
2780 _ => unreachable!(),
2781 }
2782 match event!(peers) {
2783 PeerAction::Connect { peer_id, .. } => {
2784 assert_eq!(peer_id, peer);
2785 }
2786 _ => unreachable!(),
2787 }
2788
2789 poll_fn(|cx| {
2790 assert!(peers.poll(cx).is_pending());
2791 Poll::Ready(())
2792 })
2793 .await;
2794
2795 loop {
2797 peers.on_active_session_dropped(
2798 &socket_addr,
2799 &peer,
2800 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2801 reth_eth_wire::EthVersion::Eth68,
2802 reth_eth_wire::EthMessageID::Status,
2803 )),
2804 );
2805
2806 if peers.peers.get(&peer).unwrap().is_banned() {
2807 break;
2808 }
2809
2810 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2811 peers.on_incoming_session_established(peer, socket_addr);
2812
2813 match event!(peers) {
2814 PeerAction::Connect { peer_id, .. } => {
2815 assert_eq!(peer_id, peer);
2816 }
2817 _ => unreachable!(),
2818 }
2819 }
2820
2821 assert!(peers.peers.get(&peer).unwrap().is_banned());
2822
2823 for _ in 0..peers.connection_info.config.max_inbound {
2825 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2826 peers.on_incoming_session_established(peer, socket_addr);
2827
2828 match event!(peers) {
2829 PeerAction::DisconnectBannedIncoming { peer_id } => {
2830 assert_eq!(peer_id, peer);
2831 }
2832 _ => unreachable!(),
2833 }
2834 }
2835
2836 poll_fn(|cx| {
2837 assert!(peers.poll(cx).is_pending());
2838 Poll::Ready(())
2839 })
2840 .await;
2841
2842 assert_eq!(peers.connection_info.num_inbound, 0);
2843
2844 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2845
2846 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2848 assert_eq!(peers.connection_info.num_pending_in, 1);
2849
2850 peers.on_active_session_gracefully_closed(peer);
2854 assert_eq!(peers.connection_info.num_inbound, 0);
2855 }
2856
2857 #[tokio::test]
2858 async fn test_add_pending_connect() {
2859 let peer = PeerId::random();
2860 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2861 let mut peers = PeersManager::default();
2862 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2863 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2864 assert_eq!(peers.connection_info.num_pending_out, 1);
2865 }
2866
2867 #[tokio::test]
2868 async fn test_dns_updates_peer_address() {
2869 let peer_id = PeerId::random();
2870 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2871 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2872
2873 let trusted = TrustedPeer {
2874 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2875 tcp_port: 8008,
2876 udp_port: 8008,
2877 id: peer_id,
2878 };
2879
2880 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2881 let mut manager = PeersManager::new(config);
2882 manager
2883 .trusted_peers_resolver
2884 .set_interval(tokio::time::interval(Duration::from_millis(1)));
2885
2886 manager.peers.insert(
2887 peer_id,
2888 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2889 );
2890
2891 for _ in 0..100 {
2892 let _ = event!(manager);
2893 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2894 break;
2895 }
2896 tokio::time::sleep(Duration::from_millis(10)).await;
2897 }
2898
2899 let updated_peer = manager.peers.get(&peer_id).unwrap();
2900 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2901 }
2902}