1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 is_connection_failed_reputation,
18 peers::{
19 config::PeerBackoffDurations,
20 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21 },
22 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27 fmt::Display,
28 io::{self},
29 net::{IpAddr, SocketAddr},
30 task::{Context, Poll},
31 time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35 sync::mpsc,
36 time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41#[derive(Debug)]
48pub struct PeersManager {
49 peers: HashMap<PeerId, Peer>,
51 trusted_peer_ids: HashSet<PeerId>,
56 trusted_peers_resolver: TrustedPeersResolver,
59 manager_tx: mpsc::UnboundedSender<PeerCommand>,
61 handle_rx: UnboundedReceiverStream<PeerCommand>,
63 queued_actions: VecDeque<PeerAction>,
65 refill_slots_interval: Interval,
67 reputation_weights: ReputationChangeWeights,
69 connection_info: ConnectionInfo,
71 ban_list: BanList,
73 backed_off_peers: HashMap<PeerId, std::time::Instant>,
75 release_interval: Interval,
77 ban_duration: Duration,
79 backoff_durations: PeerBackoffDurations,
82 trusted_nodes_only: bool,
85 last_tick: Instant,
87 max_backoff_count: u8,
89 net_connection_state: NetworkConnectionState,
91 incoming_ip_throttle_duration: Duration,
93 ip_filter: reth_net_banlist::IpFilter,
95 enforce_enr_fork_id: bool,
98}
99
100impl PeersManager {
101 pub fn new(config: PeersConfig) -> Self {
103 let PeersConfig {
104 refill_slots_interval,
105 connection_info,
106 reputation_weights,
107 ban_list,
108 ban_duration,
109 backoff_durations,
110 trusted_nodes,
111 trusted_nodes_only,
112 trusted_nodes_resolution_interval,
113 basic_nodes,
114 max_backoff_count,
115 incoming_ip_throttle_duration,
116 ip_filter,
117 enforce_enr_fork_id,
118 } = config;
119 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
120 let now = Instant::now();
121
122 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
124
125 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
126 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
127
128 for trusted_peer in &trusted_nodes {
129 match trusted_peer.resolve_blocking() {
130 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
131 trusted_peer_ids.insert(id);
132 peers.entry(id).or_insert_with(|| {
133 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
134 });
135 }
136 Err(err) => {
137 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
138 }
139 }
140 }
141
142 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
143 peers.entry(id).or_insert_with(|| {
144 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
145 });
146 }
147
148 trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
149
150 Self {
151 peers,
152 trusted_peer_ids,
153 trusted_peers_resolver: TrustedPeersResolver::new(
154 trusted_nodes,
155 tokio::time::interval(trusted_nodes_resolution_interval), ),
157 manager_tx,
158 handle_rx: UnboundedReceiverStream::new(handle_rx),
159 queued_actions: Default::default(),
160 reputation_weights,
161 refill_slots_interval: tokio::time::interval(refill_slots_interval),
162 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
163 connection_info: ConnectionInfo::new(connection_info),
164 ban_list,
165 backed_off_peers: Default::default(),
166 ban_duration,
167 backoff_durations,
168 trusted_nodes_only,
169 last_tick: Instant::now(),
170 max_backoff_count,
171 net_connection_state: NetworkConnectionState::default(),
172 incoming_ip_throttle_duration,
173 ip_filter,
174 enforce_enr_fork_id,
175 }
176 }
177
178 pub(crate) fn handle(&self) -> PeersHandle {
180 PeersHandle::new(self.manager_tx.clone())
181 }
182
183 pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
185 self.enforce_enr_fork_id
186 }
187
188 #[inline]
190 pub(crate) fn num_known_peers(&self) -> usize {
191 self.peers.len()
192 }
193
194 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
196 self.peers.iter().map(|(peer_id, v)| {
197 NodeRecord::new_with_ports(
198 v.addr.tcp().ip(),
199 v.addr.tcp().port(),
200 v.addr.udp().map(|addr| addr.port()),
201 *peer_id,
202 )
203 })
204 }
205
206 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
208 self.peers.get(&peer_id).map(|v| {
209 (
210 NodeRecord::new_with_ports(
211 v.addr.tcp().ip(),
212 v.addr.tcp().port(),
213 v.addr.udp().map(|addr| addr.port()),
214 peer_id,
215 ),
216 v.kind,
217 )
218 })
219 }
220
221 pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
223 self.peers.get(peer_id).is_some_and(|p| {
224 matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
225 })
226 }
227
228 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
230 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
231 }
232
233 #[inline]
235 pub(crate) const fn num_inbound_connections(&self) -> usize {
236 self.connection_info.num_inbound
237 }
238
239 #[inline]
241 pub(crate) const fn num_outbound_connections(&self) -> usize {
242 self.connection_info.num_outbound
243 }
244
245 #[inline]
247 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
248 self.connection_info.num_pending_out
249 }
250
251 #[inline]
253 pub(crate) fn num_backed_off_peers(&self) -> usize {
254 self.backed_off_peers.len()
255 }
256
257 fn num_idle_trusted_peers(&self) -> usize {
259 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
260 }
261
262 pub(crate) fn on_incoming_pending_session(
266 &mut self,
267 addr: IpAddr,
268 ) -> Result<(), InboundConnectionError> {
269 if !self.ip_filter.is_allowed(&addr) {
271 trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
272 return Err(InboundConnectionError::IpBanned)
273 }
274
275 if self.ban_list.is_banned_ip(&addr) {
276 return Err(InboundConnectionError::IpBanned)
277 }
278
279 if !self.connection_info.has_in_capacity() {
281 if self.trusted_peer_ids.is_empty() {
282 return Err(InboundConnectionError::ExceedsCapacity)
285 }
286
287 let num_idle_trusted_peers = self.num_idle_trusted_peers();
291 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
292 let max_inbound =
294 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
295 if self.connection_info.num_pending_in < max_inbound {
296 self.connection_info.inc_pending_in();
297 return Ok(())
298 }
299 }
300
301 return Err(InboundConnectionError::ExceedsCapacity)
303 }
304
305 if !self.connection_info.has_in_pending_capacity() {
307 return Err(InboundConnectionError::ExceedsCapacity)
308 }
309
310 self.throttle_incoming_ip(addr);
312
313 self.connection_info.inc_pending_in();
314 Ok(())
315 }
316
317 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
320 self.connection_info.decr_pending_in();
321 }
322
323 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
325 self.connection_info.decr_pending_in()
326 }
327
328 pub(crate) fn on_incoming_pending_session_dropped(
330 &mut self,
331 remote_addr: SocketAddr,
332 err: &PendingSessionHandshakeError,
333 ) {
334 if err.is_fatal_protocol_error() {
335 self.ban_ip(remote_addr.ip());
336
337 if err.merits_discovery_ban() {
338 self.queued_actions
339 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
340 }
341 }
342
343 self.connection_info.decr_pending_in();
344 }
345
346 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
353 self.connection_info.decr_pending_in();
354
355 if self.ban_list.is_banned_peer(&peer_id) {
358 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
359 return
360 }
361
362 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
364 if self.trusted_nodes_only && !is_trusted {
365 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
366 return
367 }
368
369 self.tick();
371
372 match self.peers.entry(peer_id) {
373 Entry::Occupied(mut entry) => {
374 let peer = entry.get_mut();
375 if peer.is_banned() {
376 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
377 return
378 }
379 if peer.state.is_pending_out() {
382 self.connection_info.decr_state(peer.state);
383 }
384
385 peer.state = PeerConnectionState::In;
386
387 is_trusted = is_trusted || peer.is_trusted();
388 }
389 Entry::Vacant(entry) => {
390 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
393 peer.remove_after_disconnect = true;
394 entry.insert(peer);
395 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
396 }
397 }
398
399 let has_in_capacity = self.connection_info.has_in_capacity();
400 self.connection_info.inc_in();
402
403 if !is_trusted && !has_in_capacity {
405 self.queued_actions.push_back(PeerAction::Disconnect {
406 peer_id,
407 reason: Some(DisconnectReason::TooManyPeers),
408 });
409 }
410 }
411
412 fn ban_peer(&mut self, peer_id: PeerId) {
414 let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
415 (peer.is_trusted() || peer.is_static())
416 {
417 self.backoff_durations.low / 2
420 } else {
421 self.ban_duration
422 };
423
424 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
425 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
426 }
427
428 fn ban_ip(&mut self, ip: IpAddr) {
430 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
431 }
432
433 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
435 self.ban_list
436 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
437 }
438
439 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
441 trace!(target: "net::peers", ?peer_id, "backing off");
442
443 if let Some(peer) = self.peers.get_mut(&peer_id) {
444 peer.backed_off = true;
445 self.backed_off_peers.insert(peer_id, until);
446 }
447 }
448
449 fn unban_peer(&mut self, peer_id: PeerId) {
451 self.ban_list.unban_peer(&peer_id);
452 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
453 }
454
455 fn tick(&mut self) {
460 let now = Instant::now();
461 let secs_since_last_tick =
465 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
466 self.last_tick = now;
467
468 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
470 if peer.1.reputation < DEFAULT_REPUTATION {
473 peer.1.reputation += secs_since_last_tick;
474 }
475 }
476 }
477
478 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
480 self.peers.get(peer_id).map(|peer| peer.reputation)
481 }
482
483 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
489 trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
490
491 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
492 if rep.is_reset() {
494 peer.reset_reputation()
495 } else {
496 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
497 if peer.is_trusted() || peer.is_static() {
498 if matches!(
500 rep,
501 ReputationChangeKind::Dropped |
502 ReputationChangeKind::BadAnnouncement |
503 ReputationChangeKind::Timeout |
504 ReputationChangeKind::AlreadySeenTransaction
505 ) {
506 return
507 }
508
509 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
511 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
513 }
514 }
515 peer.apply_reputation(reputation_change, rep)
516 }
517 } else {
518 return
519 };
520
521 match outcome {
522 ReputationChangeOutcome::None => {}
523 ReputationChangeOutcome::Ban => {
524 self.ban_peer(*peer_id);
525 }
526 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
527 ReputationChangeOutcome::DisconnectAndBan => {
528 self.queued_actions.push_back(PeerAction::Disconnect {
529 peer_id: *peer_id,
530 reason: Some(DisconnectReason::DisconnectRequested),
531 });
532 self.ban_peer(*peer_id);
533 }
534 }
535 }
536
537 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
539 if let Some(peer) = self.peers.get_mut(peer_id) {
540 self.connection_info.decr_state(peer.state);
541 peer.state = PeerConnectionState::Idle;
542 }
543 }
544
545 pub(crate) fn on_outgoing_pending_session_dropped(
548 &mut self,
549 remote_addr: &SocketAddr,
550 peer_id: &PeerId,
551 err: &PendingSessionHandshakeError,
552 ) {
553 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
554 }
555
556 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
558 match self.peers.entry(peer_id) {
559 Entry::Occupied(mut entry) => {
560 trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
561 self.connection_info.decr_state(entry.get().state);
562
563 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
564 entry.remove();
566 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
567 } else {
568 let peer = entry.get_mut();
569 peer.severe_backoff_counter = 0;
573 peer.state = PeerConnectionState::Idle;
574
575 peer.backed_off = true;
580 self.backed_off_peers.insert(
581 peer_id,
582 std::time::Instant::now() + self.incoming_ip_throttle_duration,
583 );
584 trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
585 }
586 }
587 Entry::Vacant(_) => return,
588 }
589
590 self.fill_outbound_slots();
591 }
592
593 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
595 if let Some(peer) = self.peers.get_mut(&peer_id) {
596 trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
597 self.connection_info.decr_state(peer.state);
598 self.connection_info.inc_out();
599 peer.state = PeerConnectionState::Out;
600 }
601 }
602
603 pub(crate) fn on_active_session_dropped(
608 &mut self,
609 remote_addr: &SocketAddr,
610 peer_id: &PeerId,
611 err: &EthStreamError,
612 ) {
613 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
614 }
615
616 pub(crate) fn on_outgoing_connection_failure(
619 &mut self,
620 remote_addr: &SocketAddr,
621 peer_id: &PeerId,
622 err: &io::Error,
623 ) {
624 if let Some(peer) = self.peers.get(peer_id) {
628 if peer.state.is_incoming() {
629 return
631 }
632
633 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
634 self.trusted_peers_resolver.interval.reset_immediately();
637 }
638 }
639
640 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
641 }
642
643 fn on_connection_failure(
644 &mut self,
645 remote_addr: &SocketAddr,
646 peer_id: &PeerId,
647 err: impl SessionError,
648 reputation_change: ReputationChangeKind,
649 ) {
650 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
651
652 if err.is_fatal_protocol_error() {
653 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
654 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
657 self.connection_info.decr_state(entry.get().state);
658 if entry.get().is_trusted() {
660 entry.get_mut().state = PeerConnectionState::Idle;
661 } else {
662 entry.remove();
663 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
664 if err.merits_discovery_ban() {
666 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
667 peer_id: *peer_id,
668 ip_addr: remote_addr.ip(),
669 })
670 }
671 }
672 }
673
674 self.ban_peer(*peer_id);
676 } else {
677 let mut backoff_until = None;
678 let mut remove_peer = false;
679
680 if let Some(peer) = self.peers.get_mut(peer_id) {
681 if let Some(kind) = err.should_backoff() {
682 if peer.is_trusted() || peer.is_static() {
683 let backoff = self.backoff_durations.low;
689 backoff_until = Some(std::time::Instant::now() + backoff);
690 trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
691 } else {
692 if kind.is_severe() {
694 peer.severe_backoff_counter =
695 peer.severe_backoff_counter.saturating_add(1);
696 }
697 trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
698
699 let backoff_time =
700 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
701
702 backoff_until = Some(backoff_time);
706 }
707 } else {
708 let reputation_change = self.reputation_weights.change(reputation_change);
710 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
711 };
712
713 self.connection_info.decr_state(peer.state);
714 peer.state = PeerConnectionState::Idle;
715
716 if peer.severe_backoff_counter > self.max_backoff_count &&
717 !peer.is_trusted() &&
718 !peer.is_static()
719 {
720 remove_peer = true;
723 }
724 }
725
726 if remove_peer {
728 trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
729 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
730 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
731 } else if let Some(backoff_until) = backoff_until {
732 self.backoff_peer_until(*peer_id, backoff_until);
734 }
735 }
736
737 self.fill_outbound_slots();
738 }
739
740 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
746 match direction {
747 Direction::Incoming => {
748 self.connection_info.decr_pending_in();
750 }
751 Direction::Outgoing(_) => {
752 }
755 }
756 }
757
758 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
762 self.add_peer_kind(peer_id, None, addr, fork_id)
763 }
764
765 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
767 self.trusted_peer_ids.insert(peer_id);
768 if let Some(peer) = self.peers.get_mut(&peer_id) {
769 peer.kind = PeerKind::Trusted;
770 }
771 }
772
773 #[cfg_attr(not(test), expect(dead_code))]
777 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
778 self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
779 }
780
781 pub(crate) fn add_peer_kind(
786 &mut self,
787 peer_id: PeerId,
788 kind: Option<PeerKind>,
789 addr: PeerAddr,
790 fork_id: Option<ForkId>,
791 ) {
792 let ip_addr = addr.tcp().ip();
793
794 if !self.ip_filter.is_allowed(&ip_addr) {
796 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
797 return
798 }
799
800 if self.ban_list.is_banned(&peer_id, &ip_addr) {
801 return
802 }
803
804 match self.peers.entry(peer_id) {
805 Entry::Occupied(mut entry) => {
806 let peer = entry.get_mut();
807 peer.fork_id = fork_id.map(Box::new);
808 peer.addr = addr;
809
810 if let Some(kind) = kind {
811 peer.kind = kind;
812 }
813
814 if peer.state.is_incoming() {
815 peer.remove_after_disconnect = false;
819 }
820 }
821 Entry::Vacant(entry) => {
822 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
823 let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
824 peer.fork_id = fork_id.map(Box::new);
825 entry.insert(peer);
826 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
827 }
828 }
829
830 if kind.filter(|kind| kind.is_trusted()).is_some() {
831 self.trusted_peer_ids.insert(peer_id);
833 }
834 }
835
836 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
838 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
839 if entry.get().is_trusted() {
840 return
841 }
842 let mut peer = entry.remove();
843
844 trace!(target: "net::peers", ?peer_id, "remove discovered node");
845 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
846
847 if peer.state.is_connected() {
848 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
849 peer.remove_after_disconnect = true;
854 peer.state.disconnect();
855 self.peers.insert(peer_id, peer);
856 self.queued_actions.push_back(PeerAction::Disconnect {
857 peer_id,
858 reason: Some(DisconnectReason::DisconnectRequested),
859 })
860 }
861 }
862
863 #[cfg_attr(not(test), expect(dead_code))]
866 pub(crate) fn add_and_connect(
867 &mut self,
868 peer_id: PeerId,
869 addr: PeerAddr,
870 fork_id: Option<ForkId>,
871 ) {
872 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
873 }
874
875 pub(crate) fn add_and_connect_kind(
879 &mut self,
880 peer_id: PeerId,
881 kind: PeerKind,
882 addr: PeerAddr,
883 fork_id: Option<ForkId>,
884 ) {
885 let ip_addr = addr.tcp().ip();
886
887 if !self.ip_filter.is_allowed(&ip_addr) {
889 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
890 return
891 }
892
893 if self.ban_list.is_banned(&peer_id, &ip_addr) {
894 return
895 }
896
897 match self.peers.entry(peer_id) {
898 Entry::Occupied(mut entry) => {
899 let peer = entry.get_mut();
900 peer.kind = kind;
901 peer.fork_id = fork_id.map(Box::new);
902 peer.addr = addr;
903
904 if peer.state == PeerConnectionState::Idle {
905 peer.state = PeerConnectionState::PendingOut;
907 self.connection_info.inc_pending_out();
908 self.queued_actions
909 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
910 }
911 }
912 Entry::Vacant(entry) => {
913 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
914 let mut peer = Peer::with_kind(addr, kind);
915 peer.state = PeerConnectionState::PendingOut;
916 peer.fork_id = fork_id.map(Box::new);
917 entry.insert(peer);
918 self.connection_info.inc_pending_out();
919 self.queued_actions
920 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
921 }
922 }
923
924 if kind.is_trusted() {
925 self.trusted_peer_ids.insert(peer_id);
926 }
927 }
928
929 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
931 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
932 if !entry.get().is_trusted() {
933 return
934 }
935
936 let peer = entry.get_mut();
937 peer.kind = PeerKind::Basic;
938
939 self.trusted_peer_ids.remove(&peer_id);
940 }
941
942 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
952 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
953 !peer.is_backed_off() &&
954 !peer.is_banned() &&
955 peer.state.is_unconnected() &&
956 (!self.trusted_nodes_only || peer.is_trusted())
957 });
958
959 let mut best_peer = unconnected.next()?;
961
962 if best_peer.1.is_trusted() || best_peer.1.is_static() {
963 return Some((*best_peer.0, best_peer.1))
964 }
965
966 for maybe_better in unconnected {
967 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
969 return Some((*maybe_better.0, maybe_better.1))
970 }
971
972 if maybe_better.1.reputation > best_peer.1.reputation {
974 best_peer = maybe_better;
975 }
976 }
977 Some((*best_peer.0, best_peer.1))
978 }
979
980 fn fill_outbound_slots(&mut self) {
986 self.tick();
987
988 if !self.net_connection_state.is_active() {
989 return
991 }
992
993 while self.connection_info.has_out_capacity() {
995 let action = {
996 let (peer_id, peer) = match self.best_unconnected() {
997 Some(peer) => peer,
998 _ => break,
999 };
1000
1001 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
1002
1003 peer.state = PeerConnectionState::PendingOut;
1004 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
1005 };
1006
1007 self.connection_info.inc_pending_out();
1008
1009 self.queued_actions.push_back(action);
1010 }
1011 }
1012
1013 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1014 if let Some(peer) = self.peers.get_mut(&peer_id) {
1015 let new_addr = PeerAddr::new_with_ports(
1016 new_record.address,
1017 new_record.tcp_port,
1018 Some(new_record.udp_port),
1019 );
1020
1021 if peer.addr != new_addr {
1022 peer.addr = new_addr;
1023 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1024 }
1025 }
1026 }
1027
1028 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1030 self.net_connection_state = state;
1031 }
1032
1033 pub const fn connection_state(&self) -> &NetworkConnectionState {
1035 &self.net_connection_state
1036 }
1037
1038 pub const fn on_shutdown(&mut self) {
1040 self.net_connection_state = NetworkConnectionState::ShuttingDown;
1041 }
1042
1043 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1048 loop {
1049 if let Some(action) = self.queued_actions.pop_front() {
1051 return Poll::Ready(action)
1052 }
1053
1054 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1055 match cmd {
1056 PeerCommand::Add(peer_id, addr) => {
1057 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1058 }
1059 PeerCommand::Remove(peer) => self.remove_peer(peer),
1060 PeerCommand::ReputationChange(peer_id, rep) => {
1061 self.apply_reputation_change(&peer_id, rep)
1062 }
1063 PeerCommand::GetPeer(peer, tx) => {
1064 let _ = tx.send(self.peers.get(&peer).cloned());
1065 }
1066 PeerCommand::GetPeers(tx) => {
1067 let _ = tx.send(self.iter_peers().collect());
1068 }
1069 }
1070 }
1071
1072 if self.release_interval.poll_tick(cx).is_ready() {
1073 let now = std::time::Instant::now();
1074 let (_, unbanned_peers) = self.ban_list.evict(now);
1075
1076 for peer_id in unbanned_peers {
1077 if let Some(peer) = self.peers.get_mut(&peer_id) {
1078 peer.unban();
1079 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1080 }
1081 }
1082
1083 self.backed_off_peers.retain(|peer_id, until| {
1086 if now > *until {
1087 if let Some(peer) = self.peers.get_mut(peer_id) {
1088 peer.backed_off = false;
1089 }
1090 return false
1091 }
1092 true
1093 })
1094 }
1095
1096 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1097 self.fill_outbound_slots();
1098 }
1099
1100 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1101 self.on_resolved_peer(peer_id, new_record);
1102 }
1103
1104 if self.queued_actions.is_empty() {
1105 return Poll::Pending
1106 }
1107 }
1108 }
1109}
1110
1111impl Default for PeersManager {
1112 fn default() -> Self {
1113 Self::new(Default::default())
1114 }
1115}
1116
1117#[derive(Debug, Clone, PartialEq, Eq, Default)]
1119pub struct ConnectionInfo {
1120 num_outbound: usize,
1122 num_pending_out: usize,
1124 num_inbound: usize,
1126 num_pending_in: usize,
1128 config: ConnectionsConfig,
1130}
1131
1132impl ConnectionInfo {
1135 const fn new(config: ConnectionsConfig) -> Self {
1137 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1138 }
1139
1140 const fn has_out_capacity(&self) -> bool {
1142 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1143 self.num_outbound < self.config.max_outbound
1144 }
1145
1146 const fn has_in_capacity(&self) -> bool {
1148 self.num_inbound < self.config.max_inbound
1149 }
1150
1151 const fn has_in_pending_capacity(&self) -> bool {
1153 self.num_pending_in < self.config.max_inbound
1154 }
1155
1156 const fn decr_state(&mut self, state: PeerConnectionState) {
1157 match state {
1158 PeerConnectionState::Idle => {}
1159 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1160 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1161 PeerConnectionState::PendingOut => self.decr_pending_out(),
1162 }
1163 }
1164
1165 const fn decr_out(&mut self) {
1166 self.num_outbound -= 1;
1167 }
1168
1169 const fn inc_out(&mut self) {
1170 self.num_outbound += 1;
1171 }
1172
1173 const fn inc_pending_out(&mut self) {
1174 self.num_pending_out += 1;
1175 }
1176
1177 const fn inc_in(&mut self) {
1178 self.num_inbound += 1;
1179 }
1180
1181 const fn inc_pending_in(&mut self) {
1182 self.num_pending_in += 1;
1183 }
1184
1185 const fn decr_in(&mut self) {
1186 self.num_inbound -= 1;
1187 }
1188
1189 const fn decr_pending_out(&mut self) {
1190 self.num_pending_out -= 1;
1191 }
1192
1193 const fn decr_pending_in(&mut self) {
1194 self.num_pending_in -= 1;
1195 }
1196}
1197
1198#[derive(Debug)]
1200pub enum PeerAction {
1201 Connect {
1203 peer_id: PeerId,
1205 remote_addr: SocketAddr,
1207 },
1208 Disconnect {
1210 peer_id: PeerId,
1212 reason: Option<DisconnectReason>,
1214 },
1215 DisconnectBannedIncoming {
1218 peer_id: PeerId,
1220 },
1221 DisconnectUntrustedIncoming {
1223 peer_id: PeerId,
1225 },
1226 DiscoveryBanPeerId {
1228 peer_id: PeerId,
1230 ip_addr: IpAddr,
1232 },
1233 DiscoveryBanIp {
1235 ip_addr: IpAddr,
1237 },
1238 BanPeer {
1240 peer_id: PeerId,
1242 },
1243 UnBanPeer {
1245 peer_id: PeerId,
1247 },
1248 PeerAdded(PeerId),
1250 PeerRemoved(PeerId),
1252}
1253
1254#[derive(Debug, Error, PartialEq, Eq)]
1256pub enum InboundConnectionError {
1257 IpBanned,
1259 ExceedsCapacity,
1261}
1262
1263impl Display for InboundConnectionError {
1264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1265 write!(f, "{self:?}")
1266 }
1267}
1268
1269#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1271pub enum BackoffReason {
1272 TooManyPeers,
1274 GracefulClose,
1276 ConnectionError,
1278}
1279
1280impl BackoffReason {
1281 pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
1283 match reason {
1284 Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
1285 _ => Self::ConnectionError,
1286 }
1287 }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292 use alloy_primitives::B512;
1293 use reth_eth_wire::{
1294 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1295 DisconnectReason,
1296 };
1297 use reth_net_banlist::BanList;
1298 use reth_network_api::Direction;
1299 use reth_network_peers::{PeerId, TrustedPeer};
1300 use reth_network_types::{
1301 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1302 };
1303 use std::{
1304 future::{poll_fn, Future},
1305 io,
1306 net::{IpAddr, Ipv4Addr, SocketAddr},
1307 pin::Pin,
1308 task::{Context, Poll},
1309 time::Duration,
1310 };
1311 use url::Host;
1312
1313 use super::PeersManager;
1314 use crate::{
1315 error::SessionError,
1316 peers::{
1317 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1318 PeerConnectionState,
1319 },
1320 session::PendingSessionHandshakeError,
1321 PeersConfig,
1322 };
1323
1324 struct PeerActionFuture<'a> {
1325 peers: &'a mut PeersManager,
1326 }
1327
1328 impl Future for PeerActionFuture<'_> {
1329 type Output = PeerAction;
1330
1331 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1332 self.get_mut().peers.poll(cx)
1333 }
1334 }
1335
1336 macro_rules! event {
1337 ($peers:expr) => {
1338 PeerActionFuture { peers: &mut $peers }.await
1339 };
1340 }
1341
1342 #[tokio::test]
1343 async fn test_insert() {
1344 let peer = PeerId::random();
1345 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1346 let mut peers = PeersManager::default();
1347 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1348
1349 match event!(peers) {
1350 PeerAction::PeerAdded(peer_id) => {
1351 assert_eq!(peer_id, peer);
1352 }
1353 _ => unreachable!(),
1354 }
1355 match event!(peers) {
1356 PeerAction::Connect { peer_id, remote_addr } => {
1357 assert_eq!(peer_id, peer);
1358 assert_eq!(remote_addr, socket_addr);
1359 }
1360 _ => unreachable!(),
1361 }
1362
1363 let (record, _) = peers.peer_by_id(peer).unwrap();
1364 assert_eq!(record.tcp_addr(), socket_addr);
1365 assert_eq!(record.udp_addr(), socket_addr);
1366 }
1367
1368 #[tokio::test]
1369 async fn test_insert_udp() {
1370 let peer = PeerId::random();
1371 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1372 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1373 let mut peers = PeersManager::default();
1374 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1375
1376 match event!(peers) {
1377 PeerAction::PeerAdded(peer_id) => {
1378 assert_eq!(peer_id, peer);
1379 }
1380 _ => unreachable!(),
1381 }
1382 match event!(peers) {
1383 PeerAction::Connect { peer_id, remote_addr } => {
1384 assert_eq!(peer_id, peer);
1385 assert_eq!(remote_addr, tcp_addr);
1386 }
1387 _ => unreachable!(),
1388 }
1389
1390 let (record, _) = peers.peer_by_id(peer).unwrap();
1391 assert_eq!(record.tcp_addr(), tcp_addr);
1392 assert_eq!(record.udp_addr(), udp_addr);
1393 }
1394
1395 #[tokio::test]
1396 async fn test_ban() {
1397 let peer = PeerId::random();
1398 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1399 let mut peers = PeersManager::default();
1400 peers.ban_peer(peer);
1401 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1402
1403 match event!(peers) {
1404 PeerAction::BanPeer { peer_id } => {
1405 assert_eq!(peer_id, peer);
1406 }
1407 _ => unreachable!(),
1408 }
1409
1410 poll_fn(|cx| {
1411 assert!(peers.poll(cx).is_pending());
1412 Poll::Ready(())
1413 })
1414 .await;
1415 }
1416
1417 #[tokio::test]
1418 async fn test_unban() {
1419 let peer = PeerId::random();
1420 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1421 let mut peers = PeersManager::default();
1422 peers.ban_peer(peer);
1423 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1424
1425 match event!(peers) {
1426 PeerAction::BanPeer { peer_id } => {
1427 assert_eq!(peer_id, peer);
1428 }
1429 _ => unreachable!(),
1430 }
1431
1432 poll_fn(|cx| {
1433 assert!(peers.poll(cx).is_pending());
1434 Poll::Ready(())
1435 })
1436 .await;
1437
1438 peers.unban_peer(peer);
1439
1440 match event!(peers) {
1441 PeerAction::UnBanPeer { peer_id } => {
1442 assert_eq!(peer_id, peer);
1443 }
1444 _ => unreachable!(),
1445 }
1446
1447 poll_fn(|cx| {
1448 assert!(peers.poll(cx).is_pending());
1449 Poll::Ready(())
1450 })
1451 .await;
1452 }
1453
1454 #[tokio::test]
1455 async fn test_backoff_on_busy() {
1456 let peer = PeerId::random();
1457 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1458
1459 let mut peers = PeersManager::new(PeersConfig::test());
1460 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1461
1462 match event!(peers) {
1463 PeerAction::PeerAdded(peer_id) => {
1464 assert_eq!(peer_id, peer);
1465 }
1466 _ => unreachable!(),
1467 }
1468 match event!(peers) {
1469 PeerAction::Connect { peer_id, .. } => {
1470 assert_eq!(peer_id, peer);
1471 }
1472 _ => unreachable!(),
1473 }
1474
1475 poll_fn(|cx| {
1476 assert!(peers.poll(cx).is_pending());
1477 Poll::Ready(())
1478 })
1479 .await;
1480
1481 peers.on_active_session_dropped(
1482 &socket_addr,
1483 &peer,
1484 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1485 DisconnectReason::TooManyPeers,
1486 )),
1487 );
1488
1489 poll_fn(|cx| {
1490 assert!(peers.poll(cx).is_pending());
1491 Poll::Ready(())
1492 })
1493 .await;
1494
1495 assert!(peers.backed_off_peers.contains_key(&peer));
1496 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1497
1498 tokio::time::sleep(peers.backoff_durations.low).await;
1499
1500 match event!(peers) {
1501 PeerAction::Connect { peer_id, .. } => {
1502 assert_eq!(peer_id, peer);
1503 }
1504 _ => unreachable!(),
1505 }
1506
1507 assert!(!peers.backed_off_peers.contains_key(&peer));
1508 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1509 }
1510
1511 #[tokio::test]
1512 async fn test_backoff_on_no_response() {
1513 let peer = PeerId::random();
1514 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1515
1516 let backoff_durations = PeerBackoffDurations::test();
1517 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1518 let mut peers = PeersManager::new(config);
1519 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1520
1521 match event!(peers) {
1522 PeerAction::PeerAdded(peer_id) => {
1523 assert_eq!(peer_id, peer);
1524 }
1525 _ => unreachable!(),
1526 }
1527 match event!(peers) {
1528 PeerAction::Connect { peer_id, .. } => {
1529 assert_eq!(peer_id, peer);
1530 }
1531 _ => unreachable!(),
1532 }
1533
1534 poll_fn(|cx| {
1535 assert!(peers.poll(cx).is_pending());
1536 Poll::Ready(())
1537 })
1538 .await;
1539
1540 peers.on_outgoing_pending_session_dropped(
1541 &socket_addr,
1542 &peer,
1543 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1544 EthHandshakeError::NoResponse,
1545 )),
1546 );
1547
1548 poll_fn(|cx| {
1549 assert!(peers.poll(cx).is_pending());
1550 Poll::Ready(())
1551 })
1552 .await;
1553
1554 assert!(peers.backed_off_peers.contains_key(&peer));
1555 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1556
1557 tokio::time::sleep(backoff_durations.high).await;
1558
1559 match event!(peers) {
1560 PeerAction::Connect { peer_id, .. } => {
1561 assert_eq!(peer_id, peer);
1562 }
1563 _ => unreachable!(),
1564 }
1565
1566 assert!(!peers.backed_off_peers.contains_key(&peer));
1567 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1568 }
1569
1570 #[tokio::test]
1571 async fn test_low_backoff() {
1572 let peer = PeerId::random();
1573 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1574 let config = PeersConfig::test();
1575 let mut peers = PeersManager::new(config);
1576 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1577 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1578
1579 let backoff_timestamp = peers
1580 .backoff_durations
1581 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1582
1583 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1584 assert!(backoff_timestamp <= expected);
1585 }
1586
1587 #[tokio::test]
1588 async fn test_multiple_backoff_calculations() {
1589 let peer = PeerId::random();
1590 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1591 let config = PeersConfig::default();
1592 let mut peers = PeersManager::new(config);
1593 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1594 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1595
1596 peer_struct.severe_backoff_counter = 1;
1598
1599 let now = std::time::Instant::now();
1600
1601 peer_struct.severe_backoff_counter += 1;
1603 let backoff_time = peers
1605 .backoff_durations
1606 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1607
1608 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1610
1611 assert!(backoff_time.duration_since(now) > backoff_duration);
1614 }
1615
1616 #[tokio::test]
1617 async fn test_ban_on_active_drop() {
1618 let peer = PeerId::random();
1619 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1620 let mut peers = PeersManager::default();
1621 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1622
1623 match event!(peers) {
1624 PeerAction::PeerAdded(peer_id) => {
1625 assert_eq!(peer_id, peer);
1626 }
1627 _ => unreachable!(),
1628 }
1629 match event!(peers) {
1630 PeerAction::Connect { peer_id, .. } => {
1631 assert_eq!(peer_id, peer);
1632 }
1633 _ => unreachable!(),
1634 }
1635
1636 poll_fn(|cx| {
1637 assert!(peers.poll(cx).is_pending());
1638 Poll::Ready(())
1639 })
1640 .await;
1641
1642 peers.on_active_session_dropped(
1643 &socket_addr,
1644 &peer,
1645 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1646 DisconnectReason::UselessPeer,
1647 )),
1648 );
1649
1650 match event!(peers) {
1651 PeerAction::PeerRemoved(peer_id) => {
1652 assert_eq!(peer_id, peer);
1653 }
1654 _ => unreachable!(),
1655 }
1656 match event!(peers) {
1657 PeerAction::BanPeer { peer_id } => {
1658 assert_eq!(peer_id, peer);
1659 }
1660 _ => unreachable!(),
1661 }
1662
1663 poll_fn(|cx| {
1664 assert!(peers.poll(cx).is_pending());
1665 Poll::Ready(())
1666 })
1667 .await;
1668
1669 assert!(!peers.peers.contains_key(&peer));
1670 }
1671
1672 #[tokio::test]
1673 async fn test_remove_on_max_backoff_count() {
1674 let peer = PeerId::random();
1675 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1676 let config = PeersConfig::test();
1677 let mut peers = PeersManager::new(config.clone());
1678 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1679 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1680
1681 peer_struct.severe_backoff_counter = config.max_backoff_count;
1683
1684 match event!(peers) {
1685 PeerAction::PeerAdded(peer_id) => {
1686 assert_eq!(peer_id, peer);
1687 }
1688 _ => unreachable!(),
1689 }
1690 match event!(peers) {
1691 PeerAction::Connect { peer_id, .. } => {
1692 assert_eq!(peer_id, peer);
1693 }
1694 _ => unreachable!(),
1695 }
1696
1697 poll_fn(|cx| {
1698 assert!(peers.poll(cx).is_pending());
1699 Poll::Ready(())
1700 })
1701 .await;
1702
1703 peers.on_outgoing_pending_session_dropped(
1704 &socket_addr,
1705 &peer,
1706 &PendingSessionHandshakeError::Eth(
1707 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1708 ),
1709 );
1710
1711 match event!(peers) {
1712 PeerAction::PeerRemoved(peer_id) => {
1713 assert_eq!(peer_id, peer);
1714 }
1715 _ => unreachable!(),
1716 }
1717
1718 poll_fn(|cx| {
1719 assert!(peers.poll(cx).is_pending());
1720 Poll::Ready(())
1721 })
1722 .await;
1723
1724 assert!(!peers.peers.contains_key(&peer));
1725 }
1726
1727 #[tokio::test]
1728 async fn test_ban_on_pending_drop() {
1729 let peer = PeerId::random();
1730 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1731 let mut peers = PeersManager::default();
1732 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1733
1734 match event!(peers) {
1735 PeerAction::PeerAdded(peer_id) => {
1736 assert_eq!(peer_id, peer);
1737 }
1738 _ => unreachable!(),
1739 }
1740 match event!(peers) {
1741 PeerAction::Connect { peer_id, .. } => {
1742 assert_eq!(peer_id, peer);
1743 }
1744 _ => unreachable!(),
1745 }
1746
1747 poll_fn(|cx| {
1748 assert!(peers.poll(cx).is_pending());
1749 Poll::Ready(())
1750 })
1751 .await;
1752
1753 peers.on_outgoing_pending_session_dropped(
1754 &socket_addr,
1755 &peer,
1756 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1757 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1758 )),
1759 );
1760
1761 match event!(peers) {
1762 PeerAction::PeerRemoved(peer_id) => {
1763 assert_eq!(peer_id, peer);
1764 }
1765 _ => unreachable!(),
1766 }
1767 match event!(peers) {
1768 PeerAction::BanPeer { peer_id } => {
1769 assert_eq!(peer_id, peer);
1770 }
1771 _ => unreachable!(),
1772 }
1773
1774 poll_fn(|cx| {
1775 assert!(peers.poll(cx).is_pending());
1776 Poll::Ready(())
1777 })
1778 .await;
1779
1780 assert!(!peers.peers.contains_key(&peer));
1781 }
1782
1783 #[tokio::test]
1784 async fn test_internally_closed_incoming() {
1785 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1786 let mut peers = PeersManager::default();
1787
1788 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1789 assert_eq!(peers.connection_info.num_pending_in, 1);
1790 peers.on_incoming_pending_session_rejected_internally();
1791 assert_eq!(peers.connection_info.num_pending_in, 0);
1792 }
1793
1794 #[tokio::test]
1795 async fn test_reject_incoming_at_pending_capacity() {
1796 let mut peers = PeersManager::default();
1797
1798 for count in 1..=peers.connection_info.config.max_inbound {
1799 let socket_addr =
1800 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1801 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1802 assert_eq!(peers.connection_info.num_pending_in, count);
1803 }
1804 assert!(peers.connection_info.has_in_capacity());
1805 assert!(!peers.connection_info.has_in_pending_capacity());
1806
1807 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1808 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1809 }
1810
1811 #[tokio::test]
1812 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1813 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1814 let trusted = PeerId::random();
1815 peers.add_trusted_peer_id(trusted);
1816
1817 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1819 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1820 peers.on_incoming_session_established(trusted, addr);
1821
1822 match event!(peers) {
1823 PeerAction::PeerAdded(id) => {
1824 assert_eq!(id, trusted);
1825 }
1826 _ => unreachable!(),
1827 }
1828
1829 let mut connected_untrusted_peer_ids = Vec::new();
1831 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1832 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1833 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1834 let peer_id = PeerId::random();
1835 peers.on_incoming_session_established(peer_id, addr);
1836 connected_untrusted_peer_ids.push(peer_id);
1837
1838 match event!(peers) {
1839 PeerAction::PeerAdded(id) => {
1840 assert_eq!(id, peer_id);
1841 }
1842 _ => unreachable!(),
1843 }
1844 }
1845
1846 let mut pending_addrs = Vec::new();
1847
1848 for i in 0..2 {
1850 let socket_addr =
1851 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1852 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1853
1854 pending_addrs.push(socket_addr);
1855 }
1856
1857 assert_eq!(peers.connection_info.num_pending_in, 2);
1858
1859 for i in 0..2 {
1861 let socket_addr =
1862 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1863 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1864 }
1865
1866 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1867 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1868 DisconnectReason::UselessPeer,
1869 )),
1870 ));
1871
1872 for pending_addr in pending_addrs {
1874 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1875 }
1876
1877 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1878
1879 println!(
1880 "num_inbound: {}, has_in_capacity: {}",
1881 peers.connection_info.num_inbound,
1882 peers.connection_info.has_in_capacity()
1883 );
1884
1885 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1887
1888 println!(
1889 "num_inbound: {}, has_in_capacity: {}",
1890 peers.connection_info.num_inbound,
1891 peers.connection_info.has_in_capacity()
1892 );
1893
1894 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1895 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1896 }
1897
1898 #[tokio::test]
1899 async fn test_closed_incoming() {
1900 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1901 let mut peers = PeersManager::default();
1902
1903 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1904 assert_eq!(peers.connection_info.num_pending_in, 1);
1905 peers.on_incoming_pending_session_gracefully_closed();
1906 assert_eq!(peers.connection_info.num_pending_in, 0);
1907 }
1908
1909 #[tokio::test]
1910 async fn test_dropped_incoming() {
1911 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1912 let ban_duration = Duration::from_millis(500);
1913 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1914 let mut peers = PeersManager::new(config);
1915
1916 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1917 assert_eq!(peers.connection_info.num_pending_in, 1);
1918 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1919 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1920 DisconnectReason::UselessPeer,
1921 )),
1922 ));
1923
1924 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1925 assert_eq!(peers.connection_info.num_pending_in, 0);
1926 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1927
1928 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1929
1930 tokio::time::sleep(ban_duration).await;
1932
1933 poll_fn(|cx| {
1934 let _ = peers.poll(cx);
1935 Poll::Ready(())
1936 })
1937 .await;
1938
1939 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1940 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1941 }
1942
1943 #[tokio::test]
1944 async fn test_reputation_change_connected() {
1945 let peer = PeerId::random();
1946 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1947 let mut peers = PeersManager::default();
1948 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1949
1950 match event!(peers) {
1951 PeerAction::PeerAdded(peer_id) => {
1952 assert_eq!(peer_id, peer);
1953 }
1954 _ => unreachable!(),
1955 }
1956 match event!(peers) {
1957 PeerAction::Connect { peer_id, remote_addr } => {
1958 assert_eq!(peer_id, peer);
1959 assert_eq!(remote_addr, socket_addr);
1960 }
1961 _ => unreachable!(),
1962 }
1963
1964 let p = peers.peers.get_mut(&peer).unwrap();
1965 assert_eq!(p.state, PeerConnectionState::PendingOut);
1966
1967 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1968
1969 let p = peers.peers.get(&peer).unwrap();
1970 assert_eq!(p.state, PeerConnectionState::PendingOut);
1971 assert!(p.is_banned());
1972
1973 peers.on_active_session_gracefully_closed(peer);
1974
1975 let p = peers.peers.get(&peer).unwrap();
1976 assert_eq!(p.state, PeerConnectionState::Idle);
1977 assert!(p.is_banned());
1978
1979 match event!(peers) {
1980 PeerAction::Disconnect { peer_id, .. } => {
1981 assert_eq!(peer_id, peer);
1982 }
1983 _ => unreachable!(),
1984 }
1985 }
1986
1987 #[tokio::test]
1988 async fn retain_trusted_status() {
1989 let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1990 let trusted = PeerId::random();
1991 let mut peers =
1992 PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
1993 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
1994 tcp_port: 8008,
1995 udp_port: 8008,
1996 id: trusted,
1997 }]));
1998 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1999 peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
2000 assert!(peers.peers.get(&trusted).unwrap().is_trusted());
2001 }
2002
2003 #[tokio::test]
2004 async fn accept_incoming_trusted_unknown_peer_address() {
2005 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2006 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2007 let trusted = PeerId::random();
2009 peers.add_trusted_peer_id(trusted);
2010
2011 for i in 0..peers.connection_info.config.max_inbound {
2013 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
2014 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2015 let peer_id = PeerId::random();
2016 peers.on_incoming_session_established(peer_id, addr);
2017
2018 match event!(peers) {
2019 PeerAction::PeerAdded(id) => {
2020 assert_eq!(id, peer_id);
2021 }
2022 _ => unreachable!(),
2023 }
2024 }
2025
2026 let untrusted = PeerId::random();
2028 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2029 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2030 peers.on_incoming_session_established(untrusted, socket_addr);
2031
2032 match event!(peers) {
2033 PeerAction::PeerAdded(id) => {
2034 assert_eq!(id, untrusted);
2035 }
2036 _ => unreachable!(),
2037 }
2038
2039 match event!(peers) {
2040 PeerAction::Disconnect { peer_id, reason } => {
2041 assert_eq!(peer_id, untrusted);
2042 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2043 }
2044 _ => unreachable!(),
2045 }
2046
2047 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2048 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2049 peers.on_incoming_session_established(trusted, socket_addr);
2050
2051 match event!(peers) {
2052 PeerAction::PeerAdded(id) => {
2053 assert_eq!(id, trusted);
2054 }
2055 _ => unreachable!(),
2056 }
2057
2058 poll_fn(|cx| {
2059 assert!(peers.poll(cx).is_pending());
2060 Poll::Ready(())
2061 })
2062 .await;
2063
2064 let peer = peers.peers.get(&trusted).unwrap();
2065 assert_eq!(peer.state, PeerConnectionState::In);
2066 }
2067
2068 #[tokio::test]
2069 async fn test_already_connected() {
2070 let peer = PeerId::random();
2071 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2072 let mut peers = PeersManager::default();
2073
2074 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2076 assert_eq!(peers.connection_info.num_pending_in, 1);
2077
2078 peers.on_incoming_session_established(peer, socket_addr);
2081 let p = peers.peers.get_mut(&peer).expect("peer not found");
2082 assert_eq!(p.addr.tcp(), socket_addr);
2083 assert_eq!(peers.connection_info.num_pending_in, 0);
2084 assert_eq!(peers.connection_info.num_inbound, 1);
2085
2086 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2089 assert_eq!(peers.connection_info.num_pending_in, 1);
2090
2091 peers.on_already_connected(Direction::Incoming);
2095
2096 let p = peers.peers.get_mut(&peer).expect("peer not found");
2097 assert_eq!(p.addr.tcp(), socket_addr);
2098 assert_eq!(peers.connection_info.num_pending_in, 0);
2099 assert_eq!(peers.connection_info.num_inbound, 1);
2100 }
2101
2102 #[tokio::test]
2103 async fn test_reputation_change_trusted_peer() {
2104 let peer = PeerId::random();
2105 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2106 let mut peers = PeersManager::default();
2107 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2108
2109 match event!(peers) {
2110 PeerAction::PeerAdded(peer_id) => {
2111 assert_eq!(peer_id, peer);
2112 }
2113 _ => unreachable!(),
2114 }
2115 match event!(peers) {
2116 PeerAction::Connect { peer_id, remote_addr } => {
2117 assert_eq!(peer_id, peer);
2118 assert_eq!(remote_addr, socket_addr);
2119 }
2120 _ => unreachable!(),
2121 }
2122
2123 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2124 peers.on_active_outgoing_established(peer);
2125 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2126
2127 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2128
2129 {
2130 let p = peers.peers.get(&peer).unwrap();
2131 assert_eq!(p.state, PeerConnectionState::Out);
2132 assert!(!p.is_banned());
2134 }
2135
2136 loop {
2138 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2139
2140 let p = peers.peers.get(&peer).unwrap();
2141 if p.is_banned() {
2142 break
2143 }
2144 }
2145
2146 match event!(peers) {
2147 PeerAction::Disconnect { peer_id, .. } => {
2148 assert_eq!(peer_id, peer);
2149 }
2150 _ => unreachable!(),
2151 }
2152 }
2153
2154 #[tokio::test]
2155 async fn test_reputation_management() {
2156 let peer = PeerId::random();
2157 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2158 let mut peers = PeersManager::default();
2159 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2160 assert_eq!(peers.get_reputation(&peer), Some(0));
2161
2162 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2163 assert_eq!(peers.get_reputation(&peer), Some(1024));
2164
2165 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2166 assert_eq!(peers.get_reputation(&peer), Some(0));
2167 }
2168
2169 #[tokio::test]
2170 async fn test_remove_discovered_active() {
2171 let peer = PeerId::random();
2172 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2173 let mut peers = PeersManager::default();
2174 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2175
2176 match event!(peers) {
2177 PeerAction::PeerAdded(peer_id) => {
2178 assert_eq!(peer_id, peer);
2179 }
2180 _ => unreachable!(),
2181 }
2182 match event!(peers) {
2183 PeerAction::Connect { peer_id, remote_addr } => {
2184 assert_eq!(peer_id, peer);
2185 assert_eq!(remote_addr, socket_addr);
2186 }
2187 _ => unreachable!(),
2188 }
2189
2190 let p = peers.peers.get(&peer).unwrap();
2191 assert_eq!(p.state, PeerConnectionState::PendingOut);
2192
2193 peers.remove_peer(peer);
2194
2195 match event!(peers) {
2196 PeerAction::PeerRemoved(peer_id) => {
2197 assert_eq!(peer_id, peer);
2198 }
2199 _ => unreachable!(),
2200 }
2201 match event!(peers) {
2202 PeerAction::Disconnect { peer_id, .. } => {
2203 assert_eq!(peer_id, peer);
2204 }
2205 _ => unreachable!(),
2206 }
2207
2208 let p = peers.peers.get(&peer).unwrap();
2209 assert_eq!(p.state, PeerConnectionState::PendingOut);
2210
2211 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2212 let p = peers.peers.get(&peer).unwrap();
2213 assert_eq!(p.state, PeerConnectionState::PendingOut);
2214
2215 peers.on_active_session_gracefully_closed(peer);
2216 assert!(!peers.peers.contains_key(&peer));
2217 }
2218
2219 #[tokio::test]
2220 async fn test_fatal_outgoing_connection_error_trusted() {
2221 let peer = PeerId::random();
2222 let config = PeersConfig::test()
2223 .with_trusted_nodes(vec![TrustedPeer {
2224 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2225 tcp_port: 8008,
2226 udp_port: 8008,
2227 id: peer,
2228 }])
2229 .with_trusted_nodes_only(true);
2230 let mut peers = PeersManager::new(config);
2231 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2232
2233 match event!(peers) {
2234 PeerAction::Connect { peer_id, remote_addr } => {
2235 assert_eq!(peer_id, peer);
2236 assert_eq!(remote_addr, socket_addr);
2237 }
2238 _ => unreachable!(),
2239 }
2240
2241 let p = peers.peers.get(&peer).unwrap();
2242 assert_eq!(p.state, PeerConnectionState::PendingOut);
2243
2244 assert_eq!(peers.num_outbound_connections(), 0);
2245
2246 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2247 EthHandshakeError::NonStatusMessageInHandshake,
2248 ));
2249 assert!(err.is_fatal_protocol_error());
2250
2251 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2252 assert_eq!(peers.num_outbound_connections(), 0);
2253
2254 match event!(peers) {
2256 PeerAction::BanPeer { peer_id } => {
2257 assert_eq!(peer_id, peer);
2258 }
2259 err => unreachable!("{err:?}"),
2260 }
2261
2262 assert!(peers.peers.contains_key(&peer));
2264
2265 tokio::time::sleep(peers.backoff_durations.medium).await;
2267
2268 match event!(peers) {
2269 PeerAction::Connect { peer_id, remote_addr } => {
2270 assert_eq!(peer_id, peer);
2271 assert_eq!(remote_addr, socket_addr);
2272 }
2273 err => unreachable!("{err:?}"),
2274 }
2275 }
2276
2277 #[tokio::test]
2278 async fn test_outgoing_connection_error() {
2279 let peer = PeerId::random();
2280 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2281 let mut peers = PeersManager::default();
2282 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2283
2284 match event!(peers) {
2285 PeerAction::PeerAdded(peer_id) => {
2286 assert_eq!(peer_id, peer);
2287 }
2288 _ => unreachable!(),
2289 }
2290 match event!(peers) {
2291 PeerAction::Connect { peer_id, remote_addr } => {
2292 assert_eq!(peer_id, peer);
2293 assert_eq!(remote_addr, socket_addr);
2294 }
2295 _ => unreachable!(),
2296 }
2297
2298 let p = peers.peers.get(&peer).unwrap();
2299 assert_eq!(p.state, PeerConnectionState::PendingOut);
2300
2301 assert_eq!(peers.num_outbound_connections(), 0);
2302
2303 peers.on_outgoing_connection_failure(
2304 &socket_addr,
2305 &peer,
2306 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2307 );
2308
2309 assert_eq!(peers.num_outbound_connections(), 0);
2310 }
2311
2312 #[tokio::test]
2313 async fn test_outgoing_connection_gracefully_closed() {
2314 let peer = PeerId::random();
2315 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2316 let mut peers = PeersManager::default();
2317 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2318
2319 match event!(peers) {
2320 PeerAction::PeerAdded(peer_id) => {
2321 assert_eq!(peer_id, peer);
2322 }
2323 _ => unreachable!(),
2324 }
2325 match event!(peers) {
2326 PeerAction::Connect { peer_id, remote_addr } => {
2327 assert_eq!(peer_id, peer);
2328 assert_eq!(remote_addr, socket_addr);
2329 }
2330 _ => unreachable!(),
2331 }
2332
2333 let p = peers.peers.get(&peer).unwrap();
2334 assert_eq!(p.state, PeerConnectionState::PendingOut);
2335
2336 assert_eq!(peers.num_outbound_connections(), 0);
2337
2338 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2339
2340 assert_eq!(peers.num_outbound_connections(), 0);
2341 assert_eq!(peers.connection_info.num_pending_out, 0);
2342 }
2343
2344 #[tokio::test]
2345 async fn test_discovery_ban_list() {
2346 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2347 let socket_addr = SocketAddr::new(ip, 8008);
2348 let ban_list = BanList::new(vec![], vec![ip]);
2349 let config = PeersConfig::default().with_ban_list(ban_list);
2350 let mut peer_manager = PeersManager::new(config);
2351 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2352
2353 assert!(peer_manager.peers.is_empty());
2354 }
2355
2356 #[tokio::test]
2357 async fn test_on_pending_ban_list() {
2358 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2359 let socket_addr = SocketAddr::new(ip, 8008);
2360 let ban_list = BanList::new(vec![], vec![ip]);
2361 let config = PeersConfig::test().with_ban_list(ban_list);
2362 let mut peer_manager = PeersManager::new(config);
2363 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2364 match a {
2366 Ok(_) => panic!(),
2367 Err(err) => match err {
2368 InboundConnectionError::IpBanned => {
2369 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2370 }
2371 _ => unreachable!(),
2372 },
2373 }
2374 }
2375
2376 #[tokio::test]
2377 async fn test_on_active_inbound_ban_list() {
2378 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2379 let socket_addr = SocketAddr::new(ip, 8008);
2380 let given_peer_id = PeerId::random();
2381 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2382 let config = PeersConfig::test().with_ban_list(ban_list);
2383 let mut peer_manager = PeersManager::new(config);
2384 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2385 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2387 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2388 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2391 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2392
2393 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2394 peer_manager.queued_actions.pop_front()
2395 else {
2396 panic!()
2397 };
2398
2399 assert_eq!(peer_id, given_peer_id)
2400 }
2401
2402 #[test]
2403 fn test_connection_limits() {
2404 let mut info = ConnectionInfo::default();
2405 info.inc_in();
2406 assert_eq!(info.num_inbound, 1);
2407 assert_eq!(info.num_outbound, 0);
2408 assert!(info.has_in_capacity());
2409
2410 info.decr_in();
2411 assert_eq!(info.num_inbound, 0);
2412 assert_eq!(info.num_outbound, 0);
2413
2414 info.inc_out();
2415 assert_eq!(info.num_inbound, 0);
2416 assert_eq!(info.num_outbound, 1);
2417 assert!(info.has_out_capacity());
2418
2419 info.decr_out();
2420 assert_eq!(info.num_inbound, 0);
2421 assert_eq!(info.num_outbound, 0);
2422 }
2423
2424 #[test]
2425 fn test_connection_peer_state() {
2426 let mut info = ConnectionInfo::default();
2427 info.inc_in();
2428
2429 info.decr_state(PeerConnectionState::In);
2430 assert_eq!(info.num_inbound, 0);
2431 assert_eq!(info.num_outbound, 0);
2432
2433 info.inc_out();
2434
2435 info.decr_state(PeerConnectionState::Out);
2436 assert_eq!(info.num_inbound, 0);
2437 assert_eq!(info.num_outbound, 0);
2438 }
2439
2440 #[tokio::test]
2441 async fn test_trusted_peers_are_prioritized() {
2442 let trusted_peer = PeerId::random();
2443 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2444 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2445 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2446 tcp_port: 8008,
2447 udp_port: 8008,
2448 id: trusted_peer,
2449 }]);
2450 let mut peers = PeersManager::new(config);
2451
2452 let basic_peer = PeerId::random();
2453 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2454 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2455
2456 match event!(peers) {
2457 PeerAction::PeerAdded(peer_id) => {
2458 assert_eq!(peer_id, basic_peer);
2459 }
2460 _ => unreachable!(),
2461 }
2462 match event!(peers) {
2463 PeerAction::Connect { peer_id, remote_addr } => {
2464 assert_eq!(peer_id, trusted_peer);
2465 assert_eq!(remote_addr, trusted_sock);
2466 }
2467 _ => unreachable!(),
2468 }
2469 match event!(peers) {
2470 PeerAction::Connect { peer_id, remote_addr } => {
2471 assert_eq!(peer_id, basic_peer);
2472 assert_eq!(remote_addr, basic_sock);
2473 }
2474 _ => unreachable!(),
2475 }
2476 }
2477
2478 #[tokio::test]
2479 async fn test_connect_trusted_nodes_only() {
2480 let trusted_peer = PeerId::random();
2481 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2482 let config = PeersConfig::test()
2483 .with_trusted_nodes(vec![TrustedPeer {
2484 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2485 tcp_port: 8008,
2486 udp_port: 8008,
2487 id: trusted_peer,
2488 }])
2489 .with_trusted_nodes_only(true);
2490 let mut peers = PeersManager::new(config);
2491
2492 let basic_peer = PeerId::random();
2493 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2494 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2495
2496 match event!(peers) {
2497 PeerAction::PeerAdded(peer_id) => {
2498 assert_eq!(peer_id, basic_peer);
2499 }
2500 _ => unreachable!(),
2501 }
2502 match event!(peers) {
2503 PeerAction::Connect { peer_id, remote_addr } => {
2504 assert_eq!(peer_id, trusted_peer);
2505 assert_eq!(remote_addr, trusted_sock);
2506 }
2507 _ => unreachable!(),
2508 }
2509 poll_fn(|cx| {
2510 assert!(peers.poll(cx).is_pending());
2511 Poll::Ready(())
2512 })
2513 .await;
2514 }
2515
2516 #[tokio::test]
2517 async fn test_incoming_with_trusted_nodes_only() {
2518 let trusted_peer = PeerId::random();
2519 let config = PeersConfig::test()
2520 .with_trusted_nodes(vec![TrustedPeer {
2521 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2522 tcp_port: 8008,
2523 udp_port: 8008,
2524 id: trusted_peer,
2525 }])
2526 .with_trusted_nodes_only(true);
2527 let mut peers = PeersManager::new(config);
2528
2529 let basic_peer = PeerId::random();
2530 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2531 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2532 assert_eq!(peers.connection_info.num_pending_in, 1);
2534 peers.on_incoming_session_established(basic_peer, basic_sock);
2535 assert_eq!(peers.connection_info.num_pending_in, 0);
2538 assert_eq!(peers.connection_info.num_inbound, 0);
2539
2540 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2541 peers.queued_actions.pop_front()
2542 else {
2543 panic!()
2544 };
2545 assert_eq!(basic_peer, peer_id);
2546 assert!(!peers.peers.contains_key(&basic_peer));
2547 }
2548
2549 #[tokio::test]
2550 async fn test_incoming_without_trusted_nodes_only() {
2551 let trusted_peer = PeerId::random();
2552 let config = PeersConfig::test()
2553 .with_trusted_nodes(vec![TrustedPeer {
2554 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2555 tcp_port: 8008,
2556 udp_port: 8008,
2557 id: trusted_peer,
2558 }])
2559 .with_trusted_nodes_only(false);
2560 let mut peers = PeersManager::new(config);
2561
2562 let basic_peer = PeerId::random();
2563 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2564 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2565
2566 assert_eq!(peers.connection_info.num_pending_in, 1);
2568 peers.on_incoming_session_established(basic_peer, basic_sock);
2569 assert_eq!(peers.connection_info.num_pending_in, 0);
2572 assert_eq!(peers.connection_info.num_inbound, 1);
2573 assert!(peers.peers.contains_key(&basic_peer));
2574 }
2575
2576 #[tokio::test]
2577 async fn test_incoming_at_capacity() {
2578 let mut config = PeersConfig::test();
2579 config.connection_info.max_inbound = 1;
2580 let mut peers = PeersManager::new(config);
2581
2582 let peer = PeerId::random();
2583 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2584 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2585
2586 peers.on_incoming_session_established(peer, addr);
2587
2588 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2589 assert_eq!(
2590 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2591 InboundConnectionError::ExceedsCapacity
2592 );
2593 }
2594
2595 #[tokio::test]
2596 async fn test_incoming_rate_limit() {
2597 let config = PeersConfig {
2598 incoming_ip_throttle_duration: Duration::from_millis(100),
2599 ..PeersConfig::test()
2600 };
2601 let mut peers = PeersManager::new(config);
2602
2603 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2604 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2605 assert_eq!(
2606 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2607 InboundConnectionError::IpBanned
2608 );
2609
2610 peers.release_interval.reset_immediately();
2611 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2612
2613 poll_fn(|cx| loop {
2615 if peers.poll(cx).is_pending() {
2616 return Poll::Ready(());
2617 }
2618 })
2619 .await;
2620
2621 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2622 assert_eq!(
2623 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2624 InboundConnectionError::IpBanned
2625 );
2626 }
2627
2628 #[tokio::test]
2629 async fn test_tick() {
2630 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2631 let socket_addr = SocketAddr::new(ip, 8008);
2632 let config = PeersConfig::test();
2633 let mut peer_manager = PeersManager::new(config);
2634 let peer_id = PeerId::random();
2635 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2636
2637 tokio::time::sleep(Duration::from_secs(1)).await;
2638 peer_manager.tick();
2639
2640 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2642
2643 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2645
2646 tokio::time::sleep(Duration::from_secs(1)).await;
2647 peer_manager.tick();
2648
2649 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2651
2652 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2653
2654 tokio::time::sleep(Duration::from_secs(1)).await;
2655 peer_manager.tick();
2656
2657 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2659 }
2660
2661 #[tokio::test]
2662 async fn test_remove_incoming_after_disconnect() {
2663 let peer_id = PeerId::random();
2664 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2665 let mut peers = PeersManager::default();
2666
2667 peers.on_incoming_pending_session(addr.ip()).unwrap();
2668 peers.on_incoming_session_established(peer_id, addr);
2669 let peer = peers.peers.get(&peer_id).unwrap();
2670 assert_eq!(peer.state, PeerConnectionState::In);
2671 assert!(peer.remove_after_disconnect);
2672
2673 peers.on_active_session_gracefully_closed(peer_id);
2674 assert!(!peers.peers.contains_key(&peer_id))
2675 }
2676
2677 #[tokio::test]
2678 async fn test_keep_incoming_after_disconnect_if_discovered() {
2679 let peer_id = PeerId::random();
2680 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2681 let mut peers = PeersManager::default();
2682
2683 peers.on_incoming_pending_session(addr.ip()).unwrap();
2684 peers.on_incoming_session_established(peer_id, addr);
2685 let peer = peers.peers.get(&peer_id).unwrap();
2686 assert_eq!(peer.state, PeerConnectionState::In);
2687 assert!(peer.remove_after_disconnect);
2688
2689 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2691
2692 peers.on_active_session_gracefully_closed(peer_id);
2693
2694 let peer = peers.peers.get(&peer_id).unwrap();
2695 assert_eq!(peer.state, PeerConnectionState::Idle);
2696 assert!(!peer.remove_after_disconnect);
2697 }
2698
2699 #[tokio::test]
2700 async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2701 let throttle_duration = Duration::from_millis(100);
2702 let config =
2703 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2704 let mut peers = PeersManager::new(config);
2705
2706 let peer_id = PeerId::random();
2707 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2708
2709 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2711
2712 match event!(peers) {
2713 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2714 _ => unreachable!(),
2715 }
2716
2717 match event!(peers) {
2718 PeerAction::Connect { .. } => {}
2719 _ => unreachable!(),
2720 }
2721
2722 peers.on_active_outgoing_established(peer_id);
2724 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2725
2726 peers.on_active_session_gracefully_closed(peer_id);
2728
2729 let peer = peers.peers.get(&peer_id).unwrap();
2730 assert_eq!(peer.state, PeerConnectionState::Idle);
2731 assert!(peer.backed_off);
2732
2733 assert!(peers.backed_off_peers.contains_key(&peer_id));
2735
2736 poll_fn(|cx| {
2738 assert!(peers.poll(cx).is_pending());
2739 Poll::Ready(())
2740 })
2741 .await;
2742
2743 assert!(peers.backed_off_peers.contains_key(&peer_id));
2745 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2746
2747 tokio::time::sleep(throttle_duration).await;
2749
2750 match event!(peers) {
2752 PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
2753 _ => unreachable!(),
2754 }
2755
2756 assert!(!peers.backed_off_peers.contains_key(&peer_id));
2758 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2759 }
2760
2761 #[tokio::test]
2762 async fn test_backed_off_peer_can_accept_incoming_connection() {
2763 let throttle_duration = Duration::from_millis(100);
2764 let config =
2765 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2766 let mut peers = PeersManager::new(config);
2767
2768 let peer_id = PeerId::random();
2769 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2770
2771 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2773
2774 match event!(peers) {
2775 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2776 _ => unreachable!(),
2777 }
2778
2779 match event!(peers) {
2780 PeerAction::Connect { .. } => {}
2781 _ => unreachable!(),
2782 }
2783
2784 peers.on_active_outgoing_established(peer_id);
2786 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2787
2788 peers.on_active_session_gracefully_closed(peer_id);
2790
2791 let peer = peers.peers.get(&peer_id).unwrap();
2792 assert_eq!(peer.state, PeerConnectionState::Idle);
2793 assert!(peer.backed_off);
2794 assert!(peers.backed_off_peers.contains_key(&peer_id));
2795
2796 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2799 assert_eq!(peers.connection_info.num_pending_in, 1);
2800
2801 peers.on_incoming_session_established(peer_id, addr);
2803
2804 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2806 assert_eq!(peers.connection_info.num_inbound, 1);
2807
2808 assert!(peers.backed_off_peers.contains_key(&peer_id));
2810 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2811
2812 poll_fn(|cx| {
2814 assert!(peers.poll(cx).is_pending());
2815 Poll::Ready(())
2816 })
2817 .await;
2818
2819 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2821
2822 tokio::time::sleep(throttle_duration).await;
2824
2825 poll_fn(|cx| {
2826 let _ = peers.poll(cx);
2827 Poll::Ready(())
2828 })
2829 .await;
2830
2831 assert!(!peers.backed_off_peers.contains_key(&peer_id));
2833 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2834
2835 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2837 }
2838
2839 #[tokio::test]
2840 async fn test_incoming_outgoing_already_connected() {
2841 let peer_id = PeerId::random();
2842 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2843 let mut peers = PeersManager::default();
2844
2845 peers.on_incoming_pending_session(addr.ip()).unwrap();
2846 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2847
2848 match event!(peers) {
2849 PeerAction::PeerAdded(_) => {}
2850 _ => unreachable!(),
2851 }
2852
2853 match event!(peers) {
2854 PeerAction::Connect { .. } => {}
2855 _ => unreachable!(),
2856 }
2857
2858 peers.on_incoming_session_established(peer_id, addr);
2859 peers.on_already_connected(Direction::Outgoing(peer_id));
2860 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2861 assert_eq!(peers.connection_info.num_inbound, 1);
2862 assert_eq!(peers.connection_info.num_pending_out, 0);
2863 assert_eq!(peers.connection_info.num_pending_in, 0);
2864 assert_eq!(peers.connection_info.num_outbound, 0);
2865 }
2866
2867 #[tokio::test]
2868 async fn test_already_connected_incoming_outgoing_connection_error() {
2869 let peer_id = PeerId::random();
2870 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2871 let mut peers = PeersManager::default();
2872
2873 peers.on_incoming_pending_session(addr.ip()).unwrap();
2874 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2875
2876 match event!(peers) {
2877 PeerAction::PeerAdded(_) => {}
2878 _ => unreachable!(),
2879 }
2880
2881 match event!(peers) {
2882 PeerAction::Connect { .. } => {}
2883 _ => unreachable!(),
2884 }
2885
2886 peers.on_incoming_session_established(peer_id, addr);
2887
2888 peers.on_outgoing_connection_failure(
2889 &addr,
2890 &peer_id,
2891 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2892 );
2893 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2894 assert_eq!(peers.connection_info.num_inbound, 1);
2895 assert_eq!(peers.connection_info.num_pending_out, 0);
2896 assert_eq!(peers.connection_info.num_pending_in, 0);
2897 assert_eq!(peers.connection_info.num_outbound, 0);
2898 }
2899
2900 #[tokio::test]
2901 async fn test_max_concurrent_dials() {
2902 let config = PeersConfig::default();
2903 let mut peer_manager = PeersManager::new(config);
2904 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2905 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2906 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2907 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2908 }
2909
2910 peer_manager.fill_outbound_slots();
2911 let dials = peer_manager
2912 .queued_actions
2913 .iter()
2914 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2915 .count();
2916 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2917 }
2918
2919 #[tokio::test]
2920 async fn test_max_num_of_pending_dials() {
2921 let config = PeersConfig::default();
2922 let mut peer_manager = PeersManager::new(config);
2923 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2924 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2925
2926 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2928 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2929 }
2930
2931 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2932 match event!(peer_manager) {
2933 PeerAction::PeerAdded(_) => {}
2934 _ => unreachable!(),
2935 }
2936 }
2937
2938 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2939 match event!(peer_manager) {
2940 PeerAction::Connect { .. } => {}
2941 _ => unreachable!(),
2942 }
2943 }
2944
2945 peer_manager.fill_outbound_slots();
2947
2948 let dials = peer_manager.connection_info.num_pending_out;
2950 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2951
2952 let num_pendingout_states = peer_manager
2953 .peers
2954 .iter()
2955 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2956 .map(|(peer_id, _)| *peer_id)
2957 .collect::<Vec<PeerId>>();
2958 assert_eq!(
2959 num_pendingout_states.len(),
2960 peer_manager.connection_info.config.max_concurrent_outbound_dials
2961 );
2962
2963 for peer_id in &num_pendingout_states {
2965 peer_manager.on_active_outgoing_established(*peer_id);
2966 }
2967
2968 for peer_id in &num_pendingout_states {
2970 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2971 }
2972
2973 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2975 }
2976
2977 #[tokio::test]
2978 async fn test_connect() {
2979 let peer = PeerId::random();
2980 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2981 let mut peers = PeersManager::default();
2982 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2983 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2984
2985 match event!(peers) {
2986 PeerAction::Connect { peer_id, remote_addr } => {
2987 assert_eq!(peer_id, peer);
2988 assert_eq!(remote_addr, socket_addr);
2989 }
2990 _ => unreachable!(),
2991 }
2992
2993 let (record, _) = peers.peer_by_id(peer).unwrap();
2994 assert_eq!(record.tcp_addr(), socket_addr);
2995 assert_eq!(record.udp_addr(), socket_addr);
2996
2997 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2999
3000 let (record, _) = peers.peer_by_id(peer).unwrap();
3001 assert_eq!(record.tcp_addr(), socket_addr);
3002 assert_eq!(record.udp_addr(), socket_addr);
3003 }
3004
3005 #[tokio::test]
3006 async fn test_incoming_connection_from_banned() {
3007 let peer = PeerId::random();
3008 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3009 let config = PeersConfig::test().with_max_inbound(3);
3010 let mut peers = PeersManager::new(config);
3011 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
3012
3013 match event!(peers) {
3014 PeerAction::PeerAdded(peer_id) => {
3015 assert_eq!(peer_id, peer);
3016 }
3017 _ => unreachable!(),
3018 }
3019 match event!(peers) {
3020 PeerAction::Connect { peer_id, .. } => {
3021 assert_eq!(peer_id, peer);
3022 }
3023 _ => unreachable!(),
3024 }
3025
3026 poll_fn(|cx| {
3027 assert!(peers.poll(cx).is_pending());
3028 Poll::Ready(())
3029 })
3030 .await;
3031
3032 loop {
3034 peers.on_active_session_dropped(
3035 &socket_addr,
3036 &peer,
3037 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3038 reth_eth_wire::EthVersion::Eth68,
3039 reth_eth_wire::EthMessageID::Status,
3040 )),
3041 );
3042
3043 if peers.peers.get(&peer).unwrap().is_banned() {
3044 break;
3045 }
3046
3047 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3048 peers.on_incoming_session_established(peer, socket_addr);
3049
3050 match event!(peers) {
3051 PeerAction::Connect { peer_id, .. } => {
3052 assert_eq!(peer_id, peer);
3053 }
3054 _ => unreachable!(),
3055 }
3056 }
3057
3058 assert!(peers.peers.get(&peer).unwrap().is_banned());
3059
3060 for _ in 0..peers.connection_info.config.max_inbound {
3062 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3063 peers.on_incoming_session_established(peer, socket_addr);
3064
3065 match event!(peers) {
3066 PeerAction::DisconnectBannedIncoming { peer_id } => {
3067 assert_eq!(peer_id, peer);
3068 }
3069 _ => unreachable!(),
3070 }
3071 }
3072
3073 poll_fn(|cx| {
3074 assert!(peers.poll(cx).is_pending());
3075 Poll::Ready(())
3076 })
3077 .await;
3078
3079 assert_eq!(peers.connection_info.num_inbound, 0);
3080
3081 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3082
3083 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3085 assert_eq!(peers.connection_info.num_pending_in, 1);
3086
3087 peers.on_active_session_gracefully_closed(peer);
3091 assert_eq!(peers.connection_info.num_inbound, 0);
3092 }
3093
3094 #[tokio::test]
3095 async fn test_add_pending_connect() {
3096 let peer = PeerId::random();
3097 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3098 let mut peers = PeersManager::default();
3099 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3100 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3101 assert_eq!(peers.connection_info.num_pending_out, 1);
3102 }
3103
3104 #[tokio::test]
3105 async fn test_dns_updates_peer_address() {
3106 let peer_id = PeerId::random();
3107 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3108 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3109
3110 let trusted = TrustedPeer {
3111 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3112 tcp_port: 8008,
3113 udp_port: 8008,
3114 id: peer_id,
3115 };
3116
3117 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3118 let mut manager = PeersManager::new(config);
3119 manager
3120 .trusted_peers_resolver
3121 .set_interval(tokio::time::interval(Duration::from_millis(1)));
3122
3123 manager.peers.insert(
3124 peer_id,
3125 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3126 );
3127
3128 for _ in 0..100 {
3129 let _ = event!(manager);
3130 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3131 break;
3132 }
3133 tokio::time::sleep(Duration::from_millis(10)).await;
3134 }
3135
3136 let updated_peer = manager.peers.get(&peer_id).unwrap();
3137 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3138 }
3139
3140 #[tokio::test]
3141 async fn test_ip_filter_blocks_inbound_connection() {
3142 use reth_net_banlist::IpFilter;
3143 use std::net::IpAddr;
3144
3145 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3147 let config = PeersConfig::test().with_ip_filter(ip_filter);
3148 let mut peers = PeersManager::new(config);
3149
3150 let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3152 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3153
3154 let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3156 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3157 }
3158
3159 #[tokio::test]
3160 async fn test_ip_filter_blocks_outbound_connection() {
3161 use reth_net_banlist::IpFilter;
3162 use std::net::SocketAddr;
3163
3164 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3166 let config = PeersConfig::test().with_ip_filter(ip_filter);
3167 let mut peers = PeersManager::new(config);
3168
3169 let peer_id = PeerId::new([1; 64]);
3170
3171 let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3173 peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3174 assert!(peers.peers.contains_key(&peer_id));
3175
3176 let peer_id2 = PeerId::new([2; 64]);
3178 let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3179 peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3180 assert!(!peers.peers.contains_key(&peer_id2));
3181 }
3182
3183 #[tokio::test]
3184 async fn test_ip_filter_ipv6() {
3185 use reth_net_banlist::IpFilter;
3186 use std::net::IpAddr;
3187
3188 let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3190 let config = PeersConfig::test().with_ip_filter(ip_filter);
3191 let mut peers = PeersManager::new(config);
3192
3193 let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3195 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3196
3197 let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3199 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3200 }
3201
3202 #[tokio::test]
3203 async fn test_ip_filter_multiple_ranges() {
3204 use reth_net_banlist::IpFilter;
3205 use std::net::IpAddr;
3206
3207 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3209 let config = PeersConfig::test().with_ip_filter(ip_filter);
3210 let mut peers = PeersManager::new(config);
3211
3212 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3214 let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3215 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3216 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3217
3218 let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3220 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3221 }
3222
3223 #[tokio::test]
3224 async fn test_ip_filter_no_restriction() {
3225 use reth_net_banlist::IpFilter;
3226 use std::net::IpAddr;
3227
3228 let ip_filter = IpFilter::allow_all();
3230 let config = PeersConfig::test().with_ip_filter(ip_filter);
3231 let mut peers = PeersManager::new(config);
3232
3233 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3235 let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3236 let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3237 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3238 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3239 assert!(peers.on_incoming_pending_session(ip3).is_ok());
3240 }
3241}