1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 is_connection_failed_reputation,
18 peers::{
19 config::PeerBackoffDurations,
20 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21 },
22 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23 PersistedPeerInfo, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27 fmt::Display,
28 io::{self},
29 net::{IpAddr, SocketAddr},
30 task::{Context, Poll},
31 time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35 sync::mpsc,
36 time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41#[derive(Debug)]
48pub struct PeersManager {
49 peers: HashMap<PeerId, Peer>,
51 trusted_peer_ids: HashSet<PeerId>,
56 trusted_peers_resolver: TrustedPeersResolver,
59 manager_tx: mpsc::UnboundedSender<PeerCommand>,
61 handle_rx: UnboundedReceiverStream<PeerCommand>,
63 queued_actions: VecDeque<PeerAction>,
65 refill_slots_interval: Interval,
67 reputation_weights: ReputationChangeWeights,
69 connection_info: ConnectionInfo,
71 ban_list: BanList,
73 backed_off_peers: HashMap<PeerId, std::time::Instant>,
75 release_interval: Interval,
77 ban_duration: Duration,
79 backoff_durations: PeerBackoffDurations,
82 trusted_nodes_only: bool,
85 last_tick: Instant,
87 max_backoff_count: u8,
89 net_connection_state: NetworkConnectionState,
91 incoming_ip_throttle_duration: Duration,
93 ip_filter: reth_net_banlist::IpFilter,
95 enforce_enr_fork_id: bool,
98}
99
100impl PeersManager {
101 pub fn new(config: PeersConfig) -> Self {
103 let PeersConfig {
104 refill_slots_interval,
105 connection_info,
106 reputation_weights,
107 ban_list,
108 ban_duration,
109 backoff_durations,
110 trusted_nodes,
111 trusted_nodes_only,
112 trusted_nodes_resolution_interval,
113 basic_nodes,
114 persisted_peers,
115 max_backoff_count,
116 incoming_ip_throttle_duration,
117 ip_filter,
118 enforce_enr_fork_id,
119 } = config;
120 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
121 let now = Instant::now();
122
123 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
125
126 let mut peers =
127 HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len() + persisted_peers.len());
128 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
129
130 for trusted_peer in &trusted_nodes {
131 match trusted_peer.resolve_blocking() {
132 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
133 trusted_peer_ids.insert(id);
134 peers.entry(id).or_insert_with(|| {
135 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
136 });
137 }
138 Err(err) => {
139 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
140 }
141 }
142 }
143
144 for PersistedPeerInfo { record, kind, fork_id, reputation } in persisted_peers {
145 if enforce_enr_fork_id && fork_id.is_none() {
149 continue
150 }
151 let NodeRecord { address, tcp_port, udp_port, id } = record;
152 peers.entry(id).or_insert_with(|| {
153 let mut peer = Peer::with_kind(
154 PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)),
155 kind,
156 );
157 peer.fork_id = fork_id.map(Box::new);
158 peer.reputation = reputation;
159 peer
160 });
161 }
162
163 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
164 peers.entry(id).or_insert_with(|| {
165 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
166 });
167 }
168
169 trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
170
171 Self {
172 peers,
173 trusted_peer_ids,
174 trusted_peers_resolver: TrustedPeersResolver::new(
175 trusted_nodes,
176 tokio::time::interval(trusted_nodes_resolution_interval), ),
178 manager_tx,
179 handle_rx: UnboundedReceiverStream::new(handle_rx),
180 queued_actions: Default::default(),
181 reputation_weights,
182 refill_slots_interval: tokio::time::interval(refill_slots_interval),
183 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
184 connection_info: ConnectionInfo::new(connection_info),
185 ban_list,
186 backed_off_peers: Default::default(),
187 ban_duration,
188 backoff_durations,
189 trusted_nodes_only,
190 last_tick: Instant::now(),
191 max_backoff_count,
192 net_connection_state: NetworkConnectionState::default(),
193 incoming_ip_throttle_duration,
194 ip_filter,
195 enforce_enr_fork_id,
196 }
197 }
198
199 pub(crate) fn handle(&self) -> PeersHandle {
201 PeersHandle::new(self.manager_tx.clone())
202 }
203
204 pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
206 self.enforce_enr_fork_id
207 }
208
209 #[inline]
211 pub(crate) fn num_known_peers(&self) -> usize {
212 self.peers.len()
213 }
214
215 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
217 self.peers.iter().map(|(peer_id, v)| {
218 NodeRecord::new_with_ports(
219 v.addr.tcp().ip(),
220 v.addr.tcp().port(),
221 v.addr.udp().map(|addr| addr.port()),
222 *peer_id,
223 )
224 })
225 }
226
227 pub(crate) fn persistable_peers(&self) -> impl Iterator<Item = PersistedPeerInfo> + '_ {
232 self.peers.iter().filter(|(_, peer)| !peer.is_backed_off() && !peer.is_banned()).map(
233 |(peer_id, peer)| PersistedPeerInfo {
234 record: NodeRecord::new_with_ports(
235 peer.addr.tcp().ip(),
236 peer.addr.tcp().port(),
237 peer.addr.udp().map(|addr| addr.port()),
238 *peer_id,
239 ),
240 kind: peer.kind,
241 fork_id: peer.fork_id.as_deref().copied(),
242 reputation: peer.reputation,
243 },
244 )
245 }
246
247 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
249 self.peers.get(&peer_id).map(|v| {
250 (
251 NodeRecord::new_with_ports(
252 v.addr.tcp().ip(),
253 v.addr.tcp().port(),
254 v.addr.udp().map(|addr| addr.port()),
255 peer_id,
256 ),
257 v.kind,
258 )
259 })
260 }
261
262 pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
264 self.peers.get(peer_id).is_some_and(|p| {
265 matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
266 })
267 }
268
269 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
271 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
272 }
273
274 #[inline]
276 pub(crate) const fn num_inbound_connections(&self) -> usize {
277 self.connection_info.num_inbound
278 }
279
280 #[inline]
282 pub(crate) const fn num_outbound_connections(&self) -> usize {
283 self.connection_info.num_outbound
284 }
285
286 #[inline]
288 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
289 self.connection_info.num_pending_out
290 }
291
292 #[inline]
294 pub(crate) fn num_backed_off_peers(&self) -> usize {
295 self.backed_off_peers.len()
296 }
297
298 fn num_idle_trusted_peers(&self) -> usize {
300 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
301 }
302
303 pub(crate) fn on_incoming_pending_session(
307 &mut self,
308 addr: IpAddr,
309 ) -> Result<(), InboundConnectionError> {
310 if !self.ip_filter.is_allowed(&addr) {
312 trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
313 return Err(InboundConnectionError::IpBanned)
314 }
315
316 if self.ban_list.is_banned_ip(&addr) {
317 return Err(InboundConnectionError::IpBanned)
318 }
319
320 if !self.connection_info.has_in_capacity() {
322 if self.trusted_peer_ids.is_empty() {
323 return Err(InboundConnectionError::ExceedsCapacity)
326 }
327
328 let num_idle_trusted_peers = self.num_idle_trusted_peers();
332 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
333 let max_inbound =
335 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
336 if self.connection_info.num_pending_in < max_inbound {
337 self.connection_info.inc_pending_in();
338 return Ok(())
339 }
340 }
341
342 return Err(InboundConnectionError::ExceedsCapacity)
344 }
345
346 if !self.connection_info.has_in_pending_capacity() {
348 return Err(InboundConnectionError::ExceedsCapacity)
349 }
350
351 self.throttle_incoming_ip(addr);
353
354 self.connection_info.inc_pending_in();
355 Ok(())
356 }
357
358 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
361 self.connection_info.decr_pending_in();
362 }
363
364 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
366 self.connection_info.decr_pending_in()
367 }
368
369 pub(crate) fn on_incoming_pending_session_dropped(
371 &mut self,
372 remote_addr: SocketAddr,
373 err: &PendingSessionHandshakeError,
374 ) {
375 if err.is_fatal_protocol_error() {
376 self.ban_ip(remote_addr.ip());
377
378 if err.merits_discovery_ban() {
379 self.queued_actions
380 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
381 }
382 }
383
384 self.connection_info.decr_pending_in();
385 }
386
387 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
394 self.connection_info.decr_pending_in();
395
396 if self.ban_list.is_banned_peer(&peer_id) {
399 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
400 return
401 }
402
403 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
405 if self.trusted_nodes_only && !is_trusted {
406 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
407 return
408 }
409
410 self.tick();
412
413 match self.peers.entry(peer_id) {
414 Entry::Occupied(mut entry) => {
415 let peer = entry.get_mut();
416 if peer.is_banned() {
417 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
418 return
419 }
420 if peer.state.is_pending_out() {
423 self.connection_info.decr_state(peer.state);
424 }
425
426 peer.state = PeerConnectionState::In;
427
428 is_trusted = is_trusted || peer.is_trusted();
429 }
430 Entry::Vacant(entry) => {
431 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
434 peer.remove_after_disconnect = true;
435 entry.insert(peer);
436 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
437 }
438 }
439
440 let has_in_capacity = self.connection_info.has_in_capacity();
441 self.connection_info.inc_in();
443
444 if !is_trusted && !has_in_capacity {
446 self.queued_actions.push_back(PeerAction::Disconnect {
447 peer_id,
448 reason: Some(DisconnectReason::TooManyPeers),
449 });
450 }
451 }
452
453 fn ban_peer(&mut self, peer_id: PeerId) {
455 let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
456 (peer.is_trusted() || peer.is_static())
457 {
458 self.backoff_durations.low / 2
461 } else {
462 self.ban_duration
463 };
464
465 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
466 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
467 }
468
469 fn ban_ip(&mut self, ip: IpAddr) {
471 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
472 }
473
474 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
476 self.ban_list
477 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
478 }
479
480 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
482 trace!(target: "net::peers", ?peer_id, "backing off");
483
484 if let Some(peer) = self.peers.get_mut(&peer_id) {
485 peer.backed_off = true;
486 self.backed_off_peers.insert(peer_id, until);
487 }
488 }
489
490 fn unban_peer(&mut self, peer_id: PeerId) {
492 self.ban_list.unban_peer(&peer_id);
493 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
494 }
495
496 fn tick(&mut self) {
501 let now = Instant::now();
502 let secs_since_last_tick =
506 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
507 self.last_tick = now;
508
509 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
511 if peer.1.reputation < DEFAULT_REPUTATION {
514 peer.1.reputation += secs_since_last_tick;
515 }
516 }
517 }
518
519 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
521 self.peers.get(peer_id).map(|peer| peer.reputation)
522 }
523
524 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
530 trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
531
532 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
533 if rep.is_reset() {
535 peer.reset_reputation()
536 } else {
537 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
538 if peer.is_trusted() || peer.is_static() {
539 if matches!(
541 rep,
542 ReputationChangeKind::Dropped |
543 ReputationChangeKind::BadAnnouncement |
544 ReputationChangeKind::Timeout |
545 ReputationChangeKind::AlreadySeenTransaction
546 ) {
547 return
548 }
549
550 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
552 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
554 }
555 }
556 peer.apply_reputation(reputation_change, rep)
557 }
558 } else {
559 return
560 };
561
562 match outcome {
563 ReputationChangeOutcome::None => {}
564 ReputationChangeOutcome::Ban => {
565 self.ban_peer(*peer_id);
566 }
567 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
568 ReputationChangeOutcome::DisconnectAndBan => {
569 self.queued_actions.push_back(PeerAction::Disconnect {
570 peer_id: *peer_id,
571 reason: Some(DisconnectReason::DisconnectRequested),
572 });
573 self.ban_peer(*peer_id);
574 }
575 }
576 }
577
578 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
580 if let Some(peer) = self.peers.get_mut(peer_id) {
581 self.connection_info.decr_state(peer.state);
582 peer.state = PeerConnectionState::Idle;
583 }
584 }
585
586 pub(crate) fn on_outgoing_pending_session_dropped(
589 &mut self,
590 remote_addr: &SocketAddr,
591 peer_id: &PeerId,
592 err: &PendingSessionHandshakeError,
593 ) {
594 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
595 }
596
597 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
599 match self.peers.entry(peer_id) {
600 Entry::Occupied(mut entry) => {
601 trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
602 self.connection_info.decr_state(entry.get().state);
603
604 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
605 entry.remove();
607 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
608 } else {
609 let peer = entry.get_mut();
610 peer.severe_backoff_counter = 0;
614 peer.state = PeerConnectionState::Idle;
615
616 peer.backed_off = true;
621 self.backed_off_peers.insert(
622 peer_id,
623 std::time::Instant::now() + self.incoming_ip_throttle_duration,
624 );
625 trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
626 }
627 }
628 Entry::Vacant(_) => return,
629 }
630
631 self.fill_outbound_slots();
632 }
633
634 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
636 if let Some(peer) = self.peers.get_mut(&peer_id) {
637 trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
638 self.connection_info.decr_state(peer.state);
639 self.connection_info.inc_out();
640 peer.state = PeerConnectionState::Out;
641 }
642 }
643
644 pub(crate) fn on_active_session_dropped(
649 &mut self,
650 remote_addr: &SocketAddr,
651 peer_id: &PeerId,
652 err: &EthStreamError,
653 ) {
654 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
655 }
656
657 pub(crate) fn on_outgoing_connection_failure(
660 &mut self,
661 remote_addr: &SocketAddr,
662 peer_id: &PeerId,
663 err: &io::Error,
664 ) {
665 if let Some(peer) = self.peers.get(peer_id) {
669 if peer.state.is_incoming() {
670 return
672 }
673
674 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
675 self.trusted_peers_resolver.interval.reset_immediately();
678 }
679 }
680
681 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
682 }
683
684 fn on_connection_failure(
685 &mut self,
686 remote_addr: &SocketAddr,
687 peer_id: &PeerId,
688 err: impl SessionError,
689 reputation_change: ReputationChangeKind,
690 ) {
691 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
692
693 if err.is_fatal_protocol_error() {
694 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
695 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
698 self.connection_info.decr_state(entry.get().state);
699 if entry.get().is_trusted() {
701 entry.get_mut().state = PeerConnectionState::Idle;
702 } else {
703 entry.remove();
704 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
705 if err.merits_discovery_ban() {
707 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
708 peer_id: *peer_id,
709 ip_addr: remote_addr.ip(),
710 })
711 }
712 }
713 }
714
715 self.ban_peer(*peer_id);
717 } else {
718 let mut backoff_until = None;
719 let mut remove_peer = false;
720
721 if let Some(peer) = self.peers.get_mut(peer_id) {
722 if let Some(kind) = err.should_backoff() {
723 if peer.is_trusted() || peer.is_static() {
724 let backoff = self.backoff_durations.low;
730 backoff_until = Some(std::time::Instant::now() + backoff);
731 trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
732 } else {
733 if kind.is_severe() {
735 peer.severe_backoff_counter =
736 peer.severe_backoff_counter.saturating_add(1);
737 }
738 trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
739
740 let backoff_time =
741 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
742
743 backoff_until = Some(backoff_time);
747 }
748 } else {
749 let reputation_change = self.reputation_weights.change(reputation_change);
751 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
752 };
753
754 self.connection_info.decr_state(peer.state);
755 peer.state = PeerConnectionState::Idle;
756
757 if peer.severe_backoff_counter > self.max_backoff_count &&
758 !peer.is_trusted() &&
759 !peer.is_static()
760 {
761 remove_peer = true;
764 }
765 }
766
767 if remove_peer {
769 trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
770 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
771 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
772 } else if let Some(backoff_until) = backoff_until {
773 self.backoff_peer_until(*peer_id, backoff_until);
775 }
776 }
777
778 self.fill_outbound_slots();
779 }
780
781 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
787 match direction {
788 Direction::Incoming => {
789 self.connection_info.decr_pending_in();
791 }
792 Direction::Outgoing(_) => {
793 }
796 }
797 }
798
799 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
803 self.add_peer_kind(peer_id, None, addr, fork_id)
804 }
805
806 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
808 self.trusted_peer_ids.insert(peer_id);
809 if let Some(peer) = self.peers.get_mut(&peer_id) {
810 peer.kind = PeerKind::Trusted;
811 }
812 }
813
814 #[cfg_attr(not(test), expect(dead_code))]
818 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
819 self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
820 }
821
822 pub(crate) fn add_peer_kind(
827 &mut self,
828 peer_id: PeerId,
829 kind: Option<PeerKind>,
830 addr: PeerAddr,
831 fork_id: Option<ForkId>,
832 ) {
833 let ip_addr = addr.tcp().ip();
834
835 if !self.ip_filter.is_allowed(&ip_addr) {
837 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
838 return
839 }
840
841 if self.ban_list.is_banned(&peer_id, &ip_addr) {
842 return
843 }
844
845 match self.peers.entry(peer_id) {
846 Entry::Occupied(mut entry) => {
847 let peer = entry.get_mut();
848 peer.fork_id = fork_id.map(Box::new);
849 peer.addr = addr;
850
851 if let Some(kind) = kind {
852 peer.kind = kind;
853 }
854
855 if peer.state.is_incoming() {
856 peer.remove_after_disconnect = false;
860 }
861 }
862 Entry::Vacant(entry) => {
863 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
864 let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
865 peer.fork_id = fork_id.map(Box::new);
866 entry.insert(peer);
867 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
868 }
869 }
870
871 if kind.is_some_and(|kind| kind.is_trusted()) {
872 self.trusted_peer_ids.insert(peer_id);
874 }
875 }
876
877 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
879 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
880 if entry.get().is_trusted() {
881 return
882 }
883 let mut peer = entry.remove();
884
885 trace!(target: "net::peers", ?peer_id, "remove discovered node");
886 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
887
888 if peer.state.is_connected() {
889 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
890 peer.remove_after_disconnect = true;
895 peer.state.disconnect();
896 self.peers.insert(peer_id, peer);
897 self.queued_actions.push_back(PeerAction::Disconnect {
898 peer_id,
899 reason: Some(DisconnectReason::DisconnectRequested),
900 })
901 }
902 }
903
904 #[cfg_attr(not(test), expect(dead_code))]
907 pub(crate) fn add_and_connect(
908 &mut self,
909 peer_id: PeerId,
910 addr: PeerAddr,
911 fork_id: Option<ForkId>,
912 ) {
913 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
914 }
915
916 pub(crate) fn add_and_connect_kind(
920 &mut self,
921 peer_id: PeerId,
922 kind: PeerKind,
923 addr: PeerAddr,
924 fork_id: Option<ForkId>,
925 ) {
926 let ip_addr = addr.tcp().ip();
927
928 if !self.ip_filter.is_allowed(&ip_addr) {
930 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
931 return
932 }
933
934 if self.ban_list.is_banned(&peer_id, &ip_addr) {
935 return
936 }
937
938 match self.peers.entry(peer_id) {
939 Entry::Occupied(mut entry) => {
940 let peer = entry.get_mut();
941 peer.kind = kind;
942 peer.fork_id = fork_id.map(Box::new);
943 peer.addr = addr;
944
945 if peer.state == PeerConnectionState::Idle {
946 peer.state = PeerConnectionState::PendingOut;
948 self.connection_info.inc_pending_out();
949 self.queued_actions
950 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
951 }
952 }
953 Entry::Vacant(entry) => {
954 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
955 let mut peer = Peer::with_kind(addr, kind);
956 peer.state = PeerConnectionState::PendingOut;
957 peer.fork_id = fork_id.map(Box::new);
958 entry.insert(peer);
959 self.connection_info.inc_pending_out();
960 self.queued_actions
961 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
962 }
963 }
964
965 if kind.is_trusted() {
966 self.trusted_peer_ids.insert(peer_id);
967 }
968 }
969
970 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
972 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
973 if !entry.get().is_trusted() {
974 return
975 }
976
977 let peer = entry.get_mut();
978 peer.kind = PeerKind::Basic;
979
980 self.trusted_peer_ids.remove(&peer_id);
981 }
982
983 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
996 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
997 !peer.is_backed_off() &&
998 !peer.is_banned() &&
999 peer.state.is_unconnected() &&
1000 (!self.trusted_nodes_only || peer.is_trusted())
1001 });
1002
1003 let mut best_peer = unconnected.next()?;
1005
1006 if best_peer.1.is_trusted() || best_peer.1.is_static() {
1007 return Some((*best_peer.0, best_peer.1))
1008 }
1009
1010 for maybe_better in unconnected {
1011 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
1013 return Some((*maybe_better.0, maybe_better.1))
1014 }
1015
1016 match maybe_better.1.reputation.cmp(&best_peer.1.reputation) {
1018 std::cmp::Ordering::Greater => best_peer = maybe_better,
1019 std::cmp::Ordering::Equal
1020 if maybe_better.1.fork_id.is_some() && best_peer.1.fork_id.is_none() =>
1021 {
1022 best_peer = maybe_better
1023 }
1024 _ => {}
1025 }
1026 }
1027 Some((*best_peer.0, best_peer.1))
1028 }
1029
1030 fn fill_outbound_slots(&mut self) {
1036 self.tick();
1037
1038 if !self.net_connection_state.is_active() {
1039 return
1041 }
1042
1043 while self.connection_info.has_out_capacity() {
1045 let action = {
1046 let (peer_id, peer) = match self.best_unconnected() {
1047 Some(peer) => peer,
1048 _ => break,
1049 };
1050
1051 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
1052
1053 peer.state = PeerConnectionState::PendingOut;
1054 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
1055 };
1056
1057 self.connection_info.inc_pending_out();
1058
1059 self.queued_actions.push_back(action);
1060 }
1061 }
1062
1063 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1064 if let Some(peer) = self.peers.get_mut(&peer_id) {
1065 let new_addr = PeerAddr::new_with_ports(
1066 new_record.address,
1067 new_record.tcp_port,
1068 Some(new_record.udp_port),
1069 );
1070
1071 if peer.addr != new_addr {
1072 peer.addr = new_addr;
1073 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1074 }
1075 }
1076 }
1077
1078 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1080 self.net_connection_state = state;
1081 }
1082
1083 pub const fn connection_state(&self) -> &NetworkConnectionState {
1085 &self.net_connection_state
1086 }
1087
1088 pub const fn on_shutdown(&mut self) {
1090 self.net_connection_state = NetworkConnectionState::ShuttingDown;
1091 }
1092
1093 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1098 loop {
1099 if let Some(action) = self.queued_actions.pop_front() {
1101 return Poll::Ready(action)
1102 }
1103
1104 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1105 match cmd {
1106 PeerCommand::Add(peer_id, addr) => {
1107 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1108 }
1109 PeerCommand::Remove(peer) => self.remove_peer(peer),
1110 PeerCommand::ReputationChange(peer_id, rep) => {
1111 self.apply_reputation_change(&peer_id, rep)
1112 }
1113 PeerCommand::GetPeer(peer, tx) => {
1114 let _ = tx.send(self.peers.get(&peer).cloned());
1115 }
1116 PeerCommand::GetPeers(tx) => {
1117 let _ = tx.send(self.iter_peers().collect());
1118 }
1119 }
1120 }
1121
1122 if self.release_interval.poll_tick(cx).is_ready() {
1123 let now = std::time::Instant::now();
1124 let (_, unbanned_peers) = self.ban_list.evict(now);
1125
1126 for peer_id in unbanned_peers {
1127 if let Some(peer) = self.peers.get_mut(&peer_id) {
1128 peer.unban();
1129 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1130 }
1131 }
1132
1133 self.backed_off_peers.retain(|peer_id, until| {
1136 if now > *until {
1137 if let Some(peer) = self.peers.get_mut(peer_id) {
1138 peer.backed_off = false;
1139 }
1140 return false
1141 }
1142 true
1143 })
1144 }
1145
1146 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1147 self.fill_outbound_slots();
1148 }
1149
1150 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1151 self.on_resolved_peer(peer_id, new_record);
1152 }
1153
1154 if self.queued_actions.is_empty() {
1155 return Poll::Pending
1156 }
1157 }
1158 }
1159}
1160
1161impl Default for PeersManager {
1162 fn default() -> Self {
1163 Self::new(Default::default())
1164 }
1165}
1166
1167#[derive(Debug, Clone, PartialEq, Eq, Default)]
1169pub struct ConnectionInfo {
1170 num_outbound: usize,
1172 num_pending_out: usize,
1174 num_inbound: usize,
1176 num_pending_in: usize,
1178 config: ConnectionsConfig,
1180}
1181
1182impl ConnectionInfo {
1185 const fn new(config: ConnectionsConfig) -> Self {
1187 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1188 }
1189
1190 const fn has_out_capacity(&self) -> bool {
1192 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1193 self.num_outbound < self.config.max_outbound
1194 }
1195
1196 const fn has_in_capacity(&self) -> bool {
1198 self.num_inbound < self.config.max_inbound
1199 }
1200
1201 const fn has_in_pending_capacity(&self) -> bool {
1203 self.num_pending_in < self.config.max_inbound
1204 }
1205
1206 const fn decr_state(&mut self, state: PeerConnectionState) {
1207 match state {
1208 PeerConnectionState::Idle => {}
1209 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1210 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1211 PeerConnectionState::PendingOut => self.decr_pending_out(),
1212 }
1213 }
1214
1215 const fn decr_out(&mut self) {
1216 self.num_outbound -= 1;
1217 }
1218
1219 const fn inc_out(&mut self) {
1220 self.num_outbound += 1;
1221 }
1222
1223 const fn inc_pending_out(&mut self) {
1224 self.num_pending_out += 1;
1225 }
1226
1227 const fn inc_in(&mut self) {
1228 self.num_inbound += 1;
1229 }
1230
1231 const fn inc_pending_in(&mut self) {
1232 self.num_pending_in += 1;
1233 }
1234
1235 const fn decr_in(&mut self) {
1236 self.num_inbound -= 1;
1237 }
1238
1239 const fn decr_pending_out(&mut self) {
1240 self.num_pending_out -= 1;
1241 }
1242
1243 const fn decr_pending_in(&mut self) {
1244 self.num_pending_in -= 1;
1245 }
1246}
1247
1248#[derive(Debug)]
1250pub enum PeerAction {
1251 Connect {
1253 peer_id: PeerId,
1255 remote_addr: SocketAddr,
1257 },
1258 Disconnect {
1260 peer_id: PeerId,
1262 reason: Option<DisconnectReason>,
1264 },
1265 DisconnectBannedIncoming {
1268 peer_id: PeerId,
1270 },
1271 DisconnectUntrustedIncoming {
1273 peer_id: PeerId,
1275 },
1276 DiscoveryBanPeerId {
1278 peer_id: PeerId,
1280 ip_addr: IpAddr,
1282 },
1283 DiscoveryBanIp {
1285 ip_addr: IpAddr,
1287 },
1288 BanPeer {
1290 peer_id: PeerId,
1292 },
1293 UnBanPeer {
1295 peer_id: PeerId,
1297 },
1298 PeerAdded(PeerId),
1300 PeerRemoved(PeerId),
1302}
1303
1304#[derive(Debug, Error, PartialEq, Eq)]
1306pub enum InboundConnectionError {
1307 IpBanned,
1309 ExceedsCapacity,
1311}
1312
1313impl Display for InboundConnectionError {
1314 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1315 write!(f, "{self:?}")
1316 }
1317}
1318
1319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1321pub enum BackoffReason {
1322 TooManyPeers,
1324 GracefulClose,
1326 ConnectionError,
1328}
1329
1330impl BackoffReason {
1331 pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
1333 match reason {
1334 Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
1335 _ => Self::ConnectionError,
1336 }
1337 }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342 use alloy_primitives::B512;
1343 use reth_eth_wire::{
1344 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1345 DisconnectReason,
1346 };
1347 use reth_ethereum_forks::{ForkHash, ForkId};
1348 use reth_net_banlist::BanList;
1349 use reth_network_api::Direction;
1350 use reth_network_peers::{PeerId, TrustedPeer};
1351 use reth_network_types::{
1352 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1353 };
1354 use std::{
1355 future::{poll_fn, Future},
1356 io,
1357 net::{IpAddr, Ipv4Addr, SocketAddr},
1358 pin::Pin,
1359 task::{Context, Poll},
1360 time::Duration,
1361 };
1362 use url::Host;
1363
1364 use super::PeersManager;
1365 use crate::{
1366 error::SessionError,
1367 peers::{
1368 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1369 PeerConnectionState,
1370 },
1371 session::PendingSessionHandshakeError,
1372 PeersConfig,
1373 };
1374
1375 struct PeerActionFuture<'a> {
1376 peers: &'a mut PeersManager,
1377 }
1378
1379 impl Future for PeerActionFuture<'_> {
1380 type Output = PeerAction;
1381
1382 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1383 self.get_mut().peers.poll(cx)
1384 }
1385 }
1386
1387 macro_rules! event {
1388 ($peers:expr) => {
1389 PeerActionFuture { peers: &mut $peers }.await
1390 };
1391 }
1392
1393 #[tokio::test]
1394 async fn test_insert() {
1395 let peer = PeerId::random();
1396 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1397 let mut peers = PeersManager::default();
1398 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1399
1400 match event!(peers) {
1401 PeerAction::PeerAdded(peer_id) => {
1402 assert_eq!(peer_id, peer);
1403 }
1404 _ => unreachable!(),
1405 }
1406 match event!(peers) {
1407 PeerAction::Connect { peer_id, remote_addr } => {
1408 assert_eq!(peer_id, peer);
1409 assert_eq!(remote_addr, socket_addr);
1410 }
1411 _ => unreachable!(),
1412 }
1413
1414 let (record, _) = peers.peer_by_id(peer).unwrap();
1415 assert_eq!(record.tcp_addr(), socket_addr);
1416 assert_eq!(record.udp_addr(), socket_addr);
1417 }
1418
1419 #[tokio::test]
1420 async fn test_insert_udp() {
1421 let peer = PeerId::random();
1422 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1423 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1424 let mut peers = PeersManager::default();
1425 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1426
1427 match event!(peers) {
1428 PeerAction::PeerAdded(peer_id) => {
1429 assert_eq!(peer_id, peer);
1430 }
1431 _ => unreachable!(),
1432 }
1433 match event!(peers) {
1434 PeerAction::Connect { peer_id, remote_addr } => {
1435 assert_eq!(peer_id, peer);
1436 assert_eq!(remote_addr, tcp_addr);
1437 }
1438 _ => unreachable!(),
1439 }
1440
1441 let (record, _) = peers.peer_by_id(peer).unwrap();
1442 assert_eq!(record.tcp_addr(), tcp_addr);
1443 assert_eq!(record.udp_addr(), udp_addr);
1444 }
1445
1446 #[tokio::test]
1447 async fn test_ban() {
1448 let peer = PeerId::random();
1449 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1450 let mut peers = PeersManager::default();
1451 peers.ban_peer(peer);
1452 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1453
1454 match event!(peers) {
1455 PeerAction::BanPeer { peer_id } => {
1456 assert_eq!(peer_id, peer);
1457 }
1458 _ => unreachable!(),
1459 }
1460
1461 poll_fn(|cx| {
1462 assert!(peers.poll(cx).is_pending());
1463 Poll::Ready(())
1464 })
1465 .await;
1466 }
1467
1468 #[tokio::test]
1469 async fn test_unban() {
1470 let peer = PeerId::random();
1471 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1472 let mut peers = PeersManager::default();
1473 peers.ban_peer(peer);
1474 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1475
1476 match event!(peers) {
1477 PeerAction::BanPeer { peer_id } => {
1478 assert_eq!(peer_id, peer);
1479 }
1480 _ => unreachable!(),
1481 }
1482
1483 poll_fn(|cx| {
1484 assert!(peers.poll(cx).is_pending());
1485 Poll::Ready(())
1486 })
1487 .await;
1488
1489 peers.unban_peer(peer);
1490
1491 match event!(peers) {
1492 PeerAction::UnBanPeer { peer_id } => {
1493 assert_eq!(peer_id, peer);
1494 }
1495 _ => unreachable!(),
1496 }
1497
1498 poll_fn(|cx| {
1499 assert!(peers.poll(cx).is_pending());
1500 Poll::Ready(())
1501 })
1502 .await;
1503 }
1504
1505 #[tokio::test]
1506 async fn test_backoff_on_busy() {
1507 let peer = PeerId::random();
1508 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1509
1510 let mut peers = PeersManager::new(PeersConfig::test());
1511 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1512
1513 match event!(peers) {
1514 PeerAction::PeerAdded(peer_id) => {
1515 assert_eq!(peer_id, peer);
1516 }
1517 _ => unreachable!(),
1518 }
1519 match event!(peers) {
1520 PeerAction::Connect { peer_id, .. } => {
1521 assert_eq!(peer_id, peer);
1522 }
1523 _ => unreachable!(),
1524 }
1525
1526 poll_fn(|cx| {
1527 assert!(peers.poll(cx).is_pending());
1528 Poll::Ready(())
1529 })
1530 .await;
1531
1532 peers.on_active_session_dropped(
1533 &socket_addr,
1534 &peer,
1535 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1536 DisconnectReason::TooManyPeers,
1537 )),
1538 );
1539
1540 poll_fn(|cx| {
1541 assert!(peers.poll(cx).is_pending());
1542 Poll::Ready(())
1543 })
1544 .await;
1545
1546 assert!(peers.backed_off_peers.contains_key(&peer));
1547 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1548
1549 tokio::time::sleep(peers.backoff_durations.low).await;
1550
1551 match event!(peers) {
1552 PeerAction::Connect { peer_id, .. } => {
1553 assert_eq!(peer_id, peer);
1554 }
1555 _ => unreachable!(),
1556 }
1557
1558 assert!(!peers.backed_off_peers.contains_key(&peer));
1559 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1560 }
1561
1562 #[tokio::test]
1563 async fn test_backoff_on_no_response() {
1564 let peer = PeerId::random();
1565 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1566
1567 let backoff_durations = PeerBackoffDurations::test();
1568 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1569 let mut peers = PeersManager::new(config);
1570 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1571
1572 match event!(peers) {
1573 PeerAction::PeerAdded(peer_id) => {
1574 assert_eq!(peer_id, peer);
1575 }
1576 _ => unreachable!(),
1577 }
1578 match event!(peers) {
1579 PeerAction::Connect { peer_id, .. } => {
1580 assert_eq!(peer_id, peer);
1581 }
1582 _ => unreachable!(),
1583 }
1584
1585 poll_fn(|cx| {
1586 assert!(peers.poll(cx).is_pending());
1587 Poll::Ready(())
1588 })
1589 .await;
1590
1591 peers.on_outgoing_pending_session_dropped(
1592 &socket_addr,
1593 &peer,
1594 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1595 EthHandshakeError::NoResponse,
1596 )),
1597 );
1598
1599 poll_fn(|cx| {
1600 assert!(peers.poll(cx).is_pending());
1601 Poll::Ready(())
1602 })
1603 .await;
1604
1605 assert!(peers.backed_off_peers.contains_key(&peer));
1606 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1607
1608 tokio::time::sleep(backoff_durations.high).await;
1609
1610 match event!(peers) {
1611 PeerAction::Connect { peer_id, .. } => {
1612 assert_eq!(peer_id, peer);
1613 }
1614 _ => unreachable!(),
1615 }
1616
1617 assert!(!peers.backed_off_peers.contains_key(&peer));
1618 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1619 }
1620
1621 #[tokio::test]
1622 async fn test_low_backoff() {
1623 let peer = PeerId::random();
1624 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1625 let config = PeersConfig::test();
1626 let mut peers = PeersManager::new(config);
1627 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1628 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1629
1630 let backoff_timestamp = peers
1631 .backoff_durations
1632 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1633
1634 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1635 assert!(backoff_timestamp <= expected);
1636 }
1637
1638 #[tokio::test]
1639 async fn test_multiple_backoff_calculations() {
1640 let peer = PeerId::random();
1641 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1642 let config = PeersConfig::default();
1643 let mut peers = PeersManager::new(config);
1644 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1645 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1646
1647 peer_struct.severe_backoff_counter = 1;
1649
1650 let now = std::time::Instant::now();
1651
1652 peer_struct.severe_backoff_counter += 1;
1654 let backoff_time = peers
1656 .backoff_durations
1657 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1658
1659 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1661
1662 assert!(backoff_time.duration_since(now) > backoff_duration);
1665 }
1666
1667 #[tokio::test]
1668 async fn test_ban_on_active_drop() {
1669 let peer = PeerId::random();
1670 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1671 let mut peers = PeersManager::default();
1672 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1673
1674 match event!(peers) {
1675 PeerAction::PeerAdded(peer_id) => {
1676 assert_eq!(peer_id, peer);
1677 }
1678 _ => unreachable!(),
1679 }
1680 match event!(peers) {
1681 PeerAction::Connect { peer_id, .. } => {
1682 assert_eq!(peer_id, peer);
1683 }
1684 _ => unreachable!(),
1685 }
1686
1687 poll_fn(|cx| {
1688 assert!(peers.poll(cx).is_pending());
1689 Poll::Ready(())
1690 })
1691 .await;
1692
1693 peers.on_active_session_dropped(
1694 &socket_addr,
1695 &peer,
1696 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1697 DisconnectReason::UselessPeer,
1698 )),
1699 );
1700
1701 match event!(peers) {
1702 PeerAction::PeerRemoved(peer_id) => {
1703 assert_eq!(peer_id, peer);
1704 }
1705 _ => unreachable!(),
1706 }
1707 match event!(peers) {
1708 PeerAction::BanPeer { peer_id } => {
1709 assert_eq!(peer_id, peer);
1710 }
1711 _ => unreachable!(),
1712 }
1713
1714 poll_fn(|cx| {
1715 assert!(peers.poll(cx).is_pending());
1716 Poll::Ready(())
1717 })
1718 .await;
1719
1720 assert!(!peers.peers.contains_key(&peer));
1721 }
1722
1723 #[tokio::test]
1724 async fn test_remove_on_max_backoff_count() {
1725 let peer = PeerId::random();
1726 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1727 let config = PeersConfig::test();
1728 let mut peers = PeersManager::new(config.clone());
1729 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1730 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1731
1732 peer_struct.severe_backoff_counter = config.max_backoff_count;
1734
1735 match event!(peers) {
1736 PeerAction::PeerAdded(peer_id) => {
1737 assert_eq!(peer_id, peer);
1738 }
1739 _ => unreachable!(),
1740 }
1741 match event!(peers) {
1742 PeerAction::Connect { peer_id, .. } => {
1743 assert_eq!(peer_id, peer);
1744 }
1745 _ => unreachable!(),
1746 }
1747
1748 poll_fn(|cx| {
1749 assert!(peers.poll(cx).is_pending());
1750 Poll::Ready(())
1751 })
1752 .await;
1753
1754 peers.on_outgoing_pending_session_dropped(
1755 &socket_addr,
1756 &peer,
1757 &PendingSessionHandshakeError::Eth(
1758 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1759 ),
1760 );
1761
1762 match event!(peers) {
1763 PeerAction::PeerRemoved(peer_id) => {
1764 assert_eq!(peer_id, peer);
1765 }
1766 _ => unreachable!(),
1767 }
1768
1769 poll_fn(|cx| {
1770 assert!(peers.poll(cx).is_pending());
1771 Poll::Ready(())
1772 })
1773 .await;
1774
1775 assert!(!peers.peers.contains_key(&peer));
1776 }
1777
1778 #[tokio::test]
1779 async fn test_ban_on_pending_drop() {
1780 let peer = PeerId::random();
1781 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1782 let mut peers = PeersManager::default();
1783 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1784
1785 match event!(peers) {
1786 PeerAction::PeerAdded(peer_id) => {
1787 assert_eq!(peer_id, peer);
1788 }
1789 _ => unreachable!(),
1790 }
1791 match event!(peers) {
1792 PeerAction::Connect { peer_id, .. } => {
1793 assert_eq!(peer_id, peer);
1794 }
1795 _ => unreachable!(),
1796 }
1797
1798 poll_fn(|cx| {
1799 assert!(peers.poll(cx).is_pending());
1800 Poll::Ready(())
1801 })
1802 .await;
1803
1804 peers.on_outgoing_pending_session_dropped(
1805 &socket_addr,
1806 &peer,
1807 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1808 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1809 )),
1810 );
1811
1812 match event!(peers) {
1813 PeerAction::PeerRemoved(peer_id) => {
1814 assert_eq!(peer_id, peer);
1815 }
1816 _ => unreachable!(),
1817 }
1818 match event!(peers) {
1819 PeerAction::BanPeer { peer_id } => {
1820 assert_eq!(peer_id, peer);
1821 }
1822 _ => unreachable!(),
1823 }
1824
1825 poll_fn(|cx| {
1826 assert!(peers.poll(cx).is_pending());
1827 Poll::Ready(())
1828 })
1829 .await;
1830
1831 assert!(!peers.peers.contains_key(&peer));
1832 }
1833
1834 #[tokio::test]
1835 async fn test_internally_closed_incoming() {
1836 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1837 let mut peers = PeersManager::default();
1838
1839 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1840 assert_eq!(peers.connection_info.num_pending_in, 1);
1841 peers.on_incoming_pending_session_rejected_internally();
1842 assert_eq!(peers.connection_info.num_pending_in, 0);
1843 }
1844
1845 #[tokio::test]
1846 async fn test_reject_incoming_at_pending_capacity() {
1847 let mut peers = PeersManager::default();
1848
1849 for count in 1..=peers.connection_info.config.max_inbound {
1850 let socket_addr =
1851 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1852 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1853 assert_eq!(peers.connection_info.num_pending_in, count);
1854 }
1855 assert!(peers.connection_info.has_in_capacity());
1856 assert!(!peers.connection_info.has_in_pending_capacity());
1857
1858 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1859 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1860 }
1861
1862 #[tokio::test]
1863 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1864 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1865 let trusted = PeerId::random();
1866 peers.add_trusted_peer_id(trusted);
1867
1868 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1870 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1871 peers.on_incoming_session_established(trusted, addr);
1872
1873 match event!(peers) {
1874 PeerAction::PeerAdded(id) => {
1875 assert_eq!(id, trusted);
1876 }
1877 _ => unreachable!(),
1878 }
1879
1880 let mut connected_untrusted_peer_ids = Vec::new();
1882 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1883 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1884 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1885 let peer_id = PeerId::random();
1886 peers.on_incoming_session_established(peer_id, addr);
1887 connected_untrusted_peer_ids.push(peer_id);
1888
1889 match event!(peers) {
1890 PeerAction::PeerAdded(id) => {
1891 assert_eq!(id, peer_id);
1892 }
1893 _ => unreachable!(),
1894 }
1895 }
1896
1897 let mut pending_addrs = Vec::new();
1898
1899 for i in 0..2 {
1901 let socket_addr =
1902 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1903 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1904
1905 pending_addrs.push(socket_addr);
1906 }
1907
1908 assert_eq!(peers.connection_info.num_pending_in, 2);
1909
1910 for i in 0..2 {
1912 let socket_addr =
1913 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1914 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1915 }
1916
1917 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1918 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1919 DisconnectReason::UselessPeer,
1920 )),
1921 ));
1922
1923 for pending_addr in pending_addrs {
1925 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1926 }
1927
1928 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1929
1930 println!(
1931 "num_inbound: {}, has_in_capacity: {}",
1932 peers.connection_info.num_inbound,
1933 peers.connection_info.has_in_capacity()
1934 );
1935
1936 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1938
1939 println!(
1940 "num_inbound: {}, has_in_capacity: {}",
1941 peers.connection_info.num_inbound,
1942 peers.connection_info.has_in_capacity()
1943 );
1944
1945 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1946 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1947 }
1948
1949 #[tokio::test]
1950 async fn test_closed_incoming() {
1951 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1952 let mut peers = PeersManager::default();
1953
1954 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1955 assert_eq!(peers.connection_info.num_pending_in, 1);
1956 peers.on_incoming_pending_session_gracefully_closed();
1957 assert_eq!(peers.connection_info.num_pending_in, 0);
1958 }
1959
1960 #[tokio::test]
1961 async fn test_dropped_incoming() {
1962 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1963 let ban_duration = Duration::from_millis(500);
1964 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1965 let mut peers = PeersManager::new(config);
1966
1967 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1968 assert_eq!(peers.connection_info.num_pending_in, 1);
1969 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1970 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1971 DisconnectReason::UselessPeer,
1972 )),
1973 ));
1974
1975 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1976 assert_eq!(peers.connection_info.num_pending_in, 0);
1977 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1978
1979 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1980
1981 tokio::time::sleep(ban_duration).await;
1983
1984 poll_fn(|cx| {
1985 let _ = peers.poll(cx);
1986 Poll::Ready(())
1987 })
1988 .await;
1989
1990 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1991 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1992 }
1993
1994 #[tokio::test]
1995 async fn test_reputation_change_connected() {
1996 let peer = PeerId::random();
1997 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1998 let mut peers = PeersManager::default();
1999 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2000
2001 match event!(peers) {
2002 PeerAction::PeerAdded(peer_id) => {
2003 assert_eq!(peer_id, peer);
2004 }
2005 _ => unreachable!(),
2006 }
2007 match event!(peers) {
2008 PeerAction::Connect { peer_id, remote_addr } => {
2009 assert_eq!(peer_id, peer);
2010 assert_eq!(remote_addr, socket_addr);
2011 }
2012 _ => unreachable!(),
2013 }
2014
2015 let p = peers.peers.get_mut(&peer).unwrap();
2016 assert_eq!(p.state, PeerConnectionState::PendingOut);
2017
2018 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
2019
2020 let p = peers.peers.get(&peer).unwrap();
2021 assert_eq!(p.state, PeerConnectionState::PendingOut);
2022 assert!(p.is_banned());
2023
2024 peers.on_active_session_gracefully_closed(peer);
2025
2026 let p = peers.peers.get(&peer).unwrap();
2027 assert_eq!(p.state, PeerConnectionState::Idle);
2028 assert!(p.is_banned());
2029
2030 match event!(peers) {
2031 PeerAction::Disconnect { peer_id, .. } => {
2032 assert_eq!(peer_id, peer);
2033 }
2034 _ => unreachable!(),
2035 }
2036 }
2037
2038 #[tokio::test]
2039 async fn retain_trusted_status() {
2040 let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2041 let trusted = PeerId::random();
2042 let mut peers =
2043 PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2044 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2045 tcp_port: 8008,
2046 udp_port: 8008,
2047 id: trusted,
2048 }]));
2049 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2050 peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
2051 assert!(peers.peers.get(&trusted).unwrap().is_trusted());
2052 }
2053
2054 #[tokio::test]
2055 async fn accept_incoming_trusted_unknown_peer_address() {
2056 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2057 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2058 let trusted = PeerId::random();
2060 peers.add_trusted_peer_id(trusted);
2061
2062 for i in 0..peers.connection_info.config.max_inbound {
2064 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
2065 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2066 let peer_id = PeerId::random();
2067 peers.on_incoming_session_established(peer_id, addr);
2068
2069 match event!(peers) {
2070 PeerAction::PeerAdded(id) => {
2071 assert_eq!(id, peer_id);
2072 }
2073 _ => unreachable!(),
2074 }
2075 }
2076
2077 let untrusted = PeerId::random();
2079 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2080 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2081 peers.on_incoming_session_established(untrusted, socket_addr);
2082
2083 match event!(peers) {
2084 PeerAction::PeerAdded(id) => {
2085 assert_eq!(id, untrusted);
2086 }
2087 _ => unreachable!(),
2088 }
2089
2090 match event!(peers) {
2091 PeerAction::Disconnect { peer_id, reason } => {
2092 assert_eq!(peer_id, untrusted);
2093 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2094 }
2095 _ => unreachable!(),
2096 }
2097
2098 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2099 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2100 peers.on_incoming_session_established(trusted, socket_addr);
2101
2102 match event!(peers) {
2103 PeerAction::PeerAdded(id) => {
2104 assert_eq!(id, trusted);
2105 }
2106 _ => unreachable!(),
2107 }
2108
2109 poll_fn(|cx| {
2110 assert!(peers.poll(cx).is_pending());
2111 Poll::Ready(())
2112 })
2113 .await;
2114
2115 let peer = peers.peers.get(&trusted).unwrap();
2116 assert_eq!(peer.state, PeerConnectionState::In);
2117 }
2118
2119 #[tokio::test]
2120 async fn test_already_connected() {
2121 let peer = PeerId::random();
2122 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2123 let mut peers = PeersManager::default();
2124
2125 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2127 assert_eq!(peers.connection_info.num_pending_in, 1);
2128
2129 peers.on_incoming_session_established(peer, socket_addr);
2132 let p = peers.peers.get_mut(&peer).expect("peer not found");
2133 assert_eq!(p.addr.tcp(), socket_addr);
2134 assert_eq!(peers.connection_info.num_pending_in, 0);
2135 assert_eq!(peers.connection_info.num_inbound, 1);
2136
2137 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2140 assert_eq!(peers.connection_info.num_pending_in, 1);
2141
2142 peers.on_already_connected(Direction::Incoming);
2146
2147 let p = peers.peers.get_mut(&peer).expect("peer not found");
2148 assert_eq!(p.addr.tcp(), socket_addr);
2149 assert_eq!(peers.connection_info.num_pending_in, 0);
2150 assert_eq!(peers.connection_info.num_inbound, 1);
2151 }
2152
2153 #[tokio::test]
2154 async fn test_reputation_change_trusted_peer() {
2155 let peer = PeerId::random();
2156 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2157 let mut peers = PeersManager::default();
2158 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2159
2160 match event!(peers) {
2161 PeerAction::PeerAdded(peer_id) => {
2162 assert_eq!(peer_id, peer);
2163 }
2164 _ => unreachable!(),
2165 }
2166 match event!(peers) {
2167 PeerAction::Connect { peer_id, remote_addr } => {
2168 assert_eq!(peer_id, peer);
2169 assert_eq!(remote_addr, socket_addr);
2170 }
2171 _ => unreachable!(),
2172 }
2173
2174 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2175 peers.on_active_outgoing_established(peer);
2176 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2177
2178 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2179
2180 {
2181 let p = peers.peers.get(&peer).unwrap();
2182 assert_eq!(p.state, PeerConnectionState::Out);
2183 assert!(!p.is_banned());
2185 }
2186
2187 loop {
2189 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2190
2191 let p = peers.peers.get(&peer).unwrap();
2192 if p.is_banned() {
2193 break
2194 }
2195 }
2196
2197 match event!(peers) {
2198 PeerAction::Disconnect { peer_id, .. } => {
2199 assert_eq!(peer_id, peer);
2200 }
2201 _ => unreachable!(),
2202 }
2203 }
2204
2205 #[tokio::test]
2206 async fn test_reputation_management() {
2207 let peer = PeerId::random();
2208 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2209 let mut peers = PeersManager::default();
2210 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2211 assert_eq!(peers.get_reputation(&peer), Some(0));
2212
2213 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2214 assert_eq!(peers.get_reputation(&peer), Some(1024));
2215
2216 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2217 assert_eq!(peers.get_reputation(&peer), Some(0));
2218 }
2219
2220 #[tokio::test]
2221 async fn test_remove_discovered_active() {
2222 let peer = PeerId::random();
2223 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2224 let mut peers = PeersManager::default();
2225 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2226
2227 match event!(peers) {
2228 PeerAction::PeerAdded(peer_id) => {
2229 assert_eq!(peer_id, peer);
2230 }
2231 _ => unreachable!(),
2232 }
2233 match event!(peers) {
2234 PeerAction::Connect { peer_id, remote_addr } => {
2235 assert_eq!(peer_id, peer);
2236 assert_eq!(remote_addr, socket_addr);
2237 }
2238 _ => unreachable!(),
2239 }
2240
2241 let p = peers.peers.get(&peer).unwrap();
2242 assert_eq!(p.state, PeerConnectionState::PendingOut);
2243
2244 peers.remove_peer(peer);
2245
2246 match event!(peers) {
2247 PeerAction::PeerRemoved(peer_id) => {
2248 assert_eq!(peer_id, peer);
2249 }
2250 _ => unreachable!(),
2251 }
2252 match event!(peers) {
2253 PeerAction::Disconnect { peer_id, .. } => {
2254 assert_eq!(peer_id, peer);
2255 }
2256 _ => unreachable!(),
2257 }
2258
2259 let p = peers.peers.get(&peer).unwrap();
2260 assert_eq!(p.state, PeerConnectionState::PendingOut);
2261
2262 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2263 let p = peers.peers.get(&peer).unwrap();
2264 assert_eq!(p.state, PeerConnectionState::PendingOut);
2265
2266 peers.on_active_session_gracefully_closed(peer);
2267 assert!(!peers.peers.contains_key(&peer));
2268 }
2269
2270 #[tokio::test]
2271 async fn test_fatal_outgoing_connection_error_trusted() {
2272 let peer = PeerId::random();
2273 let config = PeersConfig::test()
2274 .with_trusted_nodes(vec![TrustedPeer {
2275 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2276 tcp_port: 8008,
2277 udp_port: 8008,
2278 id: peer,
2279 }])
2280 .with_trusted_nodes_only(true);
2281 let mut peers = PeersManager::new(config);
2282 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2283
2284 match event!(peers) {
2285 PeerAction::Connect { peer_id, remote_addr } => {
2286 assert_eq!(peer_id, peer);
2287 assert_eq!(remote_addr, socket_addr);
2288 }
2289 _ => unreachable!(),
2290 }
2291
2292 let p = peers.peers.get(&peer).unwrap();
2293 assert_eq!(p.state, PeerConnectionState::PendingOut);
2294
2295 assert_eq!(peers.num_outbound_connections(), 0);
2296
2297 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2298 EthHandshakeError::NonStatusMessageInHandshake,
2299 ));
2300 assert!(err.is_fatal_protocol_error());
2301
2302 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2303 assert_eq!(peers.num_outbound_connections(), 0);
2304
2305 match event!(peers) {
2307 PeerAction::BanPeer { peer_id } => {
2308 assert_eq!(peer_id, peer);
2309 }
2310 err => unreachable!("{err:?}"),
2311 }
2312
2313 assert!(peers.peers.contains_key(&peer));
2315
2316 tokio::time::sleep(peers.backoff_durations.medium).await;
2318
2319 match event!(peers) {
2320 PeerAction::Connect { peer_id, remote_addr } => {
2321 assert_eq!(peer_id, peer);
2322 assert_eq!(remote_addr, socket_addr);
2323 }
2324 err => unreachable!("{err:?}"),
2325 }
2326 }
2327
2328 #[tokio::test]
2329 async fn test_outgoing_connection_error() {
2330 let peer = PeerId::random();
2331 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2332 let mut peers = PeersManager::default();
2333 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2334
2335 match event!(peers) {
2336 PeerAction::PeerAdded(peer_id) => {
2337 assert_eq!(peer_id, peer);
2338 }
2339 _ => unreachable!(),
2340 }
2341 match event!(peers) {
2342 PeerAction::Connect { peer_id, remote_addr } => {
2343 assert_eq!(peer_id, peer);
2344 assert_eq!(remote_addr, socket_addr);
2345 }
2346 _ => unreachable!(),
2347 }
2348
2349 let p = peers.peers.get(&peer).unwrap();
2350 assert_eq!(p.state, PeerConnectionState::PendingOut);
2351
2352 assert_eq!(peers.num_outbound_connections(), 0);
2353
2354 peers.on_outgoing_connection_failure(
2355 &socket_addr,
2356 &peer,
2357 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2358 );
2359
2360 assert_eq!(peers.num_outbound_connections(), 0);
2361 }
2362
2363 #[tokio::test]
2364 async fn test_outgoing_connection_gracefully_closed() {
2365 let peer = PeerId::random();
2366 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2367 let mut peers = PeersManager::default();
2368 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2369
2370 match event!(peers) {
2371 PeerAction::PeerAdded(peer_id) => {
2372 assert_eq!(peer_id, peer);
2373 }
2374 _ => unreachable!(),
2375 }
2376 match event!(peers) {
2377 PeerAction::Connect { peer_id, remote_addr } => {
2378 assert_eq!(peer_id, peer);
2379 assert_eq!(remote_addr, socket_addr);
2380 }
2381 _ => unreachable!(),
2382 }
2383
2384 let p = peers.peers.get(&peer).unwrap();
2385 assert_eq!(p.state, PeerConnectionState::PendingOut);
2386
2387 assert_eq!(peers.num_outbound_connections(), 0);
2388
2389 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2390
2391 assert_eq!(peers.num_outbound_connections(), 0);
2392 assert_eq!(peers.connection_info.num_pending_out, 0);
2393 }
2394
2395 #[tokio::test]
2396 async fn test_discovery_ban_list() {
2397 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2398 let socket_addr = SocketAddr::new(ip, 8008);
2399 let ban_list = BanList::new(vec![], vec![ip]);
2400 let config = PeersConfig::default().with_ban_list(ban_list);
2401 let mut peer_manager = PeersManager::new(config);
2402 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2403
2404 assert!(peer_manager.peers.is_empty());
2405 }
2406
2407 #[tokio::test]
2408 async fn test_on_pending_ban_list() {
2409 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2410 let socket_addr = SocketAddr::new(ip, 8008);
2411 let ban_list = BanList::new(vec![], vec![ip]);
2412 let config = PeersConfig::test().with_ban_list(ban_list);
2413 let mut peer_manager = PeersManager::new(config);
2414 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2415 match a {
2417 Ok(_) => panic!(),
2418 Err(err) => match err {
2419 InboundConnectionError::IpBanned => {
2420 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2421 }
2422 _ => unreachable!(),
2423 },
2424 }
2425 }
2426
2427 #[tokio::test]
2428 async fn test_on_active_inbound_ban_list() {
2429 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2430 let socket_addr = SocketAddr::new(ip, 8008);
2431 let given_peer_id = PeerId::random();
2432 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2433 let config = PeersConfig::test().with_ban_list(ban_list);
2434 let mut peer_manager = PeersManager::new(config);
2435 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2436 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2438 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2439 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2442 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2443
2444 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2445 peer_manager.queued_actions.pop_front()
2446 else {
2447 panic!()
2448 };
2449
2450 assert_eq!(peer_id, given_peer_id)
2451 }
2452
2453 #[test]
2454 fn test_connection_limits() {
2455 let mut info = ConnectionInfo::default();
2456 info.inc_in();
2457 assert_eq!(info.num_inbound, 1);
2458 assert_eq!(info.num_outbound, 0);
2459 assert!(info.has_in_capacity());
2460
2461 info.decr_in();
2462 assert_eq!(info.num_inbound, 0);
2463 assert_eq!(info.num_outbound, 0);
2464
2465 info.inc_out();
2466 assert_eq!(info.num_inbound, 0);
2467 assert_eq!(info.num_outbound, 1);
2468 assert!(info.has_out_capacity());
2469
2470 info.decr_out();
2471 assert_eq!(info.num_inbound, 0);
2472 assert_eq!(info.num_outbound, 0);
2473 }
2474
2475 #[test]
2476 fn test_connection_peer_state() {
2477 let mut info = ConnectionInfo::default();
2478 info.inc_in();
2479
2480 info.decr_state(PeerConnectionState::In);
2481 assert_eq!(info.num_inbound, 0);
2482 assert_eq!(info.num_outbound, 0);
2483
2484 info.inc_out();
2485
2486 info.decr_state(PeerConnectionState::Out);
2487 assert_eq!(info.num_inbound, 0);
2488 assert_eq!(info.num_outbound, 0);
2489 }
2490
2491 #[tokio::test]
2492 async fn test_trusted_peers_are_prioritized() {
2493 let trusted_peer = PeerId::random();
2494 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2495 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2496 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2497 tcp_port: 8008,
2498 udp_port: 8008,
2499 id: trusted_peer,
2500 }]);
2501 let mut peers = PeersManager::new(config);
2502
2503 let basic_peer = PeerId::random();
2504 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2505 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2506
2507 match event!(peers) {
2508 PeerAction::PeerAdded(peer_id) => {
2509 assert_eq!(peer_id, basic_peer);
2510 }
2511 _ => unreachable!(),
2512 }
2513 match event!(peers) {
2514 PeerAction::Connect { peer_id, remote_addr } => {
2515 assert_eq!(peer_id, trusted_peer);
2516 assert_eq!(remote_addr, trusted_sock);
2517 }
2518 _ => unreachable!(),
2519 }
2520 match event!(peers) {
2521 PeerAction::Connect { peer_id, remote_addr } => {
2522 assert_eq!(peer_id, basic_peer);
2523 assert_eq!(remote_addr, basic_sock);
2524 }
2525 _ => unreachable!(),
2526 }
2527 }
2528
2529 #[tokio::test]
2530 async fn test_connect_trusted_nodes_only() {
2531 let trusted_peer = PeerId::random();
2532 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2533 let config = PeersConfig::test()
2534 .with_trusted_nodes(vec![TrustedPeer {
2535 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2536 tcp_port: 8008,
2537 udp_port: 8008,
2538 id: trusted_peer,
2539 }])
2540 .with_trusted_nodes_only(true);
2541 let mut peers = PeersManager::new(config);
2542
2543 let basic_peer = PeerId::random();
2544 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2545 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2546
2547 match event!(peers) {
2548 PeerAction::PeerAdded(peer_id) => {
2549 assert_eq!(peer_id, basic_peer);
2550 }
2551 _ => unreachable!(),
2552 }
2553 match event!(peers) {
2554 PeerAction::Connect { peer_id, remote_addr } => {
2555 assert_eq!(peer_id, trusted_peer);
2556 assert_eq!(remote_addr, trusted_sock);
2557 }
2558 _ => unreachable!(),
2559 }
2560 poll_fn(|cx| {
2561 assert!(peers.poll(cx).is_pending());
2562 Poll::Ready(())
2563 })
2564 .await;
2565 }
2566
2567 #[tokio::test]
2568 async fn test_incoming_with_trusted_nodes_only() {
2569 let trusted_peer = PeerId::random();
2570 let config = PeersConfig::test()
2571 .with_trusted_nodes(vec![TrustedPeer {
2572 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2573 tcp_port: 8008,
2574 udp_port: 8008,
2575 id: trusted_peer,
2576 }])
2577 .with_trusted_nodes_only(true);
2578 let mut peers = PeersManager::new(config);
2579
2580 let basic_peer = PeerId::random();
2581 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2582 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2583 assert_eq!(peers.connection_info.num_pending_in, 1);
2585 peers.on_incoming_session_established(basic_peer, basic_sock);
2586 assert_eq!(peers.connection_info.num_pending_in, 0);
2589 assert_eq!(peers.connection_info.num_inbound, 0);
2590
2591 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2592 peers.queued_actions.pop_front()
2593 else {
2594 panic!()
2595 };
2596 assert_eq!(basic_peer, peer_id);
2597 assert!(!peers.peers.contains_key(&basic_peer));
2598 }
2599
2600 #[tokio::test]
2601 async fn test_incoming_without_trusted_nodes_only() {
2602 let trusted_peer = PeerId::random();
2603 let config = PeersConfig::test()
2604 .with_trusted_nodes(vec![TrustedPeer {
2605 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2606 tcp_port: 8008,
2607 udp_port: 8008,
2608 id: trusted_peer,
2609 }])
2610 .with_trusted_nodes_only(false);
2611 let mut peers = PeersManager::new(config);
2612
2613 let basic_peer = PeerId::random();
2614 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2615 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2616
2617 assert_eq!(peers.connection_info.num_pending_in, 1);
2619 peers.on_incoming_session_established(basic_peer, basic_sock);
2620 assert_eq!(peers.connection_info.num_pending_in, 0);
2623 assert_eq!(peers.connection_info.num_inbound, 1);
2624 assert!(peers.peers.contains_key(&basic_peer));
2625 }
2626
2627 #[tokio::test]
2628 async fn test_incoming_at_capacity() {
2629 let mut config = PeersConfig::test();
2630 config.connection_info.max_inbound = 1;
2631 let mut peers = PeersManager::new(config);
2632
2633 let peer = PeerId::random();
2634 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2635 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2636
2637 peers.on_incoming_session_established(peer, addr);
2638
2639 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2640 assert_eq!(
2641 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2642 InboundConnectionError::ExceedsCapacity
2643 );
2644 }
2645
2646 #[tokio::test]
2647 async fn test_incoming_rate_limit() {
2648 let config = PeersConfig {
2649 incoming_ip_throttle_duration: Duration::from_millis(100),
2650 ..PeersConfig::test()
2651 };
2652 let mut peers = PeersManager::new(config);
2653
2654 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2655 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2656 assert_eq!(
2657 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2658 InboundConnectionError::IpBanned
2659 );
2660
2661 peers.release_interval.reset_immediately();
2662 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2663
2664 poll_fn(|cx| loop {
2666 if peers.poll(cx).is_pending() {
2667 return Poll::Ready(());
2668 }
2669 })
2670 .await;
2671
2672 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2673 assert_eq!(
2674 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2675 InboundConnectionError::IpBanned
2676 );
2677 }
2678
2679 #[tokio::test]
2680 async fn test_tick() {
2681 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2682 let socket_addr = SocketAddr::new(ip, 8008);
2683 let config = PeersConfig::test();
2684 let mut peer_manager = PeersManager::new(config);
2685 let peer_id = PeerId::random();
2686 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2687
2688 tokio::time::sleep(Duration::from_secs(1)).await;
2689 peer_manager.tick();
2690
2691 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2693
2694 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2696
2697 tokio::time::sleep(Duration::from_secs(1)).await;
2698 peer_manager.tick();
2699
2700 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2702
2703 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2704
2705 tokio::time::sleep(Duration::from_secs(1)).await;
2706 peer_manager.tick();
2707
2708 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2710 }
2711
2712 #[tokio::test]
2713 async fn test_remove_incoming_after_disconnect() {
2714 let peer_id = PeerId::random();
2715 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2716 let mut peers = PeersManager::default();
2717
2718 peers.on_incoming_pending_session(addr.ip()).unwrap();
2719 peers.on_incoming_session_established(peer_id, addr);
2720 let peer = peers.peers.get(&peer_id).unwrap();
2721 assert_eq!(peer.state, PeerConnectionState::In);
2722 assert!(peer.remove_after_disconnect);
2723
2724 peers.on_active_session_gracefully_closed(peer_id);
2725 assert!(!peers.peers.contains_key(&peer_id))
2726 }
2727
2728 #[tokio::test]
2729 async fn test_keep_incoming_after_disconnect_if_discovered() {
2730 let peer_id = PeerId::random();
2731 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2732 let mut peers = PeersManager::default();
2733
2734 peers.on_incoming_pending_session(addr.ip()).unwrap();
2735 peers.on_incoming_session_established(peer_id, addr);
2736 let peer = peers.peers.get(&peer_id).unwrap();
2737 assert_eq!(peer.state, PeerConnectionState::In);
2738 assert!(peer.remove_after_disconnect);
2739
2740 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2742
2743 peers.on_active_session_gracefully_closed(peer_id);
2744
2745 let peer = peers.peers.get(&peer_id).unwrap();
2746 assert_eq!(peer.state, PeerConnectionState::Idle);
2747 assert!(!peer.remove_after_disconnect);
2748 }
2749
2750 #[tokio::test]
2751 async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2752 let throttle_duration = Duration::from_millis(100);
2753 let config =
2754 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2755 let mut peers = PeersManager::new(config);
2756
2757 let peer_id = PeerId::random();
2758 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2759
2760 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2762
2763 match event!(peers) {
2764 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2765 _ => unreachable!(),
2766 }
2767
2768 match event!(peers) {
2769 PeerAction::Connect { .. } => {}
2770 _ => unreachable!(),
2771 }
2772
2773 peers.on_active_outgoing_established(peer_id);
2775 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2776
2777 peers.on_active_session_gracefully_closed(peer_id);
2779
2780 let peer = peers.peers.get(&peer_id).unwrap();
2781 assert_eq!(peer.state, PeerConnectionState::Idle);
2782 assert!(peer.backed_off);
2783
2784 assert!(peers.backed_off_peers.contains_key(&peer_id));
2786
2787 poll_fn(|cx| {
2789 assert!(peers.poll(cx).is_pending());
2790 Poll::Ready(())
2791 })
2792 .await;
2793
2794 assert!(peers.backed_off_peers.contains_key(&peer_id));
2796 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2797
2798 tokio::time::sleep(throttle_duration).await;
2800
2801 match event!(peers) {
2803 PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
2804 _ => unreachable!(),
2805 }
2806
2807 assert!(!peers.backed_off_peers.contains_key(&peer_id));
2809 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2810 }
2811
2812 #[tokio::test]
2813 async fn test_backed_off_peer_can_accept_incoming_connection() {
2814 let throttle_duration = Duration::from_millis(100);
2815 let config =
2816 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2817 let mut peers = PeersManager::new(config);
2818
2819 let peer_id = PeerId::random();
2820 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2821
2822 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2824
2825 match event!(peers) {
2826 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2827 _ => unreachable!(),
2828 }
2829
2830 match event!(peers) {
2831 PeerAction::Connect { .. } => {}
2832 _ => unreachable!(),
2833 }
2834
2835 peers.on_active_outgoing_established(peer_id);
2837 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2838
2839 peers.on_active_session_gracefully_closed(peer_id);
2841
2842 let peer = peers.peers.get(&peer_id).unwrap();
2843 assert_eq!(peer.state, PeerConnectionState::Idle);
2844 assert!(peer.backed_off);
2845 assert!(peers.backed_off_peers.contains_key(&peer_id));
2846
2847 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2850 assert_eq!(peers.connection_info.num_pending_in, 1);
2851
2852 peers.on_incoming_session_established(peer_id, addr);
2854
2855 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2857 assert_eq!(peers.connection_info.num_inbound, 1);
2858
2859 assert!(peers.backed_off_peers.contains_key(&peer_id));
2861 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2862
2863 poll_fn(|cx| {
2865 assert!(peers.poll(cx).is_pending());
2866 Poll::Ready(())
2867 })
2868 .await;
2869
2870 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2872
2873 tokio::time::sleep(throttle_duration).await;
2875
2876 poll_fn(|cx| {
2877 let _ = peers.poll(cx);
2878 Poll::Ready(())
2879 })
2880 .await;
2881
2882 assert!(!peers.backed_off_peers.contains_key(&peer_id));
2884 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2885
2886 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2888 }
2889
2890 #[tokio::test]
2891 async fn test_incoming_outgoing_already_connected() {
2892 let peer_id = PeerId::random();
2893 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2894 let mut peers = PeersManager::default();
2895
2896 peers.on_incoming_pending_session(addr.ip()).unwrap();
2897 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2898
2899 match event!(peers) {
2900 PeerAction::PeerAdded(_) => {}
2901 _ => unreachable!(),
2902 }
2903
2904 match event!(peers) {
2905 PeerAction::Connect { .. } => {}
2906 _ => unreachable!(),
2907 }
2908
2909 peers.on_incoming_session_established(peer_id, addr);
2910 peers.on_already_connected(Direction::Outgoing(peer_id));
2911 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2912 assert_eq!(peers.connection_info.num_inbound, 1);
2913 assert_eq!(peers.connection_info.num_pending_out, 0);
2914 assert_eq!(peers.connection_info.num_pending_in, 0);
2915 assert_eq!(peers.connection_info.num_outbound, 0);
2916 }
2917
2918 #[tokio::test]
2919 async fn test_already_connected_incoming_outgoing_connection_error() {
2920 let peer_id = PeerId::random();
2921 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2922 let mut peers = PeersManager::default();
2923
2924 peers.on_incoming_pending_session(addr.ip()).unwrap();
2925 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2926
2927 match event!(peers) {
2928 PeerAction::PeerAdded(_) => {}
2929 _ => unreachable!(),
2930 }
2931
2932 match event!(peers) {
2933 PeerAction::Connect { .. } => {}
2934 _ => unreachable!(),
2935 }
2936
2937 peers.on_incoming_session_established(peer_id, addr);
2938
2939 peers.on_outgoing_connection_failure(
2940 &addr,
2941 &peer_id,
2942 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2943 );
2944 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2945 assert_eq!(peers.connection_info.num_inbound, 1);
2946 assert_eq!(peers.connection_info.num_pending_out, 0);
2947 assert_eq!(peers.connection_info.num_pending_in, 0);
2948 assert_eq!(peers.connection_info.num_outbound, 0);
2949 }
2950
2951 #[tokio::test]
2952 async fn test_max_concurrent_dials() {
2953 let config = PeersConfig::default();
2954 let mut peer_manager = PeersManager::new(config);
2955 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2956 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2957 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2958 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2959 }
2960
2961 peer_manager.fill_outbound_slots();
2962 let dials = peer_manager
2963 .queued_actions
2964 .iter()
2965 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2966 .count();
2967 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2968 }
2969
2970 #[tokio::test]
2971 async fn test_max_num_of_pending_dials() {
2972 let config = PeersConfig::default();
2973 let mut peer_manager = PeersManager::new(config);
2974 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2975 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2976
2977 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2979 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2980 }
2981
2982 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2983 match event!(peer_manager) {
2984 PeerAction::PeerAdded(_) => {}
2985 _ => unreachable!(),
2986 }
2987 }
2988
2989 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2990 match event!(peer_manager) {
2991 PeerAction::Connect { .. } => {}
2992 _ => unreachable!(),
2993 }
2994 }
2995
2996 peer_manager.fill_outbound_slots();
2998
2999 let dials = peer_manager.connection_info.num_pending_out;
3001 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
3002
3003 let num_pendingout_states = peer_manager
3004 .peers
3005 .iter()
3006 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
3007 .map(|(peer_id, _)| *peer_id)
3008 .collect::<Vec<PeerId>>();
3009 assert_eq!(
3010 num_pendingout_states.len(),
3011 peer_manager.connection_info.config.max_concurrent_outbound_dials
3012 );
3013
3014 for peer_id in &num_pendingout_states {
3016 peer_manager.on_active_outgoing_established(*peer_id);
3017 }
3018
3019 for peer_id in &num_pendingout_states {
3021 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
3022 }
3023
3024 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
3026 }
3027
3028 #[tokio::test]
3029 async fn test_connect() {
3030 let peer = PeerId::random();
3031 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3032 let mut peers = PeersManager::default();
3033 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3034 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3035
3036 match event!(peers) {
3037 PeerAction::Connect { peer_id, remote_addr } => {
3038 assert_eq!(peer_id, peer);
3039 assert_eq!(remote_addr, socket_addr);
3040 }
3041 _ => unreachable!(),
3042 }
3043
3044 let (record, _) = peers.peer_by_id(peer).unwrap();
3045 assert_eq!(record.tcp_addr(), socket_addr);
3046 assert_eq!(record.udp_addr(), socket_addr);
3047
3048 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3050
3051 let (record, _) = peers.peer_by_id(peer).unwrap();
3052 assert_eq!(record.tcp_addr(), socket_addr);
3053 assert_eq!(record.udp_addr(), socket_addr);
3054 }
3055
3056 #[tokio::test]
3057 async fn test_incoming_connection_from_banned() {
3058 let peer = PeerId::random();
3059 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3060 let config = PeersConfig::test().with_max_inbound(3);
3061 let mut peers = PeersManager::new(config);
3062 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
3063
3064 match event!(peers) {
3065 PeerAction::PeerAdded(peer_id) => {
3066 assert_eq!(peer_id, peer);
3067 }
3068 _ => unreachable!(),
3069 }
3070 match event!(peers) {
3071 PeerAction::Connect { peer_id, .. } => {
3072 assert_eq!(peer_id, peer);
3073 }
3074 _ => unreachable!(),
3075 }
3076
3077 poll_fn(|cx| {
3078 assert!(peers.poll(cx).is_pending());
3079 Poll::Ready(())
3080 })
3081 .await;
3082
3083 loop {
3085 peers.on_active_session_dropped(
3086 &socket_addr,
3087 &peer,
3088 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3089 reth_eth_wire::EthVersion::Eth68,
3090 reth_eth_wire::EthMessageID::Status,
3091 )),
3092 );
3093
3094 if peers.peers.get(&peer).unwrap().is_banned() {
3095 break;
3096 }
3097
3098 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3099 peers.on_incoming_session_established(peer, socket_addr);
3100
3101 match event!(peers) {
3102 PeerAction::Connect { peer_id, .. } => {
3103 assert_eq!(peer_id, peer);
3104 }
3105 _ => unreachable!(),
3106 }
3107 }
3108
3109 assert!(peers.peers.get(&peer).unwrap().is_banned());
3110
3111 for _ in 0..peers.connection_info.config.max_inbound {
3113 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3114 peers.on_incoming_session_established(peer, socket_addr);
3115
3116 match event!(peers) {
3117 PeerAction::DisconnectBannedIncoming { peer_id } => {
3118 assert_eq!(peer_id, peer);
3119 }
3120 _ => unreachable!(),
3121 }
3122 }
3123
3124 poll_fn(|cx| {
3125 assert!(peers.poll(cx).is_pending());
3126 Poll::Ready(())
3127 })
3128 .await;
3129
3130 assert_eq!(peers.connection_info.num_inbound, 0);
3131
3132 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3133
3134 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3136 assert_eq!(peers.connection_info.num_pending_in, 1);
3137
3138 peers.on_active_session_gracefully_closed(peer);
3142 assert_eq!(peers.connection_info.num_inbound, 0);
3143 }
3144
3145 #[tokio::test]
3146 async fn test_add_pending_connect() {
3147 let peer = PeerId::random();
3148 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3149 let mut peers = PeersManager::default();
3150 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3151 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3152 assert_eq!(peers.connection_info.num_pending_out, 1);
3153 }
3154
3155 #[tokio::test]
3156 async fn test_dns_updates_peer_address() {
3157 let peer_id = PeerId::random();
3158 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3159 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3160
3161 let trusted = TrustedPeer {
3162 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3163 tcp_port: 8008,
3164 udp_port: 8008,
3165 id: peer_id,
3166 };
3167
3168 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3169 let mut manager = PeersManager::new(config);
3170 manager
3171 .trusted_peers_resolver
3172 .set_interval(tokio::time::interval(Duration::from_millis(1)));
3173
3174 manager.peers.insert(
3175 peer_id,
3176 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3177 );
3178
3179 for _ in 0..100 {
3180 let _ = event!(manager);
3181 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3182 break;
3183 }
3184 tokio::time::sleep(Duration::from_millis(10)).await;
3185 }
3186
3187 let updated_peer = manager.peers.get(&peer_id).unwrap();
3188 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3189 }
3190
3191 #[tokio::test]
3192 async fn test_ip_filter_blocks_inbound_connection() {
3193 use reth_net_banlist::IpFilter;
3194 use std::net::IpAddr;
3195
3196 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3198 let config = PeersConfig::test().with_ip_filter(ip_filter);
3199 let mut peers = PeersManager::new(config);
3200
3201 let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3203 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3204
3205 let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3207 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3208 }
3209
3210 #[tokio::test]
3211 async fn test_ip_filter_blocks_outbound_connection() {
3212 use reth_net_banlist::IpFilter;
3213 use std::net::SocketAddr;
3214
3215 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3217 let config = PeersConfig::test().with_ip_filter(ip_filter);
3218 let mut peers = PeersManager::new(config);
3219
3220 let peer_id = PeerId::new([1; 64]);
3221
3222 let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3224 peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3225 assert!(peers.peers.contains_key(&peer_id));
3226
3227 let peer_id2 = PeerId::new([2; 64]);
3229 let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3230 peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3231 assert!(!peers.peers.contains_key(&peer_id2));
3232 }
3233
3234 #[tokio::test]
3235 async fn test_ip_filter_ipv6() {
3236 use reth_net_banlist::IpFilter;
3237 use std::net::IpAddr;
3238
3239 let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3241 let config = PeersConfig::test().with_ip_filter(ip_filter);
3242 let mut peers = PeersManager::new(config);
3243
3244 let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3246 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3247
3248 let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3250 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3251 }
3252
3253 #[tokio::test]
3254 async fn test_ip_filter_multiple_ranges() {
3255 use reth_net_banlist::IpFilter;
3256 use std::net::IpAddr;
3257
3258 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3260 let config = PeersConfig::test().with_ip_filter(ip_filter);
3261 let mut peers = PeersManager::new(config);
3262
3263 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3265 let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3266 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3267 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3268
3269 let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3271 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3272 }
3273
3274 #[tokio::test]
3275 async fn test_ip_filter_no_restriction() {
3276 use reth_net_banlist::IpFilter;
3277 use std::net::IpAddr;
3278
3279 let ip_filter = IpFilter::allow_all();
3281 let config = PeersConfig::test().with_ip_filter(ip_filter);
3282 let mut peers = PeersManager::new(config);
3283
3284 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3286 let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3287 let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3288 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3289 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3290 assert!(peers.on_incoming_pending_session(ip3).is_ok());
3291 }
3292
3293 #[tokio::test]
3294 async fn test_best_unconnected_prefers_fork_id_as_tiebreaker() {
3295 let mut peers = PeersManager::default();
3296 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8008);
3297
3298 let fork_id = ForkId { hash: ForkHash([0xaa, 0xbb, 0xcc, 0xdd]), next: 0 };
3299
3300 let no_fork = PeerId::random();
3302 peers.add_peer(no_fork, PeerAddr::from_tcp(addr), None);
3303
3304 let with_fork = PeerId::random();
3305 peers.add_peer(with_fork, PeerAddr::from_tcp(addr), None);
3306 peers.peers.get_mut(&with_fork).unwrap().fork_id = Some(Box::new(fork_id));
3307
3308 let (best_id, _) = peers.best_unconnected().unwrap();
3309 assert_eq!(best_id, with_fork, "fork_id should break tie when reputation is equal");
3310 }
3311}