1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use rand::Rng;
12use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
13use reth_ethereum_forks::ForkId;
14use reth_net_banlist::BanList;
15use reth_network_api::test_utils::{PeerCommand, PeersHandle};
16use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
17use reth_network_types::{
18 is_connection_failed_reputation,
19 peers::{
20 config::{PeerBackoffDurations, PEER_ROTATION_MIN_UPTIME},
21 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
22 },
23 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
24 PersistedPeerInfo, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
25};
26use std::{
27 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
28 fmt::Display,
29 io::{self},
30 net::{IpAddr, SocketAddr},
31 pin::Pin,
32 task::{Context, Poll},
33 time::Duration,
34};
35use thiserror::Error;
36use tokio::{
37 sync::mpsc,
38 time::{Instant, Interval, Sleep},
39};
40use tokio_stream::wrappers::UnboundedReceiverStream;
41use tracing::{trace, warn};
42
43#[derive(Debug)]
50pub struct PeersManager {
51 peers: HashMap<PeerId, Peer>,
53 trusted_peer_ids: HashSet<PeerId>,
58 trusted_peers_resolver: TrustedPeersResolver,
61 manager_tx: mpsc::UnboundedSender<PeerCommand>,
63 handle_rx: UnboundedReceiverStream<PeerCommand>,
65 queued_actions: VecDeque<PeerAction>,
67 refill_slots_interval: Interval,
69 reputation_weights: ReputationChangeWeights,
71 connection_info: ConnectionInfo,
73 ban_list: BanList,
75 backed_off_peers: HashMap<PeerId, std::time::Instant>,
77 release_interval: Interval,
79 ban_duration: Duration,
81 backoff_durations: PeerBackoffDurations,
84 trusted_nodes_only: bool,
87 last_tick: Instant,
89 max_backoff_count: u8,
91 net_connection_state: NetworkConnectionState,
93 incoming_ip_throttle_duration: Duration,
95 ip_filter: reth_net_banlist::IpFilter,
97 enforce_enr_fork_id: bool,
100 peer_rotation_sleep: Option<Pin<Box<Sleep>>>,
103 peer_rotation_mean: Option<Duration>,
105}
106
107impl PeersManager {
108 pub fn new(config: PeersConfig) -> Self {
110 let PeersConfig {
111 refill_slots_interval,
112 connection_info,
113 reputation_weights,
114 ban_list,
115 ban_duration,
116 backoff_durations,
117 trusted_nodes,
118 trusted_nodes_only,
119 trusted_nodes_resolution_interval,
120 basic_nodes,
121 persisted_peers,
122 max_backoff_count,
123 incoming_ip_throttle_duration,
124 ip_filter,
125 enforce_enr_fork_id,
126 peer_rotation_interval,
127 } = config;
128 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
129 let now = Instant::now();
130
131 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
133
134 let mut peers =
135 HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len() + persisted_peers.len());
136 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
137
138 for trusted_peer in &trusted_nodes {
139 match trusted_peer.resolve_blocking() {
140 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
141 trusted_peer_ids.insert(id);
142 peers.entry(id).or_insert_with(|| {
143 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
144 });
145 }
146 Err(err) => {
147 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
148 }
149 }
150 }
151
152 for PersistedPeerInfo { record, kind, fork_id, reputation } in persisted_peers {
153 if enforce_enr_fork_id && fork_id.is_none() {
157 continue
158 }
159 let NodeRecord { address, tcp_port, udp_port, id } = record;
160 peers.entry(id).or_insert_with(|| {
161 let mut peer = Peer::with_kind(
162 PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)),
163 kind,
164 );
165 peer.fork_id = fork_id.map(Box::new);
166 peer.reputation = reputation;
167 peer
168 });
169 }
170
171 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
172 peers.entry(id).or_insert_with(|| {
173 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
174 });
175 }
176
177 trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
178
179 Self {
180 peers,
181 trusted_peer_ids,
182 trusted_peers_resolver: TrustedPeersResolver::new(
183 trusted_nodes,
184 tokio::time::interval(trusted_nodes_resolution_interval), ),
186 manager_tx,
187 handle_rx: UnboundedReceiverStream::new(handle_rx),
188 queued_actions: Default::default(),
189 reputation_weights,
190 refill_slots_interval: tokio::time::interval(refill_slots_interval),
191 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
192 connection_info: ConnectionInfo::new(connection_info),
193 ban_list,
194 backed_off_peers: Default::default(),
195 ban_duration,
196 backoff_durations,
197 trusted_nodes_only,
198 last_tick: Instant::now(),
199 max_backoff_count,
200 net_connection_state: NetworkConnectionState::default(),
201 incoming_ip_throttle_duration,
202 ip_filter,
203 enforce_enr_fork_id,
204 peer_rotation_sleep: peer_rotation_interval
205 .map(|mean| Box::pin(tokio::time::sleep(jitter_rotation_interval(mean)))),
206 peer_rotation_mean: peer_rotation_interval,
207 }
208 }
209
210 pub(crate) fn handle(&self) -> PeersHandle {
212 PeersHandle::new(self.manager_tx.clone())
213 }
214
215 pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
217 self.enforce_enr_fork_id
218 }
219
220 #[inline]
222 pub(crate) fn num_known_peers(&self) -> usize {
223 self.peers.len()
224 }
225
226 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
228 self.peers.iter().map(|(peer_id, v)| {
229 NodeRecord::new_with_ports(
230 v.addr.tcp().ip(),
231 v.addr.tcp().port(),
232 v.addr.udp().map(|addr| addr.port()),
233 *peer_id,
234 )
235 })
236 }
237
238 pub(crate) fn persistable_peers(&self) -> impl Iterator<Item = PersistedPeerInfo> + '_ {
243 self.peers.iter().filter(|(_, peer)| !peer.is_backed_off() && !peer.is_banned()).map(
244 |(peer_id, peer)| PersistedPeerInfo {
245 record: NodeRecord::new_with_ports(
246 peer.addr.tcp().ip(),
247 peer.addr.tcp().port(),
248 peer.addr.udp().map(|addr| addr.port()),
249 *peer_id,
250 ),
251 kind: peer.kind,
252 fork_id: peer.fork_id.as_deref().copied(),
253 reputation: peer.reputation,
254 },
255 )
256 }
257
258 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
260 self.peers.get(&peer_id).map(|v| {
261 (
262 NodeRecord::new_with_ports(
263 v.addr.tcp().ip(),
264 v.addr.tcp().port(),
265 v.addr.udp().map(|addr| addr.port()),
266 peer_id,
267 ),
268 v.kind,
269 )
270 })
271 }
272
273 pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
275 self.peers.get(peer_id).is_some_and(|p| {
276 matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
277 })
278 }
279
280 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
282 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
283 }
284
285 #[inline]
287 pub(crate) const fn num_inbound_connections(&self) -> usize {
288 self.connection_info.num_inbound
289 }
290
291 #[inline]
293 pub(crate) const fn num_outbound_connections(&self) -> usize {
294 self.connection_info.num_outbound
295 }
296
297 #[inline]
299 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
300 self.connection_info.num_pending_out
301 }
302
303 #[inline]
305 pub(crate) fn num_backed_off_peers(&self) -> usize {
306 self.backed_off_peers.len()
307 }
308
309 fn num_idle_trusted_peers(&self) -> usize {
311 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
312 }
313
314 pub(crate) fn on_incoming_pending_session(
318 &mut self,
319 addr: IpAddr,
320 ) -> Result<(), InboundConnectionError> {
321 if !self.ip_filter.is_allowed(&addr) {
323 trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
324 return Err(InboundConnectionError::IpBanned)
325 }
326
327 if self.ban_list.is_banned_ip(&addr) {
328 return Err(InboundConnectionError::IpBanned)
329 }
330
331 if !self.connection_info.has_in_capacity() {
333 if self.trusted_peer_ids.is_empty() {
334 return Err(InboundConnectionError::ExceedsCapacity)
337 }
338
339 let num_idle_trusted_peers = self.num_idle_trusted_peers();
343 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
344 let max_inbound =
346 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
347 if self.connection_info.num_pending_in < max_inbound {
348 self.connection_info.inc_pending_in();
349 return Ok(())
350 }
351 }
352
353 return Err(InboundConnectionError::ExceedsCapacity)
355 }
356
357 if !self.connection_info.has_in_pending_capacity() {
359 return Err(InboundConnectionError::ExceedsCapacity)
360 }
361
362 self.throttle_incoming_ip(addr);
364
365 self.connection_info.inc_pending_in();
366 Ok(())
367 }
368
369 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
372 self.connection_info.decr_pending_in();
373 }
374
375 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
377 self.connection_info.decr_pending_in()
378 }
379
380 pub(crate) fn on_incoming_pending_session_dropped(
382 &mut self,
383 remote_addr: SocketAddr,
384 err: &PendingSessionHandshakeError,
385 ) {
386 if err.is_fatal_protocol_error() {
387 self.ban_ip(remote_addr.ip());
388
389 if err.merits_discovery_ban() {
390 self.queued_actions
391 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
392 }
393 }
394
395 self.connection_info.decr_pending_in();
396 }
397
398 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
405 self.connection_info.decr_pending_in();
406
407 if self.ban_list.is_banned_peer(&peer_id) {
410 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
411 return
412 }
413
414 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
416 if self.trusted_nodes_only && !is_trusted {
417 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
418 return
419 }
420
421 self.tick();
423
424 match self.peers.entry(peer_id) {
425 Entry::Occupied(mut entry) => {
426 let peer = entry.get_mut();
427 if peer.is_banned() {
428 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
429 return
430 }
431 if peer.state.is_pending_out() {
434 self.connection_info.decr_state(peer.state);
435 }
436
437 peer.state = PeerConnectionState::In;
438 peer.mark_connected();
439
440 is_trusted = is_trusted || peer.is_trusted();
441 }
442 Entry::Vacant(entry) => {
443 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
446 peer.mark_connected();
447 peer.remove_after_disconnect = true;
448 entry.insert(peer);
449 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
450 }
451 }
452
453 let has_in_capacity = self.connection_info.has_in_capacity();
454 self.connection_info.inc_in();
456
457 if !is_trusted && !has_in_capacity {
459 self.queued_actions.push_back(PeerAction::Disconnect {
460 peer_id,
461 reason: Some(DisconnectReason::TooManyPeers),
462 });
463 }
464 }
465
466 fn ban_peer(&mut self, peer_id: PeerId) {
468 let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
469 (peer.is_trusted() || peer.is_static())
470 {
471 self.backoff_durations.low / 2
474 } else {
475 self.ban_duration
476 };
477
478 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
479 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
480 }
481
482 fn ban_ip(&mut self, ip: IpAddr) {
484 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
485 }
486
487 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
489 self.ban_list
490 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
491 }
492
493 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
495 trace!(target: "net::peers", ?peer_id, "backing off");
496
497 if let Some(peer) = self.peers.get_mut(&peer_id) {
498 peer.backed_off = true;
499 self.backed_off_peers.insert(peer_id, until);
500 }
501 }
502
503 fn unban_peer(&mut self, peer_id: PeerId) {
505 self.ban_list.unban_peer(&peer_id);
506 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
507 }
508
509 fn tick(&mut self) {
514 let now = Instant::now();
515 let secs_since_last_tick =
519 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
520 self.last_tick = now;
521
522 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
524 if peer.1.reputation < DEFAULT_REPUTATION {
527 peer.1.reputation += secs_since_last_tick;
528 }
529 }
530 }
531
532 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
534 self.peers.get(peer_id).map(|peer| peer.reputation)
535 }
536
537 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
543 trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
544
545 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
546 if rep.is_reset() {
548 peer.reset_reputation()
549 } else {
550 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
551 if peer.is_trusted() || peer.is_static() {
552 if matches!(
554 rep,
555 ReputationChangeKind::Dropped |
556 ReputationChangeKind::BadAnnouncement |
557 ReputationChangeKind::Timeout |
558 ReputationChangeKind::AlreadySeenTransaction
559 ) {
560 return
561 }
562
563 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
565 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
567 }
568 }
569 peer.apply_reputation(reputation_change, rep)
570 }
571 } else {
572 return
573 };
574
575 match outcome {
576 ReputationChangeOutcome::None => {}
577 ReputationChangeOutcome::Ban => {
578 self.ban_peer(*peer_id);
579 }
580 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
581 ReputationChangeOutcome::DisconnectAndBan => {
582 self.queued_actions.push_back(PeerAction::Disconnect {
583 peer_id: *peer_id,
584 reason: Some(DisconnectReason::DisconnectRequested),
585 });
586 self.ban_peer(*peer_id);
587 }
588 }
589 }
590
591 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
593 if let Some(peer) = self.peers.get_mut(peer_id) {
594 self.connection_info.decr_state(peer.state);
595 peer.state = PeerConnectionState::Idle;
596 peer.mark_disconnected();
597 }
598 }
599
600 pub(crate) fn on_outgoing_pending_session_dropped(
603 &mut self,
604 remote_addr: &SocketAddr,
605 peer_id: &PeerId,
606 err: &PendingSessionHandshakeError,
607 ) {
608 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
609 }
610
611 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
613 match self.peers.entry(peer_id) {
614 Entry::Occupied(mut entry) => {
615 trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
616 self.connection_info.decr_state(entry.get().state);
617
618 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
619 entry.remove();
621 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
622 } else {
623 let peer = entry.get_mut();
624 peer.severe_backoff_counter = 0;
628 peer.state = PeerConnectionState::Idle;
629 peer.mark_disconnected();
630
631 peer.backed_off = true;
636 self.backed_off_peers.insert(
637 peer_id,
638 std::time::Instant::now() + self.incoming_ip_throttle_duration,
639 );
640 trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
641 }
642 }
643 Entry::Vacant(_) => return,
644 }
645
646 self.fill_outbound_slots();
647 }
648
649 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
651 if let Some(peer) = self.peers.get_mut(&peer_id) {
652 trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
653 self.connection_info.decr_state(peer.state);
654 self.connection_info.inc_out();
655 peer.state = PeerConnectionState::Out;
656 peer.mark_connected();
657 }
658 }
659
660 pub(crate) fn on_active_session_dropped(
665 &mut self,
666 remote_addr: &SocketAddr,
667 peer_id: &PeerId,
668 err: &EthStreamError,
669 ) {
670 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
671 }
672
673 pub(crate) fn on_outgoing_connection_failure(
676 &mut self,
677 remote_addr: &SocketAddr,
678 peer_id: &PeerId,
679 err: &io::Error,
680 ) {
681 if let Some(peer) = self.peers.get(peer_id) {
685 if peer.state.is_incoming() {
686 return
688 }
689
690 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
691 self.trusted_peers_resolver.interval.reset_immediately();
694 }
695 }
696
697 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
698 }
699
700 fn on_connection_failure(
701 &mut self,
702 remote_addr: &SocketAddr,
703 peer_id: &PeerId,
704 err: impl SessionError,
705 reputation_change: ReputationChangeKind,
706 ) {
707 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
708
709 if err.is_fatal_protocol_error() {
710 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
711 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
714 self.connection_info.decr_state(entry.get().state);
715 if entry.get().is_trusted() {
717 let peer = entry.get_mut();
718 peer.state = PeerConnectionState::Idle;
719 peer.mark_disconnected();
720 } else {
721 entry.remove();
722 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
723 if err.merits_discovery_ban() {
725 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
726 peer_id: *peer_id,
727 ip_addr: remote_addr.ip(),
728 })
729 }
730 }
731 }
732
733 self.ban_peer(*peer_id);
735 } else {
736 let mut backoff_until = None;
737 let mut remove_peer = false;
738
739 if let Some(peer) = self.peers.get_mut(peer_id) {
740 if let Some(kind) = err.should_backoff() {
741 if peer.is_trusted() || peer.is_static() {
742 let backoff = self.backoff_durations.low;
748 backoff_until = Some(std::time::Instant::now() + backoff);
749 trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
750 } else {
751 if kind.is_severe() {
753 peer.severe_backoff_counter =
754 peer.severe_backoff_counter.saturating_add(1);
755 }
756 trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
757
758 let backoff_time =
759 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
760
761 backoff_until = Some(backoff_time);
765 }
766 } else {
767 let reputation_change = self.reputation_weights.change(reputation_change);
769 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
770 };
771
772 self.connection_info.decr_state(peer.state);
773 peer.state = PeerConnectionState::Idle;
774 peer.mark_disconnected();
775
776 if peer.severe_backoff_counter > self.max_backoff_count &&
777 !peer.is_trusted() &&
778 !peer.is_static()
779 {
780 remove_peer = true;
783 }
784 }
785
786 if remove_peer {
788 trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
789 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
790 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
791 } else if let Some(backoff_until) = backoff_until {
792 self.backoff_peer_until(*peer_id, backoff_until);
794 }
795 }
796
797 self.fill_outbound_slots();
798 }
799
800 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
806 match direction {
807 Direction::Incoming => {
808 self.connection_info.decr_pending_in();
810 }
811 Direction::Outgoing(_) => {
812 }
815 }
816 }
817
818 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
822 self.add_peer_kind(peer_id, None, addr, fork_id)
823 }
824
825 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
827 self.trusted_peer_ids.insert(peer_id);
828 if let Some(peer) = self.peers.get_mut(&peer_id) {
829 peer.kind = PeerKind::Trusted;
830 }
831 }
832
833 #[cfg_attr(not(test), expect(dead_code))]
837 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
838 self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
839 }
840
841 pub(crate) fn add_trusted_peer_node(&mut self, trusted: TrustedPeer) {
843 let peer_id = trusted.id;
844 self.trusted_peer_ids.insert(peer_id);
845 self.trusted_peers_resolver.remove(peer_id);
846 self.trusted_peers_resolver.trusted_peers.push(trusted);
847 self.trusted_peers_resolver.interval.reset_immediately();
848 }
849
850 pub(crate) fn add_peer_kind(
855 &mut self,
856 peer_id: PeerId,
857 kind: Option<PeerKind>,
858 addr: PeerAddr,
859 fork_id: Option<ForkId>,
860 ) {
861 let ip_addr = addr.tcp().ip();
862
863 if !self.ip_filter.is_allowed(&ip_addr) {
865 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
866 return
867 }
868
869 if self.ban_list.is_banned(&peer_id, &ip_addr) {
870 return
871 }
872
873 match self.peers.entry(peer_id) {
874 Entry::Occupied(mut entry) => {
875 let peer = entry.get_mut();
876 peer.fork_id = fork_id.map(Box::new);
877 peer.addr = addr;
878
879 if let Some(kind) = kind {
880 peer.kind = kind;
881 }
882
883 if peer.state.is_incoming() {
884 peer.remove_after_disconnect = false;
888 }
889 }
890 Entry::Vacant(entry) => {
891 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
892 let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
893 peer.fork_id = fork_id.map(Box::new);
894 entry.insert(peer);
895 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
896 }
897 }
898
899 if kind.is_some_and(|kind| kind.is_trusted()) {
900 self.trusted_peer_ids.insert(peer_id);
902 }
903 }
904
905 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
907 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
908 if entry.get().is_trusted() {
909 return
910 }
911 let mut peer = entry.remove();
912
913 trace!(target: "net::peers", ?peer_id, "remove discovered node");
914 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
915
916 if peer.state.is_connected() {
917 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
918 peer.remove_after_disconnect = true;
923 peer.state.disconnect();
924 self.peers.insert(peer_id, peer);
925 self.queued_actions.push_back(PeerAction::Disconnect {
926 peer_id,
927 reason: Some(DisconnectReason::DisconnectRequested),
928 })
929 }
930 }
931
932 pub(crate) fn ban_peer_by_admin(&mut self, peer_id: PeerId) {
937 if self.trusted_peer_ids.contains(&peer_id) ||
938 self.peers.get(&peer_id).is_some_and(Peer::is_trusted)
939 {
940 return
941 }
942
943 self.remove_peer(peer_id);
944 self.ban_list.ban_peer(peer_id);
945 }
946
947 pub(crate) fn unban_peer_by_admin(&mut self, peer_id: PeerId) {
949 self.ban_list.unban_peer(&peer_id);
950 }
951
952 #[cfg_attr(not(test), expect(dead_code))]
955 pub(crate) fn add_and_connect(
956 &mut self,
957 peer_id: PeerId,
958 addr: PeerAddr,
959 fork_id: Option<ForkId>,
960 ) {
961 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
962 }
963
964 pub(crate) fn add_and_connect_kind(
968 &mut self,
969 peer_id: PeerId,
970 kind: PeerKind,
971 addr: PeerAddr,
972 fork_id: Option<ForkId>,
973 ) {
974 let ip_addr = addr.tcp().ip();
975
976 if !self.ip_filter.is_allowed(&ip_addr) {
978 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
979 return
980 }
981
982 if self.ban_list.is_banned(&peer_id, &ip_addr) {
983 return
984 }
985
986 match self.peers.entry(peer_id) {
987 Entry::Occupied(mut entry) => {
988 let peer = entry.get_mut();
989 peer.kind = kind;
990 peer.fork_id = fork_id.map(Box::new);
991 peer.addr = addr;
992
993 if peer.state == PeerConnectionState::Idle {
994 peer.state = PeerConnectionState::PendingOut;
996 self.connection_info.inc_pending_out();
997 self.queued_actions
998 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
999 }
1000 }
1001 Entry::Vacant(entry) => {
1002 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
1003 let mut peer = Peer::with_kind(addr, kind);
1004 peer.state = PeerConnectionState::PendingOut;
1005 peer.fork_id = fork_id.map(Box::new);
1006 entry.insert(peer);
1007 self.connection_info.inc_pending_out();
1008 self.queued_actions
1009 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
1010 }
1011 }
1012
1013 if kind.is_trusted() {
1014 self.trusted_peer_ids.insert(peer_id);
1015 }
1016 }
1017
1018 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
1020 self.trusted_peers_resolver.remove(peer_id);
1021
1022 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else {
1023 self.trusted_peer_ids.remove(&peer_id);
1024 return
1025 };
1026 if !entry.get().is_trusted() {
1027 return
1028 }
1029
1030 let peer = entry.get_mut();
1031 peer.kind = PeerKind::Basic;
1032
1033 self.trusted_peer_ids.remove(&peer_id);
1034 }
1035
1036 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
1049 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
1050 !peer.is_backed_off() &&
1051 !peer.is_banned() &&
1052 peer.state.is_unconnected() &&
1053 (!self.trusted_nodes_only || peer.is_trusted())
1054 });
1055
1056 let mut best_peer = unconnected.next()?;
1058
1059 if best_peer.1.is_trusted() || best_peer.1.is_static() {
1060 return Some((*best_peer.0, best_peer.1))
1061 }
1062
1063 for maybe_better in unconnected {
1064 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
1066 return Some((*maybe_better.0, maybe_better.1))
1067 }
1068
1069 match maybe_better.1.reputation.cmp(&best_peer.1.reputation) {
1071 std::cmp::Ordering::Greater => best_peer = maybe_better,
1072 std::cmp::Ordering::Equal
1073 if maybe_better.1.fork_id.is_some() && best_peer.1.fork_id.is_none() =>
1074 {
1075 best_peer = maybe_better
1076 }
1077 _ => {}
1078 }
1079 }
1080 Some((*best_peer.0, best_peer.1))
1081 }
1082
1083 fn try_rotate_peer(&mut self) {
1090 let outbound_at_capacity = self.connection_info.is_outbound_at_capacity();
1091 let inbound_at_capacity = self.connection_info.is_inbound_at_capacity();
1092
1093 if !outbound_at_capacity && !inbound_at_capacity {
1094 return
1095 }
1096
1097 let now = std::time::Instant::now();
1098
1099 let candidates = self
1100 .peers
1101 .iter()
1102 .filter_map(|(peer_id, peer)| {
1103 let eligible = match peer.state {
1104 PeerConnectionState::Out => outbound_at_capacity,
1105 PeerConnectionState::In => inbound_at_capacity,
1106 _ => false,
1107 };
1108 (eligible &&
1109 !peer.is_trusted() &&
1110 !peer.is_static() &&
1111 !self.trusted_peer_ids.contains(peer_id) &&
1112 peer.connected_for_at_least(now, PEER_ROTATION_MIN_UPTIME))
1113 .then_some(*peer_id)
1114 })
1115 .collect::<Vec<_>>();
1116
1117 if candidates.is_empty() {
1118 return
1119 }
1120 let peer_id = candidates[rand::rng().random_range(0..candidates.len())];
1121
1122 trace!(target: "net::peers", ?peer_id, "rotating peer to open slot for new nodes");
1123
1124 self.queued_actions.push_back(PeerAction::Disconnect {
1125 peer_id,
1126 reason: Some(DisconnectReason::UselessPeer),
1127 });
1128 }
1129
1130 fn fill_outbound_slots(&mut self) {
1136 self.tick();
1137
1138 if !self.net_connection_state.is_active() {
1139 return
1141 }
1142
1143 while self.connection_info.has_out_capacity() {
1145 let action = {
1146 let (peer_id, peer) = match self.best_unconnected() {
1147 Some(peer) => peer,
1148 _ => break,
1149 };
1150
1151 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
1152
1153 peer.state = PeerConnectionState::PendingOut;
1154 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
1155 };
1156
1157 self.connection_info.inc_pending_out();
1158
1159 self.queued_actions.push_back(action);
1160 }
1161 }
1162
1163 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1164 if !self.trusted_peer_ids.contains(&peer_id) {
1165 trace!(target: "net::peers", ?peer_id, "Ignoring resolved trusted peer after removal");
1166 return
1167 }
1168
1169 let new_addr = PeerAddr::new_with_ports(
1170 new_record.address,
1171 new_record.tcp_port,
1172 Some(new_record.udp_port),
1173 );
1174
1175 if let Some(peer) = self.peers.get_mut(&peer_id) {
1176 if peer.addr != new_addr {
1177 peer.addr = new_addr;
1178 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1179 }
1180 } else {
1181 trace!(target: "net::peers", ?peer_id, ?new_addr, "Adding trusted peer after first successful resolution");
1182 self.add_peer_kind(peer_id, Some(PeerKind::Trusted), new_addr, None);
1183 }
1184 }
1185
1186 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1188 self.net_connection_state = state;
1189 }
1190
1191 pub const fn connection_state(&self) -> &NetworkConnectionState {
1193 &self.net_connection_state
1194 }
1195
1196 pub const fn on_shutdown(&mut self) {
1198 self.net_connection_state = NetworkConnectionState::ShuttingDown;
1199 }
1200
1201 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1206 loop {
1207 if let Some(action) = self.queued_actions.pop_front() {
1209 return Poll::Ready(action)
1210 }
1211
1212 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1213 match cmd {
1214 PeerCommand::Add(peer_id, addr) => {
1215 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1216 }
1217 PeerCommand::Remove(peer) => self.remove_peer(peer),
1218 PeerCommand::ReputationChange(peer_id, rep) => {
1219 self.apply_reputation_change(&peer_id, rep)
1220 }
1221 PeerCommand::GetPeer(peer, tx) => {
1222 let _ = tx.send(self.peers.get(&peer).cloned());
1223 }
1224 PeerCommand::GetPeers(tx) => {
1225 let _ = tx.send(self.iter_peers().collect());
1226 }
1227 }
1228 }
1229
1230 if self.release_interval.poll_tick(cx).is_ready() {
1231 let now = std::time::Instant::now();
1232 let (_, unbanned_peers) = self.ban_list.evict(now);
1233
1234 for peer_id in unbanned_peers {
1235 if let Some(peer) = self.peers.get_mut(&peer_id) {
1236 peer.unban();
1237 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1238 }
1239 }
1240
1241 self.backed_off_peers.retain(|peer_id, until| {
1244 if now > *until {
1245 if let Some(peer) = self.peers.get_mut(peer_id) {
1246 peer.backed_off = false;
1247 }
1248 return false
1249 }
1250 true
1251 })
1252 }
1253
1254 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1255 self.fill_outbound_slots();
1256 }
1257
1258 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1259 self.on_resolved_peer(peer_id, new_record);
1260 }
1261
1262 let rotation_ready = self
1264 .peer_rotation_sleep
1265 .as_mut()
1266 .is_some_and(|sleep| sleep.as_mut().poll(cx).is_ready());
1267 if rotation_ready {
1268 self.try_rotate_peer();
1269 if let Some((mean, sleep)) =
1270 self.peer_rotation_mean.zip(self.peer_rotation_sleep.as_mut())
1271 {
1272 sleep
1273 .as_mut()
1274 .reset(tokio::time::Instant::now() + jitter_rotation_interval(mean));
1275 }
1276 }
1277
1278 if self.queued_actions.is_empty() {
1279 return Poll::Pending
1280 }
1281 }
1282 }
1283}
1284
1285impl Default for PeersManager {
1286 fn default() -> Self {
1287 Self::new(Default::default())
1288 }
1289}
1290
1291#[derive(Debug, Clone, PartialEq, Eq, Default)]
1293pub struct ConnectionInfo {
1294 num_outbound: usize,
1296 num_pending_out: usize,
1298 num_inbound: usize,
1300 num_pending_in: usize,
1302 config: ConnectionsConfig,
1304}
1305
1306impl ConnectionInfo {
1309 const fn new(config: ConnectionsConfig) -> Self {
1311 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1312 }
1313
1314 const fn has_out_capacity(&self) -> bool {
1316 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1317 self.num_outbound < self.config.max_outbound
1318 }
1319
1320 const fn is_outbound_at_capacity(&self) -> bool {
1322 self.num_outbound >= self.config.max_outbound
1323 }
1324
1325 const fn is_inbound_at_capacity(&self) -> bool {
1327 self.num_inbound >= self.config.max_inbound
1328 }
1329
1330 const fn has_in_capacity(&self) -> bool {
1332 self.num_inbound < self.config.max_inbound
1333 }
1334
1335 const fn has_in_pending_capacity(&self) -> bool {
1337 self.num_pending_in < self.config.max_inbound
1338 }
1339
1340 const fn decr_state(&mut self, state: PeerConnectionState) {
1341 match state {
1342 PeerConnectionState::Idle => {}
1343 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1344 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1345 PeerConnectionState::PendingOut => self.decr_pending_out(),
1346 }
1347 }
1348
1349 const fn decr_out(&mut self) {
1350 self.num_outbound -= 1;
1351 }
1352
1353 const fn inc_out(&mut self) {
1354 self.num_outbound += 1;
1355 }
1356
1357 const fn inc_pending_out(&mut self) {
1358 self.num_pending_out += 1;
1359 }
1360
1361 const fn inc_in(&mut self) {
1362 self.num_inbound += 1;
1363 }
1364
1365 const fn inc_pending_in(&mut self) {
1366 self.num_pending_in += 1;
1367 }
1368
1369 const fn decr_in(&mut self) {
1370 self.num_inbound -= 1;
1371 }
1372
1373 const fn decr_pending_out(&mut self) {
1374 self.num_pending_out -= 1;
1375 }
1376
1377 const fn decr_pending_in(&mut self) {
1378 self.num_pending_in -= 1;
1379 }
1380}
1381
1382#[derive(Debug)]
1384pub enum PeerAction {
1385 Connect {
1387 peer_id: PeerId,
1389 remote_addr: SocketAddr,
1391 },
1392 Disconnect {
1394 peer_id: PeerId,
1396 reason: Option<DisconnectReason>,
1398 },
1399 DisconnectBannedIncoming {
1402 peer_id: PeerId,
1404 },
1405 DisconnectUntrustedIncoming {
1407 peer_id: PeerId,
1409 },
1410 DiscoveryBanPeerId {
1412 peer_id: PeerId,
1414 ip_addr: IpAddr,
1416 },
1417 DiscoveryBanIp {
1419 ip_addr: IpAddr,
1421 },
1422 BanPeer {
1424 peer_id: PeerId,
1426 },
1427 UnBanPeer {
1429 peer_id: PeerId,
1431 },
1432 PeerAdded(PeerId),
1434 PeerRemoved(PeerId),
1436}
1437
1438#[derive(Debug, Error, PartialEq, Eq)]
1440pub enum InboundConnectionError {
1441 IpBanned,
1443 ExceedsCapacity,
1445}
1446
1447impl Display for InboundConnectionError {
1448 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1449 write!(f, "{self:?}")
1450 }
1451}
1452
1453#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1455pub enum BackoffReason {
1456 TooManyPeers,
1458 GracefulClose,
1460 ConnectionError,
1462}
1463
1464impl BackoffReason {
1465 pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
1467 match reason {
1468 Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
1469 _ => Self::ConnectionError,
1470 }
1471 }
1472}
1473
1474fn jitter_rotation_interval(mean: Duration) -> Duration {
1479 let min_nanos = (mean * 3 / 5).as_nanos() as u64;
1480 let max_nanos = (mean * 7 / 5).as_nanos() as u64;
1481 Duration::from_nanos(rand::rng().random_range(min_nanos..=max_nanos))
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486 use alloy_primitives::B512;
1487 use reth_eth_wire::{
1488 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1489 DisconnectReason,
1490 };
1491 use reth_ethereum_forks::{ForkHash, ForkId};
1492 use reth_net_banlist::BanList;
1493 use reth_network_api::Direction;
1494 use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
1495 use reth_network_types::{
1496 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1497 };
1498 use std::{
1499 future::{poll_fn, Future},
1500 io,
1501 net::{IpAddr, Ipv4Addr, SocketAddr},
1502 pin::Pin,
1503 task::{Context, Poll},
1504 time::Duration,
1505 };
1506 use url::Host;
1507
1508 use super::PeersManager;
1509 use crate::{
1510 error::SessionError,
1511 peers::{
1512 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1513 PeerConnectionState,
1514 },
1515 session::PendingSessionHandshakeError,
1516 PeersConfig,
1517 };
1518
1519 struct PeerActionFuture<'a> {
1520 peers: &'a mut PeersManager,
1521 }
1522
1523 impl Future for PeerActionFuture<'_> {
1524 type Output = PeerAction;
1525
1526 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1527 self.get_mut().peers.poll(cx)
1528 }
1529 }
1530
1531 macro_rules! event {
1532 ($peers:expr) => {
1533 PeerActionFuture { peers: &mut $peers }.await
1534 };
1535 }
1536
1537 fn set_connected_at(
1538 peers: &mut PeersManager,
1539 peer_id: PeerId,
1540 connected_at: std::time::Instant,
1541 ) {
1542 peers.peers.get_mut(&peer_id).expect("peer exists").connected_at = Some(connected_at);
1543 }
1544
1545 #[tokio::test]
1546 async fn test_insert() {
1547 let peer = PeerId::random();
1548 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1549 let mut peers = PeersManager::default();
1550 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1551
1552 match event!(peers) {
1553 PeerAction::PeerAdded(peer_id) => {
1554 assert_eq!(peer_id, peer);
1555 }
1556 _ => unreachable!(),
1557 }
1558 match event!(peers) {
1559 PeerAction::Connect { peer_id, remote_addr } => {
1560 assert_eq!(peer_id, peer);
1561 assert_eq!(remote_addr, socket_addr);
1562 }
1563 _ => unreachable!(),
1564 }
1565
1566 let (record, _) = peers.peer_by_id(peer).unwrap();
1567 assert_eq!(record.tcp_addr(), socket_addr);
1568 assert_eq!(record.udp_addr(), socket_addr);
1569 }
1570
1571 #[tokio::test]
1572 async fn test_insert_udp() {
1573 let peer = PeerId::random();
1574 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1575 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1576 let mut peers = PeersManager::default();
1577 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1578
1579 match event!(peers) {
1580 PeerAction::PeerAdded(peer_id) => {
1581 assert_eq!(peer_id, peer);
1582 }
1583 _ => unreachable!(),
1584 }
1585 match event!(peers) {
1586 PeerAction::Connect { peer_id, remote_addr } => {
1587 assert_eq!(peer_id, peer);
1588 assert_eq!(remote_addr, tcp_addr);
1589 }
1590 _ => unreachable!(),
1591 }
1592
1593 let (record, _) = peers.peer_by_id(peer).unwrap();
1594 assert_eq!(record.tcp_addr(), tcp_addr);
1595 assert_eq!(record.udp_addr(), udp_addr);
1596 }
1597
1598 #[tokio::test]
1599 async fn test_ban() {
1600 let peer = PeerId::random();
1601 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1602 let mut peers = PeersManager::default();
1603 peers.ban_peer(peer);
1604 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1605
1606 match event!(peers) {
1607 PeerAction::BanPeer { peer_id } => {
1608 assert_eq!(peer_id, peer);
1609 }
1610 _ => unreachable!(),
1611 }
1612
1613 poll_fn(|cx| {
1614 assert!(peers.poll(cx).is_pending());
1615 Poll::Ready(())
1616 })
1617 .await;
1618 }
1619
1620 #[tokio::test]
1621 async fn test_unban() {
1622 let peer = PeerId::random();
1623 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1624 let mut peers = PeersManager::default();
1625 peers.ban_peer(peer);
1626 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1627
1628 match event!(peers) {
1629 PeerAction::BanPeer { peer_id } => {
1630 assert_eq!(peer_id, peer);
1631 }
1632 _ => unreachable!(),
1633 }
1634
1635 poll_fn(|cx| {
1636 assert!(peers.poll(cx).is_pending());
1637 Poll::Ready(())
1638 })
1639 .await;
1640
1641 peers.unban_peer(peer);
1642
1643 match event!(peers) {
1644 PeerAction::UnBanPeer { peer_id } => {
1645 assert_eq!(peer_id, peer);
1646 }
1647 _ => unreachable!(),
1648 }
1649
1650 poll_fn(|cx| {
1651 assert!(peers.poll(cx).is_pending());
1652 Poll::Ready(())
1653 })
1654 .await;
1655 }
1656
1657 #[tokio::test]
1658 async fn test_admin_ban_removes_peer_and_bans_indefinitely() {
1659 let peer = PeerId::random();
1660 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1661 let mut peers = PeersManager::default();
1662 peers.peers.insert(peer, Peer::new(PeerAddr::from_tcp(socket_addr)));
1663
1664 peers.ban_peer_by_admin(peer);
1665
1666 assert!(peers.ban_list.is_banned_peer(&peer));
1667 assert!(peers.peer_by_id(peer).is_none());
1668
1669 match peers.queued_actions.pop_front() {
1670 Some(PeerAction::PeerRemoved(peer_id)) => assert_eq!(peer_id, peer),
1671 other => panic!("unexpected action: {other:?}"),
1672 }
1673
1674 let (_, unbanned_peers) =
1675 peers.ban_list.evict(std::time::Instant::now() + Duration::from_secs(1));
1676 assert!(unbanned_peers.is_empty());
1677 }
1678
1679 #[tokio::test]
1680 async fn test_admin_ban_does_not_override_trusted_peer() {
1681 let peer = PeerId::random();
1682 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1683 let mut peers = PeersManager::default();
1684 peers.peers.insert(peer, Peer::trusted(PeerAddr::from_tcp(socket_addr)));
1685
1686 peers.ban_peer_by_admin(peer);
1687
1688 assert!(!peers.ban_list.is_banned_peer(&peer));
1689 assert!(peers.peer_by_id(peer).is_some());
1690 assert!(peers.queued_actions.is_empty());
1691 }
1692
1693 #[tokio::test]
1694 async fn test_admin_unban_only_removes_banlist_entry() {
1695 let peer = PeerId::random();
1696 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1697 let mut peers = PeersManager::default();
1698 let mut peer_info = Peer::new(PeerAddr::from_tcp(socket_addr));
1699 peer_info.reputation = i32::MIN;
1700 peers.peers.insert(peer, peer_info);
1701 peers.ban_list.ban_peer(peer);
1702 assert!(peers.peers.get(&peer).is_some_and(Peer::is_banned));
1703
1704 peers.unban_peer_by_admin(peer);
1705
1706 assert!(!peers.ban_list.is_banned_peer(&peer));
1707 assert!(peers.peers.get(&peer).is_some_and(Peer::is_banned));
1708 }
1709
1710 #[tokio::test]
1711 async fn test_backoff_on_busy() {
1712 let peer = PeerId::random();
1713 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1714
1715 let mut peers = PeersManager::new(PeersConfig::test());
1716 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1717
1718 match event!(peers) {
1719 PeerAction::PeerAdded(peer_id) => {
1720 assert_eq!(peer_id, peer);
1721 }
1722 _ => unreachable!(),
1723 }
1724 match event!(peers) {
1725 PeerAction::Connect { peer_id, .. } => {
1726 assert_eq!(peer_id, peer);
1727 }
1728 _ => unreachable!(),
1729 }
1730
1731 poll_fn(|cx| {
1732 assert!(peers.poll(cx).is_pending());
1733 Poll::Ready(())
1734 })
1735 .await;
1736
1737 peers.on_active_session_dropped(
1738 &socket_addr,
1739 &peer,
1740 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1741 DisconnectReason::TooManyPeers,
1742 )),
1743 );
1744
1745 poll_fn(|cx| {
1746 assert!(peers.poll(cx).is_pending());
1747 Poll::Ready(())
1748 })
1749 .await;
1750
1751 assert!(peers.backed_off_peers.contains_key(&peer));
1752 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1753
1754 tokio::time::sleep(peers.backoff_durations.low).await;
1755
1756 match event!(peers) {
1757 PeerAction::Connect { peer_id, .. } => {
1758 assert_eq!(peer_id, peer);
1759 }
1760 _ => unreachable!(),
1761 }
1762
1763 assert!(!peers.backed_off_peers.contains_key(&peer));
1764 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1765 }
1766
1767 #[tokio::test]
1768 async fn test_backoff_on_no_response() {
1769 let peer = PeerId::random();
1770 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1771
1772 let backoff_durations = PeerBackoffDurations::test();
1773 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1774 let mut peers = PeersManager::new(config);
1775 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1776
1777 match event!(peers) {
1778 PeerAction::PeerAdded(peer_id) => {
1779 assert_eq!(peer_id, peer);
1780 }
1781 _ => unreachable!(),
1782 }
1783 match event!(peers) {
1784 PeerAction::Connect { peer_id, .. } => {
1785 assert_eq!(peer_id, peer);
1786 }
1787 _ => unreachable!(),
1788 }
1789
1790 poll_fn(|cx| {
1791 assert!(peers.poll(cx).is_pending());
1792 Poll::Ready(())
1793 })
1794 .await;
1795
1796 peers.on_outgoing_pending_session_dropped(
1797 &socket_addr,
1798 &peer,
1799 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1800 EthHandshakeError::NoResponse,
1801 )),
1802 );
1803
1804 poll_fn(|cx| {
1805 assert!(peers.poll(cx).is_pending());
1806 Poll::Ready(())
1807 })
1808 .await;
1809
1810 assert!(peers.backed_off_peers.contains_key(&peer));
1811 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1812
1813 tokio::time::sleep(backoff_durations.high).await;
1814
1815 match event!(peers) {
1816 PeerAction::Connect { peer_id, .. } => {
1817 assert_eq!(peer_id, peer);
1818 }
1819 _ => unreachable!(),
1820 }
1821
1822 assert!(!peers.backed_off_peers.contains_key(&peer));
1823 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1824 }
1825
1826 #[tokio::test]
1827 async fn test_low_backoff() {
1828 let peer = PeerId::random();
1829 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1830 let config = PeersConfig::test();
1831 let mut peers = PeersManager::new(config);
1832 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1833 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1834
1835 let backoff_timestamp = peers
1836 .backoff_durations
1837 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1838
1839 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1840 assert!(backoff_timestamp <= expected);
1841 }
1842
1843 #[tokio::test]
1844 async fn test_multiple_backoff_calculations() {
1845 let peer = PeerId::random();
1846 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1847 let config = PeersConfig::default();
1848 let mut peers = PeersManager::new(config);
1849 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1850 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1851
1852 peer_struct.severe_backoff_counter = 1;
1854
1855 let now = std::time::Instant::now();
1856
1857 peer_struct.severe_backoff_counter += 1;
1859 let backoff_time = peers
1861 .backoff_durations
1862 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1863
1864 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1866
1867 assert!(backoff_time.duration_since(now) > backoff_duration);
1870 }
1871
1872 #[tokio::test]
1873 async fn test_ban_on_active_drop() {
1874 let peer = PeerId::random();
1875 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1876 let mut peers = PeersManager::default();
1877 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1878
1879 match event!(peers) {
1880 PeerAction::PeerAdded(peer_id) => {
1881 assert_eq!(peer_id, peer);
1882 }
1883 _ => unreachable!(),
1884 }
1885 match event!(peers) {
1886 PeerAction::Connect { peer_id, .. } => {
1887 assert_eq!(peer_id, peer);
1888 }
1889 _ => unreachable!(),
1890 }
1891
1892 poll_fn(|cx| {
1893 assert!(peers.poll(cx).is_pending());
1894 Poll::Ready(())
1895 })
1896 .await;
1897
1898 peers.on_active_session_dropped(
1899 &socket_addr,
1900 &peer,
1901 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1902 DisconnectReason::UselessPeer,
1903 )),
1904 );
1905
1906 match event!(peers) {
1907 PeerAction::PeerRemoved(peer_id) => {
1908 assert_eq!(peer_id, peer);
1909 }
1910 _ => unreachable!(),
1911 }
1912 match event!(peers) {
1913 PeerAction::BanPeer { peer_id } => {
1914 assert_eq!(peer_id, peer);
1915 }
1916 _ => unreachable!(),
1917 }
1918
1919 poll_fn(|cx| {
1920 assert!(peers.poll(cx).is_pending());
1921 Poll::Ready(())
1922 })
1923 .await;
1924
1925 assert!(!peers.peers.contains_key(&peer));
1926 }
1927
1928 #[tokio::test]
1929 async fn test_remove_on_max_backoff_count() {
1930 let peer = PeerId::random();
1931 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1932 let config = PeersConfig::test();
1933 let mut peers = PeersManager::new(config.clone());
1934 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1935 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1936
1937 peer_struct.severe_backoff_counter = config.max_backoff_count;
1939
1940 match event!(peers) {
1941 PeerAction::PeerAdded(peer_id) => {
1942 assert_eq!(peer_id, peer);
1943 }
1944 _ => unreachable!(),
1945 }
1946 match event!(peers) {
1947 PeerAction::Connect { peer_id, .. } => {
1948 assert_eq!(peer_id, peer);
1949 }
1950 _ => unreachable!(),
1951 }
1952
1953 poll_fn(|cx| {
1954 assert!(peers.poll(cx).is_pending());
1955 Poll::Ready(())
1956 })
1957 .await;
1958
1959 peers.on_outgoing_pending_session_dropped(
1960 &socket_addr,
1961 &peer,
1962 &PendingSessionHandshakeError::Eth(
1963 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1964 ),
1965 );
1966
1967 match event!(peers) {
1968 PeerAction::PeerRemoved(peer_id) => {
1969 assert_eq!(peer_id, peer);
1970 }
1971 _ => unreachable!(),
1972 }
1973
1974 poll_fn(|cx| {
1975 assert!(peers.poll(cx).is_pending());
1976 Poll::Ready(())
1977 })
1978 .await;
1979
1980 assert!(!peers.peers.contains_key(&peer));
1981 }
1982
1983 #[tokio::test]
1984 async fn test_ban_on_pending_drop() {
1985 let peer = PeerId::random();
1986 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1987 let mut peers = PeersManager::default();
1988 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1989
1990 match event!(peers) {
1991 PeerAction::PeerAdded(peer_id) => {
1992 assert_eq!(peer_id, peer);
1993 }
1994 _ => unreachable!(),
1995 }
1996 match event!(peers) {
1997 PeerAction::Connect { peer_id, .. } => {
1998 assert_eq!(peer_id, peer);
1999 }
2000 _ => unreachable!(),
2001 }
2002
2003 poll_fn(|cx| {
2004 assert!(peers.poll(cx).is_pending());
2005 Poll::Ready(())
2006 })
2007 .await;
2008
2009 peers.on_outgoing_pending_session_dropped(
2010 &socket_addr,
2011 &peer,
2012 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
2013 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
2014 )),
2015 );
2016
2017 match event!(peers) {
2018 PeerAction::PeerRemoved(peer_id) => {
2019 assert_eq!(peer_id, peer);
2020 }
2021 _ => unreachable!(),
2022 }
2023 match event!(peers) {
2024 PeerAction::BanPeer { peer_id } => {
2025 assert_eq!(peer_id, peer);
2026 }
2027 _ => unreachable!(),
2028 }
2029
2030 poll_fn(|cx| {
2031 assert!(peers.poll(cx).is_pending());
2032 Poll::Ready(())
2033 })
2034 .await;
2035
2036 assert!(!peers.peers.contains_key(&peer));
2037 }
2038
2039 #[tokio::test]
2040 async fn test_internally_closed_incoming() {
2041 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2042 let mut peers = PeersManager::default();
2043
2044 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2045 assert_eq!(peers.connection_info.num_pending_in, 1);
2046 peers.on_incoming_pending_session_rejected_internally();
2047 assert_eq!(peers.connection_info.num_pending_in, 0);
2048 }
2049
2050 #[tokio::test]
2051 async fn test_reject_incoming_at_pending_capacity() {
2052 let mut peers = PeersManager::default();
2053
2054 for count in 1..=peers.connection_info.config.max_inbound {
2055 let socket_addr =
2056 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
2057 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2058 assert_eq!(peers.connection_info.num_pending_in, count);
2059 }
2060 assert!(peers.connection_info.has_in_capacity());
2061 assert!(!peers.connection_info.has_in_pending_capacity());
2062
2063 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2064 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
2065 }
2066
2067 #[tokio::test]
2068 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
2069 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2070 let trusted = PeerId::random();
2071 peers.add_trusted_peer_id(trusted);
2072
2073 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
2075 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2076 peers.on_incoming_session_established(trusted, addr);
2077
2078 match event!(peers) {
2079 PeerAction::PeerAdded(id) => {
2080 assert_eq!(id, trusted);
2081 }
2082 _ => unreachable!(),
2083 }
2084
2085 let mut connected_untrusted_peer_ids = Vec::new();
2087 for i in 0..(peers.connection_info.config.max_inbound - 1) {
2088 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
2089 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2090 let peer_id = PeerId::random();
2091 peers.on_incoming_session_established(peer_id, addr);
2092 connected_untrusted_peer_ids.push(peer_id);
2093
2094 match event!(peers) {
2095 PeerAction::PeerAdded(id) => {
2096 assert_eq!(id, peer_id);
2097 }
2098 _ => unreachable!(),
2099 }
2100 }
2101
2102 let mut pending_addrs = Vec::new();
2103
2104 for i in 0..2 {
2106 let socket_addr =
2107 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
2108 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2109
2110 pending_addrs.push(socket_addr);
2111 }
2112
2113 assert_eq!(peers.connection_info.num_pending_in, 2);
2114
2115 for i in 0..2 {
2117 let socket_addr =
2118 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
2119 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
2120 }
2121
2122 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
2123 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
2124 DisconnectReason::UselessPeer,
2125 )),
2126 ));
2127
2128 for pending_addr in pending_addrs {
2130 peers.on_incoming_pending_session_dropped(pending_addr, &err);
2131 }
2132
2133 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
2134
2135 println!(
2136 "num_inbound: {}, has_in_capacity: {}",
2137 peers.connection_info.num_inbound,
2138 peers.connection_info.has_in_capacity()
2139 );
2140
2141 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
2143
2144 println!(
2145 "num_inbound: {}, has_in_capacity: {}",
2146 peers.connection_info.num_inbound,
2147 peers.connection_info.has_in_capacity()
2148 );
2149
2150 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2151 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2152 }
2153
2154 #[tokio::test]
2155 async fn test_closed_incoming() {
2156 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2157 let mut peers = PeersManager::default();
2158
2159 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2160 assert_eq!(peers.connection_info.num_pending_in, 1);
2161 peers.on_incoming_pending_session_gracefully_closed();
2162 assert_eq!(peers.connection_info.num_pending_in, 0);
2163 }
2164
2165 #[tokio::test]
2166 async fn test_dropped_incoming() {
2167 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
2168 let ban_duration = Duration::from_millis(500);
2169 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
2170 let mut peers = PeersManager::new(config);
2171
2172 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2173 assert_eq!(peers.connection_info.num_pending_in, 1);
2174 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
2175 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
2176 DisconnectReason::UselessPeer,
2177 )),
2178 ));
2179
2180 peers.on_incoming_pending_session_dropped(socket_addr, &err);
2181 assert_eq!(peers.connection_info.num_pending_in, 0);
2182 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
2183
2184 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
2185
2186 tokio::time::sleep(ban_duration).await;
2188
2189 poll_fn(|cx| {
2190 let _ = peers.poll(cx);
2191 Poll::Ready(())
2192 })
2193 .await;
2194
2195 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
2196 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2197 }
2198
2199 #[tokio::test]
2200 async fn test_reputation_change_connected() {
2201 let peer = PeerId::random();
2202 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2203 let mut peers = PeersManager::default();
2204 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2205
2206 match event!(peers) {
2207 PeerAction::PeerAdded(peer_id) => {
2208 assert_eq!(peer_id, peer);
2209 }
2210 _ => unreachable!(),
2211 }
2212 match event!(peers) {
2213 PeerAction::Connect { peer_id, remote_addr } => {
2214 assert_eq!(peer_id, peer);
2215 assert_eq!(remote_addr, socket_addr);
2216 }
2217 _ => unreachable!(),
2218 }
2219
2220 let p = peers.peers.get_mut(&peer).unwrap();
2221 assert_eq!(p.state, PeerConnectionState::PendingOut);
2222
2223 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
2224
2225 let p = peers.peers.get(&peer).unwrap();
2226 assert_eq!(p.state, PeerConnectionState::PendingOut);
2227 assert!(p.is_banned());
2228
2229 peers.on_active_session_gracefully_closed(peer);
2230
2231 let p = peers.peers.get(&peer).unwrap();
2232 assert_eq!(p.state, PeerConnectionState::Idle);
2233 assert!(p.is_banned());
2234
2235 match event!(peers) {
2236 PeerAction::Disconnect { peer_id, .. } => {
2237 assert_eq!(peer_id, peer);
2238 }
2239 _ => unreachable!(),
2240 }
2241 }
2242
2243 #[tokio::test]
2244 async fn retain_trusted_status() {
2245 let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2246 let trusted = PeerId::random();
2247 let mut peers =
2248 PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2249 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2250 tcp_port: 8008,
2251 udp_port: 8008,
2252 id: trusted,
2253 }]));
2254 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2255 peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
2256 assert!(peers.peers.get(&trusted).unwrap().is_trusted());
2257 }
2258
2259 #[tokio::test]
2260 async fn accept_incoming_trusted_unknown_peer_address() {
2261 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2262 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2263 let trusted = PeerId::random();
2265 peers.add_trusted_peer_id(trusted);
2266
2267 for i in 0..peers.connection_info.config.max_inbound {
2269 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
2270 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2271 let peer_id = PeerId::random();
2272 peers.on_incoming_session_established(peer_id, addr);
2273
2274 match event!(peers) {
2275 PeerAction::PeerAdded(id) => {
2276 assert_eq!(id, peer_id);
2277 }
2278 _ => unreachable!(),
2279 }
2280 }
2281
2282 let untrusted = PeerId::random();
2284 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2285 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2286 peers.on_incoming_session_established(untrusted, socket_addr);
2287
2288 match event!(peers) {
2289 PeerAction::PeerAdded(id) => {
2290 assert_eq!(id, untrusted);
2291 }
2292 _ => unreachable!(),
2293 }
2294
2295 match event!(peers) {
2296 PeerAction::Disconnect { peer_id, reason } => {
2297 assert_eq!(peer_id, untrusted);
2298 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2299 }
2300 _ => unreachable!(),
2301 }
2302
2303 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2304 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2305 peers.on_incoming_session_established(trusted, socket_addr);
2306
2307 match event!(peers) {
2308 PeerAction::PeerAdded(id) => {
2309 assert_eq!(id, trusted);
2310 }
2311 _ => unreachable!(),
2312 }
2313
2314 poll_fn(|cx| {
2315 assert!(peers.poll(cx).is_pending());
2316 Poll::Ready(())
2317 })
2318 .await;
2319
2320 let peer = peers.peers.get(&trusted).unwrap();
2321 assert_eq!(peer.state, PeerConnectionState::In);
2322 }
2323
2324 #[tokio::test]
2325 async fn test_already_connected() {
2326 let peer = PeerId::random();
2327 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2328 let mut peers = PeersManager::default();
2329
2330 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2332 assert_eq!(peers.connection_info.num_pending_in, 1);
2333
2334 peers.on_incoming_session_established(peer, socket_addr);
2337 let p = peers.peers.get_mut(&peer).expect("peer not found");
2338 assert_eq!(p.addr.tcp(), socket_addr);
2339 assert_eq!(peers.connection_info.num_pending_in, 0);
2340 assert_eq!(peers.connection_info.num_inbound, 1);
2341
2342 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2345 assert_eq!(peers.connection_info.num_pending_in, 1);
2346
2347 peers.on_already_connected(Direction::Incoming);
2351
2352 let p = peers.peers.get_mut(&peer).expect("peer not found");
2353 assert_eq!(p.addr.tcp(), socket_addr);
2354 assert_eq!(peers.connection_info.num_pending_in, 0);
2355 assert_eq!(peers.connection_info.num_inbound, 1);
2356 }
2357
2358 #[tokio::test]
2359 async fn test_reputation_change_trusted_peer() {
2360 let peer = PeerId::random();
2361 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2362 let mut peers = PeersManager::default();
2363 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2364
2365 match event!(peers) {
2366 PeerAction::PeerAdded(peer_id) => {
2367 assert_eq!(peer_id, peer);
2368 }
2369 _ => unreachable!(),
2370 }
2371 match event!(peers) {
2372 PeerAction::Connect { peer_id, remote_addr } => {
2373 assert_eq!(peer_id, peer);
2374 assert_eq!(remote_addr, socket_addr);
2375 }
2376 _ => unreachable!(),
2377 }
2378
2379 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2380 peers.on_active_outgoing_established(peer);
2381 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2382
2383 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2384
2385 {
2386 let p = peers.peers.get(&peer).unwrap();
2387 assert_eq!(p.state, PeerConnectionState::Out);
2388 assert!(!p.is_banned());
2390 }
2391
2392 loop {
2394 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2395
2396 let p = peers.peers.get(&peer).unwrap();
2397 if p.is_banned() {
2398 break
2399 }
2400 }
2401
2402 match event!(peers) {
2403 PeerAction::Disconnect { peer_id, .. } => {
2404 assert_eq!(peer_id, peer);
2405 }
2406 _ => unreachable!(),
2407 }
2408 }
2409
2410 #[tokio::test]
2411 async fn test_reputation_management() {
2412 let peer = PeerId::random();
2413 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2414 let mut peers = PeersManager::default();
2415 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2416 assert_eq!(peers.get_reputation(&peer), Some(0));
2417
2418 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2419 assert_eq!(peers.get_reputation(&peer), Some(1024));
2420
2421 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2422 assert_eq!(peers.get_reputation(&peer), Some(0));
2423 }
2424
2425 #[tokio::test]
2426 async fn test_remove_discovered_active() {
2427 let peer = PeerId::random();
2428 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2429 let mut peers = PeersManager::default();
2430 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2431
2432 match event!(peers) {
2433 PeerAction::PeerAdded(peer_id) => {
2434 assert_eq!(peer_id, peer);
2435 }
2436 _ => unreachable!(),
2437 }
2438 match event!(peers) {
2439 PeerAction::Connect { peer_id, remote_addr } => {
2440 assert_eq!(peer_id, peer);
2441 assert_eq!(remote_addr, socket_addr);
2442 }
2443 _ => unreachable!(),
2444 }
2445
2446 let p = peers.peers.get(&peer).unwrap();
2447 assert_eq!(p.state, PeerConnectionState::PendingOut);
2448
2449 peers.remove_peer(peer);
2450
2451 match event!(peers) {
2452 PeerAction::PeerRemoved(peer_id) => {
2453 assert_eq!(peer_id, peer);
2454 }
2455 _ => unreachable!(),
2456 }
2457 match event!(peers) {
2458 PeerAction::Disconnect { peer_id, .. } => {
2459 assert_eq!(peer_id, peer);
2460 }
2461 _ => unreachable!(),
2462 }
2463
2464 let p = peers.peers.get(&peer).unwrap();
2465 assert_eq!(p.state, PeerConnectionState::PendingOut);
2466
2467 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2468 let p = peers.peers.get(&peer).unwrap();
2469 assert_eq!(p.state, PeerConnectionState::PendingOut);
2470
2471 peers.on_active_session_gracefully_closed(peer);
2472 assert!(!peers.peers.contains_key(&peer));
2473 }
2474
2475 #[tokio::test]
2476 async fn test_fatal_outgoing_connection_error_trusted() {
2477 let peer = PeerId::random();
2478 let config = PeersConfig::test()
2479 .with_trusted_nodes(vec![TrustedPeer {
2480 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2481 tcp_port: 8008,
2482 udp_port: 8008,
2483 id: peer,
2484 }])
2485 .with_trusted_nodes_only(true);
2486 let mut peers = PeersManager::new(config);
2487 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2488
2489 match event!(peers) {
2490 PeerAction::Connect { peer_id, remote_addr } => {
2491 assert_eq!(peer_id, peer);
2492 assert_eq!(remote_addr, socket_addr);
2493 }
2494 _ => unreachable!(),
2495 }
2496
2497 let p = peers.peers.get(&peer).unwrap();
2498 assert_eq!(p.state, PeerConnectionState::PendingOut);
2499
2500 assert_eq!(peers.num_outbound_connections(), 0);
2501
2502 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2503 EthHandshakeError::NonStatusMessageInHandshake,
2504 ));
2505 assert!(err.is_fatal_protocol_error());
2506
2507 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2508 assert_eq!(peers.num_outbound_connections(), 0);
2509
2510 match event!(peers) {
2512 PeerAction::BanPeer { peer_id } => {
2513 assert_eq!(peer_id, peer);
2514 }
2515 err => unreachable!("{err:?}"),
2516 }
2517
2518 assert!(peers.peers.contains_key(&peer));
2520
2521 tokio::time::sleep(peers.backoff_durations.medium).await;
2523
2524 match event!(peers) {
2525 PeerAction::Connect { peer_id, remote_addr } => {
2526 assert_eq!(peer_id, peer);
2527 assert_eq!(remote_addr, socket_addr);
2528 }
2529 err => unreachable!("{err:?}"),
2530 }
2531 }
2532
2533 #[tokio::test]
2534 async fn test_outgoing_connection_error() {
2535 let peer = PeerId::random();
2536 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2537 let mut peers = PeersManager::default();
2538 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2539
2540 match event!(peers) {
2541 PeerAction::PeerAdded(peer_id) => {
2542 assert_eq!(peer_id, peer);
2543 }
2544 _ => unreachable!(),
2545 }
2546 match event!(peers) {
2547 PeerAction::Connect { peer_id, remote_addr } => {
2548 assert_eq!(peer_id, peer);
2549 assert_eq!(remote_addr, socket_addr);
2550 }
2551 _ => unreachable!(),
2552 }
2553
2554 let p = peers.peers.get(&peer).unwrap();
2555 assert_eq!(p.state, PeerConnectionState::PendingOut);
2556
2557 assert_eq!(peers.num_outbound_connections(), 0);
2558
2559 peers.on_outgoing_connection_failure(
2560 &socket_addr,
2561 &peer,
2562 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2563 );
2564
2565 assert_eq!(peers.num_outbound_connections(), 0);
2566 }
2567
2568 #[tokio::test]
2569 async fn test_outgoing_connection_gracefully_closed() {
2570 let peer = PeerId::random();
2571 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2572 let mut peers = PeersManager::default();
2573 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2574
2575 match event!(peers) {
2576 PeerAction::PeerAdded(peer_id) => {
2577 assert_eq!(peer_id, peer);
2578 }
2579 _ => unreachable!(),
2580 }
2581 match event!(peers) {
2582 PeerAction::Connect { peer_id, remote_addr } => {
2583 assert_eq!(peer_id, peer);
2584 assert_eq!(remote_addr, socket_addr);
2585 }
2586 _ => unreachable!(),
2587 }
2588
2589 let p = peers.peers.get(&peer).unwrap();
2590 assert_eq!(p.state, PeerConnectionState::PendingOut);
2591
2592 assert_eq!(peers.num_outbound_connections(), 0);
2593
2594 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2595
2596 assert_eq!(peers.num_outbound_connections(), 0);
2597 assert_eq!(peers.connection_info.num_pending_out, 0);
2598 }
2599
2600 #[tokio::test]
2601 async fn test_discovery_ban_list() {
2602 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2603 let socket_addr = SocketAddr::new(ip, 8008);
2604 let ban_list = BanList::new(vec![], vec![ip]);
2605 let config = PeersConfig::default().with_ban_list(ban_list);
2606 let mut peer_manager = PeersManager::new(config);
2607 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2608
2609 assert!(peer_manager.peers.is_empty());
2610 }
2611
2612 #[tokio::test]
2613 async fn test_on_pending_ban_list() {
2614 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2615 let socket_addr = SocketAddr::new(ip, 8008);
2616 let ban_list = BanList::new(vec![], vec![ip]);
2617 let config = PeersConfig::test().with_ban_list(ban_list);
2618 let mut peer_manager = PeersManager::new(config);
2619 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2620 match a {
2622 Ok(_) => panic!(),
2623 Err(err) => match err {
2624 InboundConnectionError::IpBanned => {
2625 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2626 }
2627 _ => unreachable!(),
2628 },
2629 }
2630 }
2631
2632 #[tokio::test]
2633 async fn test_on_active_inbound_ban_list() {
2634 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2635 let socket_addr = SocketAddr::new(ip, 8008);
2636 let given_peer_id = PeerId::random();
2637 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2638 let config = PeersConfig::test().with_ban_list(ban_list);
2639 let mut peer_manager = PeersManager::new(config);
2640 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2641 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2643 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2644 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2647 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2648
2649 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2650 peer_manager.queued_actions.pop_front()
2651 else {
2652 panic!()
2653 };
2654
2655 assert_eq!(peer_id, given_peer_id)
2656 }
2657
2658 #[test]
2659 fn test_connection_limits() {
2660 let mut info = ConnectionInfo::default();
2661 info.inc_in();
2662 assert_eq!(info.num_inbound, 1);
2663 assert_eq!(info.num_outbound, 0);
2664 assert!(info.has_in_capacity());
2665
2666 info.decr_in();
2667 assert_eq!(info.num_inbound, 0);
2668 assert_eq!(info.num_outbound, 0);
2669
2670 info.inc_out();
2671 assert_eq!(info.num_inbound, 0);
2672 assert_eq!(info.num_outbound, 1);
2673 assert!(info.has_out_capacity());
2674
2675 info.decr_out();
2676 assert_eq!(info.num_inbound, 0);
2677 assert_eq!(info.num_outbound, 0);
2678 }
2679
2680 #[test]
2681 fn test_connection_peer_state() {
2682 let mut info = ConnectionInfo::default();
2683 info.inc_in();
2684
2685 info.decr_state(PeerConnectionState::In);
2686 assert_eq!(info.num_inbound, 0);
2687 assert_eq!(info.num_outbound, 0);
2688
2689 info.inc_out();
2690
2691 info.decr_state(PeerConnectionState::Out);
2692 assert_eq!(info.num_inbound, 0);
2693 assert_eq!(info.num_outbound, 0);
2694 }
2695
2696 #[tokio::test]
2697 async fn test_trusted_peers_are_prioritized() {
2698 let trusted_peer = PeerId::random();
2699 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2700 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2701 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2702 tcp_port: 8008,
2703 udp_port: 8008,
2704 id: trusted_peer,
2705 }]);
2706 let mut peers = PeersManager::new(config);
2707
2708 let basic_peer = PeerId::random();
2709 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2710 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2711
2712 match event!(peers) {
2713 PeerAction::PeerAdded(peer_id) => {
2714 assert_eq!(peer_id, basic_peer);
2715 }
2716 _ => unreachable!(),
2717 }
2718 match event!(peers) {
2719 PeerAction::Connect { peer_id, remote_addr } => {
2720 assert_eq!(peer_id, trusted_peer);
2721 assert_eq!(remote_addr, trusted_sock);
2722 }
2723 _ => unreachable!(),
2724 }
2725 match event!(peers) {
2726 PeerAction::Connect { peer_id, remote_addr } => {
2727 assert_eq!(peer_id, basic_peer);
2728 assert_eq!(remote_addr, basic_sock);
2729 }
2730 _ => unreachable!(),
2731 }
2732 }
2733
2734 #[tokio::test]
2735 async fn test_connect_trusted_nodes_only() {
2736 let trusted_peer = PeerId::random();
2737 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2738 let config = PeersConfig::test()
2739 .with_trusted_nodes(vec![TrustedPeer {
2740 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2741 tcp_port: 8008,
2742 udp_port: 8008,
2743 id: trusted_peer,
2744 }])
2745 .with_trusted_nodes_only(true);
2746 let mut peers = PeersManager::new(config);
2747
2748 let basic_peer = PeerId::random();
2749 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2750 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2751
2752 match event!(peers) {
2753 PeerAction::PeerAdded(peer_id) => {
2754 assert_eq!(peer_id, basic_peer);
2755 }
2756 _ => unreachable!(),
2757 }
2758 match event!(peers) {
2759 PeerAction::Connect { peer_id, remote_addr } => {
2760 assert_eq!(peer_id, trusted_peer);
2761 assert_eq!(remote_addr, trusted_sock);
2762 }
2763 _ => unreachable!(),
2764 }
2765 poll_fn(|cx| {
2766 assert!(peers.poll(cx).is_pending());
2767 Poll::Ready(())
2768 })
2769 .await;
2770 }
2771
2772 #[tokio::test]
2773 async fn test_incoming_with_trusted_nodes_only() {
2774 let trusted_peer = PeerId::random();
2775 let config = PeersConfig::test()
2776 .with_trusted_nodes(vec![TrustedPeer {
2777 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2778 tcp_port: 8008,
2779 udp_port: 8008,
2780 id: trusted_peer,
2781 }])
2782 .with_trusted_nodes_only(true);
2783 let mut peers = PeersManager::new(config);
2784
2785 let basic_peer = PeerId::random();
2786 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2787 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2788 assert_eq!(peers.connection_info.num_pending_in, 1);
2790 peers.on_incoming_session_established(basic_peer, basic_sock);
2791 assert_eq!(peers.connection_info.num_pending_in, 0);
2794 assert_eq!(peers.connection_info.num_inbound, 0);
2795
2796 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2797 peers.queued_actions.pop_front()
2798 else {
2799 panic!()
2800 };
2801 assert_eq!(basic_peer, peer_id);
2802 assert!(!peers.peers.contains_key(&basic_peer));
2803 }
2804
2805 #[tokio::test]
2806 async fn test_incoming_without_trusted_nodes_only() {
2807 let trusted_peer = PeerId::random();
2808 let config = PeersConfig::test()
2809 .with_trusted_nodes(vec![TrustedPeer {
2810 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2811 tcp_port: 8008,
2812 udp_port: 8008,
2813 id: trusted_peer,
2814 }])
2815 .with_trusted_nodes_only(false);
2816 let mut peers = PeersManager::new(config);
2817
2818 let basic_peer = PeerId::random();
2819 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2820 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2821
2822 assert_eq!(peers.connection_info.num_pending_in, 1);
2824 peers.on_incoming_session_established(basic_peer, basic_sock);
2825 assert_eq!(peers.connection_info.num_pending_in, 0);
2828 assert_eq!(peers.connection_info.num_inbound, 1);
2829 assert!(peers.peers.contains_key(&basic_peer));
2830 }
2831
2832 #[tokio::test]
2833 async fn test_incoming_at_capacity() {
2834 let mut config = PeersConfig::test();
2835 config.connection_info.max_inbound = 1;
2836 let mut peers = PeersManager::new(config);
2837
2838 let peer = PeerId::random();
2839 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2840 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2841
2842 peers.on_incoming_session_established(peer, addr);
2843
2844 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2845 assert_eq!(
2846 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2847 InboundConnectionError::ExceedsCapacity
2848 );
2849 }
2850
2851 #[tokio::test]
2852 async fn test_incoming_rate_limit() {
2853 let config = PeersConfig {
2854 incoming_ip_throttle_duration: Duration::from_millis(100),
2855 ..PeersConfig::test()
2856 };
2857 let mut peers = PeersManager::new(config);
2858
2859 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2860 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2861 assert_eq!(
2862 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2863 InboundConnectionError::IpBanned
2864 );
2865
2866 peers.release_interval.reset_immediately();
2867 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2868
2869 poll_fn(|cx| loop {
2871 if peers.poll(cx).is_pending() {
2872 return Poll::Ready(());
2873 }
2874 })
2875 .await;
2876
2877 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2878 assert_eq!(
2879 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2880 InboundConnectionError::IpBanned
2881 );
2882 }
2883
2884 #[tokio::test]
2885 async fn test_tick() {
2886 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2887 let socket_addr = SocketAddr::new(ip, 8008);
2888 let config = PeersConfig::test();
2889 let mut peer_manager = PeersManager::new(config);
2890 let peer_id = PeerId::random();
2891 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2892
2893 tokio::time::sleep(Duration::from_secs(1)).await;
2894 peer_manager.tick();
2895
2896 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2898
2899 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2901
2902 tokio::time::sleep(Duration::from_secs(1)).await;
2903 peer_manager.tick();
2904
2905 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2907
2908 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2909
2910 tokio::time::sleep(Duration::from_secs(1)).await;
2911 peer_manager.tick();
2912
2913 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2915 }
2916
2917 #[tokio::test]
2918 async fn test_remove_incoming_after_disconnect() {
2919 let peer_id = PeerId::random();
2920 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2921 let mut peers = PeersManager::default();
2922
2923 peers.on_incoming_pending_session(addr.ip()).unwrap();
2924 peers.on_incoming_session_established(peer_id, addr);
2925 let peer = peers.peers.get(&peer_id).unwrap();
2926 assert_eq!(peer.state, PeerConnectionState::In);
2927 assert!(peer.remove_after_disconnect);
2928
2929 peers.on_active_session_gracefully_closed(peer_id);
2930 assert!(!peers.peers.contains_key(&peer_id))
2931 }
2932
2933 #[tokio::test]
2934 async fn test_keep_incoming_after_disconnect_if_discovered() {
2935 let peer_id = PeerId::random();
2936 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2937 let mut peers = PeersManager::default();
2938
2939 peers.on_incoming_pending_session(addr.ip()).unwrap();
2940 peers.on_incoming_session_established(peer_id, addr);
2941 let peer = peers.peers.get(&peer_id).unwrap();
2942 assert_eq!(peer.state, PeerConnectionState::In);
2943 assert!(peer.remove_after_disconnect);
2944
2945 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2947
2948 peers.on_active_session_gracefully_closed(peer_id);
2949
2950 let peer = peers.peers.get(&peer_id).unwrap();
2951 assert_eq!(peer.state, PeerConnectionState::Idle);
2952 assert!(!peer.remove_after_disconnect);
2953 }
2954
2955 #[tokio::test]
2956 async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2957 let throttle_duration = Duration::from_millis(100);
2958 let config =
2959 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2960 let mut peers = PeersManager::new(config);
2961
2962 let peer_id = PeerId::random();
2963 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2964
2965 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2967
2968 match event!(peers) {
2969 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2970 _ => unreachable!(),
2971 }
2972
2973 match event!(peers) {
2974 PeerAction::Connect { .. } => {}
2975 _ => unreachable!(),
2976 }
2977
2978 peers.on_active_outgoing_established(peer_id);
2980 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2981
2982 peers.on_active_session_gracefully_closed(peer_id);
2984
2985 let peer = peers.peers.get(&peer_id).unwrap();
2986 assert_eq!(peer.state, PeerConnectionState::Idle);
2987 assert!(peer.backed_off);
2988
2989 assert!(peers.backed_off_peers.contains_key(&peer_id));
2991
2992 poll_fn(|cx| {
2994 assert!(peers.poll(cx).is_pending());
2995 Poll::Ready(())
2996 })
2997 .await;
2998
2999 assert!(peers.backed_off_peers.contains_key(&peer_id));
3001 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
3002
3003 tokio::time::sleep(throttle_duration).await;
3005
3006 match event!(peers) {
3008 PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
3009 _ => unreachable!(),
3010 }
3011
3012 assert!(!peers.backed_off_peers.contains_key(&peer_id));
3014 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
3015 }
3016
3017 #[tokio::test]
3018 async fn test_backed_off_peer_can_accept_incoming_connection() {
3019 let throttle_duration = Duration::from_millis(100);
3020 let config =
3021 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
3022 let mut peers = PeersManager::new(config);
3023
3024 let peer_id = PeerId::random();
3025 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
3026
3027 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
3029
3030 match event!(peers) {
3031 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
3032 _ => unreachable!(),
3033 }
3034
3035 match event!(peers) {
3036 PeerAction::Connect { .. } => {}
3037 _ => unreachable!(),
3038 }
3039
3040 peers.on_active_outgoing_established(peer_id);
3042 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
3043
3044 peers.on_active_session_gracefully_closed(peer_id);
3046
3047 let peer = peers.peers.get(&peer_id).unwrap();
3048 assert_eq!(peer.state, PeerConnectionState::Idle);
3049 assert!(peer.backed_off);
3050 assert!(peers.backed_off_peers.contains_key(&peer_id));
3051
3052 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
3055 assert_eq!(peers.connection_info.num_pending_in, 1);
3056
3057 peers.on_incoming_session_established(peer_id, addr);
3059
3060 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3062 assert_eq!(peers.connection_info.num_inbound, 1);
3063
3064 assert!(peers.backed_off_peers.contains_key(&peer_id));
3066 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
3067
3068 poll_fn(|cx| {
3070 assert!(peers.poll(cx).is_pending());
3071 Poll::Ready(())
3072 })
3073 .await;
3074
3075 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3077
3078 tokio::time::sleep(throttle_duration).await;
3080
3081 poll_fn(|cx| {
3082 let _ = peers.poll(cx);
3083 Poll::Ready(())
3084 })
3085 .await;
3086
3087 assert!(!peers.backed_off_peers.contains_key(&peer_id));
3089 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
3090
3091 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3093 }
3094
3095 #[tokio::test]
3096 async fn test_incoming_outgoing_already_connected() {
3097 let peer_id = PeerId::random();
3098 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
3099 let mut peers = PeersManager::default();
3100
3101 peers.on_incoming_pending_session(addr.ip()).unwrap();
3102 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
3103
3104 match event!(peers) {
3105 PeerAction::PeerAdded(_) => {}
3106 _ => unreachable!(),
3107 }
3108
3109 match event!(peers) {
3110 PeerAction::Connect { .. } => {}
3111 _ => unreachable!(),
3112 }
3113
3114 peers.on_incoming_session_established(peer_id, addr);
3115 peers.on_already_connected(Direction::Outgoing(peer_id));
3116 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3117 assert_eq!(peers.connection_info.num_inbound, 1);
3118 assert_eq!(peers.connection_info.num_pending_out, 0);
3119 assert_eq!(peers.connection_info.num_pending_in, 0);
3120 assert_eq!(peers.connection_info.num_outbound, 0);
3121 }
3122
3123 #[tokio::test]
3124 async fn test_already_connected_incoming_outgoing_connection_error() {
3125 let peer_id = PeerId::random();
3126 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
3127 let mut peers = PeersManager::default();
3128
3129 peers.on_incoming_pending_session(addr.ip()).unwrap();
3130 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
3131
3132 match event!(peers) {
3133 PeerAction::PeerAdded(_) => {}
3134 _ => unreachable!(),
3135 }
3136
3137 match event!(peers) {
3138 PeerAction::Connect { .. } => {}
3139 _ => unreachable!(),
3140 }
3141
3142 peers.on_incoming_session_established(peer_id, addr);
3143
3144 peers.on_outgoing_connection_failure(
3145 &addr,
3146 &peer_id,
3147 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
3148 );
3149 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3150 assert_eq!(peers.connection_info.num_inbound, 1);
3151 assert_eq!(peers.connection_info.num_pending_out, 0);
3152 assert_eq!(peers.connection_info.num_pending_in, 0);
3153 assert_eq!(peers.connection_info.num_outbound, 0);
3154 }
3155
3156 #[tokio::test]
3157 async fn test_max_concurrent_dials() {
3158 let config = PeersConfig::default();
3159 let mut peer_manager = PeersManager::new(config);
3160 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
3161 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
3162 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
3163 peer_manager.add_peer(PeerId::random(), peer_addr, None);
3164 }
3165
3166 peer_manager.fill_outbound_slots();
3167 let dials = peer_manager
3168 .queued_actions
3169 .iter()
3170 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
3171 .count();
3172 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
3173 }
3174
3175 #[tokio::test]
3176 async fn test_max_num_of_pending_dials() {
3177 let config = PeersConfig::default();
3178 let mut peer_manager = PeersManager::new(config);
3179 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
3180 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
3181
3182 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
3184 peer_manager.add_peer(PeerId::random(), peer_addr, None);
3185 }
3186
3187 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
3188 match event!(peer_manager) {
3189 PeerAction::PeerAdded(_) => {}
3190 _ => unreachable!(),
3191 }
3192 }
3193
3194 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
3195 match event!(peer_manager) {
3196 PeerAction::Connect { .. } => {}
3197 _ => unreachable!(),
3198 }
3199 }
3200
3201 peer_manager.fill_outbound_slots();
3203
3204 let dials = peer_manager.connection_info.num_pending_out;
3206 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
3207
3208 let num_pendingout_states = peer_manager
3209 .peers
3210 .iter()
3211 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
3212 .map(|(peer_id, _)| *peer_id)
3213 .collect::<Vec<PeerId>>();
3214 assert_eq!(
3215 num_pendingout_states.len(),
3216 peer_manager.connection_info.config.max_concurrent_outbound_dials
3217 );
3218
3219 for peer_id in &num_pendingout_states {
3221 peer_manager.on_active_outgoing_established(*peer_id);
3222 }
3223
3224 for peer_id in &num_pendingout_states {
3226 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
3227 }
3228
3229 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
3231 }
3232
3233 #[tokio::test]
3234 async fn test_connect() {
3235 let peer = PeerId::random();
3236 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3237 let mut peers = PeersManager::default();
3238 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3239 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3240
3241 match event!(peers) {
3242 PeerAction::Connect { peer_id, remote_addr } => {
3243 assert_eq!(peer_id, peer);
3244 assert_eq!(remote_addr, socket_addr);
3245 }
3246 _ => unreachable!(),
3247 }
3248
3249 let (record, _) = peers.peer_by_id(peer).unwrap();
3250 assert_eq!(record.tcp_addr(), socket_addr);
3251 assert_eq!(record.udp_addr(), socket_addr);
3252
3253 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3255
3256 let (record, _) = peers.peer_by_id(peer).unwrap();
3257 assert_eq!(record.tcp_addr(), socket_addr);
3258 assert_eq!(record.udp_addr(), socket_addr);
3259 }
3260
3261 #[tokio::test]
3262 async fn test_incoming_connection_from_banned() {
3263 let peer = PeerId::random();
3264 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3265 let config = PeersConfig::test().with_max_inbound(3);
3266 let mut peers = PeersManager::new(config);
3267 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
3268
3269 match event!(peers) {
3270 PeerAction::PeerAdded(peer_id) => {
3271 assert_eq!(peer_id, peer);
3272 }
3273 _ => unreachable!(),
3274 }
3275 match event!(peers) {
3276 PeerAction::Connect { peer_id, .. } => {
3277 assert_eq!(peer_id, peer);
3278 }
3279 _ => unreachable!(),
3280 }
3281
3282 poll_fn(|cx| {
3283 assert!(peers.poll(cx).is_pending());
3284 Poll::Ready(())
3285 })
3286 .await;
3287
3288 loop {
3290 peers.on_active_session_dropped(
3291 &socket_addr,
3292 &peer,
3293 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3294 reth_eth_wire::EthVersion::Eth68,
3295 reth_eth_wire::EthMessageID::Status,
3296 )),
3297 );
3298
3299 if peers.peers.get(&peer).unwrap().is_banned() {
3300 break;
3301 }
3302
3303 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3304 peers.on_incoming_session_established(peer, socket_addr);
3305
3306 match event!(peers) {
3307 PeerAction::Connect { peer_id, .. } => {
3308 assert_eq!(peer_id, peer);
3309 }
3310 _ => unreachable!(),
3311 }
3312 }
3313
3314 assert!(peers.peers.get(&peer).unwrap().is_banned());
3315
3316 for _ in 0..peers.connection_info.config.max_inbound {
3318 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3319 peers.on_incoming_session_established(peer, socket_addr);
3320
3321 match event!(peers) {
3322 PeerAction::DisconnectBannedIncoming { peer_id } => {
3323 assert_eq!(peer_id, peer);
3324 }
3325 _ => unreachable!(),
3326 }
3327 }
3328
3329 poll_fn(|cx| {
3330 assert!(peers.poll(cx).is_pending());
3331 Poll::Ready(())
3332 })
3333 .await;
3334
3335 assert_eq!(peers.connection_info.num_inbound, 0);
3336
3337 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3338
3339 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3341 assert_eq!(peers.connection_info.num_pending_in, 1);
3342
3343 peers.on_active_session_gracefully_closed(peer);
3347 assert_eq!(peers.connection_info.num_inbound, 0);
3348 }
3349
3350 #[tokio::test]
3351 async fn test_add_pending_connect() {
3352 let peer = PeerId::random();
3353 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3354 let mut peers = PeersManager::default();
3355 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3356 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3357 assert_eq!(peers.connection_info.num_pending_out, 1);
3358 }
3359
3360 #[tokio::test]
3361 async fn test_dns_updates_peer_address() {
3362 let peer_id = PeerId::random();
3363 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3364 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3365
3366 let trusted = TrustedPeer {
3367 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3368 tcp_port: 8008,
3369 udp_port: 8008,
3370 id: peer_id,
3371 };
3372
3373 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3374 let mut manager = PeersManager::new(config);
3375 manager
3376 .trusted_peers_resolver
3377 .set_interval(tokio::time::interval(Duration::from_millis(1)));
3378
3379 manager.peers.insert(
3380 peer_id,
3381 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3382 );
3383
3384 for _ in 0..100 {
3385 let _ = event!(manager);
3386 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3387 break;
3388 }
3389 tokio::time::sleep(Duration::from_millis(10)).await;
3390 }
3391
3392 let updated_peer = manager.peers.get(&peer_id).unwrap();
3393 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3394 }
3395
3396 #[tokio::test]
3397 async fn test_ip_filter_blocks_inbound_connection() {
3398 use reth_net_banlist::IpFilter;
3399 use std::net::IpAddr;
3400
3401 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3403 let config = PeersConfig::test().with_ip_filter(ip_filter);
3404 let mut peers = PeersManager::new(config);
3405
3406 let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3408 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3409
3410 let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3412 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3413 }
3414
3415 #[tokio::test]
3416 async fn test_ip_filter_blocks_outbound_connection() {
3417 use reth_net_banlist::IpFilter;
3418 use std::net::SocketAddr;
3419
3420 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3422 let config = PeersConfig::test().with_ip_filter(ip_filter);
3423 let mut peers = PeersManager::new(config);
3424
3425 let peer_id = PeerId::new([1; 64]);
3426
3427 let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3429 peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3430 assert!(peers.peers.contains_key(&peer_id));
3431
3432 let peer_id2 = PeerId::new([2; 64]);
3434 let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3435 peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3436 assert!(!peers.peers.contains_key(&peer_id2));
3437 }
3438
3439 #[tokio::test]
3440 async fn test_ip_filter_ipv6() {
3441 use reth_net_banlist::IpFilter;
3442 use std::net::IpAddr;
3443
3444 let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3446 let config = PeersConfig::test().with_ip_filter(ip_filter);
3447 let mut peers = PeersManager::new(config);
3448
3449 let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3451 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3452
3453 let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3455 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3456 }
3457
3458 #[tokio::test]
3459 async fn test_ip_filter_multiple_ranges() {
3460 use reth_net_banlist::IpFilter;
3461 use std::net::IpAddr;
3462
3463 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3465 let config = PeersConfig::test().with_ip_filter(ip_filter);
3466 let mut peers = PeersManager::new(config);
3467
3468 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3470 let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3471 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3472 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3473
3474 let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3476 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3477 }
3478
3479 #[tokio::test]
3480 async fn test_ip_filter_no_restriction() {
3481 use reth_net_banlist::IpFilter;
3482 use std::net::IpAddr;
3483
3484 let ip_filter = IpFilter::allow_all();
3486 let config = PeersConfig::test().with_ip_filter(ip_filter);
3487 let mut peers = PeersManager::new(config);
3488
3489 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3491 let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3492 let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3493 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3494 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3495 assert!(peers.on_incoming_pending_session(ip3).is_ok());
3496 }
3497
3498 #[tokio::test]
3499 async fn test_best_unconnected_prefers_fork_id_as_tiebreaker() {
3500 let mut peers = PeersManager::default();
3501 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8008);
3502
3503 let fork_id = ForkId { hash: ForkHash([0xaa, 0xbb, 0xcc, 0xdd]), next: 0 };
3504
3505 let no_fork = PeerId::random();
3507 peers.add_peer(no_fork, PeerAddr::from_tcp(addr), None);
3508
3509 let with_fork = PeerId::random();
3510 peers.add_peer(with_fork, PeerAddr::from_tcp(addr), None);
3511 peers.peers.get_mut(&with_fork).unwrap().fork_id = Some(Box::new(fork_id));
3512
3513 let (best_id, _) = peers.best_unconnected().unwrap();
3514 assert_eq!(best_id, with_fork, "fork_id should break tie when reputation is equal");
3515 }
3516
3517 #[tokio::test]
3518 async fn test_add_trusted_peer_node_resolves_absent_peer() {
3519 let peer_id = PeerId::random();
3520 let trusted = TrustedPeer {
3521 host: url::Host::Domain("example.invalid".to_string()),
3522 tcp_port: 30303,
3523 udp_port: 30303,
3524 id: peer_id,
3525 };
3526
3527 let mut manager = PeersManager::default();
3528 manager.add_trusted_peer_node(trusted);
3529
3530 assert!(manager.trusted_peer_ids.contains(&peer_id));
3531 assert_eq!(manager.trusted_peers_resolver.trusted_peers.len(), 1);
3532 assert!(!manager.peers.contains_key(&peer_id));
3533
3534 let resolved = NodeRecord {
3535 address: "10.0.0.1".parse::<IpAddr>().unwrap(),
3536 tcp_port: 30303,
3537 udp_port: 30303,
3538 id: peer_id,
3539 };
3540 manager.on_resolved_peer(peer_id, resolved);
3541
3542 let peer = manager.peers.get(&peer_id).expect("peer should be added after resolution");
3543 assert!(peer.kind.is_trusted());
3544 assert_eq!(peer.addr.tcp().ip(), "10.0.0.1".parse::<IpAddr>().unwrap());
3545 }
3546
3547 #[tokio::test]
3548 async fn test_removed_trusted_peer_ignores_late_resolution() {
3549 let peer_id = PeerId::random();
3550 let trusted = TrustedPeer {
3551 host: url::Host::Domain("example.invalid".to_string()),
3552 tcp_port: 30303,
3553 udp_port: 30303,
3554 id: peer_id,
3555 };
3556
3557 let mut manager = PeersManager::default();
3558 manager.add_trusted_peer_node(trusted);
3559 manager.remove_peer_from_trusted_set(peer_id);
3560
3561 let resolved = NodeRecord {
3562 address: "10.0.0.1".parse::<IpAddr>().unwrap(),
3563 tcp_port: 30303,
3564 udp_port: 30303,
3565 id: peer_id,
3566 };
3567 manager.on_resolved_peer(peer_id, resolved);
3568
3569 assert!(!manager.peers.contains_key(&peer_id));
3570 assert!(!manager.trusted_peer_ids.contains(&peer_id));
3571 }
3572
3573 #[tokio::test]
3574 async fn test_rotation_disconnects_eligible_peer() {
3575 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3576 let peer = PeerId::random();
3577 let mut peers = PeersManager::new(
3578 PeersConfig::test()
3579 .with_max_outbound(1)
3580 .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3581 );
3582
3583 peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3584 match event!(peers) {
3585 PeerAction::PeerAdded(_) => {}
3586 _ => unreachable!(),
3587 }
3588 match event!(peers) {
3589 PeerAction::Connect { .. } => {}
3590 _ => unreachable!(),
3591 }
3592 peers.on_active_outgoing_established(peer);
3593
3594 set_connected_at(
3595 &mut peers,
3596 peer,
3597 std::time::Instant::now() - Duration::from_secs(11 * 60),
3598 );
3599
3600 tokio::time::sleep(Duration::from_millis(100)).await;
3601
3602 match event!(peers) {
3603 PeerAction::Disconnect { peer_id, reason } => {
3604 assert_eq!(peer_id, peer);
3605 assert_eq!(reason, Some(DisconnectReason::UselessPeer));
3606 }
3607 _ => unreachable!(),
3608 }
3609 }
3610
3611 #[tokio::test]
3612 async fn test_rotation_then_remove_uses_active_session_removal() {
3613 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 7)), 8008);
3614 let peer = PeerId::random();
3615 let mut peers = PeersManager::new(
3616 PeersConfig::test()
3617 .with_max_outbound(1)
3618 .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3619 );
3620
3621 peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3622 match event!(peers) {
3623 PeerAction::PeerAdded(_) => {}
3624 _ => unreachable!(),
3625 }
3626 match event!(peers) {
3627 PeerAction::Connect { .. } => {}
3628 _ => unreachable!(),
3629 }
3630 peers.on_active_outgoing_established(peer);
3631 set_connected_at(
3632 &mut peers,
3633 peer,
3634 std::time::Instant::now() - Duration::from_secs(11 * 60),
3635 );
3636
3637 tokio::time::sleep(Duration::from_millis(100)).await;
3638
3639 match event!(peers) {
3640 PeerAction::Disconnect { peer_id, reason } => {
3641 assert_eq!(peer_id, peer);
3642 assert_eq!(reason, Some(DisconnectReason::UselessPeer));
3643 }
3644 _ => unreachable!(),
3645 }
3646 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::Out);
3647 assert_eq!(peers.connection_info.num_outbound, 1);
3648
3649 peers.remove_peer(peer);
3650 match event!(peers) {
3651 PeerAction::PeerRemoved(peer_id) => assert_eq!(peer_id, peer),
3652 _ => unreachable!(),
3653 }
3654 match event!(peers) {
3655 PeerAction::Disconnect { peer_id, reason } => {
3656 assert_eq!(peer_id, peer);
3657 assert_eq!(reason, Some(DisconnectReason::DisconnectRequested));
3658 }
3659 _ => unreachable!(),
3660 }
3661
3662 let p = peers.peers.get(&peer).unwrap();
3663 assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
3664 assert!(p.remove_after_disconnect);
3665 assert_eq!(peers.connection_info.num_outbound, 1);
3666
3667 peers.on_active_session_gracefully_closed(peer);
3668 assert_eq!(peers.connection_info.num_outbound, 0);
3669 assert!(!peers.peers.contains_key(&peer));
3670 }
3671
3672 #[tokio::test]
3673 async fn test_rotation_skips_trusted_peers() {
3674 let _addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3675 let peer = PeerId::random();
3676 let trusted = TrustedPeer {
3677 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 3)),
3678 tcp_port: 8008,
3679 udp_port: 8008,
3680 id: peer,
3681 };
3682 let mut peers = PeersManager::new(
3683 PeersConfig::test()
3684 .with_max_outbound(1)
3685 .with_trusted_nodes(vec![trusted])
3686 .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3687 );
3688
3689 match event!(peers) {
3690 PeerAction::Connect { .. } => {}
3691 _ => unreachable!(),
3692 }
3693 peers.on_active_outgoing_established(peer);
3694 set_connected_at(
3695 &mut peers,
3696 peer,
3697 std::time::Instant::now() - Duration::from_secs(11 * 60),
3698 );
3699
3700 tokio::time::sleep(Duration::from_millis(100)).await;
3701
3702 poll_fn(|cx| {
3703 assert!(peers.poll(cx).is_pending(), "trusted peer must not be rotated");
3704 Poll::Ready(())
3705 })
3706 .await;
3707 }
3708
3709 #[tokio::test]
3710 async fn test_rotation_skips_recently_connected_peers() {
3711 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 4)), 8008);
3712 let peer = PeerId::random();
3713 let mut peers = PeersManager::new(
3714 PeersConfig::test()
3715 .with_max_outbound(1)
3716 .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3717 );
3718
3719 peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3720 match event!(peers) {
3721 PeerAction::PeerAdded(_) => {}
3722 _ => unreachable!(),
3723 }
3724 match event!(peers) {
3725 PeerAction::Connect { .. } => {}
3726 _ => unreachable!(),
3727 }
3728 peers.on_active_outgoing_established(peer);
3729
3730 tokio::time::sleep(Duration::from_millis(100)).await;
3731
3732 poll_fn(|cx| {
3733 assert!(peers.poll(cx).is_pending(), "recently connected peer must not be rotated");
3734 Poll::Ready(())
3735 })
3736 .await;
3737 }
3738
3739 #[tokio::test]
3740 async fn test_rotation_skips_when_slots_available() {
3741 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 5)), 8008);
3742 let peer = PeerId::random();
3743 let mut peers = PeersManager::new(
3744 PeersConfig::test()
3745 .with_max_outbound(2)
3746 .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3747 );
3748
3749 peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3750 match event!(peers) {
3751 PeerAction::PeerAdded(_) => {}
3752 _ => unreachable!(),
3753 }
3754 match event!(peers) {
3755 PeerAction::Connect { .. } => {}
3756 _ => unreachable!(),
3757 }
3758 peers.on_active_outgoing_established(peer);
3759 set_connected_at(
3760 &mut peers,
3761 peer,
3762 std::time::Instant::now() - Duration::from_secs(11 * 60),
3763 );
3764
3765 tokio::time::sleep(Duration::from_millis(100)).await;
3766
3767 poll_fn(|cx| {
3768 assert!(
3769 peers.poll(cx).is_pending(),
3770 "rotation must not fire when outbound slots are available"
3771 );
3772 Poll::Ready(())
3773 })
3774 .await;
3775 }
3776
3777 #[tokio::test]
3778 async fn test_rotation_disconnects_inbound_peer() {
3779 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 6)), 8008);
3780 let peer = PeerId::random();
3781 let mut peers = PeersManager::new(
3782 PeersConfig::test()
3783 .with_max_inbound(1)
3784 .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3785 );
3786
3787 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
3788 peers.on_incoming_session_established(peer, addr);
3789
3790 match event!(peers) {
3791 PeerAction::PeerAdded(_) => {}
3792 _ => unreachable!(),
3793 }
3794
3795 set_connected_at(
3796 &mut peers,
3797 peer,
3798 std::time::Instant::now() - Duration::from_secs(11 * 60),
3799 );
3800
3801 tokio::time::sleep(Duration::from_millis(100)).await;
3802
3803 match event!(peers) {
3804 PeerAction::Disconnect { peer_id, reason } => {
3805 assert_eq!(peer_id, peer);
3806 assert_eq!(reason, Some(DisconnectReason::UselessPeer));
3807 }
3808 _ => unreachable!(),
3809 }
3810 }
3811}