1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 is_connection_failed_reputation,
18 peers::{
19 config::PeerBackoffDurations,
20 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21 },
22 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27 fmt::Display,
28 io::{self},
29 net::{IpAddr, SocketAddr},
30 task::{Context, Poll},
31 time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35 sync::mpsc,
36 time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41#[derive(Debug)]
48pub struct PeersManager {
49 peers: HashMap<PeerId, Peer>,
51 trusted_peer_ids: HashSet<PeerId>,
56 trusted_peers_resolver: TrustedPeersResolver,
59 manager_tx: mpsc::UnboundedSender<PeerCommand>,
61 handle_rx: UnboundedReceiverStream<PeerCommand>,
63 queued_actions: VecDeque<PeerAction>,
65 refill_slots_interval: Interval,
67 reputation_weights: ReputationChangeWeights,
69 connection_info: ConnectionInfo,
71 ban_list: BanList,
73 backed_off_peers: HashMap<PeerId, std::time::Instant>,
75 release_interval: Interval,
77 ban_duration: Duration,
79 backoff_durations: PeerBackoffDurations,
82 trusted_nodes_only: bool,
85 last_tick: Instant,
87 max_backoff_count: u8,
89 net_connection_state: NetworkConnectionState,
91 incoming_ip_throttle_duration: Duration,
93 ip_filter: reth_net_banlist::IpFilter,
95}
96
97impl PeersManager {
98 pub fn new(config: PeersConfig) -> Self {
100 let PeersConfig {
101 refill_slots_interval,
102 connection_info,
103 reputation_weights,
104 ban_list,
105 ban_duration,
106 backoff_durations,
107 trusted_nodes,
108 trusted_nodes_only,
109 trusted_nodes_resolution_interval,
110 basic_nodes,
111 max_backoff_count,
112 incoming_ip_throttle_duration,
113 ip_filter,
114 } = config;
115 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
116 let now = Instant::now();
117
118 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
120
121 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
122 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
123
124 for trusted_peer in &trusted_nodes {
125 match trusted_peer.resolve_blocking() {
126 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
127 trusted_peer_ids.insert(id);
128 peers.entry(id).or_insert_with(|| {
129 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
130 });
131 }
132 Err(err) => {
133 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
134 }
135 }
136 }
137
138 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
139 peers.entry(id).or_insert_with(|| {
140 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
141 });
142 }
143
144 trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
145
146 Self {
147 peers,
148 trusted_peer_ids,
149 trusted_peers_resolver: TrustedPeersResolver::new(
150 trusted_nodes,
151 tokio::time::interval(trusted_nodes_resolution_interval), ),
153 manager_tx,
154 handle_rx: UnboundedReceiverStream::new(handle_rx),
155 queued_actions: Default::default(),
156 reputation_weights,
157 refill_slots_interval: tokio::time::interval(refill_slots_interval),
158 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
159 connection_info: ConnectionInfo::new(connection_info),
160 ban_list,
161 backed_off_peers: Default::default(),
162 ban_duration,
163 backoff_durations,
164 trusted_nodes_only,
165 last_tick: Instant::now(),
166 max_backoff_count,
167 net_connection_state: NetworkConnectionState::default(),
168 incoming_ip_throttle_duration,
169 ip_filter,
170 }
171 }
172
173 pub(crate) fn handle(&self) -> PeersHandle {
175 PeersHandle::new(self.manager_tx.clone())
176 }
177
178 #[inline]
180 pub(crate) fn num_known_peers(&self) -> usize {
181 self.peers.len()
182 }
183
184 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
186 self.peers.iter().map(|(peer_id, v)| {
187 NodeRecord::new_with_ports(
188 v.addr.tcp().ip(),
189 v.addr.tcp().port(),
190 v.addr.udp().map(|addr| addr.port()),
191 *peer_id,
192 )
193 })
194 }
195
196 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
198 self.peers.get(&peer_id).map(|v| {
199 (
200 NodeRecord::new_with_ports(
201 v.addr.tcp().ip(),
202 v.addr.tcp().port(),
203 v.addr.udp().map(|addr| addr.port()),
204 peer_id,
205 ),
206 v.kind,
207 )
208 })
209 }
210
211 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
213 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
214 }
215
216 #[inline]
218 pub(crate) const fn num_inbound_connections(&self) -> usize {
219 self.connection_info.num_inbound
220 }
221
222 #[inline]
224 pub(crate) const fn num_outbound_connections(&self) -> usize {
225 self.connection_info.num_outbound
226 }
227
228 #[inline]
230 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
231 self.connection_info.num_pending_out
232 }
233
234 #[inline]
236 pub(crate) fn num_backed_off_peers(&self) -> usize {
237 self.backed_off_peers.len()
238 }
239
240 fn num_idle_trusted_peers(&self) -> usize {
242 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
243 }
244
245 pub(crate) fn on_incoming_pending_session(
249 &mut self,
250 addr: IpAddr,
251 ) -> Result<(), InboundConnectionError> {
252 if !self.ip_filter.is_allowed(&addr) {
254 trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
255 return Err(InboundConnectionError::IpBanned)
256 }
257
258 if self.ban_list.is_banned_ip(&addr) {
259 return Err(InboundConnectionError::IpBanned)
260 }
261
262 if !self.connection_info.has_in_capacity() {
264 if self.trusted_peer_ids.is_empty() {
265 return Err(InboundConnectionError::ExceedsCapacity)
268 }
269
270 let num_idle_trusted_peers = self.num_idle_trusted_peers();
274 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
275 let max_inbound =
277 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
278 if self.connection_info.num_pending_in < max_inbound {
279 self.connection_info.inc_pending_in();
280 return Ok(())
281 }
282 }
283
284 return Err(InboundConnectionError::ExceedsCapacity)
286 }
287
288 if !self.connection_info.has_in_pending_capacity() {
290 return Err(InboundConnectionError::ExceedsCapacity)
291 }
292
293 self.throttle_incoming_ip(addr);
295
296 self.connection_info.inc_pending_in();
297 Ok(())
298 }
299
300 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
303 self.connection_info.decr_pending_in();
304 }
305
306 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
308 self.connection_info.decr_pending_in()
309 }
310
311 pub(crate) fn on_incoming_pending_session_dropped(
313 &mut self,
314 remote_addr: SocketAddr,
315 err: &PendingSessionHandshakeError,
316 ) {
317 if err.is_fatal_protocol_error() {
318 self.ban_ip(remote_addr.ip());
319
320 if err.merits_discovery_ban() {
321 self.queued_actions
322 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
323 }
324 }
325
326 self.connection_info.decr_pending_in();
327 }
328
329 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
336 self.connection_info.decr_pending_in();
337
338 if self.ban_list.is_banned_peer(&peer_id) {
341 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
342 return
343 }
344
345 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
347 if self.trusted_nodes_only && !is_trusted {
348 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
349 return
350 }
351
352 self.tick();
354
355 match self.peers.entry(peer_id) {
356 Entry::Occupied(mut entry) => {
357 let peer = entry.get_mut();
358 if peer.is_banned() {
359 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
360 return
361 }
362 if peer.state.is_pending_out() {
365 self.connection_info.decr_state(peer.state);
366 }
367
368 peer.state = PeerConnectionState::In;
369
370 is_trusted = is_trusted || peer.is_trusted();
371 }
372 Entry::Vacant(entry) => {
373 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
376 peer.remove_after_disconnect = true;
377 entry.insert(peer);
378 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
379 }
380 }
381
382 let has_in_capacity = self.connection_info.has_in_capacity();
383 self.connection_info.inc_in();
385
386 if !is_trusted && !has_in_capacity {
388 self.queued_actions.push_back(PeerAction::Disconnect {
389 peer_id,
390 reason: Some(DisconnectReason::TooManyPeers),
391 });
392 }
393 }
394
395 fn ban_peer(&mut self, peer_id: PeerId) {
397 let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
398 (peer.is_trusted() || peer.is_static())
399 {
400 self.backoff_durations.low / 2
403 } else {
404 self.ban_duration
405 };
406
407 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
408 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
409 }
410
411 fn ban_ip(&mut self, ip: IpAddr) {
413 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
414 }
415
416 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
418 self.ban_list
419 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
420 }
421
422 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
424 trace!(target: "net::peers", ?peer_id, "backing off");
425
426 if let Some(peer) = self.peers.get_mut(&peer_id) {
427 peer.backed_off = true;
428 self.backed_off_peers.insert(peer_id, until);
429 }
430 }
431
432 fn unban_peer(&mut self, peer_id: PeerId) {
434 self.ban_list.unban_peer(&peer_id);
435 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
436 }
437
438 fn tick(&mut self) {
443 let now = Instant::now();
444 let secs_since_last_tick =
448 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
449 self.last_tick = now;
450
451 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
453 if peer.1.reputation < DEFAULT_REPUTATION {
456 peer.1.reputation += secs_since_last_tick;
457 }
458 }
459 }
460
461 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
463 self.peers.get(peer_id).map(|peer| peer.reputation)
464 }
465
466 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
472 trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
473
474 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
475 if rep.is_reset() {
477 peer.reset_reputation()
478 } else {
479 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
480 if peer.is_trusted() || peer.is_static() {
481 if matches!(
483 rep,
484 ReputationChangeKind::Dropped |
485 ReputationChangeKind::BadAnnouncement |
486 ReputationChangeKind::Timeout |
487 ReputationChangeKind::AlreadySeenTransaction
488 ) {
489 return
490 }
491
492 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
494 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
496 }
497 }
498 peer.apply_reputation(reputation_change, rep)
499 }
500 } else {
501 return
502 };
503
504 match outcome {
505 ReputationChangeOutcome::None => {}
506 ReputationChangeOutcome::Ban => {
507 self.ban_peer(*peer_id);
508 }
509 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
510 ReputationChangeOutcome::DisconnectAndBan => {
511 self.queued_actions.push_back(PeerAction::Disconnect {
512 peer_id: *peer_id,
513 reason: Some(DisconnectReason::DisconnectRequested),
514 });
515 self.ban_peer(*peer_id);
516 }
517 }
518 }
519
520 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
522 if let Some(peer) = self.peers.get_mut(peer_id) {
523 self.connection_info.decr_state(peer.state);
524 peer.state = PeerConnectionState::Idle;
525 }
526 }
527
528 pub(crate) fn on_outgoing_pending_session_dropped(
531 &mut self,
532 remote_addr: &SocketAddr,
533 peer_id: &PeerId,
534 err: &PendingSessionHandshakeError,
535 ) {
536 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
537 }
538
539 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
541 match self.peers.entry(peer_id) {
542 Entry::Occupied(mut entry) => {
543 trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
544 self.connection_info.decr_state(entry.get().state);
545
546 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
547 entry.remove();
549 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
550 } else {
551 let peer = entry.get_mut();
552 peer.severe_backoff_counter = 0;
556 peer.state = PeerConnectionState::Idle;
557
558 peer.backed_off = true;
563 self.backed_off_peers.insert(
564 peer_id,
565 std::time::Instant::now() + self.incoming_ip_throttle_duration,
566 );
567 trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
568 }
569 }
570 Entry::Vacant(_) => return,
571 }
572
573 self.fill_outbound_slots();
574 }
575
576 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
578 if let Some(peer) = self.peers.get_mut(&peer_id) {
579 trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
580 self.connection_info.decr_state(peer.state);
581 self.connection_info.inc_out();
582 peer.state = PeerConnectionState::Out;
583 }
584 }
585
586 pub(crate) fn on_active_session_dropped(
591 &mut self,
592 remote_addr: &SocketAddr,
593 peer_id: &PeerId,
594 err: &EthStreamError,
595 ) {
596 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
597 }
598
599 pub(crate) fn on_outgoing_connection_failure(
602 &mut self,
603 remote_addr: &SocketAddr,
604 peer_id: &PeerId,
605 err: &io::Error,
606 ) {
607 if let Some(peer) = self.peers.get(peer_id) {
611 if peer.state.is_incoming() {
612 return
614 }
615
616 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
617 self.trusted_peers_resolver.interval.reset_immediately();
620 }
621 }
622
623 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
624 }
625
626 fn on_connection_failure(
627 &mut self,
628 remote_addr: &SocketAddr,
629 peer_id: &PeerId,
630 err: impl SessionError,
631 reputation_change: ReputationChangeKind,
632 ) {
633 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
634
635 if err.is_fatal_protocol_error() {
636 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
637 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
640 self.connection_info.decr_state(entry.get().state);
641 if entry.get().is_trusted() {
643 entry.get_mut().state = PeerConnectionState::Idle;
644 } else {
645 entry.remove();
646 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
647 if err.merits_discovery_ban() {
649 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
650 peer_id: *peer_id,
651 ip_addr: remote_addr.ip(),
652 })
653 }
654 }
655 }
656
657 self.ban_peer(*peer_id);
659 } else {
660 let mut backoff_until = None;
661 let mut remove_peer = false;
662
663 if let Some(peer) = self.peers.get_mut(peer_id) {
664 if let Some(kind) = err.should_backoff() {
665 if peer.is_trusted() || peer.is_static() {
666 let backoff = self.backoff_durations.low;
672 backoff_until = Some(std::time::Instant::now() + backoff);
673 trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
674 } else {
675 if kind.is_severe() {
677 peer.severe_backoff_counter =
678 peer.severe_backoff_counter.saturating_add(1);
679 }
680 trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
681
682 let backoff_time =
683 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
684
685 backoff_until = Some(backoff_time);
689 }
690 } else {
691 let reputation_change = self.reputation_weights.change(reputation_change);
693 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
694 };
695
696 self.connection_info.decr_state(peer.state);
697 peer.state = PeerConnectionState::Idle;
698
699 if peer.severe_backoff_counter > self.max_backoff_count &&
700 !peer.is_trusted() &&
701 !peer.is_static()
702 {
703 remove_peer = true;
706 }
707 }
708
709 if remove_peer {
711 trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
712 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
713 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
714 } else if let Some(backoff_until) = backoff_until {
715 self.backoff_peer_until(*peer_id, backoff_until);
717 }
718 }
719
720 self.fill_outbound_slots();
721 }
722
723 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
729 match direction {
730 Direction::Incoming => {
731 self.connection_info.decr_pending_in();
733 }
734 Direction::Outgoing(_) => {
735 }
738 }
739 }
740
741 pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
746 if let Some(peer) = self.peers.get_mut(&peer_id) {
747 trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
748 peer.fork_id = Some(Box::new(fork_id));
749 }
750 }
751
752 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
756 self.add_peer_kind(peer_id, None, addr, fork_id)
757 }
758
759 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
761 self.trusted_peer_ids.insert(peer_id);
762 if let Some(peer) = self.peers.get_mut(&peer_id) {
763 peer.kind = PeerKind::Trusted;
764 }
765 }
766
767 #[cfg_attr(not(test), expect(dead_code))]
771 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
772 self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
773 }
774
775 pub(crate) fn add_peer_kind(
780 &mut self,
781 peer_id: PeerId,
782 kind: Option<PeerKind>,
783 addr: PeerAddr,
784 fork_id: Option<ForkId>,
785 ) {
786 let ip_addr = addr.tcp().ip();
787
788 if !self.ip_filter.is_allowed(&ip_addr) {
790 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
791 return
792 }
793
794 if self.ban_list.is_banned(&peer_id, &ip_addr) {
795 return
796 }
797
798 match self.peers.entry(peer_id) {
799 Entry::Occupied(mut entry) => {
800 let peer = entry.get_mut();
801 peer.fork_id = fork_id.map(Box::new);
802 peer.addr = addr;
803
804 if let Some(kind) = kind {
805 peer.kind = kind;
806 }
807
808 if peer.state.is_incoming() {
809 peer.remove_after_disconnect = false;
813 }
814 }
815 Entry::Vacant(entry) => {
816 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
817 let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
818 peer.fork_id = fork_id.map(Box::new);
819 entry.insert(peer);
820 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
821 }
822 }
823
824 if kind.filter(|kind| kind.is_trusted()).is_some() {
825 self.trusted_peer_ids.insert(peer_id);
827 }
828 }
829
830 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
832 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
833 if entry.get().is_trusted() {
834 return
835 }
836 let mut peer = entry.remove();
837
838 trace!(target: "net::peers", ?peer_id, "remove discovered node");
839 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
840
841 if peer.state.is_connected() {
842 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
843 peer.remove_after_disconnect = true;
848 peer.state.disconnect();
849 self.peers.insert(peer_id, peer);
850 self.queued_actions.push_back(PeerAction::Disconnect {
851 peer_id,
852 reason: Some(DisconnectReason::DisconnectRequested),
853 })
854 }
855 }
856
857 #[cfg_attr(not(test), expect(dead_code))]
860 pub(crate) fn add_and_connect(
861 &mut self,
862 peer_id: PeerId,
863 addr: PeerAddr,
864 fork_id: Option<ForkId>,
865 ) {
866 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
867 }
868
869 pub(crate) fn add_and_connect_kind(
873 &mut self,
874 peer_id: PeerId,
875 kind: PeerKind,
876 addr: PeerAddr,
877 fork_id: Option<ForkId>,
878 ) {
879 let ip_addr = addr.tcp().ip();
880
881 if !self.ip_filter.is_allowed(&ip_addr) {
883 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
884 return
885 }
886
887 if self.ban_list.is_banned(&peer_id, &ip_addr) {
888 return
889 }
890
891 match self.peers.entry(peer_id) {
892 Entry::Occupied(mut entry) => {
893 let peer = entry.get_mut();
894 peer.kind = kind;
895 peer.fork_id = fork_id.map(Box::new);
896 peer.addr = addr;
897
898 if peer.state == PeerConnectionState::Idle {
899 peer.state = PeerConnectionState::PendingOut;
901 self.connection_info.inc_pending_out();
902 self.queued_actions
903 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
904 }
905 }
906 Entry::Vacant(entry) => {
907 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
908 let mut peer = Peer::with_kind(addr, kind);
909 peer.state = PeerConnectionState::PendingOut;
910 peer.fork_id = fork_id.map(Box::new);
911 entry.insert(peer);
912 self.connection_info.inc_pending_out();
913 self.queued_actions
914 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
915 }
916 }
917
918 if kind.is_trusted() {
919 self.trusted_peer_ids.insert(peer_id);
920 }
921 }
922
923 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
925 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
926 if !entry.get().is_trusted() {
927 return
928 }
929
930 let peer = entry.get_mut();
931 peer.kind = PeerKind::Basic;
932
933 self.trusted_peer_ids.remove(&peer_id);
934 }
935
936 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
946 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
947 !peer.is_backed_off() &&
948 !peer.is_banned() &&
949 peer.state.is_unconnected() &&
950 (!self.trusted_nodes_only || peer.is_trusted())
951 });
952
953 let mut best_peer = unconnected.next()?;
955
956 if best_peer.1.is_trusted() || best_peer.1.is_static() {
957 return Some((*best_peer.0, best_peer.1))
958 }
959
960 for maybe_better in unconnected {
961 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
963 return Some((*maybe_better.0, maybe_better.1))
964 }
965
966 if maybe_better.1.reputation > best_peer.1.reputation {
968 best_peer = maybe_better;
969 }
970 }
971 Some((*best_peer.0, best_peer.1))
972 }
973
974 fn fill_outbound_slots(&mut self) {
980 self.tick();
981
982 if !self.net_connection_state.is_active() {
983 return
985 }
986
987 while self.connection_info.has_out_capacity() {
989 let action = {
990 let (peer_id, peer) = match self.best_unconnected() {
991 Some(peer) => peer,
992 _ => break,
993 };
994
995 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
996
997 peer.state = PeerConnectionState::PendingOut;
998 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
999 };
1000
1001 self.connection_info.inc_pending_out();
1002
1003 self.queued_actions.push_back(action);
1004 }
1005 }
1006
1007 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1008 if let Some(peer) = self.peers.get_mut(&peer_id) {
1009 let new_addr = PeerAddr::new_with_ports(
1010 new_record.address,
1011 new_record.tcp_port,
1012 Some(new_record.udp_port),
1013 );
1014
1015 if peer.addr != new_addr {
1016 peer.addr = new_addr;
1017 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1018 }
1019 }
1020 }
1021
1022 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1024 self.net_connection_state = state;
1025 }
1026
1027 pub const fn connection_state(&self) -> &NetworkConnectionState {
1029 &self.net_connection_state
1030 }
1031
1032 pub const fn on_shutdown(&mut self) {
1034 self.net_connection_state = NetworkConnectionState::ShuttingDown;
1035 }
1036
1037 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1042 loop {
1043 if let Some(action) = self.queued_actions.pop_front() {
1045 return Poll::Ready(action)
1046 }
1047
1048 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1049 match cmd {
1050 PeerCommand::Add(peer_id, addr) => {
1051 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1052 }
1053 PeerCommand::Remove(peer) => self.remove_peer(peer),
1054 PeerCommand::ReputationChange(peer_id, rep) => {
1055 self.apply_reputation_change(&peer_id, rep)
1056 }
1057 PeerCommand::GetPeer(peer, tx) => {
1058 let _ = tx.send(self.peers.get(&peer).cloned());
1059 }
1060 PeerCommand::GetPeers(tx) => {
1061 let _ = tx.send(self.iter_peers().collect());
1062 }
1063 }
1064 }
1065
1066 if self.release_interval.poll_tick(cx).is_ready() {
1067 let now = std::time::Instant::now();
1068 let (_, unbanned_peers) = self.ban_list.evict(now);
1069
1070 for peer_id in unbanned_peers {
1071 if let Some(peer) = self.peers.get_mut(&peer_id) {
1072 peer.unban();
1073 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1074 }
1075 }
1076
1077 self.backed_off_peers.retain(|peer_id, until| {
1080 if now > *until {
1081 if let Some(peer) = self.peers.get_mut(peer_id) {
1082 peer.backed_off = false;
1083 }
1084 return false
1085 }
1086 true
1087 })
1088 }
1089
1090 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1091 self.fill_outbound_slots();
1092 }
1093
1094 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1095 self.on_resolved_peer(peer_id, new_record);
1096 }
1097
1098 if self.queued_actions.is_empty() {
1099 return Poll::Pending
1100 }
1101 }
1102 }
1103}
1104
1105impl Default for PeersManager {
1106 fn default() -> Self {
1107 Self::new(Default::default())
1108 }
1109}
1110
1111#[derive(Debug, Clone, PartialEq, Eq, Default)]
1113pub struct ConnectionInfo {
1114 num_outbound: usize,
1116 num_pending_out: usize,
1118 num_inbound: usize,
1120 num_pending_in: usize,
1122 config: ConnectionsConfig,
1124}
1125
1126impl ConnectionInfo {
1129 const fn new(config: ConnectionsConfig) -> Self {
1131 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1132 }
1133
1134 const fn has_out_capacity(&self) -> bool {
1136 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1137 self.num_outbound < self.config.max_outbound
1138 }
1139
1140 const fn has_in_capacity(&self) -> bool {
1142 self.num_inbound < self.config.max_inbound
1143 }
1144
1145 const fn has_in_pending_capacity(&self) -> bool {
1147 self.num_pending_in < self.config.max_inbound
1148 }
1149
1150 const fn decr_state(&mut self, state: PeerConnectionState) {
1151 match state {
1152 PeerConnectionState::Idle => {}
1153 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1154 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1155 PeerConnectionState::PendingOut => self.decr_pending_out(),
1156 }
1157 }
1158
1159 const fn decr_out(&mut self) {
1160 self.num_outbound -= 1;
1161 }
1162
1163 const fn inc_out(&mut self) {
1164 self.num_outbound += 1;
1165 }
1166
1167 const fn inc_pending_out(&mut self) {
1168 self.num_pending_out += 1;
1169 }
1170
1171 const fn inc_in(&mut self) {
1172 self.num_inbound += 1;
1173 }
1174
1175 const fn inc_pending_in(&mut self) {
1176 self.num_pending_in += 1;
1177 }
1178
1179 const fn decr_in(&mut self) {
1180 self.num_inbound -= 1;
1181 }
1182
1183 const fn decr_pending_out(&mut self) {
1184 self.num_pending_out -= 1;
1185 }
1186
1187 const fn decr_pending_in(&mut self) {
1188 self.num_pending_in -= 1;
1189 }
1190}
1191
1192#[derive(Debug)]
1194pub enum PeerAction {
1195 Connect {
1197 peer_id: PeerId,
1199 remote_addr: SocketAddr,
1201 },
1202 Disconnect {
1204 peer_id: PeerId,
1206 reason: Option<DisconnectReason>,
1208 },
1209 DisconnectBannedIncoming {
1212 peer_id: PeerId,
1214 },
1215 DisconnectUntrustedIncoming {
1217 peer_id: PeerId,
1219 },
1220 DiscoveryBanPeerId {
1222 peer_id: PeerId,
1224 ip_addr: IpAddr,
1226 },
1227 DiscoveryBanIp {
1229 ip_addr: IpAddr,
1231 },
1232 BanPeer {
1234 peer_id: PeerId,
1236 },
1237 UnBanPeer {
1239 peer_id: PeerId,
1241 },
1242 PeerAdded(PeerId),
1244 PeerRemoved(PeerId),
1246}
1247
1248#[derive(Debug, Error, PartialEq, Eq)]
1250pub enum InboundConnectionError {
1251 IpBanned,
1253 ExceedsCapacity,
1255}
1256
1257impl Display for InboundConnectionError {
1258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1259 write!(f, "{self:?}")
1260 }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265 use alloy_primitives::B512;
1266 use reth_eth_wire::{
1267 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1268 DisconnectReason,
1269 };
1270 use reth_net_banlist::BanList;
1271 use reth_network_api::Direction;
1272 use reth_network_peers::{PeerId, TrustedPeer};
1273 use reth_network_types::{
1274 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1275 };
1276 use std::{
1277 future::{poll_fn, Future},
1278 io,
1279 net::{IpAddr, Ipv4Addr, SocketAddr},
1280 pin::Pin,
1281 task::{Context, Poll},
1282 time::Duration,
1283 };
1284 use url::Host;
1285
1286 use super::PeersManager;
1287 use crate::{
1288 error::SessionError,
1289 peers::{
1290 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1291 PeerConnectionState,
1292 },
1293 session::PendingSessionHandshakeError,
1294 PeersConfig,
1295 };
1296
1297 struct PeerActionFuture<'a> {
1298 peers: &'a mut PeersManager,
1299 }
1300
1301 impl Future for PeerActionFuture<'_> {
1302 type Output = PeerAction;
1303
1304 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1305 self.get_mut().peers.poll(cx)
1306 }
1307 }
1308
1309 macro_rules! event {
1310 ($peers:expr) => {
1311 PeerActionFuture { peers: &mut $peers }.await
1312 };
1313 }
1314
1315 #[tokio::test]
1316 async fn test_insert() {
1317 let peer = PeerId::random();
1318 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1319 let mut peers = PeersManager::default();
1320 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1321
1322 match event!(peers) {
1323 PeerAction::PeerAdded(peer_id) => {
1324 assert_eq!(peer_id, peer);
1325 }
1326 _ => unreachable!(),
1327 }
1328 match event!(peers) {
1329 PeerAction::Connect { peer_id, remote_addr } => {
1330 assert_eq!(peer_id, peer);
1331 assert_eq!(remote_addr, socket_addr);
1332 }
1333 _ => unreachable!(),
1334 }
1335
1336 let (record, _) = peers.peer_by_id(peer).unwrap();
1337 assert_eq!(record.tcp_addr(), socket_addr);
1338 assert_eq!(record.udp_addr(), socket_addr);
1339 }
1340
1341 #[tokio::test]
1342 async fn test_insert_udp() {
1343 let peer = PeerId::random();
1344 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1345 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1346 let mut peers = PeersManager::default();
1347 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1348
1349 match event!(peers) {
1350 PeerAction::PeerAdded(peer_id) => {
1351 assert_eq!(peer_id, peer);
1352 }
1353 _ => unreachable!(),
1354 }
1355 match event!(peers) {
1356 PeerAction::Connect { peer_id, remote_addr } => {
1357 assert_eq!(peer_id, peer);
1358 assert_eq!(remote_addr, tcp_addr);
1359 }
1360 _ => unreachable!(),
1361 }
1362
1363 let (record, _) = peers.peer_by_id(peer).unwrap();
1364 assert_eq!(record.tcp_addr(), tcp_addr);
1365 assert_eq!(record.udp_addr(), udp_addr);
1366 }
1367
1368 #[tokio::test]
1369 async fn test_ban() {
1370 let peer = PeerId::random();
1371 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1372 let mut peers = PeersManager::default();
1373 peers.ban_peer(peer);
1374 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1375
1376 match event!(peers) {
1377 PeerAction::BanPeer { peer_id } => {
1378 assert_eq!(peer_id, peer);
1379 }
1380 _ => unreachable!(),
1381 }
1382
1383 poll_fn(|cx| {
1384 assert!(peers.poll(cx).is_pending());
1385 Poll::Ready(())
1386 })
1387 .await;
1388 }
1389
1390 #[tokio::test]
1391 async fn test_unban() {
1392 let peer = PeerId::random();
1393 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1394 let mut peers = PeersManager::default();
1395 peers.ban_peer(peer);
1396 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1397
1398 match event!(peers) {
1399 PeerAction::BanPeer { peer_id } => {
1400 assert_eq!(peer_id, peer);
1401 }
1402 _ => unreachable!(),
1403 }
1404
1405 poll_fn(|cx| {
1406 assert!(peers.poll(cx).is_pending());
1407 Poll::Ready(())
1408 })
1409 .await;
1410
1411 peers.unban_peer(peer);
1412
1413 match event!(peers) {
1414 PeerAction::UnBanPeer { peer_id } => {
1415 assert_eq!(peer_id, peer);
1416 }
1417 _ => unreachable!(),
1418 }
1419
1420 poll_fn(|cx| {
1421 assert!(peers.poll(cx).is_pending());
1422 Poll::Ready(())
1423 })
1424 .await;
1425 }
1426
1427 #[tokio::test]
1428 async fn test_backoff_on_busy() {
1429 let peer = PeerId::random();
1430 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1431
1432 let mut peers = PeersManager::new(PeersConfig::test());
1433 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1434
1435 match event!(peers) {
1436 PeerAction::PeerAdded(peer_id) => {
1437 assert_eq!(peer_id, peer);
1438 }
1439 _ => unreachable!(),
1440 }
1441 match event!(peers) {
1442 PeerAction::Connect { peer_id, .. } => {
1443 assert_eq!(peer_id, peer);
1444 }
1445 _ => unreachable!(),
1446 }
1447
1448 poll_fn(|cx| {
1449 assert!(peers.poll(cx).is_pending());
1450 Poll::Ready(())
1451 })
1452 .await;
1453
1454 peers.on_active_session_dropped(
1455 &socket_addr,
1456 &peer,
1457 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1458 DisconnectReason::TooManyPeers,
1459 )),
1460 );
1461
1462 poll_fn(|cx| {
1463 assert!(peers.poll(cx).is_pending());
1464 Poll::Ready(())
1465 })
1466 .await;
1467
1468 assert!(peers.backed_off_peers.contains_key(&peer));
1469 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1470
1471 tokio::time::sleep(peers.backoff_durations.low).await;
1472
1473 match event!(peers) {
1474 PeerAction::Connect { peer_id, .. } => {
1475 assert_eq!(peer_id, peer);
1476 }
1477 _ => unreachable!(),
1478 }
1479
1480 assert!(!peers.backed_off_peers.contains_key(&peer));
1481 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1482 }
1483
1484 #[tokio::test]
1485 async fn test_backoff_on_no_response() {
1486 let peer = PeerId::random();
1487 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1488
1489 let backoff_durations = PeerBackoffDurations::test();
1490 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1491 let mut peers = PeersManager::new(config);
1492 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1493
1494 match event!(peers) {
1495 PeerAction::PeerAdded(peer_id) => {
1496 assert_eq!(peer_id, peer);
1497 }
1498 _ => unreachable!(),
1499 }
1500 match event!(peers) {
1501 PeerAction::Connect { peer_id, .. } => {
1502 assert_eq!(peer_id, peer);
1503 }
1504 _ => unreachable!(),
1505 }
1506
1507 poll_fn(|cx| {
1508 assert!(peers.poll(cx).is_pending());
1509 Poll::Ready(())
1510 })
1511 .await;
1512
1513 peers.on_outgoing_pending_session_dropped(
1514 &socket_addr,
1515 &peer,
1516 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1517 EthHandshakeError::NoResponse,
1518 )),
1519 );
1520
1521 poll_fn(|cx| {
1522 assert!(peers.poll(cx).is_pending());
1523 Poll::Ready(())
1524 })
1525 .await;
1526
1527 assert!(peers.backed_off_peers.contains_key(&peer));
1528 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1529
1530 tokio::time::sleep(backoff_durations.high).await;
1531
1532 match event!(peers) {
1533 PeerAction::Connect { peer_id, .. } => {
1534 assert_eq!(peer_id, peer);
1535 }
1536 _ => unreachable!(),
1537 }
1538
1539 assert!(!peers.backed_off_peers.contains_key(&peer));
1540 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1541 }
1542
1543 #[tokio::test]
1544 async fn test_low_backoff() {
1545 let peer = PeerId::random();
1546 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1547 let config = PeersConfig::test();
1548 let mut peers = PeersManager::new(config);
1549 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1550 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1551
1552 let backoff_timestamp = peers
1553 .backoff_durations
1554 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1555
1556 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1557 assert!(backoff_timestamp <= expected);
1558 }
1559
1560 #[tokio::test]
1561 async fn test_multiple_backoff_calculations() {
1562 let peer = PeerId::random();
1563 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1564 let config = PeersConfig::default();
1565 let mut peers = PeersManager::new(config);
1566 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1567 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1568
1569 peer_struct.severe_backoff_counter = 1;
1571
1572 let now = std::time::Instant::now();
1573
1574 peer_struct.severe_backoff_counter += 1;
1576 let backoff_time = peers
1578 .backoff_durations
1579 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1580
1581 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1583
1584 assert!(backoff_time.duration_since(now) > backoff_duration);
1587 }
1588
1589 #[tokio::test]
1590 async fn test_ban_on_active_drop() {
1591 let peer = PeerId::random();
1592 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1593 let mut peers = PeersManager::default();
1594 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1595
1596 match event!(peers) {
1597 PeerAction::PeerAdded(peer_id) => {
1598 assert_eq!(peer_id, peer);
1599 }
1600 _ => unreachable!(),
1601 }
1602 match event!(peers) {
1603 PeerAction::Connect { peer_id, .. } => {
1604 assert_eq!(peer_id, peer);
1605 }
1606 _ => unreachable!(),
1607 }
1608
1609 poll_fn(|cx| {
1610 assert!(peers.poll(cx).is_pending());
1611 Poll::Ready(())
1612 })
1613 .await;
1614
1615 peers.on_active_session_dropped(
1616 &socket_addr,
1617 &peer,
1618 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1619 DisconnectReason::UselessPeer,
1620 )),
1621 );
1622
1623 match event!(peers) {
1624 PeerAction::PeerRemoved(peer_id) => {
1625 assert_eq!(peer_id, peer);
1626 }
1627 _ => unreachable!(),
1628 }
1629 match event!(peers) {
1630 PeerAction::BanPeer { peer_id } => {
1631 assert_eq!(peer_id, peer);
1632 }
1633 _ => unreachable!(),
1634 }
1635
1636 poll_fn(|cx| {
1637 assert!(peers.poll(cx).is_pending());
1638 Poll::Ready(())
1639 })
1640 .await;
1641
1642 assert!(!peers.peers.contains_key(&peer));
1643 }
1644
1645 #[tokio::test]
1646 async fn test_remove_on_max_backoff_count() {
1647 let peer = PeerId::random();
1648 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1649 let config = PeersConfig::test();
1650 let mut peers = PeersManager::new(config.clone());
1651 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1652 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1653
1654 peer_struct.severe_backoff_counter = config.max_backoff_count;
1656
1657 match event!(peers) {
1658 PeerAction::PeerAdded(peer_id) => {
1659 assert_eq!(peer_id, peer);
1660 }
1661 _ => unreachable!(),
1662 }
1663 match event!(peers) {
1664 PeerAction::Connect { peer_id, .. } => {
1665 assert_eq!(peer_id, peer);
1666 }
1667 _ => unreachable!(),
1668 }
1669
1670 poll_fn(|cx| {
1671 assert!(peers.poll(cx).is_pending());
1672 Poll::Ready(())
1673 })
1674 .await;
1675
1676 peers.on_outgoing_pending_session_dropped(
1677 &socket_addr,
1678 &peer,
1679 &PendingSessionHandshakeError::Eth(
1680 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1681 ),
1682 );
1683
1684 match event!(peers) {
1685 PeerAction::PeerRemoved(peer_id) => {
1686 assert_eq!(peer_id, peer);
1687 }
1688 _ => unreachable!(),
1689 }
1690
1691 poll_fn(|cx| {
1692 assert!(peers.poll(cx).is_pending());
1693 Poll::Ready(())
1694 })
1695 .await;
1696
1697 assert!(!peers.peers.contains_key(&peer));
1698 }
1699
1700 #[tokio::test]
1701 async fn test_ban_on_pending_drop() {
1702 let peer = PeerId::random();
1703 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1704 let mut peers = PeersManager::default();
1705 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1706
1707 match event!(peers) {
1708 PeerAction::PeerAdded(peer_id) => {
1709 assert_eq!(peer_id, peer);
1710 }
1711 _ => unreachable!(),
1712 }
1713 match event!(peers) {
1714 PeerAction::Connect { peer_id, .. } => {
1715 assert_eq!(peer_id, peer);
1716 }
1717 _ => unreachable!(),
1718 }
1719
1720 poll_fn(|cx| {
1721 assert!(peers.poll(cx).is_pending());
1722 Poll::Ready(())
1723 })
1724 .await;
1725
1726 peers.on_outgoing_pending_session_dropped(
1727 &socket_addr,
1728 &peer,
1729 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1730 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1731 )),
1732 );
1733
1734 match event!(peers) {
1735 PeerAction::PeerRemoved(peer_id) => {
1736 assert_eq!(peer_id, peer);
1737 }
1738 _ => unreachable!(),
1739 }
1740 match event!(peers) {
1741 PeerAction::BanPeer { peer_id } => {
1742 assert_eq!(peer_id, peer);
1743 }
1744 _ => unreachable!(),
1745 }
1746
1747 poll_fn(|cx| {
1748 assert!(peers.poll(cx).is_pending());
1749 Poll::Ready(())
1750 })
1751 .await;
1752
1753 assert!(!peers.peers.contains_key(&peer));
1754 }
1755
1756 #[tokio::test]
1757 async fn test_internally_closed_incoming() {
1758 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1759 let mut peers = PeersManager::default();
1760
1761 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1762 assert_eq!(peers.connection_info.num_pending_in, 1);
1763 peers.on_incoming_pending_session_rejected_internally();
1764 assert_eq!(peers.connection_info.num_pending_in, 0);
1765 }
1766
1767 #[tokio::test]
1768 async fn test_reject_incoming_at_pending_capacity() {
1769 let mut peers = PeersManager::default();
1770
1771 for count in 1..=peers.connection_info.config.max_inbound {
1772 let socket_addr =
1773 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1774 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1775 assert_eq!(peers.connection_info.num_pending_in, count);
1776 }
1777 assert!(peers.connection_info.has_in_capacity());
1778 assert!(!peers.connection_info.has_in_pending_capacity());
1779
1780 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1781 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1782 }
1783
1784 #[tokio::test]
1785 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1786 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1787 let trusted = PeerId::random();
1788 peers.add_trusted_peer_id(trusted);
1789
1790 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1792 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1793 peers.on_incoming_session_established(trusted, addr);
1794
1795 match event!(peers) {
1796 PeerAction::PeerAdded(id) => {
1797 assert_eq!(id, trusted);
1798 }
1799 _ => unreachable!(),
1800 }
1801
1802 let mut connected_untrusted_peer_ids = Vec::new();
1804 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1805 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1806 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1807 let peer_id = PeerId::random();
1808 peers.on_incoming_session_established(peer_id, addr);
1809 connected_untrusted_peer_ids.push(peer_id);
1810
1811 match event!(peers) {
1812 PeerAction::PeerAdded(id) => {
1813 assert_eq!(id, peer_id);
1814 }
1815 _ => unreachable!(),
1816 }
1817 }
1818
1819 let mut pending_addrs = Vec::new();
1820
1821 for i in 0..2 {
1823 let socket_addr =
1824 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1825 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1826
1827 pending_addrs.push(socket_addr);
1828 }
1829
1830 assert_eq!(peers.connection_info.num_pending_in, 2);
1831
1832 for i in 0..2 {
1834 let socket_addr =
1835 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1836 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1837 }
1838
1839 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1840 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1841 DisconnectReason::UselessPeer,
1842 )),
1843 ));
1844
1845 for pending_addr in pending_addrs {
1847 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1848 }
1849
1850 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1851
1852 println!(
1853 "num_inbound: {}, has_in_capacity: {}",
1854 peers.connection_info.num_inbound,
1855 peers.connection_info.has_in_capacity()
1856 );
1857
1858 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1860
1861 println!(
1862 "num_inbound: {}, has_in_capacity: {}",
1863 peers.connection_info.num_inbound,
1864 peers.connection_info.has_in_capacity()
1865 );
1866
1867 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1868 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1869 }
1870
1871 #[tokio::test]
1872 async fn test_closed_incoming() {
1873 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1874 let mut peers = PeersManager::default();
1875
1876 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1877 assert_eq!(peers.connection_info.num_pending_in, 1);
1878 peers.on_incoming_pending_session_gracefully_closed();
1879 assert_eq!(peers.connection_info.num_pending_in, 0);
1880 }
1881
1882 #[tokio::test]
1883 async fn test_dropped_incoming() {
1884 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1885 let ban_duration = Duration::from_millis(500);
1886 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1887 let mut peers = PeersManager::new(config);
1888
1889 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1890 assert_eq!(peers.connection_info.num_pending_in, 1);
1891 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1892 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1893 DisconnectReason::UselessPeer,
1894 )),
1895 ));
1896
1897 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1898 assert_eq!(peers.connection_info.num_pending_in, 0);
1899 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1900
1901 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1902
1903 tokio::time::sleep(ban_duration).await;
1905
1906 poll_fn(|cx| {
1907 let _ = peers.poll(cx);
1908 Poll::Ready(())
1909 })
1910 .await;
1911
1912 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1913 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1914 }
1915
1916 #[tokio::test]
1917 async fn test_reputation_change_connected() {
1918 let peer = PeerId::random();
1919 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1920 let mut peers = PeersManager::default();
1921 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1922
1923 match event!(peers) {
1924 PeerAction::PeerAdded(peer_id) => {
1925 assert_eq!(peer_id, peer);
1926 }
1927 _ => unreachable!(),
1928 }
1929 match event!(peers) {
1930 PeerAction::Connect { peer_id, remote_addr } => {
1931 assert_eq!(peer_id, peer);
1932 assert_eq!(remote_addr, socket_addr);
1933 }
1934 _ => unreachable!(),
1935 }
1936
1937 let p = peers.peers.get_mut(&peer).unwrap();
1938 assert_eq!(p.state, PeerConnectionState::PendingOut);
1939
1940 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1941
1942 let p = peers.peers.get(&peer).unwrap();
1943 assert_eq!(p.state, PeerConnectionState::PendingOut);
1944 assert!(p.is_banned());
1945
1946 peers.on_active_session_gracefully_closed(peer);
1947
1948 let p = peers.peers.get(&peer).unwrap();
1949 assert_eq!(p.state, PeerConnectionState::Idle);
1950 assert!(p.is_banned());
1951
1952 match event!(peers) {
1953 PeerAction::Disconnect { peer_id, .. } => {
1954 assert_eq!(peer_id, peer);
1955 }
1956 _ => unreachable!(),
1957 }
1958 }
1959
1960 #[tokio::test]
1961 async fn retain_trusted_status() {
1962 let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1963 let trusted = PeerId::random();
1964 let mut peers =
1965 PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
1966 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
1967 tcp_port: 8008,
1968 udp_port: 8008,
1969 id: trusted,
1970 }]));
1971 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1972 peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
1973 assert!(peers.peers.get(&trusted).unwrap().is_trusted());
1974 }
1975
1976 #[tokio::test]
1977 async fn accept_incoming_trusted_unknown_peer_address() {
1978 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1979 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1980 let trusted = PeerId::random();
1982 peers.add_trusted_peer_id(trusted);
1983
1984 for i in 0..peers.connection_info.config.max_inbound {
1986 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1987 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1988 let peer_id = PeerId::random();
1989 peers.on_incoming_session_established(peer_id, addr);
1990
1991 match event!(peers) {
1992 PeerAction::PeerAdded(id) => {
1993 assert_eq!(id, peer_id);
1994 }
1995 _ => unreachable!(),
1996 }
1997 }
1998
1999 let untrusted = PeerId::random();
2001 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2002 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2003 peers.on_incoming_session_established(untrusted, socket_addr);
2004
2005 match event!(peers) {
2006 PeerAction::PeerAdded(id) => {
2007 assert_eq!(id, untrusted);
2008 }
2009 _ => unreachable!(),
2010 }
2011
2012 match event!(peers) {
2013 PeerAction::Disconnect { peer_id, reason } => {
2014 assert_eq!(peer_id, untrusted);
2015 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2016 }
2017 _ => unreachable!(),
2018 }
2019
2020 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2021 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2022 peers.on_incoming_session_established(trusted, socket_addr);
2023
2024 match event!(peers) {
2025 PeerAction::PeerAdded(id) => {
2026 assert_eq!(id, trusted);
2027 }
2028 _ => unreachable!(),
2029 }
2030
2031 poll_fn(|cx| {
2032 assert!(peers.poll(cx).is_pending());
2033 Poll::Ready(())
2034 })
2035 .await;
2036
2037 let peer = peers.peers.get(&trusted).unwrap();
2038 assert_eq!(peer.state, PeerConnectionState::In);
2039 }
2040
2041 #[tokio::test]
2042 async fn test_already_connected() {
2043 let peer = PeerId::random();
2044 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2045 let mut peers = PeersManager::default();
2046
2047 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2049 assert_eq!(peers.connection_info.num_pending_in, 1);
2050
2051 peers.on_incoming_session_established(peer, socket_addr);
2054 let p = peers.peers.get_mut(&peer).expect("peer not found");
2055 assert_eq!(p.addr.tcp(), socket_addr);
2056 assert_eq!(peers.connection_info.num_pending_in, 0);
2057 assert_eq!(peers.connection_info.num_inbound, 1);
2058
2059 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2062 assert_eq!(peers.connection_info.num_pending_in, 1);
2063
2064 peers.on_already_connected(Direction::Incoming);
2068
2069 let p = peers.peers.get_mut(&peer).expect("peer not found");
2070 assert_eq!(p.addr.tcp(), socket_addr);
2071 assert_eq!(peers.connection_info.num_pending_in, 0);
2072 assert_eq!(peers.connection_info.num_inbound, 1);
2073 }
2074
2075 #[tokio::test]
2076 async fn test_reputation_change_trusted_peer() {
2077 let peer = PeerId::random();
2078 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2079 let mut peers = PeersManager::default();
2080 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2081
2082 match event!(peers) {
2083 PeerAction::PeerAdded(peer_id) => {
2084 assert_eq!(peer_id, peer);
2085 }
2086 _ => unreachable!(),
2087 }
2088 match event!(peers) {
2089 PeerAction::Connect { peer_id, remote_addr } => {
2090 assert_eq!(peer_id, peer);
2091 assert_eq!(remote_addr, socket_addr);
2092 }
2093 _ => unreachable!(),
2094 }
2095
2096 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2097 peers.on_active_outgoing_established(peer);
2098 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2099
2100 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2101
2102 {
2103 let p = peers.peers.get(&peer).unwrap();
2104 assert_eq!(p.state, PeerConnectionState::Out);
2105 assert!(!p.is_banned());
2107 }
2108
2109 loop {
2111 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2112
2113 let p = peers.peers.get(&peer).unwrap();
2114 if p.is_banned() {
2115 break
2116 }
2117 }
2118
2119 match event!(peers) {
2120 PeerAction::Disconnect { peer_id, .. } => {
2121 assert_eq!(peer_id, peer);
2122 }
2123 _ => unreachable!(),
2124 }
2125 }
2126
2127 #[tokio::test]
2128 async fn test_reputation_management() {
2129 let peer = PeerId::random();
2130 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2131 let mut peers = PeersManager::default();
2132 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2133 assert_eq!(peers.get_reputation(&peer), Some(0));
2134
2135 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2136 assert_eq!(peers.get_reputation(&peer), Some(1024));
2137
2138 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2139 assert_eq!(peers.get_reputation(&peer), Some(0));
2140 }
2141
2142 #[tokio::test]
2143 async fn test_remove_discovered_active() {
2144 let peer = PeerId::random();
2145 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2146 let mut peers = PeersManager::default();
2147 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2148
2149 match event!(peers) {
2150 PeerAction::PeerAdded(peer_id) => {
2151 assert_eq!(peer_id, peer);
2152 }
2153 _ => unreachable!(),
2154 }
2155 match event!(peers) {
2156 PeerAction::Connect { peer_id, remote_addr } => {
2157 assert_eq!(peer_id, peer);
2158 assert_eq!(remote_addr, socket_addr);
2159 }
2160 _ => unreachable!(),
2161 }
2162
2163 let p = peers.peers.get(&peer).unwrap();
2164 assert_eq!(p.state, PeerConnectionState::PendingOut);
2165
2166 peers.remove_peer(peer);
2167
2168 match event!(peers) {
2169 PeerAction::PeerRemoved(peer_id) => {
2170 assert_eq!(peer_id, peer);
2171 }
2172 _ => unreachable!(),
2173 }
2174 match event!(peers) {
2175 PeerAction::Disconnect { peer_id, .. } => {
2176 assert_eq!(peer_id, peer);
2177 }
2178 _ => unreachable!(),
2179 }
2180
2181 let p = peers.peers.get(&peer).unwrap();
2182 assert_eq!(p.state, PeerConnectionState::PendingOut);
2183
2184 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2185 let p = peers.peers.get(&peer).unwrap();
2186 assert_eq!(p.state, PeerConnectionState::PendingOut);
2187
2188 peers.on_active_session_gracefully_closed(peer);
2189 assert!(!peers.peers.contains_key(&peer));
2190 }
2191
2192 #[tokio::test]
2193 async fn test_fatal_outgoing_connection_error_trusted() {
2194 let peer = PeerId::random();
2195 let config = PeersConfig::test()
2196 .with_trusted_nodes(vec![TrustedPeer {
2197 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2198 tcp_port: 8008,
2199 udp_port: 8008,
2200 id: peer,
2201 }])
2202 .with_trusted_nodes_only(true);
2203 let mut peers = PeersManager::new(config);
2204 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2205
2206 match event!(peers) {
2207 PeerAction::Connect { peer_id, remote_addr } => {
2208 assert_eq!(peer_id, peer);
2209 assert_eq!(remote_addr, socket_addr);
2210 }
2211 _ => unreachable!(),
2212 }
2213
2214 let p = peers.peers.get(&peer).unwrap();
2215 assert_eq!(p.state, PeerConnectionState::PendingOut);
2216
2217 assert_eq!(peers.num_outbound_connections(), 0);
2218
2219 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2220 EthHandshakeError::NonStatusMessageInHandshake,
2221 ));
2222 assert!(err.is_fatal_protocol_error());
2223
2224 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2225 assert_eq!(peers.num_outbound_connections(), 0);
2226
2227 match event!(peers) {
2229 PeerAction::BanPeer { peer_id } => {
2230 assert_eq!(peer_id, peer);
2231 }
2232 err => unreachable!("{err:?}"),
2233 }
2234
2235 assert!(peers.peers.contains_key(&peer));
2237
2238 tokio::time::sleep(peers.backoff_durations.medium).await;
2240
2241 match event!(peers) {
2242 PeerAction::Connect { peer_id, remote_addr } => {
2243 assert_eq!(peer_id, peer);
2244 assert_eq!(remote_addr, socket_addr);
2245 }
2246 err => unreachable!("{err:?}"),
2247 }
2248 }
2249
2250 #[tokio::test]
2251 async fn test_outgoing_connection_error() {
2252 let peer = PeerId::random();
2253 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2254 let mut peers = PeersManager::default();
2255 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2256
2257 match event!(peers) {
2258 PeerAction::PeerAdded(peer_id) => {
2259 assert_eq!(peer_id, peer);
2260 }
2261 _ => unreachable!(),
2262 }
2263 match event!(peers) {
2264 PeerAction::Connect { peer_id, remote_addr } => {
2265 assert_eq!(peer_id, peer);
2266 assert_eq!(remote_addr, socket_addr);
2267 }
2268 _ => unreachable!(),
2269 }
2270
2271 let p = peers.peers.get(&peer).unwrap();
2272 assert_eq!(p.state, PeerConnectionState::PendingOut);
2273
2274 assert_eq!(peers.num_outbound_connections(), 0);
2275
2276 peers.on_outgoing_connection_failure(
2277 &socket_addr,
2278 &peer,
2279 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2280 );
2281
2282 assert_eq!(peers.num_outbound_connections(), 0);
2283 }
2284
2285 #[tokio::test]
2286 async fn test_outgoing_connection_gracefully_closed() {
2287 let peer = PeerId::random();
2288 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2289 let mut peers = PeersManager::default();
2290 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2291
2292 match event!(peers) {
2293 PeerAction::PeerAdded(peer_id) => {
2294 assert_eq!(peer_id, peer);
2295 }
2296 _ => unreachable!(),
2297 }
2298 match event!(peers) {
2299 PeerAction::Connect { peer_id, remote_addr } => {
2300 assert_eq!(peer_id, peer);
2301 assert_eq!(remote_addr, socket_addr);
2302 }
2303 _ => unreachable!(),
2304 }
2305
2306 let p = peers.peers.get(&peer).unwrap();
2307 assert_eq!(p.state, PeerConnectionState::PendingOut);
2308
2309 assert_eq!(peers.num_outbound_connections(), 0);
2310
2311 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2312
2313 assert_eq!(peers.num_outbound_connections(), 0);
2314 assert_eq!(peers.connection_info.num_pending_out, 0);
2315 }
2316
2317 #[tokio::test]
2318 async fn test_discovery_ban_list() {
2319 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2320 let socket_addr = SocketAddr::new(ip, 8008);
2321 let ban_list = BanList::new(vec![], vec![ip]);
2322 let config = PeersConfig::default().with_ban_list(ban_list);
2323 let mut peer_manager = PeersManager::new(config);
2324 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2325
2326 assert!(peer_manager.peers.is_empty());
2327 }
2328
2329 #[tokio::test]
2330 async fn test_on_pending_ban_list() {
2331 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2332 let socket_addr = SocketAddr::new(ip, 8008);
2333 let ban_list = BanList::new(vec![], vec![ip]);
2334 let config = PeersConfig::test().with_ban_list(ban_list);
2335 let mut peer_manager = PeersManager::new(config);
2336 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2337 match a {
2339 Ok(_) => panic!(),
2340 Err(err) => match err {
2341 InboundConnectionError::IpBanned => {
2342 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2343 }
2344 _ => unreachable!(),
2345 },
2346 }
2347 }
2348
2349 #[tokio::test]
2350 async fn test_on_active_inbound_ban_list() {
2351 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2352 let socket_addr = SocketAddr::new(ip, 8008);
2353 let given_peer_id = PeerId::random();
2354 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2355 let config = PeersConfig::test().with_ban_list(ban_list);
2356 let mut peer_manager = PeersManager::new(config);
2357 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2358 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2360 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2361 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2364 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2365
2366 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2367 peer_manager.queued_actions.pop_front()
2368 else {
2369 panic!()
2370 };
2371
2372 assert_eq!(peer_id, given_peer_id)
2373 }
2374
2375 #[test]
2376 fn test_connection_limits() {
2377 let mut info = ConnectionInfo::default();
2378 info.inc_in();
2379 assert_eq!(info.num_inbound, 1);
2380 assert_eq!(info.num_outbound, 0);
2381 assert!(info.has_in_capacity());
2382
2383 info.decr_in();
2384 assert_eq!(info.num_inbound, 0);
2385 assert_eq!(info.num_outbound, 0);
2386
2387 info.inc_out();
2388 assert_eq!(info.num_inbound, 0);
2389 assert_eq!(info.num_outbound, 1);
2390 assert!(info.has_out_capacity());
2391
2392 info.decr_out();
2393 assert_eq!(info.num_inbound, 0);
2394 assert_eq!(info.num_outbound, 0);
2395 }
2396
2397 #[test]
2398 fn test_connection_peer_state() {
2399 let mut info = ConnectionInfo::default();
2400 info.inc_in();
2401
2402 info.decr_state(PeerConnectionState::In);
2403 assert_eq!(info.num_inbound, 0);
2404 assert_eq!(info.num_outbound, 0);
2405
2406 info.inc_out();
2407
2408 info.decr_state(PeerConnectionState::Out);
2409 assert_eq!(info.num_inbound, 0);
2410 assert_eq!(info.num_outbound, 0);
2411 }
2412
2413 #[tokio::test]
2414 async fn test_trusted_peers_are_prioritized() {
2415 let trusted_peer = PeerId::random();
2416 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2417 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2418 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2419 tcp_port: 8008,
2420 udp_port: 8008,
2421 id: trusted_peer,
2422 }]);
2423 let mut peers = PeersManager::new(config);
2424
2425 let basic_peer = PeerId::random();
2426 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2427 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2428
2429 match event!(peers) {
2430 PeerAction::PeerAdded(peer_id) => {
2431 assert_eq!(peer_id, basic_peer);
2432 }
2433 _ => unreachable!(),
2434 }
2435 match event!(peers) {
2436 PeerAction::Connect { peer_id, remote_addr } => {
2437 assert_eq!(peer_id, trusted_peer);
2438 assert_eq!(remote_addr, trusted_sock);
2439 }
2440 _ => unreachable!(),
2441 }
2442 match event!(peers) {
2443 PeerAction::Connect { peer_id, remote_addr } => {
2444 assert_eq!(peer_id, basic_peer);
2445 assert_eq!(remote_addr, basic_sock);
2446 }
2447 _ => unreachable!(),
2448 }
2449 }
2450
2451 #[tokio::test]
2452 async fn test_connect_trusted_nodes_only() {
2453 let trusted_peer = PeerId::random();
2454 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2455 let config = PeersConfig::test()
2456 .with_trusted_nodes(vec![TrustedPeer {
2457 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2458 tcp_port: 8008,
2459 udp_port: 8008,
2460 id: trusted_peer,
2461 }])
2462 .with_trusted_nodes_only(true);
2463 let mut peers = PeersManager::new(config);
2464
2465 let basic_peer = PeerId::random();
2466 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2467 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2468
2469 match event!(peers) {
2470 PeerAction::PeerAdded(peer_id) => {
2471 assert_eq!(peer_id, basic_peer);
2472 }
2473 _ => unreachable!(),
2474 }
2475 match event!(peers) {
2476 PeerAction::Connect { peer_id, remote_addr } => {
2477 assert_eq!(peer_id, trusted_peer);
2478 assert_eq!(remote_addr, trusted_sock);
2479 }
2480 _ => unreachable!(),
2481 }
2482 poll_fn(|cx| {
2483 assert!(peers.poll(cx).is_pending());
2484 Poll::Ready(())
2485 })
2486 .await;
2487 }
2488
2489 #[tokio::test]
2490 async fn test_incoming_with_trusted_nodes_only() {
2491 let trusted_peer = PeerId::random();
2492 let config = PeersConfig::test()
2493 .with_trusted_nodes(vec![TrustedPeer {
2494 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2495 tcp_port: 8008,
2496 udp_port: 8008,
2497 id: trusted_peer,
2498 }])
2499 .with_trusted_nodes_only(true);
2500 let mut peers = PeersManager::new(config);
2501
2502 let basic_peer = PeerId::random();
2503 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2504 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2505 assert_eq!(peers.connection_info.num_pending_in, 1);
2507 peers.on_incoming_session_established(basic_peer, basic_sock);
2508 assert_eq!(peers.connection_info.num_pending_in, 0);
2511 assert_eq!(peers.connection_info.num_inbound, 0);
2512
2513 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2514 peers.queued_actions.pop_front()
2515 else {
2516 panic!()
2517 };
2518 assert_eq!(basic_peer, peer_id);
2519 assert!(!peers.peers.contains_key(&basic_peer));
2520 }
2521
2522 #[tokio::test]
2523 async fn test_incoming_without_trusted_nodes_only() {
2524 let trusted_peer = PeerId::random();
2525 let config = PeersConfig::test()
2526 .with_trusted_nodes(vec![TrustedPeer {
2527 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2528 tcp_port: 8008,
2529 udp_port: 8008,
2530 id: trusted_peer,
2531 }])
2532 .with_trusted_nodes_only(false);
2533 let mut peers = PeersManager::new(config);
2534
2535 let basic_peer = PeerId::random();
2536 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2537 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2538
2539 assert_eq!(peers.connection_info.num_pending_in, 1);
2541 peers.on_incoming_session_established(basic_peer, basic_sock);
2542 assert_eq!(peers.connection_info.num_pending_in, 0);
2545 assert_eq!(peers.connection_info.num_inbound, 1);
2546 assert!(peers.peers.contains_key(&basic_peer));
2547 }
2548
2549 #[tokio::test]
2550 async fn test_incoming_at_capacity() {
2551 let mut config = PeersConfig::test();
2552 config.connection_info.max_inbound = 1;
2553 let mut peers = PeersManager::new(config);
2554
2555 let peer = PeerId::random();
2556 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2557 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2558
2559 peers.on_incoming_session_established(peer, addr);
2560
2561 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2562 assert_eq!(
2563 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2564 InboundConnectionError::ExceedsCapacity
2565 );
2566 }
2567
2568 #[tokio::test]
2569 async fn test_incoming_rate_limit() {
2570 let config = PeersConfig {
2571 incoming_ip_throttle_duration: Duration::from_millis(100),
2572 ..PeersConfig::test()
2573 };
2574 let mut peers = PeersManager::new(config);
2575
2576 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2577 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2578 assert_eq!(
2579 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2580 InboundConnectionError::IpBanned
2581 );
2582
2583 peers.release_interval.reset_immediately();
2584 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2585
2586 poll_fn(|cx| loop {
2588 if peers.poll(cx).is_pending() {
2589 return Poll::Ready(());
2590 }
2591 })
2592 .await;
2593
2594 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2595 assert_eq!(
2596 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2597 InboundConnectionError::IpBanned
2598 );
2599 }
2600
2601 #[tokio::test]
2602 async fn test_tick() {
2603 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2604 let socket_addr = SocketAddr::new(ip, 8008);
2605 let config = PeersConfig::test();
2606 let mut peer_manager = PeersManager::new(config);
2607 let peer_id = PeerId::random();
2608 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2609
2610 tokio::time::sleep(Duration::from_secs(1)).await;
2611 peer_manager.tick();
2612
2613 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2615
2616 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2618
2619 tokio::time::sleep(Duration::from_secs(1)).await;
2620 peer_manager.tick();
2621
2622 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2624
2625 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2626
2627 tokio::time::sleep(Duration::from_secs(1)).await;
2628 peer_manager.tick();
2629
2630 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2632 }
2633
2634 #[tokio::test]
2635 async fn test_remove_incoming_after_disconnect() {
2636 let peer_id = PeerId::random();
2637 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2638 let mut peers = PeersManager::default();
2639
2640 peers.on_incoming_pending_session(addr.ip()).unwrap();
2641 peers.on_incoming_session_established(peer_id, addr);
2642 let peer = peers.peers.get(&peer_id).unwrap();
2643 assert_eq!(peer.state, PeerConnectionState::In);
2644 assert!(peer.remove_after_disconnect);
2645
2646 peers.on_active_session_gracefully_closed(peer_id);
2647 assert!(!peers.peers.contains_key(&peer_id))
2648 }
2649
2650 #[tokio::test]
2651 async fn test_keep_incoming_after_disconnect_if_discovered() {
2652 let peer_id = PeerId::random();
2653 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2654 let mut peers = PeersManager::default();
2655
2656 peers.on_incoming_pending_session(addr.ip()).unwrap();
2657 peers.on_incoming_session_established(peer_id, addr);
2658 let peer = peers.peers.get(&peer_id).unwrap();
2659 assert_eq!(peer.state, PeerConnectionState::In);
2660 assert!(peer.remove_after_disconnect);
2661
2662 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2664
2665 peers.on_active_session_gracefully_closed(peer_id);
2666
2667 let peer = peers.peers.get(&peer_id).unwrap();
2668 assert_eq!(peer.state, PeerConnectionState::Idle);
2669 assert!(!peer.remove_after_disconnect);
2670 }
2671
2672 #[tokio::test]
2673 async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2674 let throttle_duration = Duration::from_millis(100);
2675 let config =
2676 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2677 let mut peers = PeersManager::new(config);
2678
2679 let peer_id = PeerId::random();
2680 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2681
2682 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2684
2685 match event!(peers) {
2686 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2687 _ => unreachable!(),
2688 }
2689
2690 match event!(peers) {
2691 PeerAction::Connect { .. } => {}
2692 _ => unreachable!(),
2693 }
2694
2695 peers.on_active_outgoing_established(peer_id);
2697 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2698
2699 peers.on_active_session_gracefully_closed(peer_id);
2701
2702 let peer = peers.peers.get(&peer_id).unwrap();
2703 assert_eq!(peer.state, PeerConnectionState::Idle);
2704 assert!(peer.backed_off);
2705
2706 assert!(peers.backed_off_peers.contains_key(&peer_id));
2708
2709 poll_fn(|cx| {
2711 assert!(peers.poll(cx).is_pending());
2712 Poll::Ready(())
2713 })
2714 .await;
2715
2716 assert!(peers.backed_off_peers.contains_key(&peer_id));
2718 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2719
2720 tokio::time::sleep(throttle_duration).await;
2722
2723 match event!(peers) {
2725 PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
2726 _ => unreachable!(),
2727 }
2728
2729 assert!(!peers.backed_off_peers.contains_key(&peer_id));
2731 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2732 }
2733
2734 #[tokio::test]
2735 async fn test_backed_off_peer_can_accept_incoming_connection() {
2736 let throttle_duration = Duration::from_millis(100);
2737 let config =
2738 PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2739 let mut peers = PeersManager::new(config);
2740
2741 let peer_id = PeerId::random();
2742 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2743
2744 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2746
2747 match event!(peers) {
2748 PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2749 _ => unreachable!(),
2750 }
2751
2752 match event!(peers) {
2753 PeerAction::Connect { .. } => {}
2754 _ => unreachable!(),
2755 }
2756
2757 peers.on_active_outgoing_established(peer_id);
2759 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2760
2761 peers.on_active_session_gracefully_closed(peer_id);
2763
2764 let peer = peers.peers.get(&peer_id).unwrap();
2765 assert_eq!(peer.state, PeerConnectionState::Idle);
2766 assert!(peer.backed_off);
2767 assert!(peers.backed_off_peers.contains_key(&peer_id));
2768
2769 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2772 assert_eq!(peers.connection_info.num_pending_in, 1);
2773
2774 peers.on_incoming_session_established(peer_id, addr);
2776
2777 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2779 assert_eq!(peers.connection_info.num_inbound, 1);
2780
2781 assert!(peers.backed_off_peers.contains_key(&peer_id));
2783 assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2784
2785 poll_fn(|cx| {
2787 assert!(peers.poll(cx).is_pending());
2788 Poll::Ready(())
2789 })
2790 .await;
2791
2792 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2794
2795 tokio::time::sleep(throttle_duration).await;
2797
2798 poll_fn(|cx| {
2799 let _ = peers.poll(cx);
2800 Poll::Ready(())
2801 })
2802 .await;
2803
2804 assert!(!peers.backed_off_peers.contains_key(&peer_id));
2806 assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2807
2808 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2810 }
2811
2812 #[tokio::test]
2813 async fn test_incoming_outgoing_already_connected() {
2814 let peer_id = PeerId::random();
2815 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2816 let mut peers = PeersManager::default();
2817
2818 peers.on_incoming_pending_session(addr.ip()).unwrap();
2819 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2820
2821 match event!(peers) {
2822 PeerAction::PeerAdded(_) => {}
2823 _ => unreachable!(),
2824 }
2825
2826 match event!(peers) {
2827 PeerAction::Connect { .. } => {}
2828 _ => unreachable!(),
2829 }
2830
2831 peers.on_incoming_session_established(peer_id, addr);
2832 peers.on_already_connected(Direction::Outgoing(peer_id));
2833 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2834 assert_eq!(peers.connection_info.num_inbound, 1);
2835 assert_eq!(peers.connection_info.num_pending_out, 0);
2836 assert_eq!(peers.connection_info.num_pending_in, 0);
2837 assert_eq!(peers.connection_info.num_outbound, 0);
2838 }
2839
2840 #[tokio::test]
2841 async fn test_already_connected_incoming_outgoing_connection_error() {
2842 let peer_id = PeerId::random();
2843 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2844 let mut peers = PeersManager::default();
2845
2846 peers.on_incoming_pending_session(addr.ip()).unwrap();
2847 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2848
2849 match event!(peers) {
2850 PeerAction::PeerAdded(_) => {}
2851 _ => unreachable!(),
2852 }
2853
2854 match event!(peers) {
2855 PeerAction::Connect { .. } => {}
2856 _ => unreachable!(),
2857 }
2858
2859 peers.on_incoming_session_established(peer_id, addr);
2860
2861 peers.on_outgoing_connection_failure(
2862 &addr,
2863 &peer_id,
2864 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2865 );
2866 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2867 assert_eq!(peers.connection_info.num_inbound, 1);
2868 assert_eq!(peers.connection_info.num_pending_out, 0);
2869 assert_eq!(peers.connection_info.num_pending_in, 0);
2870 assert_eq!(peers.connection_info.num_outbound, 0);
2871 }
2872
2873 #[tokio::test]
2874 async fn test_max_concurrent_dials() {
2875 let config = PeersConfig::default();
2876 let mut peer_manager = PeersManager::new(config);
2877 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2878 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2879 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2880 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2881 }
2882
2883 peer_manager.fill_outbound_slots();
2884 let dials = peer_manager
2885 .queued_actions
2886 .iter()
2887 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2888 .count();
2889 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2890 }
2891
2892 #[tokio::test]
2893 async fn test_max_num_of_pending_dials() {
2894 let config = PeersConfig::default();
2895 let mut peer_manager = PeersManager::new(config);
2896 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2897 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2898
2899 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2901 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2902 }
2903
2904 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2905 match event!(peer_manager) {
2906 PeerAction::PeerAdded(_) => {}
2907 _ => unreachable!(),
2908 }
2909 }
2910
2911 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2912 match event!(peer_manager) {
2913 PeerAction::Connect { .. } => {}
2914 _ => unreachable!(),
2915 }
2916 }
2917
2918 peer_manager.fill_outbound_slots();
2920
2921 let dials = peer_manager.connection_info.num_pending_out;
2923 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2924
2925 let num_pendingout_states = peer_manager
2926 .peers
2927 .iter()
2928 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2929 .map(|(peer_id, _)| *peer_id)
2930 .collect::<Vec<PeerId>>();
2931 assert_eq!(
2932 num_pendingout_states.len(),
2933 peer_manager.connection_info.config.max_concurrent_outbound_dials
2934 );
2935
2936 for peer_id in &num_pendingout_states {
2938 peer_manager.on_active_outgoing_established(*peer_id);
2939 }
2940
2941 for peer_id in &num_pendingout_states {
2943 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2944 }
2945
2946 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2948 }
2949
2950 #[tokio::test]
2951 async fn test_connect() {
2952 let peer = PeerId::random();
2953 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2954 let mut peers = PeersManager::default();
2955 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2956 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2957
2958 match event!(peers) {
2959 PeerAction::Connect { peer_id, remote_addr } => {
2960 assert_eq!(peer_id, peer);
2961 assert_eq!(remote_addr, socket_addr);
2962 }
2963 _ => unreachable!(),
2964 }
2965
2966 let (record, _) = peers.peer_by_id(peer).unwrap();
2967 assert_eq!(record.tcp_addr(), socket_addr);
2968 assert_eq!(record.udp_addr(), socket_addr);
2969
2970 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2972
2973 let (record, _) = peers.peer_by_id(peer).unwrap();
2974 assert_eq!(record.tcp_addr(), socket_addr);
2975 assert_eq!(record.udp_addr(), socket_addr);
2976 }
2977
2978 #[tokio::test]
2979 async fn test_incoming_connection_from_banned() {
2980 let peer = PeerId::random();
2981 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2982 let config = PeersConfig::test().with_max_inbound(3);
2983 let mut peers = PeersManager::new(config);
2984 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2985
2986 match event!(peers) {
2987 PeerAction::PeerAdded(peer_id) => {
2988 assert_eq!(peer_id, peer);
2989 }
2990 _ => unreachable!(),
2991 }
2992 match event!(peers) {
2993 PeerAction::Connect { peer_id, .. } => {
2994 assert_eq!(peer_id, peer);
2995 }
2996 _ => unreachable!(),
2997 }
2998
2999 poll_fn(|cx| {
3000 assert!(peers.poll(cx).is_pending());
3001 Poll::Ready(())
3002 })
3003 .await;
3004
3005 loop {
3007 peers.on_active_session_dropped(
3008 &socket_addr,
3009 &peer,
3010 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3011 reth_eth_wire::EthVersion::Eth68,
3012 reth_eth_wire::EthMessageID::Status,
3013 )),
3014 );
3015
3016 if peers.peers.get(&peer).unwrap().is_banned() {
3017 break;
3018 }
3019
3020 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3021 peers.on_incoming_session_established(peer, socket_addr);
3022
3023 match event!(peers) {
3024 PeerAction::Connect { peer_id, .. } => {
3025 assert_eq!(peer_id, peer);
3026 }
3027 _ => unreachable!(),
3028 }
3029 }
3030
3031 assert!(peers.peers.get(&peer).unwrap().is_banned());
3032
3033 for _ in 0..peers.connection_info.config.max_inbound {
3035 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3036 peers.on_incoming_session_established(peer, socket_addr);
3037
3038 match event!(peers) {
3039 PeerAction::DisconnectBannedIncoming { peer_id } => {
3040 assert_eq!(peer_id, peer);
3041 }
3042 _ => unreachable!(),
3043 }
3044 }
3045
3046 poll_fn(|cx| {
3047 assert!(peers.poll(cx).is_pending());
3048 Poll::Ready(())
3049 })
3050 .await;
3051
3052 assert_eq!(peers.connection_info.num_inbound, 0);
3053
3054 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3055
3056 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3058 assert_eq!(peers.connection_info.num_pending_in, 1);
3059
3060 peers.on_active_session_gracefully_closed(peer);
3064 assert_eq!(peers.connection_info.num_inbound, 0);
3065 }
3066
3067 #[tokio::test]
3068 async fn test_add_pending_connect() {
3069 let peer = PeerId::random();
3070 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3071 let mut peers = PeersManager::default();
3072 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3073 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3074 assert_eq!(peers.connection_info.num_pending_out, 1);
3075 }
3076
3077 #[tokio::test]
3078 async fn test_dns_updates_peer_address() {
3079 let peer_id = PeerId::random();
3080 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3081 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3082
3083 let trusted = TrustedPeer {
3084 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3085 tcp_port: 8008,
3086 udp_port: 8008,
3087 id: peer_id,
3088 };
3089
3090 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3091 let mut manager = PeersManager::new(config);
3092 manager
3093 .trusted_peers_resolver
3094 .set_interval(tokio::time::interval(Duration::from_millis(1)));
3095
3096 manager.peers.insert(
3097 peer_id,
3098 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3099 );
3100
3101 for _ in 0..100 {
3102 let _ = event!(manager);
3103 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3104 break;
3105 }
3106 tokio::time::sleep(Duration::from_millis(10)).await;
3107 }
3108
3109 let updated_peer = manager.peers.get(&peer_id).unwrap();
3110 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3111 }
3112
3113 #[tokio::test]
3114 async fn test_ip_filter_blocks_inbound_connection() {
3115 use reth_net_banlist::IpFilter;
3116 use std::net::IpAddr;
3117
3118 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3120 let config = PeersConfig::test().with_ip_filter(ip_filter);
3121 let mut peers = PeersManager::new(config);
3122
3123 let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3125 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3126
3127 let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3129 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3130 }
3131
3132 #[tokio::test]
3133 async fn test_ip_filter_blocks_outbound_connection() {
3134 use reth_net_banlist::IpFilter;
3135 use std::net::SocketAddr;
3136
3137 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3139 let config = PeersConfig::test().with_ip_filter(ip_filter);
3140 let mut peers = PeersManager::new(config);
3141
3142 let peer_id = PeerId::new([1; 64]);
3143
3144 let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3146 peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3147 assert!(peers.peers.contains_key(&peer_id));
3148
3149 let peer_id2 = PeerId::new([2; 64]);
3151 let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3152 peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3153 assert!(!peers.peers.contains_key(&peer_id2));
3154 }
3155
3156 #[tokio::test]
3157 async fn test_ip_filter_ipv6() {
3158 use reth_net_banlist::IpFilter;
3159 use std::net::IpAddr;
3160
3161 let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3163 let config = PeersConfig::test().with_ip_filter(ip_filter);
3164 let mut peers = PeersManager::new(config);
3165
3166 let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3168 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3169
3170 let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3172 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3173 }
3174
3175 #[tokio::test]
3176 async fn test_ip_filter_multiple_ranges() {
3177 use reth_net_banlist::IpFilter;
3178 use std::net::IpAddr;
3179
3180 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3182 let config = PeersConfig::test().with_ip_filter(ip_filter);
3183 let mut peers = PeersManager::new(config);
3184
3185 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3187 let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3188 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3189 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3190
3191 let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3193 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3194 }
3195
3196 #[tokio::test]
3197 async fn test_ip_filter_no_restriction() {
3198 use reth_net_banlist::IpFilter;
3199 use std::net::IpAddr;
3200
3201 let ip_filter = IpFilter::allow_all();
3203 let config = PeersConfig::test().with_ip_filter(ip_filter);
3204 let mut peers = PeersManager::new(config);
3205
3206 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3208 let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3209 let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3210 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3211 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3212 assert!(peers.on_incoming_pending_session(ip3).is_ok());
3213 }
3214}