1use crate::{
4 error::SessionError,
5 session::{Direction, PendingSessionHandshakeError},
6 swarm::NetworkConnectionState,
7 trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17 is_connection_failed_reputation,
18 peers::{
19 config::PeerBackoffDurations,
20 reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21 },
22 ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23 ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27 fmt::Display,
28 io::{self},
29 net::{IpAddr, SocketAddr},
30 task::{Context, Poll},
31 time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35 sync::mpsc,
36 time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41#[derive(Debug)]
48pub struct PeersManager {
49 peers: HashMap<PeerId, Peer>,
51 trusted_peer_ids: HashSet<PeerId>,
56 trusted_peers_resolver: TrustedPeersResolver,
59 manager_tx: mpsc::UnboundedSender<PeerCommand>,
61 handle_rx: UnboundedReceiverStream<PeerCommand>,
63 queued_actions: VecDeque<PeerAction>,
65 refill_slots_interval: Interval,
67 reputation_weights: ReputationChangeWeights,
69 connection_info: ConnectionInfo,
71 ban_list: BanList,
73 backed_off_peers: HashMap<PeerId, std::time::Instant>,
75 release_interval: Interval,
77 ban_duration: Duration,
79 backoff_durations: PeerBackoffDurations,
82 trusted_nodes_only: bool,
85 last_tick: Instant,
87 max_backoff_count: u8,
89 net_connection_state: NetworkConnectionState,
91 incoming_ip_throttle_duration: Duration,
93 ip_filter: reth_net_banlist::IpFilter,
95}
96
97impl PeersManager {
98 pub fn new(config: PeersConfig) -> Self {
100 let PeersConfig {
101 refill_slots_interval,
102 connection_info,
103 reputation_weights,
104 ban_list,
105 ban_duration,
106 backoff_durations,
107 trusted_nodes,
108 trusted_nodes_only,
109 trusted_nodes_resolution_interval,
110 basic_nodes,
111 max_backoff_count,
112 incoming_ip_throttle_duration,
113 ip_filter,
114 } = config;
115 let (manager_tx, handle_rx) = mpsc::unbounded_channel();
116 let now = Instant::now();
117
118 let unban_interval = ban_duration.min(backoff_durations.low) / 2;
120
121 let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
122 let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
123
124 for trusted_peer in &trusted_nodes {
125 match trusted_peer.resolve_blocking() {
126 Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
127 trusted_peer_ids.insert(id);
128 peers.entry(id).or_insert_with(|| {
129 Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
130 });
131 }
132 Err(err) => {
133 warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
134 }
135 }
136 }
137
138 for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
139 peers.entry(id).or_insert_with(|| {
140 Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
141 });
142 }
143
144 Self {
145 peers,
146 trusted_peer_ids,
147 trusted_peers_resolver: TrustedPeersResolver::new(
148 trusted_nodes,
149 tokio::time::interval(trusted_nodes_resolution_interval), ),
151 manager_tx,
152 handle_rx: UnboundedReceiverStream::new(handle_rx),
153 queued_actions: Default::default(),
154 reputation_weights,
155 refill_slots_interval: tokio::time::interval(refill_slots_interval),
156 release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
157 connection_info: ConnectionInfo::new(connection_info),
158 ban_list,
159 backed_off_peers: Default::default(),
160 ban_duration,
161 backoff_durations,
162 trusted_nodes_only,
163 last_tick: Instant::now(),
164 max_backoff_count,
165 net_connection_state: NetworkConnectionState::default(),
166 incoming_ip_throttle_duration,
167 ip_filter,
168 }
169 }
170
171 pub(crate) fn handle(&self) -> PeersHandle {
173 PeersHandle::new(self.manager_tx.clone())
174 }
175
176 #[inline]
178 pub(crate) fn num_known_peers(&self) -> usize {
179 self.peers.len()
180 }
181
182 pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
184 self.peers.iter().map(|(peer_id, v)| {
185 NodeRecord::new_with_ports(
186 v.addr.tcp().ip(),
187 v.addr.tcp().port(),
188 v.addr.udp().map(|addr| addr.port()),
189 *peer_id,
190 )
191 })
192 }
193
194 pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
196 self.peers.get(&peer_id).map(|v| {
197 (
198 NodeRecord::new_with_ports(
199 v.addr.tcp().ip(),
200 v.addr.tcp().port(),
201 v.addr.udp().map(|addr| addr.port()),
202 peer_id,
203 ),
204 v.kind,
205 )
206 })
207 }
208
209 pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
211 self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
212 }
213
214 #[inline]
216 pub(crate) const fn num_inbound_connections(&self) -> usize {
217 self.connection_info.num_inbound
218 }
219
220 #[inline]
222 pub(crate) const fn num_outbound_connections(&self) -> usize {
223 self.connection_info.num_outbound
224 }
225
226 #[inline]
228 pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
229 self.connection_info.num_pending_out
230 }
231
232 #[inline]
234 pub(crate) fn num_backed_off_peers(&self) -> usize {
235 self.backed_off_peers.len()
236 }
237
238 fn num_idle_trusted_peers(&self) -> usize {
240 self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
241 }
242
243 pub(crate) fn on_incoming_pending_session(
247 &mut self,
248 addr: IpAddr,
249 ) -> Result<(), InboundConnectionError> {
250 if !self.ip_filter.is_allowed(&addr) {
252 trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
253 return Err(InboundConnectionError::IpBanned)
254 }
255
256 if self.ban_list.is_banned_ip(&addr) {
257 return Err(InboundConnectionError::IpBanned)
258 }
259
260 if !self.connection_info.has_in_capacity() {
262 if self.trusted_peer_ids.is_empty() {
263 return Err(InboundConnectionError::ExceedsCapacity)
266 }
267
268 let num_idle_trusted_peers = self.num_idle_trusted_peers();
272 if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
273 let max_inbound =
275 self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
276 if self.connection_info.num_pending_in < max_inbound {
277 self.connection_info.inc_pending_in();
278 return Ok(())
279 }
280 }
281
282 return Err(InboundConnectionError::ExceedsCapacity)
284 }
285
286 if !self.connection_info.has_in_pending_capacity() {
288 return Err(InboundConnectionError::ExceedsCapacity)
289 }
290
291 self.throttle_incoming_ip(addr);
293
294 self.connection_info.inc_pending_in();
295 Ok(())
296 }
297
298 pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
301 self.connection_info.decr_pending_in();
302 }
303
304 pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
306 self.connection_info.decr_pending_in()
307 }
308
309 pub(crate) fn on_incoming_pending_session_dropped(
311 &mut self,
312 remote_addr: SocketAddr,
313 err: &PendingSessionHandshakeError,
314 ) {
315 if err.is_fatal_protocol_error() {
316 self.ban_ip(remote_addr.ip());
317
318 if err.merits_discovery_ban() {
319 self.queued_actions
320 .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
321 }
322 }
323
324 self.connection_info.decr_pending_in();
325 }
326
327 pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
334 self.connection_info.decr_pending_in();
335
336 if self.ban_list.is_banned_peer(&peer_id) {
339 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
340 return
341 }
342
343 let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
345 if self.trusted_nodes_only && !is_trusted {
346 self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
347 return
348 }
349
350 self.tick();
352
353 match self.peers.entry(peer_id) {
354 Entry::Occupied(mut entry) => {
355 let peer = entry.get_mut();
356 if peer.is_banned() {
357 self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
358 return
359 }
360 if peer.state.is_pending_out() {
363 self.connection_info.decr_state(peer.state);
364 }
365
366 peer.state = PeerConnectionState::In;
367
368 is_trusted = is_trusted || peer.is_trusted();
369 }
370 Entry::Vacant(entry) => {
371 let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
374 peer.remove_after_disconnect = true;
375 entry.insert(peer);
376 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
377 }
378 }
379
380 let has_in_capacity = self.connection_info.has_in_capacity();
381 self.connection_info.inc_in();
383
384 if !is_trusted && !has_in_capacity {
386 self.queued_actions.push_back(PeerAction::Disconnect {
387 peer_id,
388 reason: Some(DisconnectReason::TooManyPeers),
389 });
390 }
391 }
392
393 fn ban_peer(&mut self, peer_id: PeerId) {
395 let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
396 (peer.is_trusted() || peer.is_static())
397 {
398 self.backoff_durations.low / 2
401 } else {
402 self.ban_duration
403 };
404
405 self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
406 self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
407 }
408
409 fn ban_ip(&mut self, ip: IpAddr) {
411 self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
412 }
413
414 fn throttle_incoming_ip(&mut self, ip: IpAddr) {
416 self.ban_list
417 .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
418 }
419
420 fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
422 trace!(target: "net::peers", ?peer_id, "backing off");
423
424 if let Some(peer) = self.peers.get_mut(&peer_id) {
425 peer.backed_off = true;
426 self.backed_off_peers.insert(peer_id, until);
427 }
428 }
429
430 fn unban_peer(&mut self, peer_id: PeerId) {
432 self.ban_list.unban_peer(&peer_id);
433 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
434 }
435
436 fn tick(&mut self) {
441 let now = Instant::now();
442 let secs_since_last_tick =
446 if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
447 self.last_tick = now;
448
449 for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
451 if peer.1.reputation < DEFAULT_REPUTATION {
454 peer.1.reputation += secs_since_last_tick;
455 }
456 }
457 }
458
459 pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
461 self.peers.get(peer_id).map(|peer| peer.reputation)
462 }
463
464 pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
470 let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
471 if rep.is_reset() {
473 peer.reset_reputation()
474 } else {
475 let mut reputation_change = self.reputation_weights.change(rep).as_i32();
476 if peer.is_trusted() || peer.is_static() {
477 if matches!(
479 rep,
480 ReputationChangeKind::Dropped |
481 ReputationChangeKind::BadAnnouncement |
482 ReputationChangeKind::Timeout |
483 ReputationChangeKind::AlreadySeenTransaction
484 ) {
485 return
486 }
487
488 if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
490 reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
492 }
493 }
494 peer.apply_reputation(reputation_change, rep)
495 }
496 } else {
497 return
498 };
499
500 match outcome {
501 ReputationChangeOutcome::None => {}
502 ReputationChangeOutcome::Ban => {
503 self.ban_peer(*peer_id);
504 }
505 ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
506 ReputationChangeOutcome::DisconnectAndBan => {
507 self.queued_actions.push_back(PeerAction::Disconnect {
508 peer_id: *peer_id,
509 reason: Some(DisconnectReason::DisconnectRequested),
510 });
511 self.ban_peer(*peer_id);
512 }
513 }
514 }
515
516 pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
518 if let Some(peer) = self.peers.get_mut(peer_id) {
519 self.connection_info.decr_state(peer.state);
520 peer.state = PeerConnectionState::Idle;
521 }
522 }
523
524 pub(crate) fn on_outgoing_pending_session_dropped(
527 &mut self,
528 remote_addr: &SocketAddr,
529 peer_id: &PeerId,
530 err: &PendingSessionHandshakeError,
531 ) {
532 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
533 }
534
535 pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
537 match self.peers.entry(peer_id) {
538 Entry::Occupied(mut entry) => {
539 self.connection_info.decr_state(entry.get().state);
540
541 if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
542 entry.remove();
544 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
545 } else {
546 entry.get_mut().severe_backoff_counter = 0;
550 entry.get_mut().state = PeerConnectionState::Idle;
551 return
552 }
553 }
554 Entry::Vacant(_) => return,
555 }
556
557 self.fill_outbound_slots();
558 }
559
560 pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
562 if let Some(peer) = self.peers.get_mut(&peer_id) {
563 self.connection_info.decr_state(peer.state);
564 self.connection_info.inc_out();
565 peer.state = PeerConnectionState::Out;
566 }
567 }
568
569 pub(crate) fn on_active_session_dropped(
574 &mut self,
575 remote_addr: &SocketAddr,
576 peer_id: &PeerId,
577 err: &EthStreamError,
578 ) {
579 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
580 }
581
582 pub(crate) fn on_outgoing_connection_failure(
585 &mut self,
586 remote_addr: &SocketAddr,
587 peer_id: &PeerId,
588 err: &io::Error,
589 ) {
590 if let Some(peer) = self.peers.get(peer_id) {
594 if peer.state.is_incoming() {
595 return
597 }
598
599 if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
600 self.trusted_peers_resolver.interval.reset_immediately();
603 }
604 }
605
606 self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
607 }
608
609 fn on_connection_failure(
610 &mut self,
611 remote_addr: &SocketAddr,
612 peer_id: &PeerId,
613 err: impl SessionError,
614 reputation_change: ReputationChangeKind,
615 ) {
616 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
617
618 if err.is_fatal_protocol_error() {
619 trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
620 if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
623 self.connection_info.decr_state(entry.get().state);
624 if entry.get().is_trusted() {
626 entry.get_mut().state = PeerConnectionState::Idle;
627 } else {
628 entry.remove();
629 self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
630 if err.merits_discovery_ban() {
632 self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
633 peer_id: *peer_id,
634 ip_addr: remote_addr.ip(),
635 })
636 }
637 }
638 }
639
640 self.ban_peer(*peer_id);
642 } else {
643 let mut backoff_until = None;
644 let mut remove_peer = false;
645
646 if let Some(peer) = self.peers.get_mut(peer_id) {
647 if let Some(kind) = err.should_backoff() {
648 if peer.is_trusted() || peer.is_static() {
649 let backoff = self.backoff_durations.low;
655 backoff_until = Some(std::time::Instant::now() + backoff);
656 } else {
657 if kind.is_severe() {
659 peer.severe_backoff_counter =
660 peer.severe_backoff_counter.saturating_add(1);
661 }
662
663 let backoff_time =
664 self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
665
666 backoff_until = Some(backoff_time);
670 }
671 } else {
672 let reputation_change = self.reputation_weights.change(reputation_change);
674 peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
675 };
676
677 self.connection_info.decr_state(peer.state);
678 peer.state = PeerConnectionState::Idle;
679
680 if peer.severe_backoff_counter > self.max_backoff_count &&
681 !peer.is_trusted() &&
682 !peer.is_static()
683 {
684 remove_peer = true;
687 }
688 }
689
690 if remove_peer {
692 let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
693 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
694 } else if let Some(backoff_until) = backoff_until {
695 self.backoff_peer_until(*peer_id, backoff_until);
697 }
698 }
699
700 self.fill_outbound_slots();
701 }
702
703 pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
709 match direction {
710 Direction::Incoming => {
711 self.connection_info.decr_pending_in();
713 }
714 Direction::Outgoing(_) => {
715 }
718 }
719 }
720
721 pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
726 if let Some(peer) = self.peers.get_mut(&peer_id) {
727 trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
728 peer.fork_id = Some(Box::new(fork_id));
729 }
730 }
731
732 pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
736 self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
737 }
738
739 pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
741 self.trusted_peer_ids.insert(peer_id);
742 }
743
744 #[cfg_attr(not(test), expect(dead_code))]
748 pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
749 self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
750 }
751
752 pub(crate) fn add_peer_kind(
756 &mut self,
757 peer_id: PeerId,
758 kind: PeerKind,
759 addr: PeerAddr,
760 fork_id: Option<ForkId>,
761 ) {
762 let ip_addr = addr.tcp().ip();
763
764 if !self.ip_filter.is_allowed(&ip_addr) {
766 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
767 return
768 }
769
770 if self.ban_list.is_banned(&peer_id, &ip_addr) {
771 return
772 }
773
774 match self.peers.entry(peer_id) {
775 Entry::Occupied(mut entry) => {
776 let peer = entry.get_mut();
777 peer.kind = kind;
778 peer.fork_id = fork_id.map(Box::new);
779 peer.addr = addr;
780
781 if peer.state.is_incoming() {
782 peer.remove_after_disconnect = false;
786 }
787 }
788 Entry::Vacant(entry) => {
789 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
790 let mut peer = Peer::with_kind(addr, kind);
791 peer.fork_id = fork_id.map(Box::new);
792 entry.insert(peer);
793 self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
794 }
795 }
796
797 if kind.is_trusted() {
798 self.trusted_peer_ids.insert(peer_id);
799 }
800 }
801
802 pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
804 let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
805 if entry.get().is_trusted() {
806 return
807 }
808 let mut peer = entry.remove();
809
810 trace!(target: "net::peers", ?peer_id, "remove discovered node");
811 self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
812
813 if peer.state.is_connected() {
814 trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
815 peer.remove_after_disconnect = true;
820 peer.state.disconnect();
821 self.peers.insert(peer_id, peer);
822 self.queued_actions.push_back(PeerAction::Disconnect {
823 peer_id,
824 reason: Some(DisconnectReason::DisconnectRequested),
825 })
826 }
827 }
828
829 #[cfg_attr(not(test), expect(dead_code))]
832 pub(crate) fn add_and_connect(
833 &mut self,
834 peer_id: PeerId,
835 addr: PeerAddr,
836 fork_id: Option<ForkId>,
837 ) {
838 self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
839 }
840
841 pub(crate) fn add_and_connect_kind(
845 &mut self,
846 peer_id: PeerId,
847 kind: PeerKind,
848 addr: PeerAddr,
849 fork_id: Option<ForkId>,
850 ) {
851 let ip_addr = addr.tcp().ip();
852
853 if !self.ip_filter.is_allowed(&ip_addr) {
855 trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
856 return
857 }
858
859 if self.ban_list.is_banned(&peer_id, &ip_addr) {
860 return
861 }
862
863 match self.peers.entry(peer_id) {
864 Entry::Occupied(mut entry) => {
865 let peer = entry.get_mut();
866 peer.kind = kind;
867 peer.fork_id = fork_id.map(Box::new);
868 peer.addr = addr;
869
870 if peer.state == PeerConnectionState::Idle {
871 peer.state = PeerConnectionState::PendingOut;
873 self.connection_info.inc_pending_out();
874 self.queued_actions
875 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
876 }
877 }
878 Entry::Vacant(entry) => {
879 trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
880 let mut peer = Peer::with_kind(addr, kind);
881 peer.state = PeerConnectionState::PendingOut;
882 peer.fork_id = fork_id.map(Box::new);
883 entry.insert(peer);
884 self.connection_info.inc_pending_out();
885 self.queued_actions
886 .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
887 }
888 }
889
890 if kind.is_trusted() {
891 self.trusted_peer_ids.insert(peer_id);
892 }
893 }
894
895 pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
897 let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
898 if !entry.get().is_trusted() {
899 return
900 }
901
902 let peer = entry.get_mut();
903 peer.kind = PeerKind::Basic;
904
905 self.trusted_peer_ids.remove(&peer_id);
906 }
907
908 fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
918 let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
919 !peer.is_backed_off() &&
920 !peer.is_banned() &&
921 peer.state.is_unconnected() &&
922 (!self.trusted_nodes_only || peer.is_trusted())
923 });
924
925 let mut best_peer = unconnected.next()?;
927
928 if best_peer.1.is_trusted() || best_peer.1.is_static() {
929 return Some((*best_peer.0, best_peer.1))
930 }
931
932 for maybe_better in unconnected {
933 if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
935 return Some((*maybe_better.0, maybe_better.1))
936 }
937
938 if maybe_better.1.reputation > best_peer.1.reputation {
940 best_peer = maybe_better;
941 }
942 }
943 Some((*best_peer.0, best_peer.1))
944 }
945
946 fn fill_outbound_slots(&mut self) {
952 self.tick();
953
954 if !self.net_connection_state.is_active() {
955 return
957 }
958
959 while self.connection_info.has_out_capacity() {
961 let action = {
962 let (peer_id, peer) = match self.best_unconnected() {
963 Some(peer) => peer,
964 _ => break,
965 };
966
967 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
968
969 peer.state = PeerConnectionState::PendingOut;
970 PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
971 };
972
973 self.connection_info.inc_pending_out();
974
975 self.queued_actions.push_back(action);
976 }
977 }
978
979 fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
980 if let Some(peer) = self.peers.get_mut(&peer_id) {
981 let new_addr = PeerAddr::new_with_ports(
982 new_record.address,
983 new_record.tcp_port,
984 Some(new_record.udp_port),
985 );
986
987 if peer.addr != new_addr {
988 peer.addr = new_addr;
989 trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
990 }
991 }
992 }
993
994 pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
996 self.net_connection_state = state;
997 }
998
999 pub const fn connection_state(&self) -> &NetworkConnectionState {
1001 &self.net_connection_state
1002 }
1003
1004 pub const fn on_shutdown(&mut self) {
1006 self.net_connection_state = NetworkConnectionState::ShuttingDown;
1007 }
1008
1009 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1014 loop {
1015 if let Some(action) = self.queued_actions.pop_front() {
1017 return Poll::Ready(action)
1018 }
1019
1020 while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1021 match cmd {
1022 PeerCommand::Add(peer_id, addr) => {
1023 self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1024 }
1025 PeerCommand::Remove(peer) => self.remove_peer(peer),
1026 PeerCommand::ReputationChange(peer_id, rep) => {
1027 self.apply_reputation_change(&peer_id, rep)
1028 }
1029 PeerCommand::GetPeer(peer, tx) => {
1030 let _ = tx.send(self.peers.get(&peer).cloned());
1031 }
1032 PeerCommand::GetPeers(tx) => {
1033 let _ = tx.send(self.iter_peers().collect());
1034 }
1035 }
1036 }
1037
1038 if self.release_interval.poll_tick(cx).is_ready() {
1039 let now = std::time::Instant::now();
1040 let (_, unbanned_peers) = self.ban_list.evict(now);
1041
1042 for peer_id in unbanned_peers {
1043 if let Some(peer) = self.peers.get_mut(&peer_id) {
1044 peer.unban();
1045 self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1046 }
1047 }
1048
1049 self.backed_off_peers.retain(|peer_id, until| {
1052 if now > *until {
1053 if let Some(peer) = self.peers.get_mut(peer_id) {
1054 peer.backed_off = false;
1055 }
1056 return false
1057 }
1058 true
1059 })
1060 }
1061
1062 while self.refill_slots_interval.poll_tick(cx).is_ready() {
1063 self.fill_outbound_slots();
1064 }
1065
1066 if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1067 self.on_resolved_peer(peer_id, new_record);
1068 }
1069
1070 if self.queued_actions.is_empty() {
1071 return Poll::Pending
1072 }
1073 }
1074 }
1075}
1076
1077impl Default for PeersManager {
1078 fn default() -> Self {
1079 Self::new(Default::default())
1080 }
1081}
1082
1083#[derive(Debug, Clone, PartialEq, Eq, Default)]
1085pub struct ConnectionInfo {
1086 num_outbound: usize,
1088 num_pending_out: usize,
1090 num_inbound: usize,
1092 num_pending_in: usize,
1094 config: ConnectionsConfig,
1096}
1097
1098impl ConnectionInfo {
1101 const fn new(config: ConnectionsConfig) -> Self {
1103 Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1104 }
1105
1106 const fn has_out_capacity(&self) -> bool {
1108 self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1109 self.num_outbound < self.config.max_outbound
1110 }
1111
1112 const fn has_in_capacity(&self) -> bool {
1114 self.num_inbound < self.config.max_inbound
1115 }
1116
1117 const fn has_in_pending_capacity(&self) -> bool {
1119 self.num_pending_in < self.config.max_inbound
1120 }
1121
1122 const fn decr_state(&mut self, state: PeerConnectionState) {
1123 match state {
1124 PeerConnectionState::Idle => {}
1125 PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1126 PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1127 PeerConnectionState::PendingOut => self.decr_pending_out(),
1128 }
1129 }
1130
1131 const fn decr_out(&mut self) {
1132 self.num_outbound -= 1;
1133 }
1134
1135 const fn inc_out(&mut self) {
1136 self.num_outbound += 1;
1137 }
1138
1139 const fn inc_pending_out(&mut self) {
1140 self.num_pending_out += 1;
1141 }
1142
1143 const fn inc_in(&mut self) {
1144 self.num_inbound += 1;
1145 }
1146
1147 const fn inc_pending_in(&mut self) {
1148 self.num_pending_in += 1;
1149 }
1150
1151 const fn decr_in(&mut self) {
1152 self.num_inbound -= 1;
1153 }
1154
1155 const fn decr_pending_out(&mut self) {
1156 self.num_pending_out -= 1;
1157 }
1158
1159 const fn decr_pending_in(&mut self) {
1160 self.num_pending_in -= 1;
1161 }
1162}
1163
1164#[derive(Debug)]
1166pub enum PeerAction {
1167 Connect {
1169 peer_id: PeerId,
1171 remote_addr: SocketAddr,
1173 },
1174 Disconnect {
1176 peer_id: PeerId,
1178 reason: Option<DisconnectReason>,
1180 },
1181 DisconnectBannedIncoming {
1184 peer_id: PeerId,
1186 },
1187 DisconnectUntrustedIncoming {
1189 peer_id: PeerId,
1191 },
1192 DiscoveryBanPeerId {
1194 peer_id: PeerId,
1196 ip_addr: IpAddr,
1198 },
1199 DiscoveryBanIp {
1201 ip_addr: IpAddr,
1203 },
1204 BanPeer {
1206 peer_id: PeerId,
1208 },
1209 UnBanPeer {
1211 peer_id: PeerId,
1213 },
1214 PeerAdded(PeerId),
1216 PeerRemoved(PeerId),
1218}
1219
1220#[derive(Debug, Error, PartialEq, Eq)]
1222pub enum InboundConnectionError {
1223 IpBanned,
1225 ExceedsCapacity,
1227}
1228
1229impl Display for InboundConnectionError {
1230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1231 write!(f, "{self:?}")
1232 }
1233}
1234
1235#[cfg(test)]
1236mod tests {
1237 use alloy_primitives::B512;
1238 use reth_eth_wire::{
1239 errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1240 DisconnectReason,
1241 };
1242 use reth_net_banlist::BanList;
1243 use reth_network_api::Direction;
1244 use reth_network_peers::{PeerId, TrustedPeer};
1245 use reth_network_types::{
1246 peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1247 };
1248 use std::{
1249 future::{poll_fn, Future},
1250 io,
1251 net::{IpAddr, Ipv4Addr, SocketAddr},
1252 pin::Pin,
1253 task::{Context, Poll},
1254 time::Duration,
1255 };
1256 use url::Host;
1257
1258 use super::PeersManager;
1259 use crate::{
1260 error::SessionError,
1261 peers::{
1262 ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1263 PeerConnectionState,
1264 },
1265 session::PendingSessionHandshakeError,
1266 PeersConfig,
1267 };
1268
1269 struct PeerActionFuture<'a> {
1270 peers: &'a mut PeersManager,
1271 }
1272
1273 impl Future for PeerActionFuture<'_> {
1274 type Output = PeerAction;
1275
1276 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1277 self.get_mut().peers.poll(cx)
1278 }
1279 }
1280
1281 macro_rules! event {
1282 ($peers:expr) => {
1283 PeerActionFuture { peers: &mut $peers }.await
1284 };
1285 }
1286
1287 #[tokio::test]
1288 async fn test_insert() {
1289 let peer = PeerId::random();
1290 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1291 let mut peers = PeersManager::default();
1292 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1293
1294 match event!(peers) {
1295 PeerAction::PeerAdded(peer_id) => {
1296 assert_eq!(peer_id, peer);
1297 }
1298 _ => unreachable!(),
1299 }
1300 match event!(peers) {
1301 PeerAction::Connect { peer_id, remote_addr } => {
1302 assert_eq!(peer_id, peer);
1303 assert_eq!(remote_addr, socket_addr);
1304 }
1305 _ => unreachable!(),
1306 }
1307
1308 let (record, _) = peers.peer_by_id(peer).unwrap();
1309 assert_eq!(record.tcp_addr(), socket_addr);
1310 assert_eq!(record.udp_addr(), socket_addr);
1311 }
1312
1313 #[tokio::test]
1314 async fn test_insert_udp() {
1315 let peer = PeerId::random();
1316 let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1317 let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1318 let mut peers = PeersManager::default();
1319 peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1320
1321 match event!(peers) {
1322 PeerAction::PeerAdded(peer_id) => {
1323 assert_eq!(peer_id, peer);
1324 }
1325 _ => unreachable!(),
1326 }
1327 match event!(peers) {
1328 PeerAction::Connect { peer_id, remote_addr } => {
1329 assert_eq!(peer_id, peer);
1330 assert_eq!(remote_addr, tcp_addr);
1331 }
1332 _ => unreachable!(),
1333 }
1334
1335 let (record, _) = peers.peer_by_id(peer).unwrap();
1336 assert_eq!(record.tcp_addr(), tcp_addr);
1337 assert_eq!(record.udp_addr(), udp_addr);
1338 }
1339
1340 #[tokio::test]
1341 async fn test_ban() {
1342 let peer = PeerId::random();
1343 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1344 let mut peers = PeersManager::default();
1345 peers.ban_peer(peer);
1346 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1347
1348 match event!(peers) {
1349 PeerAction::BanPeer { peer_id } => {
1350 assert_eq!(peer_id, peer);
1351 }
1352 _ => unreachable!(),
1353 }
1354
1355 poll_fn(|cx| {
1356 assert!(peers.poll(cx).is_pending());
1357 Poll::Ready(())
1358 })
1359 .await;
1360 }
1361
1362 #[tokio::test]
1363 async fn test_unban() {
1364 let peer = PeerId::random();
1365 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1366 let mut peers = PeersManager::default();
1367 peers.ban_peer(peer);
1368 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1369
1370 match event!(peers) {
1371 PeerAction::BanPeer { peer_id } => {
1372 assert_eq!(peer_id, peer);
1373 }
1374 _ => unreachable!(),
1375 }
1376
1377 poll_fn(|cx| {
1378 assert!(peers.poll(cx).is_pending());
1379 Poll::Ready(())
1380 })
1381 .await;
1382
1383 peers.unban_peer(peer);
1384
1385 match event!(peers) {
1386 PeerAction::UnBanPeer { peer_id } => {
1387 assert_eq!(peer_id, peer);
1388 }
1389 _ => unreachable!(),
1390 }
1391
1392 poll_fn(|cx| {
1393 assert!(peers.poll(cx).is_pending());
1394 Poll::Ready(())
1395 })
1396 .await;
1397 }
1398
1399 #[tokio::test]
1400 async fn test_backoff_on_busy() {
1401 let peer = PeerId::random();
1402 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1403
1404 let mut peers = PeersManager::new(PeersConfig::test());
1405 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1406
1407 match event!(peers) {
1408 PeerAction::PeerAdded(peer_id) => {
1409 assert_eq!(peer_id, peer);
1410 }
1411 _ => unreachable!(),
1412 }
1413 match event!(peers) {
1414 PeerAction::Connect { peer_id, .. } => {
1415 assert_eq!(peer_id, peer);
1416 }
1417 _ => unreachable!(),
1418 }
1419
1420 poll_fn(|cx| {
1421 assert!(peers.poll(cx).is_pending());
1422 Poll::Ready(())
1423 })
1424 .await;
1425
1426 peers.on_active_session_dropped(
1427 &socket_addr,
1428 &peer,
1429 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1430 DisconnectReason::TooManyPeers,
1431 )),
1432 );
1433
1434 poll_fn(|cx| {
1435 assert!(peers.poll(cx).is_pending());
1436 Poll::Ready(())
1437 })
1438 .await;
1439
1440 assert!(peers.backed_off_peers.contains_key(&peer));
1441 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1442
1443 tokio::time::sleep(peers.backoff_durations.low).await;
1444
1445 match event!(peers) {
1446 PeerAction::Connect { peer_id, .. } => {
1447 assert_eq!(peer_id, peer);
1448 }
1449 _ => unreachable!(),
1450 }
1451
1452 assert!(!peers.backed_off_peers.contains_key(&peer));
1453 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1454 }
1455
1456 #[tokio::test]
1457 async fn test_backoff_on_no_response() {
1458 let peer = PeerId::random();
1459 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1460
1461 let backoff_durations = PeerBackoffDurations::test();
1462 let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1463 let mut peers = PeersManager::new(config);
1464 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1465
1466 match event!(peers) {
1467 PeerAction::PeerAdded(peer_id) => {
1468 assert_eq!(peer_id, peer);
1469 }
1470 _ => unreachable!(),
1471 }
1472 match event!(peers) {
1473 PeerAction::Connect { peer_id, .. } => {
1474 assert_eq!(peer_id, peer);
1475 }
1476 _ => unreachable!(),
1477 }
1478
1479 poll_fn(|cx| {
1480 assert!(peers.poll(cx).is_pending());
1481 Poll::Ready(())
1482 })
1483 .await;
1484
1485 peers.on_outgoing_pending_session_dropped(
1486 &socket_addr,
1487 &peer,
1488 &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1489 EthHandshakeError::NoResponse,
1490 )),
1491 );
1492
1493 poll_fn(|cx| {
1494 assert!(peers.poll(cx).is_pending());
1495 Poll::Ready(())
1496 })
1497 .await;
1498
1499 assert!(peers.backed_off_peers.contains_key(&peer));
1500 assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1501
1502 tokio::time::sleep(backoff_durations.high).await;
1503
1504 match event!(peers) {
1505 PeerAction::Connect { peer_id, .. } => {
1506 assert_eq!(peer_id, peer);
1507 }
1508 _ => unreachable!(),
1509 }
1510
1511 assert!(!peers.backed_off_peers.contains_key(&peer));
1512 assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1513 }
1514
1515 #[tokio::test]
1516 async fn test_low_backoff() {
1517 let peer = PeerId::random();
1518 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1519 let config = PeersConfig::test();
1520 let mut peers = PeersManager::new(config);
1521 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1522 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1523
1524 let backoff_timestamp = peers
1525 .backoff_durations
1526 .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1527
1528 let expected = std::time::Instant::now() + peers.backoff_durations.low;
1529 assert!(backoff_timestamp <= expected);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_multiple_backoff_calculations() {
1534 let peer = PeerId::random();
1535 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1536 let config = PeersConfig::default();
1537 let mut peers = PeersManager::new(config);
1538 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1539 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1540
1541 peer_struct.severe_backoff_counter = 1;
1543
1544 let now = std::time::Instant::now();
1545
1546 peer_struct.severe_backoff_counter += 1;
1548 let backoff_time = peers
1550 .backoff_durations
1551 .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1552
1553 let backoff_duration = std::time::Duration::new(30 * 60, 0);
1555
1556 assert!(backoff_time.duration_since(now) > backoff_duration);
1559 }
1560
1561 #[tokio::test]
1562 async fn test_ban_on_active_drop() {
1563 let peer = PeerId::random();
1564 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1565 let mut peers = PeersManager::default();
1566 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1567
1568 match event!(peers) {
1569 PeerAction::PeerAdded(peer_id) => {
1570 assert_eq!(peer_id, peer);
1571 }
1572 _ => unreachable!(),
1573 }
1574 match event!(peers) {
1575 PeerAction::Connect { peer_id, .. } => {
1576 assert_eq!(peer_id, peer);
1577 }
1578 _ => unreachable!(),
1579 }
1580
1581 poll_fn(|cx| {
1582 assert!(peers.poll(cx).is_pending());
1583 Poll::Ready(())
1584 })
1585 .await;
1586
1587 peers.on_active_session_dropped(
1588 &socket_addr,
1589 &peer,
1590 &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1591 DisconnectReason::UselessPeer,
1592 )),
1593 );
1594
1595 match event!(peers) {
1596 PeerAction::PeerRemoved(peer_id) => {
1597 assert_eq!(peer_id, peer);
1598 }
1599 _ => unreachable!(),
1600 }
1601 match event!(peers) {
1602 PeerAction::BanPeer { peer_id } => {
1603 assert_eq!(peer_id, peer);
1604 }
1605 _ => unreachable!(),
1606 }
1607
1608 poll_fn(|cx| {
1609 assert!(peers.poll(cx).is_pending());
1610 Poll::Ready(())
1611 })
1612 .await;
1613
1614 assert!(!peers.peers.contains_key(&peer));
1615 }
1616
1617 #[tokio::test]
1618 async fn test_remove_on_max_backoff_count() {
1619 let peer = PeerId::random();
1620 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1621 let config = PeersConfig::test();
1622 let mut peers = PeersManager::new(config.clone());
1623 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1624 let peer_struct = peers.peers.get_mut(&peer).unwrap();
1625
1626 peer_struct.severe_backoff_counter = config.max_backoff_count;
1628
1629 match event!(peers) {
1630 PeerAction::PeerAdded(peer_id) => {
1631 assert_eq!(peer_id, peer);
1632 }
1633 _ => unreachable!(),
1634 }
1635 match event!(peers) {
1636 PeerAction::Connect { peer_id, .. } => {
1637 assert_eq!(peer_id, peer);
1638 }
1639 _ => unreachable!(),
1640 }
1641
1642 poll_fn(|cx| {
1643 assert!(peers.poll(cx).is_pending());
1644 Poll::Ready(())
1645 })
1646 .await;
1647
1648 peers.on_outgoing_pending_session_dropped(
1649 &socket_addr,
1650 &peer,
1651 &PendingSessionHandshakeError::Eth(
1652 io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1653 ),
1654 );
1655
1656 match event!(peers) {
1657 PeerAction::PeerRemoved(peer_id) => {
1658 assert_eq!(peer_id, peer);
1659 }
1660 _ => unreachable!(),
1661 }
1662
1663 poll_fn(|cx| {
1664 assert!(peers.poll(cx).is_pending());
1665 Poll::Ready(())
1666 })
1667 .await;
1668
1669 assert!(!peers.peers.contains_key(&peer));
1670 }
1671
1672 #[tokio::test]
1673 async fn test_ban_on_pending_drop() {
1674 let peer = PeerId::random();
1675 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1676 let mut peers = PeersManager::default();
1677 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1678
1679 match event!(peers) {
1680 PeerAction::PeerAdded(peer_id) => {
1681 assert_eq!(peer_id, peer);
1682 }
1683 _ => unreachable!(),
1684 }
1685 match event!(peers) {
1686 PeerAction::Connect { peer_id, .. } => {
1687 assert_eq!(peer_id, peer);
1688 }
1689 _ => unreachable!(),
1690 }
1691
1692 poll_fn(|cx| {
1693 assert!(peers.poll(cx).is_pending());
1694 Poll::Ready(())
1695 })
1696 .await;
1697
1698 peers.on_outgoing_pending_session_dropped(
1699 &socket_addr,
1700 &peer,
1701 &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1702 P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1703 )),
1704 );
1705
1706 match event!(peers) {
1707 PeerAction::PeerRemoved(peer_id) => {
1708 assert_eq!(peer_id, peer);
1709 }
1710 _ => unreachable!(),
1711 }
1712 match event!(peers) {
1713 PeerAction::BanPeer { peer_id } => {
1714 assert_eq!(peer_id, peer);
1715 }
1716 _ => unreachable!(),
1717 }
1718
1719 poll_fn(|cx| {
1720 assert!(peers.poll(cx).is_pending());
1721 Poll::Ready(())
1722 })
1723 .await;
1724
1725 assert!(!peers.peers.contains_key(&peer));
1726 }
1727
1728 #[tokio::test]
1729 async fn test_internally_closed_incoming() {
1730 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1731 let mut peers = PeersManager::default();
1732
1733 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1734 assert_eq!(peers.connection_info.num_pending_in, 1);
1735 peers.on_incoming_pending_session_rejected_internally();
1736 assert_eq!(peers.connection_info.num_pending_in, 0);
1737 }
1738
1739 #[tokio::test]
1740 async fn test_reject_incoming_at_pending_capacity() {
1741 let mut peers = PeersManager::default();
1742
1743 for count in 1..=peers.connection_info.config.max_inbound {
1744 let socket_addr =
1745 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1746 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1747 assert_eq!(peers.connection_info.num_pending_in, count);
1748 }
1749 assert!(peers.connection_info.has_in_capacity());
1750 assert!(!peers.connection_info.has_in_pending_capacity());
1751
1752 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1753 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1754 }
1755
1756 #[tokio::test]
1757 async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1758 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1759 let trusted = PeerId::random();
1760 peers.add_trusted_peer_id(trusted);
1761
1762 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1764 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1765 peers.on_incoming_session_established(trusted, addr);
1766
1767 match event!(peers) {
1768 PeerAction::PeerAdded(id) => {
1769 assert_eq!(id, trusted);
1770 }
1771 _ => unreachable!(),
1772 }
1773
1774 let mut connected_untrusted_peer_ids = Vec::new();
1776 for i in 0..(peers.connection_info.config.max_inbound - 1) {
1777 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1778 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1779 let peer_id = PeerId::random();
1780 peers.on_incoming_session_established(peer_id, addr);
1781 connected_untrusted_peer_ids.push(peer_id);
1782
1783 match event!(peers) {
1784 PeerAction::PeerAdded(id) => {
1785 assert_eq!(id, peer_id);
1786 }
1787 _ => unreachable!(),
1788 }
1789 }
1790
1791 let mut pending_addrs = Vec::new();
1792
1793 for i in 0..2 {
1795 let socket_addr =
1796 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1797 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1798
1799 pending_addrs.push(socket_addr);
1800 }
1801
1802 assert_eq!(peers.connection_info.num_pending_in, 2);
1803
1804 for i in 0..2 {
1806 let socket_addr =
1807 SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1808 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1809 }
1810
1811 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1812 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1813 DisconnectReason::UselessPeer,
1814 )),
1815 ));
1816
1817 for pending_addr in pending_addrs {
1819 peers.on_incoming_pending_session_dropped(pending_addr, &err);
1820 }
1821
1822 println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1823
1824 println!(
1825 "num_inbound: {}, has_in_capacity: {}",
1826 peers.connection_info.num_inbound,
1827 peers.connection_info.has_in_capacity()
1828 );
1829
1830 peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1832
1833 println!(
1834 "num_inbound: {}, has_in_capacity: {}",
1835 peers.connection_info.num_inbound,
1836 peers.connection_info.has_in_capacity()
1837 );
1838
1839 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1840 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1841 }
1842
1843 #[tokio::test]
1844 async fn test_closed_incoming() {
1845 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1846 let mut peers = PeersManager::default();
1847
1848 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1849 assert_eq!(peers.connection_info.num_pending_in, 1);
1850 peers.on_incoming_pending_session_gracefully_closed();
1851 assert_eq!(peers.connection_info.num_pending_in, 0);
1852 }
1853
1854 #[tokio::test]
1855 async fn test_dropped_incoming() {
1856 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1857 let ban_duration = Duration::from_millis(500);
1858 let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1859 let mut peers = PeersManager::new(config);
1860
1861 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1862 assert_eq!(peers.connection_info.num_pending_in, 1);
1863 let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1864 P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1865 DisconnectReason::UselessPeer,
1866 )),
1867 ));
1868
1869 peers.on_incoming_pending_session_dropped(socket_addr, &err);
1870 assert_eq!(peers.connection_info.num_pending_in, 0);
1871 assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1872
1873 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1874
1875 tokio::time::sleep(ban_duration).await;
1877
1878 poll_fn(|cx| {
1879 let _ = peers.poll(cx);
1880 Poll::Ready(())
1881 })
1882 .await;
1883
1884 assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1885 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1886 }
1887
1888 #[tokio::test]
1889 async fn test_reputation_change_connected() {
1890 let peer = PeerId::random();
1891 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1892 let mut peers = PeersManager::default();
1893 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1894
1895 match event!(peers) {
1896 PeerAction::PeerAdded(peer_id) => {
1897 assert_eq!(peer_id, peer);
1898 }
1899 _ => unreachable!(),
1900 }
1901 match event!(peers) {
1902 PeerAction::Connect { peer_id, remote_addr } => {
1903 assert_eq!(peer_id, peer);
1904 assert_eq!(remote_addr, socket_addr);
1905 }
1906 _ => unreachable!(),
1907 }
1908
1909 let p = peers.peers.get_mut(&peer).unwrap();
1910 assert_eq!(p.state, PeerConnectionState::PendingOut);
1911
1912 peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1913
1914 let p = peers.peers.get(&peer).unwrap();
1915 assert_eq!(p.state, PeerConnectionState::PendingOut);
1916 assert!(p.is_banned());
1917
1918 peers.on_active_session_gracefully_closed(peer);
1919
1920 let p = peers.peers.get(&peer).unwrap();
1921 assert_eq!(p.state, PeerConnectionState::Idle);
1922 assert!(p.is_banned());
1923
1924 match event!(peers) {
1925 PeerAction::Disconnect { peer_id, .. } => {
1926 assert_eq!(peer_id, peer);
1927 }
1928 _ => unreachable!(),
1929 }
1930 }
1931
1932 #[tokio::test]
1933 async fn accept_incoming_trusted_unknown_peer_address() {
1934 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1935 let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1936 let trusted = PeerId::random();
1938 peers.add_trusted_peer_id(trusted);
1939
1940 for i in 0..peers.connection_info.config.max_inbound {
1942 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1943 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1944 let peer_id = PeerId::random();
1945 peers.on_incoming_session_established(peer_id, addr);
1946
1947 match event!(peers) {
1948 PeerAction::PeerAdded(id) => {
1949 assert_eq!(id, peer_id);
1950 }
1951 _ => unreachable!(),
1952 }
1953 }
1954
1955 let untrusted = PeerId::random();
1957 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1958 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1959 peers.on_incoming_session_established(untrusted, socket_addr);
1960
1961 match event!(peers) {
1962 PeerAction::PeerAdded(id) => {
1963 assert_eq!(id, untrusted);
1964 }
1965 _ => unreachable!(),
1966 }
1967
1968 match event!(peers) {
1969 PeerAction::Disconnect { peer_id, reason } => {
1970 assert_eq!(peer_id, untrusted);
1971 assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1972 }
1973 _ => unreachable!(),
1974 }
1975
1976 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1977 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1978 peers.on_incoming_session_established(trusted, socket_addr);
1979
1980 match event!(peers) {
1981 PeerAction::PeerAdded(id) => {
1982 assert_eq!(id, trusted);
1983 }
1984 _ => unreachable!(),
1985 }
1986
1987 poll_fn(|cx| {
1988 assert!(peers.poll(cx).is_pending());
1989 Poll::Ready(())
1990 })
1991 .await;
1992
1993 let peer = peers.peers.get(&trusted).unwrap();
1994 assert_eq!(peer.state, PeerConnectionState::In);
1995 }
1996
1997 #[tokio::test]
1998 async fn test_already_connected() {
1999 let peer = PeerId::random();
2000 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2001 let mut peers = PeersManager::default();
2002
2003 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2005 assert_eq!(peers.connection_info.num_pending_in, 1);
2006
2007 peers.on_incoming_session_established(peer, socket_addr);
2010 let p = peers.peers.get_mut(&peer).expect("peer not found");
2011 assert_eq!(p.addr.tcp(), socket_addr);
2012 assert_eq!(peers.connection_info.num_pending_in, 0);
2013 assert_eq!(peers.connection_info.num_inbound, 1);
2014
2015 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2018 assert_eq!(peers.connection_info.num_pending_in, 1);
2019
2020 peers.on_already_connected(Direction::Incoming);
2024
2025 let p = peers.peers.get_mut(&peer).expect("peer not found");
2026 assert_eq!(p.addr.tcp(), socket_addr);
2027 assert_eq!(peers.connection_info.num_pending_in, 0);
2028 assert_eq!(peers.connection_info.num_inbound, 1);
2029 }
2030
2031 #[tokio::test]
2032 async fn test_reputation_change_trusted_peer() {
2033 let peer = PeerId::random();
2034 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2035 let mut peers = PeersManager::default();
2036 peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2037
2038 match event!(peers) {
2039 PeerAction::PeerAdded(peer_id) => {
2040 assert_eq!(peer_id, peer);
2041 }
2042 _ => unreachable!(),
2043 }
2044 match event!(peers) {
2045 PeerAction::Connect { peer_id, remote_addr } => {
2046 assert_eq!(peer_id, peer);
2047 assert_eq!(remote_addr, socket_addr);
2048 }
2049 _ => unreachable!(),
2050 }
2051
2052 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2053 peers.on_active_outgoing_established(peer);
2054 assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2055
2056 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2057
2058 {
2059 let p = peers.peers.get(&peer).unwrap();
2060 assert_eq!(p.state, PeerConnectionState::Out);
2061 assert!(!p.is_banned());
2063 }
2064
2065 loop {
2067 peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2068
2069 let p = peers.peers.get(&peer).unwrap();
2070 if p.is_banned() {
2071 break
2072 }
2073 }
2074
2075 match event!(peers) {
2076 PeerAction::Disconnect { peer_id, .. } => {
2077 assert_eq!(peer_id, peer);
2078 }
2079 _ => unreachable!(),
2080 }
2081 }
2082
2083 #[tokio::test]
2084 async fn test_reputation_management() {
2085 let peer = PeerId::random();
2086 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2087 let mut peers = PeersManager::default();
2088 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2089 assert_eq!(peers.get_reputation(&peer), Some(0));
2090
2091 peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2092 assert_eq!(peers.get_reputation(&peer), Some(1024));
2093
2094 peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2095 assert_eq!(peers.get_reputation(&peer), Some(0));
2096 }
2097
2098 #[tokio::test]
2099 async fn test_remove_discovered_active() {
2100 let peer = PeerId::random();
2101 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2102 let mut peers = PeersManager::default();
2103 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2104
2105 match event!(peers) {
2106 PeerAction::PeerAdded(peer_id) => {
2107 assert_eq!(peer_id, peer);
2108 }
2109 _ => unreachable!(),
2110 }
2111 match event!(peers) {
2112 PeerAction::Connect { peer_id, remote_addr } => {
2113 assert_eq!(peer_id, peer);
2114 assert_eq!(remote_addr, socket_addr);
2115 }
2116 _ => unreachable!(),
2117 }
2118
2119 let p = peers.peers.get(&peer).unwrap();
2120 assert_eq!(p.state, PeerConnectionState::PendingOut);
2121
2122 peers.remove_peer(peer);
2123
2124 match event!(peers) {
2125 PeerAction::PeerRemoved(peer_id) => {
2126 assert_eq!(peer_id, peer);
2127 }
2128 _ => unreachable!(),
2129 }
2130 match event!(peers) {
2131 PeerAction::Disconnect { peer_id, .. } => {
2132 assert_eq!(peer_id, peer);
2133 }
2134 _ => unreachable!(),
2135 }
2136
2137 let p = peers.peers.get(&peer).unwrap();
2138 assert_eq!(p.state, PeerConnectionState::PendingOut);
2139
2140 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2141 let p = peers.peers.get(&peer).unwrap();
2142 assert_eq!(p.state, PeerConnectionState::PendingOut);
2143
2144 peers.on_active_session_gracefully_closed(peer);
2145 assert!(!peers.peers.contains_key(&peer));
2146 }
2147
2148 #[tokio::test]
2149 async fn test_fatal_outgoing_connection_error_trusted() {
2150 let peer = PeerId::random();
2151 let config = PeersConfig::test()
2152 .with_trusted_nodes(vec![TrustedPeer {
2153 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2154 tcp_port: 8008,
2155 udp_port: 8008,
2156 id: peer,
2157 }])
2158 .with_trusted_nodes_only(true);
2159 let mut peers = PeersManager::new(config);
2160 let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2161
2162 match event!(peers) {
2163 PeerAction::Connect { peer_id, remote_addr } => {
2164 assert_eq!(peer_id, peer);
2165 assert_eq!(remote_addr, socket_addr);
2166 }
2167 _ => unreachable!(),
2168 }
2169
2170 let p = peers.peers.get(&peer).unwrap();
2171 assert_eq!(p.state, PeerConnectionState::PendingOut);
2172
2173 assert_eq!(peers.num_outbound_connections(), 0);
2174
2175 let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2176 EthHandshakeError::NonStatusMessageInHandshake,
2177 ));
2178 assert!(err.is_fatal_protocol_error());
2179
2180 peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2181 assert_eq!(peers.num_outbound_connections(), 0);
2182
2183 match event!(peers) {
2185 PeerAction::BanPeer { peer_id } => {
2186 assert_eq!(peer_id, peer);
2187 }
2188 err => unreachable!("{err:?}"),
2189 }
2190
2191 assert!(peers.peers.contains_key(&peer));
2193
2194 tokio::time::sleep(peers.backoff_durations.medium).await;
2196
2197 match event!(peers) {
2198 PeerAction::Connect { peer_id, remote_addr } => {
2199 assert_eq!(peer_id, peer);
2200 assert_eq!(remote_addr, socket_addr);
2201 }
2202 err => unreachable!("{err:?}"),
2203 }
2204 }
2205
2206 #[tokio::test]
2207 async fn test_outgoing_connection_error() {
2208 let peer = PeerId::random();
2209 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2210 let mut peers = PeersManager::default();
2211 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2212
2213 match event!(peers) {
2214 PeerAction::PeerAdded(peer_id) => {
2215 assert_eq!(peer_id, peer);
2216 }
2217 _ => unreachable!(),
2218 }
2219 match event!(peers) {
2220 PeerAction::Connect { peer_id, remote_addr } => {
2221 assert_eq!(peer_id, peer);
2222 assert_eq!(remote_addr, socket_addr);
2223 }
2224 _ => unreachable!(),
2225 }
2226
2227 let p = peers.peers.get(&peer).unwrap();
2228 assert_eq!(p.state, PeerConnectionState::PendingOut);
2229
2230 assert_eq!(peers.num_outbound_connections(), 0);
2231
2232 peers.on_outgoing_connection_failure(
2233 &socket_addr,
2234 &peer,
2235 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2236 );
2237
2238 assert_eq!(peers.num_outbound_connections(), 0);
2239 }
2240
2241 #[tokio::test]
2242 async fn test_outgoing_connection_gracefully_closed() {
2243 let peer = PeerId::random();
2244 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2245 let mut peers = PeersManager::default();
2246 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2247
2248 match event!(peers) {
2249 PeerAction::PeerAdded(peer_id) => {
2250 assert_eq!(peer_id, peer);
2251 }
2252 _ => unreachable!(),
2253 }
2254 match event!(peers) {
2255 PeerAction::Connect { peer_id, remote_addr } => {
2256 assert_eq!(peer_id, peer);
2257 assert_eq!(remote_addr, socket_addr);
2258 }
2259 _ => unreachable!(),
2260 }
2261
2262 let p = peers.peers.get(&peer).unwrap();
2263 assert_eq!(p.state, PeerConnectionState::PendingOut);
2264
2265 assert_eq!(peers.num_outbound_connections(), 0);
2266
2267 peers.on_outgoing_pending_session_gracefully_closed(&peer);
2268
2269 assert_eq!(peers.num_outbound_connections(), 0);
2270 assert_eq!(peers.connection_info.num_pending_out, 0);
2271 }
2272
2273 #[tokio::test]
2274 async fn test_discovery_ban_list() {
2275 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2276 let socket_addr = SocketAddr::new(ip, 8008);
2277 let ban_list = BanList::new(vec![], vec![ip]);
2278 let config = PeersConfig::default().with_ban_list(ban_list);
2279 let mut peer_manager = PeersManager::new(config);
2280 peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2281
2282 assert!(peer_manager.peers.is_empty());
2283 }
2284
2285 #[tokio::test]
2286 async fn test_on_pending_ban_list() {
2287 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2288 let socket_addr = SocketAddr::new(ip, 8008);
2289 let ban_list = BanList::new(vec![], vec![ip]);
2290 let config = PeersConfig::test().with_ban_list(ban_list);
2291 let mut peer_manager = PeersManager::new(config);
2292 let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2293 match a {
2295 Ok(_) => panic!(),
2296 Err(err) => match err {
2297 InboundConnectionError::IpBanned => {
2298 assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2299 }
2300 _ => unreachable!(),
2301 },
2302 }
2303 }
2304
2305 #[tokio::test]
2306 async fn test_on_active_inbound_ban_list() {
2307 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2308 let socket_addr = SocketAddr::new(ip, 8008);
2309 let given_peer_id = PeerId::random();
2310 let ban_list = BanList::new(vec![given_peer_id], vec![]);
2311 let config = PeersConfig::test().with_ban_list(ban_list);
2312 let mut peer_manager = PeersManager::new(config);
2313 assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2314 assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2316 peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2317 assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2320 assert_eq!(peer_manager.connection_info.num_inbound, 0);
2321
2322 let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2323 peer_manager.queued_actions.pop_front()
2324 else {
2325 panic!()
2326 };
2327
2328 assert_eq!(peer_id, given_peer_id)
2329 }
2330
2331 #[test]
2332 fn test_connection_limits() {
2333 let mut info = ConnectionInfo::default();
2334 info.inc_in();
2335 assert_eq!(info.num_inbound, 1);
2336 assert_eq!(info.num_outbound, 0);
2337 assert!(info.has_in_capacity());
2338
2339 info.decr_in();
2340 assert_eq!(info.num_inbound, 0);
2341 assert_eq!(info.num_outbound, 0);
2342
2343 info.inc_out();
2344 assert_eq!(info.num_inbound, 0);
2345 assert_eq!(info.num_outbound, 1);
2346 assert!(info.has_out_capacity());
2347
2348 info.decr_out();
2349 assert_eq!(info.num_inbound, 0);
2350 assert_eq!(info.num_outbound, 0);
2351 }
2352
2353 #[test]
2354 fn test_connection_peer_state() {
2355 let mut info = ConnectionInfo::default();
2356 info.inc_in();
2357
2358 info.decr_state(PeerConnectionState::In);
2359 assert_eq!(info.num_inbound, 0);
2360 assert_eq!(info.num_outbound, 0);
2361
2362 info.inc_out();
2363
2364 info.decr_state(PeerConnectionState::Out);
2365 assert_eq!(info.num_inbound, 0);
2366 assert_eq!(info.num_outbound, 0);
2367 }
2368
2369 #[tokio::test]
2370 async fn test_trusted_peers_are_prioritized() {
2371 let trusted_peer = PeerId::random();
2372 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2373 let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2374 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2375 tcp_port: 8008,
2376 udp_port: 8008,
2377 id: trusted_peer,
2378 }]);
2379 let mut peers = PeersManager::new(config);
2380
2381 let basic_peer = PeerId::random();
2382 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2383 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2384
2385 match event!(peers) {
2386 PeerAction::PeerAdded(peer_id) => {
2387 assert_eq!(peer_id, basic_peer);
2388 }
2389 _ => unreachable!(),
2390 }
2391 match event!(peers) {
2392 PeerAction::Connect { peer_id, remote_addr } => {
2393 assert_eq!(peer_id, trusted_peer);
2394 assert_eq!(remote_addr, trusted_sock);
2395 }
2396 _ => unreachable!(),
2397 }
2398 match event!(peers) {
2399 PeerAction::Connect { peer_id, remote_addr } => {
2400 assert_eq!(peer_id, basic_peer);
2401 assert_eq!(remote_addr, basic_sock);
2402 }
2403 _ => unreachable!(),
2404 }
2405 }
2406
2407 #[tokio::test]
2408 async fn test_connect_trusted_nodes_only() {
2409 let trusted_peer = PeerId::random();
2410 let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2411 let config = PeersConfig::test()
2412 .with_trusted_nodes(vec![TrustedPeer {
2413 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2414 tcp_port: 8008,
2415 udp_port: 8008,
2416 id: trusted_peer,
2417 }])
2418 .with_trusted_nodes_only(true);
2419 let mut peers = PeersManager::new(config);
2420
2421 let basic_peer = PeerId::random();
2422 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2423 peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2424
2425 match event!(peers) {
2426 PeerAction::PeerAdded(peer_id) => {
2427 assert_eq!(peer_id, basic_peer);
2428 }
2429 _ => unreachable!(),
2430 }
2431 match event!(peers) {
2432 PeerAction::Connect { peer_id, remote_addr } => {
2433 assert_eq!(peer_id, trusted_peer);
2434 assert_eq!(remote_addr, trusted_sock);
2435 }
2436 _ => unreachable!(),
2437 }
2438 poll_fn(|cx| {
2439 assert!(peers.poll(cx).is_pending());
2440 Poll::Ready(())
2441 })
2442 .await;
2443 }
2444
2445 #[tokio::test]
2446 async fn test_incoming_with_trusted_nodes_only() {
2447 let trusted_peer = PeerId::random();
2448 let config = PeersConfig::test()
2449 .with_trusted_nodes(vec![TrustedPeer {
2450 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2451 tcp_port: 8008,
2452 udp_port: 8008,
2453 id: trusted_peer,
2454 }])
2455 .with_trusted_nodes_only(true);
2456 let mut peers = PeersManager::new(config);
2457
2458 let basic_peer = PeerId::random();
2459 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2460 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2461 assert_eq!(peers.connection_info.num_pending_in, 1);
2463 peers.on_incoming_session_established(basic_peer, basic_sock);
2464 assert_eq!(peers.connection_info.num_pending_in, 0);
2467 assert_eq!(peers.connection_info.num_inbound, 0);
2468
2469 let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2470 peers.queued_actions.pop_front()
2471 else {
2472 panic!()
2473 };
2474 assert_eq!(basic_peer, peer_id);
2475 assert!(!peers.peers.contains_key(&basic_peer));
2476 }
2477
2478 #[tokio::test]
2479 async fn test_incoming_without_trusted_nodes_only() {
2480 let trusted_peer = PeerId::random();
2481 let config = PeersConfig::test()
2482 .with_trusted_nodes(vec![TrustedPeer {
2483 host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2484 tcp_port: 8008,
2485 udp_port: 8008,
2486 id: trusted_peer,
2487 }])
2488 .with_trusted_nodes_only(false);
2489 let mut peers = PeersManager::new(config);
2490
2491 let basic_peer = PeerId::random();
2492 let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2493 assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2494
2495 assert_eq!(peers.connection_info.num_pending_in, 1);
2497 peers.on_incoming_session_established(basic_peer, basic_sock);
2498 assert_eq!(peers.connection_info.num_pending_in, 0);
2501 assert_eq!(peers.connection_info.num_inbound, 1);
2502 assert!(peers.peers.contains_key(&basic_peer));
2503 }
2504
2505 #[tokio::test]
2506 async fn test_incoming_at_capacity() {
2507 let mut config = PeersConfig::test();
2508 config.connection_info.max_inbound = 1;
2509 let mut peers = PeersManager::new(config);
2510
2511 let peer = PeerId::random();
2512 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2513 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2514
2515 peers.on_incoming_session_established(peer, addr);
2516
2517 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2518 assert_eq!(
2519 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2520 InboundConnectionError::ExceedsCapacity
2521 );
2522 }
2523
2524 #[tokio::test]
2525 async fn test_incoming_rate_limit() {
2526 let config = PeersConfig {
2527 incoming_ip_throttle_duration: Duration::from_millis(100),
2528 ..PeersConfig::test()
2529 };
2530 let mut peers = PeersManager::new(config);
2531
2532 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2533 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2534 assert_eq!(
2535 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2536 InboundConnectionError::IpBanned
2537 );
2538
2539 peers.release_interval.reset_immediately();
2540 tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2541
2542 poll_fn(|cx| loop {
2544 if peers.poll(cx).is_pending() {
2545 return Poll::Ready(());
2546 }
2547 })
2548 .await;
2549
2550 assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2551 assert_eq!(
2552 peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2553 InboundConnectionError::IpBanned
2554 );
2555 }
2556
2557 #[tokio::test]
2558 async fn test_tick() {
2559 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2560 let socket_addr = SocketAddr::new(ip, 8008);
2561 let config = PeersConfig::test();
2562 let mut peer_manager = PeersManager::new(config);
2563 let peer_id = PeerId::random();
2564 peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2565
2566 tokio::time::sleep(Duration::from_secs(1)).await;
2567 peer_manager.tick();
2568
2569 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2571
2572 peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2574
2575 tokio::time::sleep(Duration::from_secs(1)).await;
2576 peer_manager.tick();
2577
2578 assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2580
2581 peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2582
2583 tokio::time::sleep(Duration::from_secs(1)).await;
2584 peer_manager.tick();
2585
2586 assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2588 }
2589
2590 #[tokio::test]
2591 async fn test_remove_incoming_after_disconnect() {
2592 let peer_id = PeerId::random();
2593 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2594 let mut peers = PeersManager::default();
2595
2596 peers.on_incoming_pending_session(addr.ip()).unwrap();
2597 peers.on_incoming_session_established(peer_id, addr);
2598 let peer = peers.peers.get(&peer_id).unwrap();
2599 assert_eq!(peer.state, PeerConnectionState::In);
2600 assert!(peer.remove_after_disconnect);
2601
2602 peers.on_active_session_gracefully_closed(peer_id);
2603 assert!(!peers.peers.contains_key(&peer_id))
2604 }
2605
2606 #[tokio::test]
2607 async fn test_keep_incoming_after_disconnect_if_discovered() {
2608 let peer_id = PeerId::random();
2609 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2610 let mut peers = PeersManager::default();
2611
2612 peers.on_incoming_pending_session(addr.ip()).unwrap();
2613 peers.on_incoming_session_established(peer_id, addr);
2614 let peer = peers.peers.get(&peer_id).unwrap();
2615 assert_eq!(peer.state, PeerConnectionState::In);
2616 assert!(peer.remove_after_disconnect);
2617
2618 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2620
2621 peers.on_active_session_gracefully_closed(peer_id);
2622
2623 let peer = peers.peers.get(&peer_id).unwrap();
2624 assert_eq!(peer.state, PeerConnectionState::Idle);
2625 assert!(!peer.remove_after_disconnect);
2626 }
2627
2628 #[tokio::test]
2629 async fn test_incoming_outgoing_already_connected() {
2630 let peer_id = PeerId::random();
2631 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2632 let mut peers = PeersManager::default();
2633
2634 peers.on_incoming_pending_session(addr.ip()).unwrap();
2635 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2636
2637 match event!(peers) {
2638 PeerAction::PeerAdded(_) => {}
2639 _ => unreachable!(),
2640 }
2641
2642 match event!(peers) {
2643 PeerAction::Connect { .. } => {}
2644 _ => unreachable!(),
2645 }
2646
2647 peers.on_incoming_session_established(peer_id, addr);
2648 peers.on_already_connected(Direction::Outgoing(peer_id));
2649 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2650 assert_eq!(peers.connection_info.num_inbound, 1);
2651 assert_eq!(peers.connection_info.num_pending_out, 0);
2652 assert_eq!(peers.connection_info.num_pending_in, 0);
2653 assert_eq!(peers.connection_info.num_outbound, 0);
2654 }
2655
2656 #[tokio::test]
2657 async fn test_already_connected_incoming_outgoing_connection_error() {
2658 let peer_id = PeerId::random();
2659 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2660 let mut peers = PeersManager::default();
2661
2662 peers.on_incoming_pending_session(addr.ip()).unwrap();
2663 peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2664
2665 match event!(peers) {
2666 PeerAction::PeerAdded(_) => {}
2667 _ => unreachable!(),
2668 }
2669
2670 match event!(peers) {
2671 PeerAction::Connect { .. } => {}
2672 _ => unreachable!(),
2673 }
2674
2675 peers.on_incoming_session_established(peer_id, addr);
2676
2677 peers.on_outgoing_connection_failure(
2678 &addr,
2679 &peer_id,
2680 &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2681 );
2682 assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2683 assert_eq!(peers.connection_info.num_inbound, 1);
2684 assert_eq!(peers.connection_info.num_pending_out, 0);
2685 assert_eq!(peers.connection_info.num_pending_in, 0);
2686 assert_eq!(peers.connection_info.num_outbound, 0);
2687 }
2688
2689 #[tokio::test]
2690 async fn test_max_concurrent_dials() {
2691 let config = PeersConfig::default();
2692 let mut peer_manager = PeersManager::new(config);
2693 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2694 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2695 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2696 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2697 }
2698
2699 peer_manager.fill_outbound_slots();
2700 let dials = peer_manager
2701 .queued_actions
2702 .iter()
2703 .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2704 .count();
2705 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2706 }
2707
2708 #[tokio::test]
2709 async fn test_max_num_of_pending_dials() {
2710 let config = PeersConfig::default();
2711 let mut peer_manager = PeersManager::new(config);
2712 let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2713 let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2714
2715 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2717 peer_manager.add_peer(PeerId::random(), peer_addr, None);
2718 }
2719
2720 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2721 match event!(peer_manager) {
2722 PeerAction::PeerAdded(_) => {}
2723 _ => unreachable!(),
2724 }
2725 }
2726
2727 for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2728 match event!(peer_manager) {
2729 PeerAction::Connect { .. } => {}
2730 _ => unreachable!(),
2731 }
2732 }
2733
2734 peer_manager.fill_outbound_slots();
2736
2737 let dials = peer_manager.connection_info.num_pending_out;
2739 assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2740
2741 let num_pendingout_states = peer_manager
2742 .peers
2743 .iter()
2744 .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2745 .map(|(peer_id, _)| *peer_id)
2746 .collect::<Vec<PeerId>>();
2747 assert_eq!(
2748 num_pendingout_states.len(),
2749 peer_manager.connection_info.config.max_concurrent_outbound_dials
2750 );
2751
2752 for peer_id in &num_pendingout_states {
2754 peer_manager.on_active_outgoing_established(*peer_id);
2755 }
2756
2757 for peer_id in &num_pendingout_states {
2759 assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2760 }
2761
2762 assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2764 }
2765
2766 #[tokio::test]
2767 async fn test_connect() {
2768 let peer = PeerId::random();
2769 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2770 let mut peers = PeersManager::default();
2771 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2772 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2773
2774 match event!(peers) {
2775 PeerAction::Connect { peer_id, remote_addr } => {
2776 assert_eq!(peer_id, peer);
2777 assert_eq!(remote_addr, socket_addr);
2778 }
2779 _ => unreachable!(),
2780 }
2781
2782 let (record, _) = peers.peer_by_id(peer).unwrap();
2783 assert_eq!(record.tcp_addr(), socket_addr);
2784 assert_eq!(record.udp_addr(), socket_addr);
2785
2786 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2788
2789 let (record, _) = peers.peer_by_id(peer).unwrap();
2790 assert_eq!(record.tcp_addr(), socket_addr);
2791 assert_eq!(record.udp_addr(), socket_addr);
2792 }
2793
2794 #[tokio::test]
2795 async fn test_incoming_connection_from_banned() {
2796 let peer = PeerId::random();
2797 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2798 let config = PeersConfig::test().with_max_inbound(3);
2799 let mut peers = PeersManager::new(config);
2800 peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2801
2802 match event!(peers) {
2803 PeerAction::PeerAdded(peer_id) => {
2804 assert_eq!(peer_id, peer);
2805 }
2806 _ => unreachable!(),
2807 }
2808 match event!(peers) {
2809 PeerAction::Connect { peer_id, .. } => {
2810 assert_eq!(peer_id, peer);
2811 }
2812 _ => unreachable!(),
2813 }
2814
2815 poll_fn(|cx| {
2816 assert!(peers.poll(cx).is_pending());
2817 Poll::Ready(())
2818 })
2819 .await;
2820
2821 loop {
2823 peers.on_active_session_dropped(
2824 &socket_addr,
2825 &peer,
2826 &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2827 reth_eth_wire::EthVersion::Eth68,
2828 reth_eth_wire::EthMessageID::Status,
2829 )),
2830 );
2831
2832 if peers.peers.get(&peer).unwrap().is_banned() {
2833 break;
2834 }
2835
2836 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2837 peers.on_incoming_session_established(peer, socket_addr);
2838
2839 match event!(peers) {
2840 PeerAction::Connect { peer_id, .. } => {
2841 assert_eq!(peer_id, peer);
2842 }
2843 _ => unreachable!(),
2844 }
2845 }
2846
2847 assert!(peers.peers.get(&peer).unwrap().is_banned());
2848
2849 for _ in 0..peers.connection_info.config.max_inbound {
2851 assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2852 peers.on_incoming_session_established(peer, socket_addr);
2853
2854 match event!(peers) {
2855 PeerAction::DisconnectBannedIncoming { peer_id } => {
2856 assert_eq!(peer_id, peer);
2857 }
2858 _ => unreachable!(),
2859 }
2860 }
2861
2862 poll_fn(|cx| {
2863 assert!(peers.poll(cx).is_pending());
2864 Poll::Ready(())
2865 })
2866 .await;
2867
2868 assert_eq!(peers.connection_info.num_inbound, 0);
2869
2870 let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2871
2872 assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2874 assert_eq!(peers.connection_info.num_pending_in, 1);
2875
2876 peers.on_active_session_gracefully_closed(peer);
2880 assert_eq!(peers.connection_info.num_inbound, 0);
2881 }
2882
2883 #[tokio::test]
2884 async fn test_add_pending_connect() {
2885 let peer = PeerId::random();
2886 let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2887 let mut peers = PeersManager::default();
2888 peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2889 assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2890 assert_eq!(peers.connection_info.num_pending_out, 1);
2891 }
2892
2893 #[tokio::test]
2894 async fn test_dns_updates_peer_address() {
2895 let peer_id = PeerId::random();
2896 let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2897 let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2898
2899 let trusted = TrustedPeer {
2900 host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2901 tcp_port: 8008,
2902 udp_port: 8008,
2903 id: peer_id,
2904 };
2905
2906 let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2907 let mut manager = PeersManager::new(config);
2908 manager
2909 .trusted_peers_resolver
2910 .set_interval(tokio::time::interval(Duration::from_millis(1)));
2911
2912 manager.peers.insert(
2913 peer_id,
2914 Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2915 );
2916
2917 for _ in 0..100 {
2918 let _ = event!(manager);
2919 if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2920 break;
2921 }
2922 tokio::time::sleep(Duration::from_millis(10)).await;
2923 }
2924
2925 let updated_peer = manager.peers.get(&peer_id).unwrap();
2926 assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2927 }
2928
2929 #[tokio::test]
2930 async fn test_ip_filter_blocks_inbound_connection() {
2931 use reth_net_banlist::IpFilter;
2932 use std::net::IpAddr;
2933
2934 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
2936 let config = PeersConfig::test().with_ip_filter(ip_filter);
2937 let mut peers = PeersManager::new(config);
2938
2939 let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
2941 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
2942
2943 let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
2945 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
2946 }
2947
2948 #[tokio::test]
2949 async fn test_ip_filter_blocks_outbound_connection() {
2950 use reth_net_banlist::IpFilter;
2951 use std::net::SocketAddr;
2952
2953 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
2955 let config = PeersConfig::test().with_ip_filter(ip_filter);
2956 let mut peers = PeersManager::new(config);
2957
2958 let peer_id = PeerId::new([1; 64]);
2959
2960 let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
2962 peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
2963 assert!(peers.peers.contains_key(&peer_id));
2964
2965 let peer_id2 = PeerId::new([2; 64]);
2967 let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
2968 peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
2969 assert!(!peers.peers.contains_key(&peer_id2));
2970 }
2971
2972 #[tokio::test]
2973 async fn test_ip_filter_ipv6() {
2974 use reth_net_banlist::IpFilter;
2975 use std::net::IpAddr;
2976
2977 let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
2979 let config = PeersConfig::test().with_ip_filter(ip_filter);
2980 let mut peers = PeersManager::new(config);
2981
2982 let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
2984 assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
2985
2986 let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
2988 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
2989 }
2990
2991 #[tokio::test]
2992 async fn test_ip_filter_multiple_ranges() {
2993 use reth_net_banlist::IpFilter;
2994 use std::net::IpAddr;
2995
2996 let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
2998 let config = PeersConfig::test().with_ip_filter(ip_filter);
2999 let mut peers = PeersManager::new(config);
3000
3001 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3003 let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3004 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3005 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3006
3007 let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3009 assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3010 }
3011
3012 #[tokio::test]
3013 async fn test_ip_filter_no_restriction() {
3014 use reth_net_banlist::IpFilter;
3015 use std::net::IpAddr;
3016
3017 let ip_filter = IpFilter::allow_all();
3019 let config = PeersConfig::test().with_ip_filter(ip_filter);
3020 let mut peers = PeersManager::new(config);
3021
3022 let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3024 let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3025 let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3026 assert!(peers.on_incoming_pending_session(ip1).is_ok());
3027 assert!(peers.on_incoming_pending_session(ip2).is_ok());
3028 assert!(peers.on_incoming_pending_session(ip3).is_ok());
3029 }
3030}