Skip to main content

reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use rand::Rng;
12use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
13use reth_ethereum_forks::ForkId;
14use reth_net_banlist::BanList;
15use reth_network_api::test_utils::{PeerCommand, PeersHandle};
16use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
17use reth_network_types::{
18    is_connection_failed_reputation,
19    peers::{
20        config::{PeerBackoffDurations, PEER_ROTATION_MIN_UPTIME},
21        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
22    },
23    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
24    PersistedPeerInfo, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
25};
26use std::{
27    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
28    fmt::Display,
29    io::{self},
30    net::{IpAddr, SocketAddr},
31    pin::Pin,
32    task::{Context, Poll},
33    time::Duration,
34};
35use thiserror::Error;
36use tokio::{
37    sync::mpsc,
38    time::{Instant, Interval, Sleep},
39};
40use tokio_stream::wrappers::UnboundedReceiverStream;
41use tracing::{trace, warn};
42
43/// Maintains the state of _all_ the peers known to the network.
44///
45/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
46/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
47///
48/// The [`PeersManager`] will be notified on peer related changes
49#[derive(Debug)]
50pub struct PeersManager {
51    /// All peers known to the network
52    peers: HashMap<PeerId, Peer>,
53    /// The set of trusted peer ids.
54    ///
55    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
56    /// an address: [`Self::add_trusted_peer_id`]
57    trusted_peer_ids: HashSet<PeerId>,
58    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
59    /// peer's address when the DNS records change.
60    trusted_peers_resolver: TrustedPeersResolver,
61    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
62    manager_tx: mpsc::UnboundedSender<PeerCommand>,
63    /// Receiver half of the command channel.
64    handle_rx: UnboundedReceiverStream<PeerCommand>,
65    /// Buffered actions until the manager is polled.
66    queued_actions: VecDeque<PeerAction>,
67    /// Interval for triggering connections if there are free slots.
68    refill_slots_interval: Interval,
69    /// How to weigh reputation changes
70    reputation_weights: ReputationChangeWeights,
71    /// Tracks current slot stats.
72    connection_info: ConnectionInfo,
73    /// Tracks unwanted ips/peer ids.
74    ban_list: BanList,
75    /// Tracks currently backed off peers.
76    backed_off_peers: HashMap<PeerId, std::time::Instant>,
77    /// Interval at which to check for peers to unban and release from the backoff map.
78    release_interval: Interval,
79    /// How long to ban bad peers.
80    ban_duration: Duration,
81    /// How long peers to which we could not connect for non-fatal reasons, e.g.
82    /// [`DisconnectReason::TooManyPeers`], are put in time out.
83    backoff_durations: PeerBackoffDurations,
84    /// If non-trusted peers should be connected to, or the connection from non-trusted
85    /// incoming peers should be accepted.
86    trusted_nodes_only: bool,
87    /// Timestamp of the last time [`Self::tick`] was called.
88    last_tick: Instant,
89    /// Maximum number of backoff attempts before we give up on a peer and dropping.
90    max_backoff_count: u8,
91    /// Tracks the connection state of the node
92    net_connection_state: NetworkConnectionState,
93    /// How long to temporarily ban ip on an incoming connection attempt.
94    incoming_ip_throttle_duration: Duration,
95    /// IP address filter for restricting network connections to specific IP ranges.
96    ip_filter: reth_net_banlist::IpFilter,
97    /// If true, discovered peers without a confirmed ENR fork ID will not be added until their
98    /// fork ID is verified via EIP-868.
99    enforce_enr_fork_id: bool,
100    /// One-shot sleep that fires when it's time to rotate a peer; reset with jitter after each
101    /// fire. `None` when rotation is disabled.
102    peer_rotation_sleep: Option<Pin<Box<Sleep>>>,
103    /// Mean duration for computing jittered rotation intervals. `None` when rotation is disabled.
104    peer_rotation_mean: Option<Duration>,
105}
106
107impl PeersManager {
108    /// Create a new instance with the given config
109    pub fn new(config: PeersConfig) -> Self {
110        let PeersConfig {
111            refill_slots_interval,
112            connection_info,
113            reputation_weights,
114            ban_list,
115            ban_duration,
116            backoff_durations,
117            trusted_nodes,
118            trusted_nodes_only,
119            trusted_nodes_resolution_interval,
120            basic_nodes,
121            persisted_peers,
122            max_backoff_count,
123            incoming_ip_throttle_duration,
124            ip_filter,
125            enforce_enr_fork_id,
126            peer_rotation_interval,
127        } = config;
128        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
129        let now = Instant::now();
130
131        // We use half of the interval to decrease the max duration to `150%` in worst case
132        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
133
134        let mut peers =
135            HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len() + persisted_peers.len());
136        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
137
138        for trusted_peer in &trusted_nodes {
139            match trusted_peer.resolve_blocking() {
140                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
141                    trusted_peer_ids.insert(id);
142                    peers.entry(id).or_insert_with(|| {
143                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
144                    });
145                }
146                Err(err) => {
147                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
148                }
149            }
150        }
151
152        for PersistedPeerInfo { record, kind, fork_id, reputation } in persisted_peers {
153            // When enforce_enr_fork_id is enabled, skip persisted peers that don't have a
154            // confirmed fork ID. These were likely accumulated from a different network during
155            // a prior run without the flag.
156            if enforce_enr_fork_id && fork_id.is_none() {
157                continue
158            }
159            let NodeRecord { address, tcp_port, udp_port, id } = record;
160            peers.entry(id).or_insert_with(|| {
161                let mut peer = Peer::with_kind(
162                    PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)),
163                    kind,
164                );
165                peer.fork_id = fork_id.map(Box::new);
166                peer.reputation = reputation;
167                peer
168            });
169        }
170
171        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
172            peers.entry(id).or_insert_with(|| {
173                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
174            });
175        }
176
177        trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
178
179        Self {
180            peers,
181            trusted_peer_ids,
182            trusted_peers_resolver: TrustedPeersResolver::new(
183                trusted_nodes,
184                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
185            ),
186            manager_tx,
187            handle_rx: UnboundedReceiverStream::new(handle_rx),
188            queued_actions: Default::default(),
189            reputation_weights,
190            refill_slots_interval: tokio::time::interval(refill_slots_interval),
191            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
192            connection_info: ConnectionInfo::new(connection_info),
193            ban_list,
194            backed_off_peers: Default::default(),
195            ban_duration,
196            backoff_durations,
197            trusted_nodes_only,
198            last_tick: Instant::now(),
199            max_backoff_count,
200            net_connection_state: NetworkConnectionState::default(),
201            incoming_ip_throttle_duration,
202            ip_filter,
203            enforce_enr_fork_id,
204            peer_rotation_sleep: peer_rotation_interval
205                .map(|mean| Box::pin(tokio::time::sleep(jitter_rotation_interval(mean)))),
206            peer_rotation_mean: peer_rotation_interval,
207        }
208    }
209
210    /// Returns a new [`PeersHandle`] that can send commands to this type.
211    pub(crate) fn handle(&self) -> PeersHandle {
212        PeersHandle::new(self.manager_tx.clone())
213    }
214
215    /// Returns `true` if discovered peers must have a confirmed ENR fork ID before being added.
216    pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
217        self.enforce_enr_fork_id
218    }
219
220    /// Returns the number of peers in the peer set
221    #[inline]
222    pub(crate) fn num_known_peers(&self) -> usize {
223        self.peers.len()
224    }
225
226    /// Returns an iterator over all peers as [`NodeRecord`]s.
227    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
228        self.peers.iter().map(|(peer_id, v)| {
229            NodeRecord::new_with_ports(
230                v.addr.tcp().ip(),
231                v.addr.tcp().port(),
232                v.addr.udp().map(|addr| addr.port()),
233                *peer_id,
234            )
235        })
236    }
237
238    /// Returns an iterator over peers suitable for persisting to disk.
239    ///
240    /// Filters out backed-off and banned peers, and includes metadata like kind, fork ID, and
241    /// reputation.
242    pub(crate) fn persistable_peers(&self) -> impl Iterator<Item = PersistedPeerInfo> + '_ {
243        self.peers.iter().filter(|(_, peer)| !peer.is_backed_off() && !peer.is_banned()).map(
244            |(peer_id, peer)| PersistedPeerInfo {
245                record: NodeRecord::new_with_ports(
246                    peer.addr.tcp().ip(),
247                    peer.addr.tcp().port(),
248                    peer.addr.udp().map(|addr| addr.port()),
249                    *peer_id,
250                ),
251                kind: peer.kind,
252                fork_id: peer.fork_id.as_deref().copied(),
253                reputation: peer.reputation,
254            },
255        )
256    }
257
258    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
259    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
260        self.peers.get(&peer_id).map(|v| {
261            (
262                NodeRecord::new_with_ports(
263                    v.addr.tcp().ip(),
264                    v.addr.tcp().port(),
265                    v.addr.udp().map(|addr| addr.port()),
266                    peer_id,
267                ),
268                v.kind,
269            )
270        })
271    }
272
273    /// Returns `true` if the given peer is connected via an inbound session.
274    pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
275        self.peers.get(peer_id).is_some_and(|p| {
276            matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
277        })
278    }
279
280    /// Returns an iterator over all peer ids for peers with the given kind
281    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
282        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
283    }
284
285    /// Returns the number of currently active inbound connections.
286    #[inline]
287    pub(crate) const fn num_inbound_connections(&self) -> usize {
288        self.connection_info.num_inbound
289    }
290
291    /// Returns the number of currently __active__ outbound connections.
292    #[inline]
293    pub(crate) const fn num_outbound_connections(&self) -> usize {
294        self.connection_info.num_outbound
295    }
296
297    /// Returns the number of currently pending outbound connections.
298    #[inline]
299    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
300        self.connection_info.num_pending_out
301    }
302
303    /// Returns the number of currently backed off peers.
304    #[inline]
305    pub(crate) fn num_backed_off_peers(&self) -> usize {
306        self.backed_off_peers.len()
307    }
308
309    /// Returns the number of idle trusted peers.
310    fn num_idle_trusted_peers(&self) -> usize {
311        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
312    }
313
314    /// Invoked when a new _incoming_ tcp connection is accepted.
315    ///
316    /// returns an error if the inbound ip address is on the ban list
317    pub(crate) fn on_incoming_pending_session(
318        &mut self,
319        addr: IpAddr,
320    ) -> Result<(), InboundConnectionError> {
321        // Check if the IP is in the allowed ranges (netrestrict)
322        if !self.ip_filter.is_allowed(&addr) {
323            trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
324            return Err(InboundConnectionError::IpBanned)
325        }
326
327        if self.ban_list.is_banned_ip(&addr) {
328            return Err(InboundConnectionError::IpBanned)
329        }
330
331        // check if we even have slots for a new incoming connection
332        if !self.connection_info.has_in_capacity() {
333            if self.trusted_peer_ids.is_empty() {
334                // if we don't have any incoming slots and no trusted peers, we don't accept any new
335                // connections
336                return Err(InboundConnectionError::ExceedsCapacity)
337            }
338
339            // there's an edge case here where no incoming connections besides from trusted peers
340            // are allowed (max_inbound == 0), in which case we still need to allow new pending
341            // incoming connections until all trusted peers are connected.
342            let num_idle_trusted_peers = self.num_idle_trusted_peers();
343            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
344                // we still want to limit concurrent pending connections
345                let max_inbound =
346                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
347                if self.connection_info.num_pending_in < max_inbound {
348                    self.connection_info.inc_pending_in();
349                    return Ok(())
350                }
351            }
352
353            // all trusted peers are either connected or connecting
354            return Err(InboundConnectionError::ExceedsCapacity)
355        }
356
357        // also cap the incoming connections we can process at once
358        if !self.connection_info.has_in_pending_capacity() {
359            return Err(InboundConnectionError::ExceedsCapacity)
360        }
361
362        // apply the rate limit
363        self.throttle_incoming_ip(addr);
364
365        self.connection_info.inc_pending_in();
366        Ok(())
367    }
368
369    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
370    /// rejected.
371    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
372        self.connection_info.decr_pending_in();
373    }
374
375    /// Invoked when a pending session was closed.
376    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
377        self.connection_info.decr_pending_in()
378    }
379
380    /// Invoked when a pending session was closed.
381    pub(crate) fn on_incoming_pending_session_dropped(
382        &mut self,
383        remote_addr: SocketAddr,
384        err: &PendingSessionHandshakeError,
385    ) {
386        if err.is_fatal_protocol_error() {
387            self.ban_ip(remote_addr.ip());
388
389            if err.merits_discovery_ban() {
390                self.queued_actions
391                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
392            }
393        }
394
395        self.connection_info.decr_pending_in();
396    }
397
398    /// Called when a new _incoming_ active session was established to the given peer.
399    ///
400    /// This will update the state of the peer if not yet tracked.
401    ///
402    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
403    /// be scheduled.
404    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
405        self.connection_info.decr_pending_in();
406
407        // we only need to check the peer id here as the ip address will have been checked at
408        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
409        if self.ban_list.is_banned_peer(&peer_id) {
410            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
411            return
412        }
413
414        // check if the peer is trustable or not
415        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
416        if self.trusted_nodes_only && !is_trusted {
417            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
418            return
419        }
420
421        // start a new tick, so the peer is not immediately rewarded for the time since last tick
422        self.tick();
423
424        match self.peers.entry(peer_id) {
425            Entry::Occupied(mut entry) => {
426                let peer = entry.get_mut();
427                if peer.is_banned() {
428                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
429                    return
430                }
431                // it might be the case that we're also trying to connect to this peer at the same
432                // time, so we need to adjust the state here
433                if peer.state.is_pending_out() {
434                    self.connection_info.decr_state(peer.state);
435                }
436
437                peer.state = PeerConnectionState::In;
438                peer.mark_connected();
439
440                is_trusted = is_trusted || peer.is_trusted();
441            }
442            Entry::Vacant(entry) => {
443                // peer is missing in the table, we add it but mark it as to be removed after
444                // disconnect, because we only know the outgoing port
445                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
446                peer.mark_connected();
447                peer.remove_after_disconnect = true;
448                entry.insert(peer);
449                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
450            }
451        }
452
453        let has_in_capacity = self.connection_info.has_in_capacity();
454        // increment new incoming connection
455        self.connection_info.inc_in();
456
457        // disconnect the peer if we don't have capacity for more inbound connections
458        if !is_trusted && !has_in_capacity {
459            self.queued_actions.push_back(PeerAction::Disconnect {
460                peer_id,
461                reason: Some(DisconnectReason::TooManyPeers),
462            });
463        }
464    }
465
466    /// Bans the peer temporarily with the configured ban timeout
467    fn ban_peer(&mut self, peer_id: PeerId) {
468        let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
469            (peer.is_trusted() || peer.is_static())
470        {
471            // For misbehaving trusted or static peers, we provide a bit more leeway when
472            // penalizing them.
473            self.backoff_durations.low / 2
474        } else {
475            self.ban_duration
476        };
477
478        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
479        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
480    }
481
482    /// Bans the IP temporarily with the configured ban timeout
483    fn ban_ip(&mut self, ip: IpAddr) {
484        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
485    }
486
487    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
488    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
489        self.ban_list
490            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
491    }
492
493    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
494    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
495        trace!(target: "net::peers", ?peer_id, "backing off");
496
497        if let Some(peer) = self.peers.get_mut(&peer_id) {
498            peer.backed_off = true;
499            self.backed_off_peers.insert(peer_id, until);
500        }
501    }
502
503    /// Unbans the peer
504    fn unban_peer(&mut self, peer_id: PeerId) {
505        self.ban_list.unban_peer(&peer_id);
506        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
507    }
508
509    /// Tick function to update reputation of all connected peers.
510    /// Peers are rewarded with reputation increases for the time they are connected since the last
511    /// tick. This is to prevent peers from being disconnected eventually due to slashed
512    /// reputation because of some bad messages (most likely transaction related)
513    fn tick(&mut self) {
514        let now = Instant::now();
515        // Determine the number of seconds since the last tick.
516        // Ensuring that now is always greater than last_tick to account for issues with system
517        // time.
518        let secs_since_last_tick =
519            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
520        self.last_tick = now;
521
522        // update reputation via seconds connected
523        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
524            // update reputation via seconds connected, but keep the target _around_ the default
525            // reputation.
526            if peer.1.reputation < DEFAULT_REPUTATION {
527                peer.1.reputation += secs_since_last_tick;
528            }
529        }
530    }
531
532    /// Returns the tracked reputation for a peer.
533    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
534        self.peers.get(peer_id).map(|peer| peer.reputation)
535    }
536
537    /// Apply the corresponding reputation change to the given peer.
538    ///
539    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
540    /// reputation changes that can be attributed to network conditions. If the peer is a
541    /// trusted peer, it will also be less strict with the reputation slashing.
542    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
543        trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
544
545        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
546            // First check if we should reset the reputation
547            if rep.is_reset() {
548                peer.reset_reputation()
549            } else {
550                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
551                if peer.is_trusted() || peer.is_static() {
552                    // exempt trusted and static peers from reputation slashing for
553                    if matches!(
554                        rep,
555                        ReputationChangeKind::Dropped |
556                            ReputationChangeKind::BadAnnouncement |
557                            ReputationChangeKind::Timeout |
558                            ReputationChangeKind::AlreadySeenTransaction
559                    ) {
560                        return
561                    }
562
563                    // also be less strict with the reputation slashing for trusted peers
564                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
565                        // this caps the reputation change to the maximum allowed for trusted peers
566                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
567                    }
568                }
569                peer.apply_reputation(reputation_change, rep)
570            }
571        } else {
572            return
573        };
574
575        match outcome {
576            ReputationChangeOutcome::None => {}
577            ReputationChangeOutcome::Ban => {
578                self.ban_peer(*peer_id);
579            }
580            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
581            ReputationChangeOutcome::DisconnectAndBan => {
582                self.queued_actions.push_back(PeerAction::Disconnect {
583                    peer_id: *peer_id,
584                    reason: Some(DisconnectReason::DisconnectRequested),
585                });
586                self.ban_peer(*peer_id);
587            }
588        }
589    }
590
591    /// Gracefully disconnected a pending _outgoing_ session
592    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
593        if let Some(peer) = self.peers.get_mut(peer_id) {
594            self.connection_info.decr_state(peer.state);
595            peer.state = PeerConnectionState::Idle;
596            peer.mark_disconnected();
597        }
598    }
599
600    /// Invoked when an _outgoing_ pending session was closed during authentication or the
601    /// handshake.
602    pub(crate) fn on_outgoing_pending_session_dropped(
603        &mut self,
604        remote_addr: &SocketAddr,
605        peer_id: &PeerId,
606        err: &PendingSessionHandshakeError,
607    ) {
608        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
609    }
610
611    /// Gracefully disconnected an active session
612    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
613        match self.peers.entry(peer_id) {
614            Entry::Occupied(mut entry) => {
615                trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
616                self.connection_info.decr_state(entry.get().state);
617
618                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
619                    // this peer should be removed from the set
620                    entry.remove();
621                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
622                } else {
623                    let peer = entry.get_mut();
624                    // reset the peer's state
625                    // we reset the backoff counter since we're able to establish a successful
626                    // session to that peer
627                    peer.severe_backoff_counter = 0;
628                    peer.state = PeerConnectionState::Idle;
629                    peer.mark_disconnected();
630
631                    // but we're backing off slightly to avoid dialing the peer again right away, to
632                    // give the remote time to also properly register the closed session and clean
633                    // up and to avoid any issues with ip throttling on the remote in case this
634                    // session was terminated right away.
635                    peer.backed_off = true;
636                    self.backed_off_peers.insert(
637                        peer_id,
638                        std::time::Instant::now() + self.incoming_ip_throttle_duration,
639                    );
640                    trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
641                }
642            }
643            Entry::Vacant(_) => return,
644        }
645
646        self.fill_outbound_slots();
647    }
648
649    /// Called when a _pending_ outbound connection is successful.
650    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
651        if let Some(peer) = self.peers.get_mut(&peer_id) {
652            trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
653            self.connection_info.decr_state(peer.state);
654            self.connection_info.inc_out();
655            peer.state = PeerConnectionState::Out;
656            peer.mark_connected();
657        }
658    }
659
660    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
661    ///
662    /// Depending on whether the error is fatal, the peer will be removed from the peer set
663    /// otherwise its reputation is slashed.
664    pub(crate) fn on_active_session_dropped(
665        &mut self,
666        remote_addr: &SocketAddr,
667        peer_id: &PeerId,
668        err: &EthStreamError,
669    ) {
670        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
671    }
672
673    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
674    /// connection.
675    pub(crate) fn on_outgoing_connection_failure(
676        &mut self,
677        remote_addr: &SocketAddr,
678        peer_id: &PeerId,
679        err: &io::Error,
680    ) {
681        // there's a race condition where we accepted an incoming connection while we were trying to
682        // connect to the same peer at the same time. if the outgoing connection failed
683        // after the incoming connection was accepted, we can ignore this error
684        if let Some(peer) = self.peers.get(peer_id) {
685            if peer.state.is_incoming() {
686                // we already have an active connection to the peer, so we can ignore this error
687                return
688            }
689
690            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
691                // trigger resolution task for trusted peer since multiple connection failures
692                // occurred
693                self.trusted_peers_resolver.interval.reset_immediately();
694            }
695        }
696
697        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
698    }
699
700    fn on_connection_failure(
701        &mut self,
702        remote_addr: &SocketAddr,
703        peer_id: &PeerId,
704        err: impl SessionError,
705        reputation_change: ReputationChangeKind,
706    ) {
707        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
708
709        if err.is_fatal_protocol_error() {
710            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
711            // remove the peer to which we can't establish a connection due to protocol related
712            // issues.
713            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
714                self.connection_info.decr_state(entry.get().state);
715                // only remove if the peer is not trusted
716                if entry.get().is_trusted() {
717                    let peer = entry.get_mut();
718                    peer.state = PeerConnectionState::Idle;
719                    peer.mark_disconnected();
720                } else {
721                    entry.remove();
722                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
723                    // If the error is caused by a peer that should be banned from discovery
724                    if err.merits_discovery_ban() {
725                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
726                            peer_id: *peer_id,
727                            ip_addr: remote_addr.ip(),
728                        })
729                    }
730                }
731            }
732
733            // ban the peer
734            self.ban_peer(*peer_id);
735        } else {
736            let mut backoff_until = None;
737            let mut remove_peer = false;
738
739            if let Some(peer) = self.peers.get_mut(peer_id) {
740                if let Some(kind) = err.should_backoff() {
741                    if peer.is_trusted() || peer.is_static() {
742                        // provide a bit more leeway for trusted peers and use a lower backoff so
743                        // that we keep re-trying them after backing off shortly, but we should at
744                        // least backoff for the low duration to not violate the ip based inbound
745                        // connection throttle that peer has in place, because this peer might not
746                        // have us registered as a trusted peer.
747                        let backoff = self.backoff_durations.low;
748                        backoff_until = Some(std::time::Instant::now() + backoff);
749                        trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
750                    } else {
751                        // Increment peer.backoff_counter
752                        if kind.is_severe() {
753                            peer.severe_backoff_counter =
754                                peer.severe_backoff_counter.saturating_add(1);
755                        }
756                        trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
757
758                        let backoff_time =
759                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
760
761                        // The peer has signaled that it is currently unable to process any more
762                        // connections, so we will hold off on attempting any new connections for a
763                        // while
764                        backoff_until = Some(backoff_time);
765                    }
766                } else {
767                    // If the error was not a backoff error, we reduce the peer's reputation
768                    let reputation_change = self.reputation_weights.change(reputation_change);
769                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
770                };
771
772                self.connection_info.decr_state(peer.state);
773                peer.state = PeerConnectionState::Idle;
774                peer.mark_disconnected();
775
776                if peer.severe_backoff_counter > self.max_backoff_count &&
777                    !peer.is_trusted() &&
778                    !peer.is_static()
779                {
780                    // mark peer for removal if it has been backoff too many times and is _not_
781                    // trusted or static
782                    remove_peer = true;
783                }
784            }
785
786            // remove peer if it has been marked for removal
787            if remove_peer {
788                trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
789                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
790                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
791            } else if let Some(backoff_until) = backoff_until {
792                // otherwise, backoff the peer if marked as such
793                self.backoff_peer_until(*peer_id, backoff_until);
794            }
795        }
796
797        self.fill_outbound_slots();
798    }
799
800    /// Invoked if a pending session was disconnected because there's already a connection to the
801    /// peer.
802    ///
803    /// If the session was an outgoing connection, this means that the peer initiated a connection
804    /// to us at the same time and this connection is already established.
805    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
806        match direction {
807            Direction::Incoming => {
808                // need to decrement the ingoing counter
809                self.connection_info.decr_pending_in();
810            }
811            Direction::Outgoing(_) => {
812                // cleanup is handled when the incoming active session is activated in
813                // `on_incoming_session_established`
814            }
815        }
816    }
817
818    /// Called for a newly discovered peer.
819    ///
820    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
821    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
822        self.add_peer_kind(peer_id, None, addr, fork_id)
823    }
824
825    /// Marks the given peer as trusted.
826    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
827        self.trusted_peer_ids.insert(peer_id);
828        if let Some(peer) = self.peers.get_mut(&peer_id) {
829            peer.kind = PeerKind::Trusted;
830        }
831    }
832
833    /// Called for a newly discovered trusted peer.
834    ///
835    /// If the peer already exists, then the address and kind will be updated.
836    #[cfg_attr(not(test), expect(dead_code))]
837    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
838        self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
839    }
840
841    /// Adds a trusted peer that may use a hostname instead of an IP address.
842    pub(crate) fn add_trusted_peer_node(&mut self, trusted: TrustedPeer) {
843        let peer_id = trusted.id;
844        self.trusted_peer_ids.insert(peer_id);
845        self.trusted_peers_resolver.remove(peer_id);
846        self.trusted_peers_resolver.trusted_peers.push(trusted);
847        self.trusted_peers_resolver.interval.reset_immediately();
848    }
849
850    /// Called for a newly discovered peer.
851    ///
852    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
853    /// If the peer exists and a [`PeerKind`] is provided then the peer's kind is updated
854    pub(crate) fn add_peer_kind(
855        &mut self,
856        peer_id: PeerId,
857        kind: Option<PeerKind>,
858        addr: PeerAddr,
859        fork_id: Option<ForkId>,
860    ) {
861        let ip_addr = addr.tcp().ip();
862
863        // Check if the IP is in the allowed ranges (netrestrict)
864        if !self.ip_filter.is_allowed(&ip_addr) {
865            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
866            return
867        }
868
869        if self.ban_list.is_banned(&peer_id, &ip_addr) {
870            return
871        }
872
873        match self.peers.entry(peer_id) {
874            Entry::Occupied(mut entry) => {
875                let peer = entry.get_mut();
876                peer.fork_id = fork_id.map(Box::new);
877                peer.addr = addr;
878
879                if let Some(kind) = kind {
880                    peer.kind = kind;
881                }
882
883                if peer.state.is_incoming() {
884                    // now that we have an actual discovered address, for that peer and not just the
885                    // ip of the incoming connection, we don't need to remove the peer after
886                    // disconnecting, See `on_incoming_session_established`
887                    peer.remove_after_disconnect = false;
888                }
889            }
890            Entry::Vacant(entry) => {
891                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
892                let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
893                peer.fork_id = fork_id.map(Box::new);
894                entry.insert(peer);
895                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
896            }
897        }
898
899        if kind.is_some_and(|kind| kind.is_trusted()) {
900            // also track the peer in the peer id set
901            self.trusted_peer_ids.insert(peer_id);
902        }
903    }
904
905    /// Removes the tracked node from the set.
906    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
907        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
908        if entry.get().is_trusted() {
909            return
910        }
911        let mut peer = entry.remove();
912
913        trace!(target: "net::peers", ?peer_id, "remove discovered node");
914        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
915
916        if peer.state.is_connected() {
917            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
918            // we terminate the active session here, but only remove the peer after the session
919            // was disconnected, this prevents the case where the session is scheduled for
920            // disconnect but the node is immediately rediscovered, See also
921            // [`Self::on_disconnected()`]
922            peer.remove_after_disconnect = true;
923            peer.state.disconnect();
924            self.peers.insert(peer_id, peer);
925            self.queued_actions.push_back(PeerAction::Disconnect {
926                peer_id,
927                reason: Some(DisconnectReason::DisconnectRequested),
928            })
929        }
930    }
931
932    /// Bans the peer indefinitely and removes it from the peer set.
933    ///
934    /// This follows [`Self::remove_peer`] and does not override trusted status. Remove trusted
935    /// peers from the trusted set before banning them.
936    pub(crate) fn ban_peer_by_admin(&mut self, peer_id: PeerId) {
937        if self.trusted_peer_ids.contains(&peer_id) ||
938            self.peers.get(&peer_id).is_some_and(Peer::is_trusted)
939        {
940            return
941        }
942
943        self.remove_peer(peer_id);
944        self.ban_list.ban_peer(peer_id);
945    }
946
947    /// Removes the peer from the ban list.
948    pub(crate) fn unban_peer_by_admin(&mut self, peer_id: PeerId) {
949        self.ban_list.unban_peer(&peer_id);
950    }
951
952    /// Connect to the given peer. NOTE: if the maximum number of outbound sessions is reached,
953    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
954    #[cfg_attr(not(test), expect(dead_code))]
955    pub(crate) fn add_and_connect(
956        &mut self,
957        peer_id: PeerId,
958        addr: PeerAddr,
959        fork_id: Option<ForkId>,
960    ) {
961        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
962    }
963
964    /// Connects a peer and its address with the given kind.
965    ///
966    /// Note: This is invoked on demand via an external command received by the manager
967    pub(crate) fn add_and_connect_kind(
968        &mut self,
969        peer_id: PeerId,
970        kind: PeerKind,
971        addr: PeerAddr,
972        fork_id: Option<ForkId>,
973    ) {
974        let ip_addr = addr.tcp().ip();
975
976        // Check if the IP is in the allowed ranges (netrestrict)
977        if !self.ip_filter.is_allowed(&ip_addr) {
978            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
979            return
980        }
981
982        if self.ban_list.is_banned(&peer_id, &ip_addr) {
983            return
984        }
985
986        match self.peers.entry(peer_id) {
987            Entry::Occupied(mut entry) => {
988                let peer = entry.get_mut();
989                peer.kind = kind;
990                peer.fork_id = fork_id.map(Box::new);
991                peer.addr = addr;
992
993                if peer.state == PeerConnectionState::Idle {
994                    // Try connecting again.
995                    peer.state = PeerConnectionState::PendingOut;
996                    self.connection_info.inc_pending_out();
997                    self.queued_actions
998                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
999                }
1000            }
1001            Entry::Vacant(entry) => {
1002                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
1003                let mut peer = Peer::with_kind(addr, kind);
1004                peer.state = PeerConnectionState::PendingOut;
1005                peer.fork_id = fork_id.map(Box::new);
1006                entry.insert(peer);
1007                self.connection_info.inc_pending_out();
1008                self.queued_actions
1009                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
1010            }
1011        }
1012
1013        if kind.is_trusted() {
1014            self.trusted_peer_ids.insert(peer_id);
1015        }
1016    }
1017
1018    /// Removes the tracked node from the trusted set.
1019    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
1020        self.trusted_peers_resolver.remove(peer_id);
1021
1022        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else {
1023            self.trusted_peer_ids.remove(&peer_id);
1024            return
1025        };
1026        if !entry.get().is_trusted() {
1027            return
1028        }
1029
1030        let peer = entry.get_mut();
1031        peer.kind = PeerKind::Basic;
1032
1033        self.trusted_peer_ids.remove(&peer_id);
1034    }
1035
1036    /// Returns the best idle peer to connect to.
1037    ///
1038    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
1039    /// not currently marked as banned or backed off.
1040    ///
1041    /// Among remaining peers, the one with the highest reputation is selected. When reputation is
1042    /// equal, a peer with a discovered `fork_id` is preferred since it indicates a compatible fork.
1043    ///
1044    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
1045    /// `trusted` peers.
1046    ///
1047    /// Returns `None` if no peer is available.
1048    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
1049        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
1050            !peer.is_backed_off() &&
1051                !peer.is_banned() &&
1052                peer.state.is_unconnected() &&
1053                (!self.trusted_nodes_only || peer.is_trusted())
1054        });
1055
1056        // keep track of the best peer, if there's one
1057        let mut best_peer = unconnected.next()?;
1058
1059        if best_peer.1.is_trusted() || best_peer.1.is_static() {
1060            return Some((*best_peer.0, best_peer.1))
1061        }
1062
1063        for maybe_better in unconnected {
1064            // if the peer is trusted or static, return it immediately
1065            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
1066                return Some((*maybe_better.0, maybe_better.1))
1067            }
1068
1069            // prefer higher reputation, break ties by fork_id presence
1070            match maybe_better.1.reputation.cmp(&best_peer.1.reputation) {
1071                std::cmp::Ordering::Greater => best_peer = maybe_better,
1072                std::cmp::Ordering::Equal
1073                    if maybe_better.1.fork_id.is_some() && best_peer.1.fork_id.is_none() =>
1074                {
1075                    best_peer = maybe_better
1076                }
1077                _ => {}
1078            }
1079        }
1080        Some((*best_peer.0, best_peer.1))
1081    }
1082
1083    /// Disconnects one eligible peer (inbound or outbound) to open a slot for new nodes,
1084    /// mirroring Geth's peer dropper logic:
1085    /// <https://github.com/ethereum/go-ethereum/blob/c5c75977ab55e4d7ea6147cc0e221b588e5e3754/eth/dropper.go#L106-L138>
1086    ///
1087    /// A peer is eligible when its pool (inbound or outbound) is at capacity, the peer is not
1088    /// trusted or static, and it has been connected for at least [`PEER_ROTATION_MIN_UPTIME`].
1089    fn try_rotate_peer(&mut self) {
1090        let outbound_at_capacity = self.connection_info.is_outbound_at_capacity();
1091        let inbound_at_capacity = self.connection_info.is_inbound_at_capacity();
1092
1093        if !outbound_at_capacity && !inbound_at_capacity {
1094            return
1095        }
1096
1097        let now = std::time::Instant::now();
1098
1099        let candidates = self
1100            .peers
1101            .iter()
1102            .filter_map(|(peer_id, peer)| {
1103                let eligible = match peer.state {
1104                    PeerConnectionState::Out => outbound_at_capacity,
1105                    PeerConnectionState::In => inbound_at_capacity,
1106                    _ => false,
1107                };
1108                (eligible &&
1109                    !peer.is_trusted() &&
1110                    !peer.is_static() &&
1111                    !self.trusted_peer_ids.contains(peer_id) &&
1112                    peer.connected_for_at_least(now, PEER_ROTATION_MIN_UPTIME))
1113                .then_some(*peer_id)
1114            })
1115            .collect::<Vec<_>>();
1116
1117        if candidates.is_empty() {
1118            return
1119        }
1120        let peer_id = candidates[rand::rng().random_range(0..candidates.len())];
1121
1122        trace!(target: "net::peers", ?peer_id, "rotating peer to open slot for new nodes");
1123
1124        self.queued_actions.push_back(PeerAction::Disconnect {
1125            peer_id,
1126            reason: Some(DisconnectReason::UselessPeer),
1127        });
1128    }
1129
1130    /// If there's capacity for new outbound connections, this will queue new
1131    /// [`PeerAction::Connect`] actions.
1132    ///
1133    /// New connections are only initiated, if slots are available and appropriate peers are
1134    /// available.
1135    fn fill_outbound_slots(&mut self) {
1136        self.tick();
1137
1138        if !self.net_connection_state.is_active() {
1139            // nothing to fill
1140            return
1141        }
1142
1143        // as long as there are slots available fill them with the best peers
1144        while self.connection_info.has_out_capacity() {
1145            let action = {
1146                let (peer_id, peer) = match self.best_unconnected() {
1147                    Some(peer) => peer,
1148                    _ => break,
1149                };
1150
1151                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
1152
1153                peer.state = PeerConnectionState::PendingOut;
1154                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
1155            };
1156
1157            self.connection_info.inc_pending_out();
1158
1159            self.queued_actions.push_back(action);
1160        }
1161    }
1162
1163    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1164        if !self.trusted_peer_ids.contains(&peer_id) {
1165            trace!(target: "net::peers", ?peer_id, "Ignoring resolved trusted peer after removal");
1166            return
1167        }
1168
1169        let new_addr = PeerAddr::new_with_ports(
1170            new_record.address,
1171            new_record.tcp_port,
1172            Some(new_record.udp_port),
1173        );
1174
1175        if let Some(peer) = self.peers.get_mut(&peer_id) {
1176            if peer.addr != new_addr {
1177                peer.addr = new_addr;
1178                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1179            }
1180        } else {
1181            trace!(target: "net::peers", ?peer_id, ?new_addr, "Adding trusted peer after first successful resolution");
1182            self.add_peer_kind(peer_id, Some(PeerKind::Trusted), new_addr, None);
1183        }
1184    }
1185
1186    /// Keeps track of network state changes.
1187    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1188        self.net_connection_state = state;
1189    }
1190
1191    /// Returns the current network connection state.
1192    pub const fn connection_state(&self) -> &NetworkConnectionState {
1193        &self.net_connection_state
1194    }
1195
1196    /// Sets `net_connection_state` to `ShuttingDown`.
1197    pub const fn on_shutdown(&mut self) {
1198        self.net_connection_state = NetworkConnectionState::ShuttingDown;
1199    }
1200
1201    /// Advances the state.
1202    ///
1203    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
1204    /// [`PeersManager`] is polled.
1205    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1206        loop {
1207            // drain buffered actions
1208            if let Some(action) = self.queued_actions.pop_front() {
1209                return Poll::Ready(action)
1210            }
1211
1212            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1213                match cmd {
1214                    PeerCommand::Add(peer_id, addr) => {
1215                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1216                    }
1217                    PeerCommand::Remove(peer) => self.remove_peer(peer),
1218                    PeerCommand::ReputationChange(peer_id, rep) => {
1219                        self.apply_reputation_change(&peer_id, rep)
1220                    }
1221                    PeerCommand::GetPeer(peer, tx) => {
1222                        let _ = tx.send(self.peers.get(&peer).cloned());
1223                    }
1224                    PeerCommand::GetPeers(tx) => {
1225                        let _ = tx.send(self.iter_peers().collect());
1226                    }
1227                }
1228            }
1229
1230            if self.release_interval.poll_tick(cx).is_ready() {
1231                let now = std::time::Instant::now();
1232                let (_, unbanned_peers) = self.ban_list.evict(now);
1233
1234                for peer_id in unbanned_peers {
1235                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1236                        peer.unban();
1237                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1238                    }
1239                }
1240
1241                // clear the backoff list of expired backoffs, and mark the relevant peers as
1242                // ready to be dialed
1243                self.backed_off_peers.retain(|peer_id, until| {
1244                    if now > *until {
1245                        if let Some(peer) = self.peers.get_mut(peer_id) {
1246                            peer.backed_off = false;
1247                        }
1248                        return false
1249                    }
1250                    true
1251                })
1252            }
1253
1254            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1255                self.fill_outbound_slots();
1256            }
1257
1258            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1259                self.on_resolved_peer(peer_id, new_record);
1260            }
1261
1262            // Poll the jittered rotation timer and rotate one eligible peer when ready.
1263            let rotation_ready = self
1264                .peer_rotation_sleep
1265                .as_mut()
1266                .is_some_and(|sleep| sleep.as_mut().poll(cx).is_ready());
1267            if rotation_ready {
1268                self.try_rotate_peer();
1269                if let Some((mean, sleep)) =
1270                    self.peer_rotation_mean.zip(self.peer_rotation_sleep.as_mut())
1271                {
1272                    sleep
1273                        .as_mut()
1274                        .reset(tokio::time::Instant::now() + jitter_rotation_interval(mean));
1275                }
1276            }
1277
1278            if self.queued_actions.is_empty() {
1279                return Poll::Pending
1280            }
1281        }
1282    }
1283}
1284
1285impl Default for PeersManager {
1286    fn default() -> Self {
1287        Self::new(Default::default())
1288    }
1289}
1290
1291/// Tracks stats about connected nodes
1292#[derive(Debug, Clone, PartialEq, Eq, Default)]
1293pub struct ConnectionInfo {
1294    /// Counter for currently occupied slots for active outbound connections.
1295    num_outbound: usize,
1296    /// Counter for pending outbound connections.
1297    num_pending_out: usize,
1298    /// Counter for currently occupied slots for active inbound connections.
1299    num_inbound: usize,
1300    /// Counter for pending inbound connections.
1301    num_pending_in: usize,
1302    /// Restrictions on number of connections.
1303    config: ConnectionsConfig,
1304}
1305
1306// === impl ConnectionInfo ===
1307
1308impl ConnectionInfo {
1309    /// Returns a new [`ConnectionInfo`] with the given config.
1310    const fn new(config: ConnectionsConfig) -> Self {
1311        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1312    }
1313
1314    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1315    const fn has_out_capacity(&self) -> bool {
1316        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1317            self.num_outbound < self.config.max_outbound
1318    }
1319
1320    /// Returns `true` if all active outbound slots are occupied (ignoring pending dials).
1321    const fn is_outbound_at_capacity(&self) -> bool {
1322        self.num_outbound >= self.config.max_outbound
1323    }
1324
1325    /// Returns `true` if all active inbound slots are occupied.
1326    const fn is_inbound_at_capacity(&self) -> bool {
1327        self.num_inbound >= self.config.max_inbound
1328    }
1329
1330    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1331    const fn has_in_capacity(&self) -> bool {
1332        self.num_inbound < self.config.max_inbound
1333    }
1334
1335    /// Returns `true` if we can handle an additional incoming pending connection.
1336    const fn has_in_pending_capacity(&self) -> bool {
1337        self.num_pending_in < self.config.max_inbound
1338    }
1339
1340    const fn decr_state(&mut self, state: PeerConnectionState) {
1341        match state {
1342            PeerConnectionState::Idle => {}
1343            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1344            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1345            PeerConnectionState::PendingOut => self.decr_pending_out(),
1346        }
1347    }
1348
1349    const fn decr_out(&mut self) {
1350        self.num_outbound -= 1;
1351    }
1352
1353    const fn inc_out(&mut self) {
1354        self.num_outbound += 1;
1355    }
1356
1357    const fn inc_pending_out(&mut self) {
1358        self.num_pending_out += 1;
1359    }
1360
1361    const fn inc_in(&mut self) {
1362        self.num_inbound += 1;
1363    }
1364
1365    const fn inc_pending_in(&mut self) {
1366        self.num_pending_in += 1;
1367    }
1368
1369    const fn decr_in(&mut self) {
1370        self.num_inbound -= 1;
1371    }
1372
1373    const fn decr_pending_out(&mut self) {
1374        self.num_pending_out -= 1;
1375    }
1376
1377    const fn decr_pending_in(&mut self) {
1378        self.num_pending_in -= 1;
1379    }
1380}
1381
1382/// Actions the peer manager can trigger.
1383#[derive(Debug)]
1384pub enum PeerAction {
1385    /// Start a new connection to a peer.
1386    Connect {
1387        /// The peer to connect to.
1388        peer_id: PeerId,
1389        /// Where to reach the node
1390        remote_addr: SocketAddr,
1391    },
1392    /// Disconnect an existing connection.
1393    Disconnect {
1394        /// The peer ID of the established connection.
1395        peer_id: PeerId,
1396        /// An optional reason for the disconnect.
1397        reason: Option<DisconnectReason>,
1398    },
1399    /// Disconnect an existing incoming connection, because the peers reputation is below the
1400    /// banned threshold or is on the [`BanList`]
1401    DisconnectBannedIncoming {
1402        /// The peer ID of the established connection.
1403        peer_id: PeerId,
1404    },
1405    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1406    DisconnectUntrustedIncoming {
1407        /// The peer ID.
1408        peer_id: PeerId,
1409    },
1410    /// Ban the peer in discovery.
1411    DiscoveryBanPeerId {
1412        /// The peer ID.
1413        peer_id: PeerId,
1414        /// The IP address.
1415        ip_addr: IpAddr,
1416    },
1417    /// Ban the IP in discovery.
1418    DiscoveryBanIp {
1419        /// The IP address.
1420        ip_addr: IpAddr,
1421    },
1422    /// Ban the peer temporarily
1423    BanPeer {
1424        /// The peer ID.
1425        peer_id: PeerId,
1426    },
1427    /// Unban the peer temporarily
1428    UnBanPeer {
1429        /// The peer ID.
1430        peer_id: PeerId,
1431    },
1432    /// Emit peerAdded event
1433    PeerAdded(PeerId),
1434    /// Emit peerRemoved event
1435    PeerRemoved(PeerId),
1436}
1437
1438/// Error thrown when an incoming connection is rejected right away
1439#[derive(Debug, Error, PartialEq, Eq)]
1440pub enum InboundConnectionError {
1441    /// The remote's ip address is banned
1442    IpBanned,
1443    /// No capacity for new inbound connections
1444    ExceedsCapacity,
1445}
1446
1447impl Display for InboundConnectionError {
1448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1449        write!(f, "{self:?}")
1450    }
1451}
1452
1453/// The reason a peer was backed off.
1454#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1455pub enum BackoffReason {
1456    /// The remote peer responded with `TooManyPeers` (0x04).
1457    TooManyPeers,
1458    /// The session was gracefully closed and we're backing off briefly.
1459    GracefulClose,
1460    /// A connection or protocol-level error occurred.
1461    ConnectionError,
1462}
1463
1464impl BackoffReason {
1465    /// Derives the backoff reason from an optional [`DisconnectReason`].
1466    pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
1467        match reason {
1468            Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
1469            _ => Self::ConnectionError,
1470        }
1471    }
1472}
1473
1474/// Returns a random duration uniformly distributed in `[mean * 3/5, mean * 7/5]`.
1475///
1476/// With the default 5-minute mean this gives `[3 min, 7 min]`, matching Geth's peer dropper:
1477/// <https://github.com/ethereum/go-ethereum/blob/c5c75977ab55e4d7ea6147cc0e221b588e5e3754/eth/dropper.go#L32-L36>
1478fn jitter_rotation_interval(mean: Duration) -> Duration {
1479    let min_nanos = (mean * 3 / 5).as_nanos() as u64;
1480    let max_nanos = (mean * 7 / 5).as_nanos() as u64;
1481    Duration::from_nanos(rand::rng().random_range(min_nanos..=max_nanos))
1482}
1483
1484#[cfg(test)]
1485mod tests {
1486    use alloy_primitives::B512;
1487    use reth_eth_wire::{
1488        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1489        DisconnectReason,
1490    };
1491    use reth_ethereum_forks::{ForkHash, ForkId};
1492    use reth_net_banlist::BanList;
1493    use reth_network_api::Direction;
1494    use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
1495    use reth_network_types::{
1496        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1497    };
1498    use std::{
1499        future::{poll_fn, Future},
1500        io,
1501        net::{IpAddr, Ipv4Addr, SocketAddr},
1502        pin::Pin,
1503        task::{Context, Poll},
1504        time::Duration,
1505    };
1506    use url::Host;
1507
1508    use super::PeersManager;
1509    use crate::{
1510        error::SessionError,
1511        peers::{
1512            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1513            PeerConnectionState,
1514        },
1515        session::PendingSessionHandshakeError,
1516        PeersConfig,
1517    };
1518
1519    struct PeerActionFuture<'a> {
1520        peers: &'a mut PeersManager,
1521    }
1522
1523    impl Future for PeerActionFuture<'_> {
1524        type Output = PeerAction;
1525
1526        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1527            self.get_mut().peers.poll(cx)
1528        }
1529    }
1530
1531    macro_rules! event {
1532        ($peers:expr) => {
1533            PeerActionFuture { peers: &mut $peers }.await
1534        };
1535    }
1536
1537    fn set_connected_at(
1538        peers: &mut PeersManager,
1539        peer_id: PeerId,
1540        connected_at: std::time::Instant,
1541    ) {
1542        peers.peers.get_mut(&peer_id).expect("peer exists").connected_at = Some(connected_at);
1543    }
1544
1545    #[tokio::test]
1546    async fn test_insert() {
1547        let peer = PeerId::random();
1548        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1549        let mut peers = PeersManager::default();
1550        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1551
1552        match event!(peers) {
1553            PeerAction::PeerAdded(peer_id) => {
1554                assert_eq!(peer_id, peer);
1555            }
1556            _ => unreachable!(),
1557        }
1558        match event!(peers) {
1559            PeerAction::Connect { peer_id, remote_addr } => {
1560                assert_eq!(peer_id, peer);
1561                assert_eq!(remote_addr, socket_addr);
1562            }
1563            _ => unreachable!(),
1564        }
1565
1566        let (record, _) = peers.peer_by_id(peer).unwrap();
1567        assert_eq!(record.tcp_addr(), socket_addr);
1568        assert_eq!(record.udp_addr(), socket_addr);
1569    }
1570
1571    #[tokio::test]
1572    async fn test_insert_udp() {
1573        let peer = PeerId::random();
1574        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1575        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1576        let mut peers = PeersManager::default();
1577        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1578
1579        match event!(peers) {
1580            PeerAction::PeerAdded(peer_id) => {
1581                assert_eq!(peer_id, peer);
1582            }
1583            _ => unreachable!(),
1584        }
1585        match event!(peers) {
1586            PeerAction::Connect { peer_id, remote_addr } => {
1587                assert_eq!(peer_id, peer);
1588                assert_eq!(remote_addr, tcp_addr);
1589            }
1590            _ => unreachable!(),
1591        }
1592
1593        let (record, _) = peers.peer_by_id(peer).unwrap();
1594        assert_eq!(record.tcp_addr(), tcp_addr);
1595        assert_eq!(record.udp_addr(), udp_addr);
1596    }
1597
1598    #[tokio::test]
1599    async fn test_ban() {
1600        let peer = PeerId::random();
1601        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1602        let mut peers = PeersManager::default();
1603        peers.ban_peer(peer);
1604        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1605
1606        match event!(peers) {
1607            PeerAction::BanPeer { peer_id } => {
1608                assert_eq!(peer_id, peer);
1609            }
1610            _ => unreachable!(),
1611        }
1612
1613        poll_fn(|cx| {
1614            assert!(peers.poll(cx).is_pending());
1615            Poll::Ready(())
1616        })
1617        .await;
1618    }
1619
1620    #[tokio::test]
1621    async fn test_unban() {
1622        let peer = PeerId::random();
1623        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1624        let mut peers = PeersManager::default();
1625        peers.ban_peer(peer);
1626        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1627
1628        match event!(peers) {
1629            PeerAction::BanPeer { peer_id } => {
1630                assert_eq!(peer_id, peer);
1631            }
1632            _ => unreachable!(),
1633        }
1634
1635        poll_fn(|cx| {
1636            assert!(peers.poll(cx).is_pending());
1637            Poll::Ready(())
1638        })
1639        .await;
1640
1641        peers.unban_peer(peer);
1642
1643        match event!(peers) {
1644            PeerAction::UnBanPeer { peer_id } => {
1645                assert_eq!(peer_id, peer);
1646            }
1647            _ => unreachable!(),
1648        }
1649
1650        poll_fn(|cx| {
1651            assert!(peers.poll(cx).is_pending());
1652            Poll::Ready(())
1653        })
1654        .await;
1655    }
1656
1657    #[tokio::test]
1658    async fn test_admin_ban_removes_peer_and_bans_indefinitely() {
1659        let peer = PeerId::random();
1660        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1661        let mut peers = PeersManager::default();
1662        peers.peers.insert(peer, Peer::new(PeerAddr::from_tcp(socket_addr)));
1663
1664        peers.ban_peer_by_admin(peer);
1665
1666        assert!(peers.ban_list.is_banned_peer(&peer));
1667        assert!(peers.peer_by_id(peer).is_none());
1668
1669        match peers.queued_actions.pop_front() {
1670            Some(PeerAction::PeerRemoved(peer_id)) => assert_eq!(peer_id, peer),
1671            other => panic!("unexpected action: {other:?}"),
1672        }
1673
1674        let (_, unbanned_peers) =
1675            peers.ban_list.evict(std::time::Instant::now() + Duration::from_secs(1));
1676        assert!(unbanned_peers.is_empty());
1677    }
1678
1679    #[tokio::test]
1680    async fn test_admin_ban_does_not_override_trusted_peer() {
1681        let peer = PeerId::random();
1682        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1683        let mut peers = PeersManager::default();
1684        peers.peers.insert(peer, Peer::trusted(PeerAddr::from_tcp(socket_addr)));
1685
1686        peers.ban_peer_by_admin(peer);
1687
1688        assert!(!peers.ban_list.is_banned_peer(&peer));
1689        assert!(peers.peer_by_id(peer).is_some());
1690        assert!(peers.queued_actions.is_empty());
1691    }
1692
1693    #[tokio::test]
1694    async fn test_admin_unban_only_removes_banlist_entry() {
1695        let peer = PeerId::random();
1696        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1697        let mut peers = PeersManager::default();
1698        let mut peer_info = Peer::new(PeerAddr::from_tcp(socket_addr));
1699        peer_info.reputation = i32::MIN;
1700        peers.peers.insert(peer, peer_info);
1701        peers.ban_list.ban_peer(peer);
1702        assert!(peers.peers.get(&peer).is_some_and(Peer::is_banned));
1703
1704        peers.unban_peer_by_admin(peer);
1705
1706        assert!(!peers.ban_list.is_banned_peer(&peer));
1707        assert!(peers.peers.get(&peer).is_some_and(Peer::is_banned));
1708    }
1709
1710    #[tokio::test]
1711    async fn test_backoff_on_busy() {
1712        let peer = PeerId::random();
1713        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1714
1715        let mut peers = PeersManager::new(PeersConfig::test());
1716        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1717
1718        match event!(peers) {
1719            PeerAction::PeerAdded(peer_id) => {
1720                assert_eq!(peer_id, peer);
1721            }
1722            _ => unreachable!(),
1723        }
1724        match event!(peers) {
1725            PeerAction::Connect { peer_id, .. } => {
1726                assert_eq!(peer_id, peer);
1727            }
1728            _ => unreachable!(),
1729        }
1730
1731        poll_fn(|cx| {
1732            assert!(peers.poll(cx).is_pending());
1733            Poll::Ready(())
1734        })
1735        .await;
1736
1737        peers.on_active_session_dropped(
1738            &socket_addr,
1739            &peer,
1740            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1741                DisconnectReason::TooManyPeers,
1742            )),
1743        );
1744
1745        poll_fn(|cx| {
1746            assert!(peers.poll(cx).is_pending());
1747            Poll::Ready(())
1748        })
1749        .await;
1750
1751        assert!(peers.backed_off_peers.contains_key(&peer));
1752        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1753
1754        tokio::time::sleep(peers.backoff_durations.low).await;
1755
1756        match event!(peers) {
1757            PeerAction::Connect { peer_id, .. } => {
1758                assert_eq!(peer_id, peer);
1759            }
1760            _ => unreachable!(),
1761        }
1762
1763        assert!(!peers.backed_off_peers.contains_key(&peer));
1764        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1765    }
1766
1767    #[tokio::test]
1768    async fn test_backoff_on_no_response() {
1769        let peer = PeerId::random();
1770        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1771
1772        let backoff_durations = PeerBackoffDurations::test();
1773        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1774        let mut peers = PeersManager::new(config);
1775        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1776
1777        match event!(peers) {
1778            PeerAction::PeerAdded(peer_id) => {
1779                assert_eq!(peer_id, peer);
1780            }
1781            _ => unreachable!(),
1782        }
1783        match event!(peers) {
1784            PeerAction::Connect { peer_id, .. } => {
1785                assert_eq!(peer_id, peer);
1786            }
1787            _ => unreachable!(),
1788        }
1789
1790        poll_fn(|cx| {
1791            assert!(peers.poll(cx).is_pending());
1792            Poll::Ready(())
1793        })
1794        .await;
1795
1796        peers.on_outgoing_pending_session_dropped(
1797            &socket_addr,
1798            &peer,
1799            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1800                EthHandshakeError::NoResponse,
1801            )),
1802        );
1803
1804        poll_fn(|cx| {
1805            assert!(peers.poll(cx).is_pending());
1806            Poll::Ready(())
1807        })
1808        .await;
1809
1810        assert!(peers.backed_off_peers.contains_key(&peer));
1811        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1812
1813        tokio::time::sleep(backoff_durations.high).await;
1814
1815        match event!(peers) {
1816            PeerAction::Connect { peer_id, .. } => {
1817                assert_eq!(peer_id, peer);
1818            }
1819            _ => unreachable!(),
1820        }
1821
1822        assert!(!peers.backed_off_peers.contains_key(&peer));
1823        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1824    }
1825
1826    #[tokio::test]
1827    async fn test_low_backoff() {
1828        let peer = PeerId::random();
1829        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1830        let config = PeersConfig::test();
1831        let mut peers = PeersManager::new(config);
1832        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1833        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1834
1835        let backoff_timestamp = peers
1836            .backoff_durations
1837            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1838
1839        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1840        assert!(backoff_timestamp <= expected);
1841    }
1842
1843    #[tokio::test]
1844    async fn test_multiple_backoff_calculations() {
1845        let peer = PeerId::random();
1846        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1847        let config = PeersConfig::default();
1848        let mut peers = PeersManager::new(config);
1849        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1850        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1851
1852        // Simulate a peer that was already backed off once
1853        peer_struct.severe_backoff_counter = 1;
1854
1855        let now = std::time::Instant::now();
1856
1857        // Simulate the increment that happens in on_connection_failure
1858        peer_struct.severe_backoff_counter += 1;
1859        // Get official backoff time
1860        let backoff_time = peers
1861            .backoff_durations
1862            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1863
1864        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1865        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1866
1867        // We can't use assert_eq! since there is a very small diff in the nano secs
1868        // Usually it is 1800s != 1799.9999996s
1869        assert!(backoff_time.duration_since(now) > backoff_duration);
1870    }
1871
1872    #[tokio::test]
1873    async fn test_ban_on_active_drop() {
1874        let peer = PeerId::random();
1875        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1876        let mut peers = PeersManager::default();
1877        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1878
1879        match event!(peers) {
1880            PeerAction::PeerAdded(peer_id) => {
1881                assert_eq!(peer_id, peer);
1882            }
1883            _ => unreachable!(),
1884        }
1885        match event!(peers) {
1886            PeerAction::Connect { peer_id, .. } => {
1887                assert_eq!(peer_id, peer);
1888            }
1889            _ => unreachable!(),
1890        }
1891
1892        poll_fn(|cx| {
1893            assert!(peers.poll(cx).is_pending());
1894            Poll::Ready(())
1895        })
1896        .await;
1897
1898        peers.on_active_session_dropped(
1899            &socket_addr,
1900            &peer,
1901            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1902                DisconnectReason::UselessPeer,
1903            )),
1904        );
1905
1906        match event!(peers) {
1907            PeerAction::PeerRemoved(peer_id) => {
1908                assert_eq!(peer_id, peer);
1909            }
1910            _ => unreachable!(),
1911        }
1912        match event!(peers) {
1913            PeerAction::BanPeer { peer_id } => {
1914                assert_eq!(peer_id, peer);
1915            }
1916            _ => unreachable!(),
1917        }
1918
1919        poll_fn(|cx| {
1920            assert!(peers.poll(cx).is_pending());
1921            Poll::Ready(())
1922        })
1923        .await;
1924
1925        assert!(!peers.peers.contains_key(&peer));
1926    }
1927
1928    #[tokio::test]
1929    async fn test_remove_on_max_backoff_count() {
1930        let peer = PeerId::random();
1931        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1932        let config = PeersConfig::test();
1933        let mut peers = PeersManager::new(config.clone());
1934        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1935        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1936
1937        // Simulate a peer that was already backed off once
1938        peer_struct.severe_backoff_counter = config.max_backoff_count;
1939
1940        match event!(peers) {
1941            PeerAction::PeerAdded(peer_id) => {
1942                assert_eq!(peer_id, peer);
1943            }
1944            _ => unreachable!(),
1945        }
1946        match event!(peers) {
1947            PeerAction::Connect { peer_id, .. } => {
1948                assert_eq!(peer_id, peer);
1949            }
1950            _ => unreachable!(),
1951        }
1952
1953        poll_fn(|cx| {
1954            assert!(peers.poll(cx).is_pending());
1955            Poll::Ready(())
1956        })
1957        .await;
1958
1959        peers.on_outgoing_pending_session_dropped(
1960            &socket_addr,
1961            &peer,
1962            &PendingSessionHandshakeError::Eth(
1963                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1964            ),
1965        );
1966
1967        match event!(peers) {
1968            PeerAction::PeerRemoved(peer_id) => {
1969                assert_eq!(peer_id, peer);
1970            }
1971            _ => unreachable!(),
1972        }
1973
1974        poll_fn(|cx| {
1975            assert!(peers.poll(cx).is_pending());
1976            Poll::Ready(())
1977        })
1978        .await;
1979
1980        assert!(!peers.peers.contains_key(&peer));
1981    }
1982
1983    #[tokio::test]
1984    async fn test_ban_on_pending_drop() {
1985        let peer = PeerId::random();
1986        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1987        let mut peers = PeersManager::default();
1988        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1989
1990        match event!(peers) {
1991            PeerAction::PeerAdded(peer_id) => {
1992                assert_eq!(peer_id, peer);
1993            }
1994            _ => unreachable!(),
1995        }
1996        match event!(peers) {
1997            PeerAction::Connect { peer_id, .. } => {
1998                assert_eq!(peer_id, peer);
1999            }
2000            _ => unreachable!(),
2001        }
2002
2003        poll_fn(|cx| {
2004            assert!(peers.poll(cx).is_pending());
2005            Poll::Ready(())
2006        })
2007        .await;
2008
2009        peers.on_outgoing_pending_session_dropped(
2010            &socket_addr,
2011            &peer,
2012            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
2013                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
2014            )),
2015        );
2016
2017        match event!(peers) {
2018            PeerAction::PeerRemoved(peer_id) => {
2019                assert_eq!(peer_id, peer);
2020            }
2021            _ => unreachable!(),
2022        }
2023        match event!(peers) {
2024            PeerAction::BanPeer { peer_id } => {
2025                assert_eq!(peer_id, peer);
2026            }
2027            _ => unreachable!(),
2028        }
2029
2030        poll_fn(|cx| {
2031            assert!(peers.poll(cx).is_pending());
2032            Poll::Ready(())
2033        })
2034        .await;
2035
2036        assert!(!peers.peers.contains_key(&peer));
2037    }
2038
2039    #[tokio::test]
2040    async fn test_internally_closed_incoming() {
2041        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2042        let mut peers = PeersManager::default();
2043
2044        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2045        assert_eq!(peers.connection_info.num_pending_in, 1);
2046        peers.on_incoming_pending_session_rejected_internally();
2047        assert_eq!(peers.connection_info.num_pending_in, 0);
2048    }
2049
2050    #[tokio::test]
2051    async fn test_reject_incoming_at_pending_capacity() {
2052        let mut peers = PeersManager::default();
2053
2054        for count in 1..=peers.connection_info.config.max_inbound {
2055            let socket_addr =
2056                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
2057            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2058            assert_eq!(peers.connection_info.num_pending_in, count);
2059        }
2060        assert!(peers.connection_info.has_in_capacity());
2061        assert!(!peers.connection_info.has_in_pending_capacity());
2062
2063        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2064        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
2065    }
2066
2067    #[tokio::test]
2068    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
2069        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2070        let trusted = PeerId::random();
2071        peers.add_trusted_peer_id(trusted);
2072
2073        // connect the trusted peer
2074        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
2075        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2076        peers.on_incoming_session_established(trusted, addr);
2077
2078        match event!(peers) {
2079            PeerAction::PeerAdded(id) => {
2080                assert_eq!(id, trusted);
2081            }
2082            _ => unreachable!(),
2083        }
2084
2085        // saturate the remaining inbound slots with untrusted peers
2086        let mut connected_untrusted_peer_ids = Vec::new();
2087        for i in 0..(peers.connection_info.config.max_inbound - 1) {
2088            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
2089            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2090            let peer_id = PeerId::random();
2091            peers.on_incoming_session_established(peer_id, addr);
2092            connected_untrusted_peer_ids.push(peer_id);
2093
2094            match event!(peers) {
2095                PeerAction::PeerAdded(id) => {
2096                    assert_eq!(id, peer_id);
2097                }
2098                _ => unreachable!(),
2099            }
2100        }
2101
2102        let mut pending_addrs = Vec::new();
2103
2104        // saturate available slots
2105        for i in 0..2 {
2106            let socket_addr =
2107                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
2108            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2109
2110            pending_addrs.push(socket_addr);
2111        }
2112
2113        assert_eq!(peers.connection_info.num_pending_in, 2);
2114
2115        // try to handle additional incoming connections at capacity
2116        for i in 0..2 {
2117            let socket_addr =
2118                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
2119            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
2120        }
2121
2122        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
2123            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
2124                DisconnectReason::UselessPeer,
2125            )),
2126        ));
2127
2128        // Remove all pending peers
2129        for pending_addr in pending_addrs {
2130            peers.on_incoming_pending_session_dropped(pending_addr, &err);
2131        }
2132
2133        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
2134
2135        println!(
2136            "num_inbound: {}, has_in_capacity: {}",
2137            peers.connection_info.num_inbound,
2138            peers.connection_info.has_in_capacity()
2139        );
2140
2141        // disconnect a connected peer
2142        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
2143
2144        println!(
2145            "num_inbound: {}, has_in_capacity: {}",
2146            peers.connection_info.num_inbound,
2147            peers.connection_info.has_in_capacity()
2148        );
2149
2150        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2151        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2152    }
2153
2154    #[tokio::test]
2155    async fn test_closed_incoming() {
2156        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2157        let mut peers = PeersManager::default();
2158
2159        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2160        assert_eq!(peers.connection_info.num_pending_in, 1);
2161        peers.on_incoming_pending_session_gracefully_closed();
2162        assert_eq!(peers.connection_info.num_pending_in, 0);
2163    }
2164
2165    #[tokio::test]
2166    async fn test_dropped_incoming() {
2167        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
2168        let ban_duration = Duration::from_millis(500);
2169        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
2170        let mut peers = PeersManager::new(config);
2171
2172        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2173        assert_eq!(peers.connection_info.num_pending_in, 1);
2174        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
2175            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
2176                DisconnectReason::UselessPeer,
2177            )),
2178        ));
2179
2180        peers.on_incoming_pending_session_dropped(socket_addr, &err);
2181        assert_eq!(peers.connection_info.num_pending_in, 0);
2182        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
2183
2184        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
2185
2186        // unbanned after timeout
2187        tokio::time::sleep(ban_duration).await;
2188
2189        poll_fn(|cx| {
2190            let _ = peers.poll(cx);
2191            Poll::Ready(())
2192        })
2193        .await;
2194
2195        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
2196        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2197    }
2198
2199    #[tokio::test]
2200    async fn test_reputation_change_connected() {
2201        let peer = PeerId::random();
2202        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2203        let mut peers = PeersManager::default();
2204        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2205
2206        match event!(peers) {
2207            PeerAction::PeerAdded(peer_id) => {
2208                assert_eq!(peer_id, peer);
2209            }
2210            _ => unreachable!(),
2211        }
2212        match event!(peers) {
2213            PeerAction::Connect { peer_id, remote_addr } => {
2214                assert_eq!(peer_id, peer);
2215                assert_eq!(remote_addr, socket_addr);
2216            }
2217            _ => unreachable!(),
2218        }
2219
2220        let p = peers.peers.get_mut(&peer).unwrap();
2221        assert_eq!(p.state, PeerConnectionState::PendingOut);
2222
2223        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
2224
2225        let p = peers.peers.get(&peer).unwrap();
2226        assert_eq!(p.state, PeerConnectionState::PendingOut);
2227        assert!(p.is_banned());
2228
2229        peers.on_active_session_gracefully_closed(peer);
2230
2231        let p = peers.peers.get(&peer).unwrap();
2232        assert_eq!(p.state, PeerConnectionState::Idle);
2233        assert!(p.is_banned());
2234
2235        match event!(peers) {
2236            PeerAction::Disconnect { peer_id, .. } => {
2237                assert_eq!(peer_id, peer);
2238            }
2239            _ => unreachable!(),
2240        }
2241    }
2242
2243    #[tokio::test]
2244    async fn retain_trusted_status() {
2245        let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2246        let trusted = PeerId::random();
2247        let mut peers =
2248            PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2249                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2250                tcp_port: 8008,
2251                udp_port: 8008,
2252                id: trusted,
2253            }]));
2254        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2255        peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
2256        assert!(peers.peers.get(&trusted).unwrap().is_trusted());
2257    }
2258
2259    #[tokio::test]
2260    async fn accept_incoming_trusted_unknown_peer_address() {
2261        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2262        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2263        // try to connect trusted peer
2264        let trusted = PeerId::random();
2265        peers.add_trusted_peer_id(trusted);
2266
2267        // saturate the inbound slots
2268        for i in 0..peers.connection_info.config.max_inbound {
2269            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
2270            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2271            let peer_id = PeerId::random();
2272            peers.on_incoming_session_established(peer_id, addr);
2273
2274            match event!(peers) {
2275                PeerAction::PeerAdded(id) => {
2276                    assert_eq!(id, peer_id);
2277                }
2278                _ => unreachable!(),
2279            }
2280        }
2281
2282        // try to connect untrusted peer
2283        let untrusted = PeerId::random();
2284        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2285        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2286        peers.on_incoming_session_established(untrusted, socket_addr);
2287
2288        match event!(peers) {
2289            PeerAction::PeerAdded(id) => {
2290                assert_eq!(id, untrusted);
2291            }
2292            _ => unreachable!(),
2293        }
2294
2295        match event!(peers) {
2296            PeerAction::Disconnect { peer_id, reason } => {
2297                assert_eq!(peer_id, untrusted);
2298                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2299            }
2300            _ => unreachable!(),
2301        }
2302
2303        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2304        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2305        peers.on_incoming_session_established(trusted, socket_addr);
2306
2307        match event!(peers) {
2308            PeerAction::PeerAdded(id) => {
2309                assert_eq!(id, trusted);
2310            }
2311            _ => unreachable!(),
2312        }
2313
2314        poll_fn(|cx| {
2315            assert!(peers.poll(cx).is_pending());
2316            Poll::Ready(())
2317        })
2318        .await;
2319
2320        let peer = peers.peers.get(&trusted).unwrap();
2321        assert_eq!(peer.state, PeerConnectionState::In);
2322    }
2323
2324    #[tokio::test]
2325    async fn test_already_connected() {
2326        let peer = PeerId::random();
2327        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2328        let mut peers = PeersManager::default();
2329
2330        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
2331        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2332        assert_eq!(peers.connection_info.num_pending_in, 1);
2333
2334        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
2335        // to increase by 1
2336        peers.on_incoming_session_established(peer, socket_addr);
2337        let p = peers.peers.get_mut(&peer).expect("peer not found");
2338        assert_eq!(p.addr.tcp(), socket_addr);
2339        assert_eq!(peers.connection_info.num_pending_in, 0);
2340        assert_eq!(peers.connection_info.num_inbound, 1);
2341
2342        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
2343        // by 1
2344        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2345        assert_eq!(peers.connection_info.num_pending_in, 1);
2346
2347        // Simulate a rejection due to an already established connection, expecting the
2348        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
2349        // should not be changed.
2350        peers.on_already_connected(Direction::Incoming);
2351
2352        let p = peers.peers.get_mut(&peer).expect("peer not found");
2353        assert_eq!(p.addr.tcp(), socket_addr);
2354        assert_eq!(peers.connection_info.num_pending_in, 0);
2355        assert_eq!(peers.connection_info.num_inbound, 1);
2356    }
2357
2358    #[tokio::test]
2359    async fn test_reputation_change_trusted_peer() {
2360        let peer = PeerId::random();
2361        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2362        let mut peers = PeersManager::default();
2363        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2364
2365        match event!(peers) {
2366            PeerAction::PeerAdded(peer_id) => {
2367                assert_eq!(peer_id, peer);
2368            }
2369            _ => unreachable!(),
2370        }
2371        match event!(peers) {
2372            PeerAction::Connect { peer_id, remote_addr } => {
2373                assert_eq!(peer_id, peer);
2374                assert_eq!(remote_addr, socket_addr);
2375            }
2376            _ => unreachable!(),
2377        }
2378
2379        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2380        peers.on_active_outgoing_established(peer);
2381        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2382
2383        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2384
2385        {
2386            let p = peers.peers.get(&peer).unwrap();
2387            assert_eq!(p.state, PeerConnectionState::Out);
2388            // not banned yet
2389            assert!(!p.is_banned());
2390        }
2391
2392        // ensure peer is banned eventually
2393        loop {
2394            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2395
2396            let p = peers.peers.get(&peer).unwrap();
2397            if p.is_banned() {
2398                break
2399            }
2400        }
2401
2402        match event!(peers) {
2403            PeerAction::Disconnect { peer_id, .. } => {
2404                assert_eq!(peer_id, peer);
2405            }
2406            _ => unreachable!(),
2407        }
2408    }
2409
2410    #[tokio::test]
2411    async fn test_reputation_management() {
2412        let peer = PeerId::random();
2413        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2414        let mut peers = PeersManager::default();
2415        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2416        assert_eq!(peers.get_reputation(&peer), Some(0));
2417
2418        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2419        assert_eq!(peers.get_reputation(&peer), Some(1024));
2420
2421        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2422        assert_eq!(peers.get_reputation(&peer), Some(0));
2423    }
2424
2425    #[tokio::test]
2426    async fn test_remove_discovered_active() {
2427        let peer = PeerId::random();
2428        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2429        let mut peers = PeersManager::default();
2430        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2431
2432        match event!(peers) {
2433            PeerAction::PeerAdded(peer_id) => {
2434                assert_eq!(peer_id, peer);
2435            }
2436            _ => unreachable!(),
2437        }
2438        match event!(peers) {
2439            PeerAction::Connect { peer_id, remote_addr } => {
2440                assert_eq!(peer_id, peer);
2441                assert_eq!(remote_addr, socket_addr);
2442            }
2443            _ => unreachable!(),
2444        }
2445
2446        let p = peers.peers.get(&peer).unwrap();
2447        assert_eq!(p.state, PeerConnectionState::PendingOut);
2448
2449        peers.remove_peer(peer);
2450
2451        match event!(peers) {
2452            PeerAction::PeerRemoved(peer_id) => {
2453                assert_eq!(peer_id, peer);
2454            }
2455            _ => unreachable!(),
2456        }
2457        match event!(peers) {
2458            PeerAction::Disconnect { peer_id, .. } => {
2459                assert_eq!(peer_id, peer);
2460            }
2461            _ => unreachable!(),
2462        }
2463
2464        let p = peers.peers.get(&peer).unwrap();
2465        assert_eq!(p.state, PeerConnectionState::PendingOut);
2466
2467        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2468        let p = peers.peers.get(&peer).unwrap();
2469        assert_eq!(p.state, PeerConnectionState::PendingOut);
2470
2471        peers.on_active_session_gracefully_closed(peer);
2472        assert!(!peers.peers.contains_key(&peer));
2473    }
2474
2475    #[tokio::test]
2476    async fn test_fatal_outgoing_connection_error_trusted() {
2477        let peer = PeerId::random();
2478        let config = PeersConfig::test()
2479            .with_trusted_nodes(vec![TrustedPeer {
2480                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2481                tcp_port: 8008,
2482                udp_port: 8008,
2483                id: peer,
2484            }])
2485            .with_trusted_nodes_only(true);
2486        let mut peers = PeersManager::new(config);
2487        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2488
2489        match event!(peers) {
2490            PeerAction::Connect { peer_id, remote_addr } => {
2491                assert_eq!(peer_id, peer);
2492                assert_eq!(remote_addr, socket_addr);
2493            }
2494            _ => unreachable!(),
2495        }
2496
2497        let p = peers.peers.get(&peer).unwrap();
2498        assert_eq!(p.state, PeerConnectionState::PendingOut);
2499
2500        assert_eq!(peers.num_outbound_connections(), 0);
2501
2502        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2503            EthHandshakeError::NonStatusMessageInHandshake,
2504        ));
2505        assert!(err.is_fatal_protocol_error());
2506
2507        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2508        assert_eq!(peers.num_outbound_connections(), 0);
2509
2510        // try tmp ban peer
2511        match event!(peers) {
2512            PeerAction::BanPeer { peer_id } => {
2513                assert_eq!(peer_id, peer);
2514            }
2515            err => unreachable!("{err:?}"),
2516        }
2517
2518        // ensure we still have trusted peer
2519        assert!(peers.peers.contains_key(&peer));
2520
2521        // await for the ban to expire
2522        tokio::time::sleep(peers.backoff_durations.medium).await;
2523
2524        match event!(peers) {
2525            PeerAction::Connect { peer_id, remote_addr } => {
2526                assert_eq!(peer_id, peer);
2527                assert_eq!(remote_addr, socket_addr);
2528            }
2529            err => unreachable!("{err:?}"),
2530        }
2531    }
2532
2533    #[tokio::test]
2534    async fn test_outgoing_connection_error() {
2535        let peer = PeerId::random();
2536        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2537        let mut peers = PeersManager::default();
2538        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2539
2540        match event!(peers) {
2541            PeerAction::PeerAdded(peer_id) => {
2542                assert_eq!(peer_id, peer);
2543            }
2544            _ => unreachable!(),
2545        }
2546        match event!(peers) {
2547            PeerAction::Connect { peer_id, remote_addr } => {
2548                assert_eq!(peer_id, peer);
2549                assert_eq!(remote_addr, socket_addr);
2550            }
2551            _ => unreachable!(),
2552        }
2553
2554        let p = peers.peers.get(&peer).unwrap();
2555        assert_eq!(p.state, PeerConnectionState::PendingOut);
2556
2557        assert_eq!(peers.num_outbound_connections(), 0);
2558
2559        peers.on_outgoing_connection_failure(
2560            &socket_addr,
2561            &peer,
2562            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2563        );
2564
2565        assert_eq!(peers.num_outbound_connections(), 0);
2566    }
2567
2568    #[tokio::test]
2569    async fn test_outgoing_connection_gracefully_closed() {
2570        let peer = PeerId::random();
2571        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2572        let mut peers = PeersManager::default();
2573        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2574
2575        match event!(peers) {
2576            PeerAction::PeerAdded(peer_id) => {
2577                assert_eq!(peer_id, peer);
2578            }
2579            _ => unreachable!(),
2580        }
2581        match event!(peers) {
2582            PeerAction::Connect { peer_id, remote_addr } => {
2583                assert_eq!(peer_id, peer);
2584                assert_eq!(remote_addr, socket_addr);
2585            }
2586            _ => unreachable!(),
2587        }
2588
2589        let p = peers.peers.get(&peer).unwrap();
2590        assert_eq!(p.state, PeerConnectionState::PendingOut);
2591
2592        assert_eq!(peers.num_outbound_connections(), 0);
2593
2594        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2595
2596        assert_eq!(peers.num_outbound_connections(), 0);
2597        assert_eq!(peers.connection_info.num_pending_out, 0);
2598    }
2599
2600    #[tokio::test]
2601    async fn test_discovery_ban_list() {
2602        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2603        let socket_addr = SocketAddr::new(ip, 8008);
2604        let ban_list = BanList::new(vec![], vec![ip]);
2605        let config = PeersConfig::default().with_ban_list(ban_list);
2606        let mut peer_manager = PeersManager::new(config);
2607        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2608
2609        assert!(peer_manager.peers.is_empty());
2610    }
2611
2612    #[tokio::test]
2613    async fn test_on_pending_ban_list() {
2614        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2615        let socket_addr = SocketAddr::new(ip, 8008);
2616        let ban_list = BanList::new(vec![], vec![ip]);
2617        let config = PeersConfig::test().with_ban_list(ban_list);
2618        let mut peer_manager = PeersManager::new(config);
2619        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2620        // because we have no active peers this should be fine for testings
2621        match a {
2622            Ok(_) => panic!(),
2623            Err(err) => match err {
2624                InboundConnectionError::IpBanned => {
2625                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2626                }
2627                _ => unreachable!(),
2628            },
2629        }
2630    }
2631
2632    #[tokio::test]
2633    async fn test_on_active_inbound_ban_list() {
2634        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2635        let socket_addr = SocketAddr::new(ip, 8008);
2636        let given_peer_id = PeerId::random();
2637        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2638        let config = PeersConfig::test().with_ban_list(ban_list);
2639        let mut peer_manager = PeersManager::new(config);
2640        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2641        // non-trusted nodes should also increase pending_in
2642        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2643        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2644        // after the connection is established, the peer should be removed, the num_pending_in
2645        // should be decreased, and the num_inbound should not be increased
2646        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2647        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2648
2649        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2650            peer_manager.queued_actions.pop_front()
2651        else {
2652            panic!()
2653        };
2654
2655        assert_eq!(peer_id, given_peer_id)
2656    }
2657
2658    #[test]
2659    fn test_connection_limits() {
2660        let mut info = ConnectionInfo::default();
2661        info.inc_in();
2662        assert_eq!(info.num_inbound, 1);
2663        assert_eq!(info.num_outbound, 0);
2664        assert!(info.has_in_capacity());
2665
2666        info.decr_in();
2667        assert_eq!(info.num_inbound, 0);
2668        assert_eq!(info.num_outbound, 0);
2669
2670        info.inc_out();
2671        assert_eq!(info.num_inbound, 0);
2672        assert_eq!(info.num_outbound, 1);
2673        assert!(info.has_out_capacity());
2674
2675        info.decr_out();
2676        assert_eq!(info.num_inbound, 0);
2677        assert_eq!(info.num_outbound, 0);
2678    }
2679
2680    #[test]
2681    fn test_connection_peer_state() {
2682        let mut info = ConnectionInfo::default();
2683        info.inc_in();
2684
2685        info.decr_state(PeerConnectionState::In);
2686        assert_eq!(info.num_inbound, 0);
2687        assert_eq!(info.num_outbound, 0);
2688
2689        info.inc_out();
2690
2691        info.decr_state(PeerConnectionState::Out);
2692        assert_eq!(info.num_inbound, 0);
2693        assert_eq!(info.num_outbound, 0);
2694    }
2695
2696    #[tokio::test]
2697    async fn test_trusted_peers_are_prioritized() {
2698        let trusted_peer = PeerId::random();
2699        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2700        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2701            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2702            tcp_port: 8008,
2703            udp_port: 8008,
2704            id: trusted_peer,
2705        }]);
2706        let mut peers = PeersManager::new(config);
2707
2708        let basic_peer = PeerId::random();
2709        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2710        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2711
2712        match event!(peers) {
2713            PeerAction::PeerAdded(peer_id) => {
2714                assert_eq!(peer_id, basic_peer);
2715            }
2716            _ => unreachable!(),
2717        }
2718        match event!(peers) {
2719            PeerAction::Connect { peer_id, remote_addr } => {
2720                assert_eq!(peer_id, trusted_peer);
2721                assert_eq!(remote_addr, trusted_sock);
2722            }
2723            _ => unreachable!(),
2724        }
2725        match event!(peers) {
2726            PeerAction::Connect { peer_id, remote_addr } => {
2727                assert_eq!(peer_id, basic_peer);
2728                assert_eq!(remote_addr, basic_sock);
2729            }
2730            _ => unreachable!(),
2731        }
2732    }
2733
2734    #[tokio::test]
2735    async fn test_connect_trusted_nodes_only() {
2736        let trusted_peer = PeerId::random();
2737        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2738        let config = PeersConfig::test()
2739            .with_trusted_nodes(vec![TrustedPeer {
2740                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2741                tcp_port: 8008,
2742                udp_port: 8008,
2743                id: trusted_peer,
2744            }])
2745            .with_trusted_nodes_only(true);
2746        let mut peers = PeersManager::new(config);
2747
2748        let basic_peer = PeerId::random();
2749        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2750        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2751
2752        match event!(peers) {
2753            PeerAction::PeerAdded(peer_id) => {
2754                assert_eq!(peer_id, basic_peer);
2755            }
2756            _ => unreachable!(),
2757        }
2758        match event!(peers) {
2759            PeerAction::Connect { peer_id, remote_addr } => {
2760                assert_eq!(peer_id, trusted_peer);
2761                assert_eq!(remote_addr, trusted_sock);
2762            }
2763            _ => unreachable!(),
2764        }
2765        poll_fn(|cx| {
2766            assert!(peers.poll(cx).is_pending());
2767            Poll::Ready(())
2768        })
2769        .await;
2770    }
2771
2772    #[tokio::test]
2773    async fn test_incoming_with_trusted_nodes_only() {
2774        let trusted_peer = PeerId::random();
2775        let config = PeersConfig::test()
2776            .with_trusted_nodes(vec![TrustedPeer {
2777                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2778                tcp_port: 8008,
2779                udp_port: 8008,
2780                id: trusted_peer,
2781            }])
2782            .with_trusted_nodes_only(true);
2783        let mut peers = PeersManager::new(config);
2784
2785        let basic_peer = PeerId::random();
2786        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2787        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2788        // non-trusted nodes should also increase pending_in
2789        assert_eq!(peers.connection_info.num_pending_in, 1);
2790        peers.on_incoming_session_established(basic_peer, basic_sock);
2791        // after the connection is established, the peer should be removed, the num_pending_in
2792        // should be decreased, and the num_inbound mut not be increased
2793        assert_eq!(peers.connection_info.num_pending_in, 0);
2794        assert_eq!(peers.connection_info.num_inbound, 0);
2795
2796        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2797            peers.queued_actions.pop_front()
2798        else {
2799            panic!()
2800        };
2801        assert_eq!(basic_peer, peer_id);
2802        assert!(!peers.peers.contains_key(&basic_peer));
2803    }
2804
2805    #[tokio::test]
2806    async fn test_incoming_without_trusted_nodes_only() {
2807        let trusted_peer = PeerId::random();
2808        let config = PeersConfig::test()
2809            .with_trusted_nodes(vec![TrustedPeer {
2810                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2811                tcp_port: 8008,
2812                udp_port: 8008,
2813                id: trusted_peer,
2814            }])
2815            .with_trusted_nodes_only(false);
2816        let mut peers = PeersManager::new(config);
2817
2818        let basic_peer = PeerId::random();
2819        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2820        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2821
2822        // non-trusted nodes should also increase pending_in
2823        assert_eq!(peers.connection_info.num_pending_in, 1);
2824        peers.on_incoming_session_established(basic_peer, basic_sock);
2825        // after the connection is established, the peer should be removed, the num_pending_in
2826        // should be decreased, and the num_inbound must be increased
2827        assert_eq!(peers.connection_info.num_pending_in, 0);
2828        assert_eq!(peers.connection_info.num_inbound, 1);
2829        assert!(peers.peers.contains_key(&basic_peer));
2830    }
2831
2832    #[tokio::test]
2833    async fn test_incoming_at_capacity() {
2834        let mut config = PeersConfig::test();
2835        config.connection_info.max_inbound = 1;
2836        let mut peers = PeersManager::new(config);
2837
2838        let peer = PeerId::random();
2839        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2840        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2841
2842        peers.on_incoming_session_established(peer, addr);
2843
2844        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2845        assert_eq!(
2846            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2847            InboundConnectionError::ExceedsCapacity
2848        );
2849    }
2850
2851    #[tokio::test]
2852    async fn test_incoming_rate_limit() {
2853        let config = PeersConfig {
2854            incoming_ip_throttle_duration: Duration::from_millis(100),
2855            ..PeersConfig::test()
2856        };
2857        let mut peers = PeersManager::new(config);
2858
2859        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2860        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2861        assert_eq!(
2862            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2863            InboundConnectionError::IpBanned
2864        );
2865
2866        peers.release_interval.reset_immediately();
2867        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2868
2869        // await unban
2870        poll_fn(|cx| loop {
2871            if peers.poll(cx).is_pending() {
2872                return Poll::Ready(());
2873            }
2874        })
2875        .await;
2876
2877        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2878        assert_eq!(
2879            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2880            InboundConnectionError::IpBanned
2881        );
2882    }
2883
2884    #[tokio::test]
2885    async fn test_tick() {
2886        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2887        let socket_addr = SocketAddr::new(ip, 8008);
2888        let config = PeersConfig::test();
2889        let mut peer_manager = PeersManager::new(config);
2890        let peer_id = PeerId::random();
2891        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2892
2893        tokio::time::sleep(Duration::from_secs(1)).await;
2894        peer_manager.tick();
2895
2896        // still unconnected
2897        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2898
2899        // mark as connected
2900        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2901
2902        tokio::time::sleep(Duration::from_secs(1)).await;
2903        peer_manager.tick();
2904
2905        // still at default reputation
2906        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2907
2908        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2909
2910        tokio::time::sleep(Duration::from_secs(1)).await;
2911        peer_manager.tick();
2912
2913        // tick applied
2914        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2915    }
2916
2917    #[tokio::test]
2918    async fn test_remove_incoming_after_disconnect() {
2919        let peer_id = PeerId::random();
2920        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2921        let mut peers = PeersManager::default();
2922
2923        peers.on_incoming_pending_session(addr.ip()).unwrap();
2924        peers.on_incoming_session_established(peer_id, addr);
2925        let peer = peers.peers.get(&peer_id).unwrap();
2926        assert_eq!(peer.state, PeerConnectionState::In);
2927        assert!(peer.remove_after_disconnect);
2928
2929        peers.on_active_session_gracefully_closed(peer_id);
2930        assert!(!peers.peers.contains_key(&peer_id))
2931    }
2932
2933    #[tokio::test]
2934    async fn test_keep_incoming_after_disconnect_if_discovered() {
2935        let peer_id = PeerId::random();
2936        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2937        let mut peers = PeersManager::default();
2938
2939        peers.on_incoming_pending_session(addr.ip()).unwrap();
2940        peers.on_incoming_session_established(peer_id, addr);
2941        let peer = peers.peers.get(&peer_id).unwrap();
2942        assert_eq!(peer.state, PeerConnectionState::In);
2943        assert!(peer.remove_after_disconnect);
2944
2945        // trigger discovery manually while the peer is still connected
2946        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2947
2948        peers.on_active_session_gracefully_closed(peer_id);
2949
2950        let peer = peers.peers.get(&peer_id).unwrap();
2951        assert_eq!(peer.state, PeerConnectionState::Idle);
2952        assert!(!peer.remove_after_disconnect);
2953    }
2954
2955    #[tokio::test]
2956    async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2957        let throttle_duration = Duration::from_millis(100);
2958        let config =
2959            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2960        let mut peers = PeersManager::new(config);
2961
2962        let peer_id = PeerId::random();
2963        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2964
2965        // Add as regular peer
2966        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2967
2968        match event!(peers) {
2969            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2970            _ => unreachable!(),
2971        }
2972
2973        match event!(peers) {
2974            PeerAction::Connect { .. } => {}
2975            _ => unreachable!(),
2976        }
2977
2978        // Simulate outbound connection established
2979        peers.on_active_outgoing_established(peer_id);
2980        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2981
2982        // Gracefully close the session
2983        peers.on_active_session_gracefully_closed(peer_id);
2984
2985        let peer = peers.peers.get(&peer_id).unwrap();
2986        assert_eq!(peer.state, PeerConnectionState::Idle);
2987        assert!(peer.backed_off);
2988
2989        // Verify the peer is in the backed_off_peers set
2990        assert!(peers.backed_off_peers.contains_key(&peer_id));
2991
2992        // Immediately try to poll - should not trigger any actions yet
2993        poll_fn(|cx| {
2994            assert!(peers.poll(cx).is_pending());
2995            Poll::Ready(())
2996        })
2997        .await;
2998
2999        // Peer should still be backed off
3000        assert!(peers.backed_off_peers.contains_key(&peer_id));
3001        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
3002
3003        // Sleep for the throttle duration
3004        tokio::time::sleep(throttle_duration).await;
3005
3006        // After throttle duration, event! will poll until we get a Connect action
3007        match event!(peers) {
3008            PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
3009            _ => unreachable!(),
3010        }
3011
3012        // After connection is initiated, peer should no longer be backed off
3013        assert!(!peers.backed_off_peers.contains_key(&peer_id));
3014        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
3015    }
3016
3017    #[tokio::test]
3018    async fn test_backed_off_peer_can_accept_incoming_connection() {
3019        let throttle_duration = Duration::from_millis(100);
3020        let config =
3021            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
3022        let mut peers = PeersManager::new(config);
3023
3024        let peer_id = PeerId::random();
3025        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
3026
3027        // Add as regular peer
3028        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
3029
3030        match event!(peers) {
3031            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
3032            _ => unreachable!(),
3033        }
3034
3035        match event!(peers) {
3036            PeerAction::Connect { .. } => {}
3037            _ => unreachable!(),
3038        }
3039
3040        // Simulate outbound connection established
3041        peers.on_active_outgoing_established(peer_id);
3042        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
3043
3044        // Gracefully close the session - this will back off the peer
3045        peers.on_active_session_gracefully_closed(peer_id);
3046
3047        let peer = peers.peers.get(&peer_id).unwrap();
3048        assert_eq!(peer.state, PeerConnectionState::Idle);
3049        assert!(peer.backed_off);
3050        assert!(peers.backed_off_peers.contains_key(&peer_id));
3051
3052        // Now simulate an incoming connection from the backed-off peer
3053        // First, handle the incoming pending session
3054        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
3055        assert_eq!(peers.connection_info.num_pending_in, 1);
3056
3057        // Establish the incoming session
3058        peers.on_incoming_session_established(peer_id, addr);
3059
3060        // Peer should have been added to incoming connections
3061        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3062        assert_eq!(peers.connection_info.num_inbound, 1);
3063
3064        // Peer should still be backed off for outbound connections
3065        assert!(peers.backed_off_peers.contains_key(&peer_id));
3066        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
3067
3068        // Verify we don't try to reconnect outbound while peer is backed off
3069        poll_fn(|cx| {
3070            assert!(peers.poll(cx).is_pending());
3071            Poll::Ready(())
3072        })
3073        .await;
3074
3075        // No outbound connection should be attempted while backed off
3076        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3077
3078        // After throttle duration, the backoff should be cleared
3079        tokio::time::sleep(throttle_duration).await;
3080
3081        poll_fn(|cx| {
3082            let _ = peers.poll(cx);
3083            Poll::Ready(())
3084        })
3085        .await;
3086
3087        // Backoff should be cleared now
3088        assert!(!peers.backed_off_peers.contains_key(&peer_id));
3089        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
3090
3091        // Peer should still be in incoming state
3092        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3093    }
3094
3095    #[tokio::test]
3096    async fn test_incoming_outgoing_already_connected() {
3097        let peer_id = PeerId::random();
3098        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
3099        let mut peers = PeersManager::default();
3100
3101        peers.on_incoming_pending_session(addr.ip()).unwrap();
3102        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
3103
3104        match event!(peers) {
3105            PeerAction::PeerAdded(_) => {}
3106            _ => unreachable!(),
3107        }
3108
3109        match event!(peers) {
3110            PeerAction::Connect { .. } => {}
3111            _ => unreachable!(),
3112        }
3113
3114        peers.on_incoming_session_established(peer_id, addr);
3115        peers.on_already_connected(Direction::Outgoing(peer_id));
3116        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3117        assert_eq!(peers.connection_info.num_inbound, 1);
3118        assert_eq!(peers.connection_info.num_pending_out, 0);
3119        assert_eq!(peers.connection_info.num_pending_in, 0);
3120        assert_eq!(peers.connection_info.num_outbound, 0);
3121    }
3122
3123    #[tokio::test]
3124    async fn test_already_connected_incoming_outgoing_connection_error() {
3125        let peer_id = PeerId::random();
3126        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
3127        let mut peers = PeersManager::default();
3128
3129        peers.on_incoming_pending_session(addr.ip()).unwrap();
3130        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
3131
3132        match event!(peers) {
3133            PeerAction::PeerAdded(_) => {}
3134            _ => unreachable!(),
3135        }
3136
3137        match event!(peers) {
3138            PeerAction::Connect { .. } => {}
3139            _ => unreachable!(),
3140        }
3141
3142        peers.on_incoming_session_established(peer_id, addr);
3143
3144        peers.on_outgoing_connection_failure(
3145            &addr,
3146            &peer_id,
3147            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
3148        );
3149        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
3150        assert_eq!(peers.connection_info.num_inbound, 1);
3151        assert_eq!(peers.connection_info.num_pending_out, 0);
3152        assert_eq!(peers.connection_info.num_pending_in, 0);
3153        assert_eq!(peers.connection_info.num_outbound, 0);
3154    }
3155
3156    #[tokio::test]
3157    async fn test_max_concurrent_dials() {
3158        let config = PeersConfig::default();
3159        let mut peer_manager = PeersManager::new(config);
3160        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
3161        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
3162        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
3163            peer_manager.add_peer(PeerId::random(), peer_addr, None);
3164        }
3165
3166        peer_manager.fill_outbound_slots();
3167        let dials = peer_manager
3168            .queued_actions
3169            .iter()
3170            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
3171            .count();
3172        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
3173    }
3174
3175    #[tokio::test]
3176    async fn test_max_num_of_pending_dials() {
3177        let config = PeersConfig::default();
3178        let mut peer_manager = PeersManager::new(config);
3179        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
3180        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
3181
3182        // add more peers than allowed
3183        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
3184            peer_manager.add_peer(PeerId::random(), peer_addr, None);
3185        }
3186
3187        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
3188            match event!(peer_manager) {
3189                PeerAction::PeerAdded(_) => {}
3190                _ => unreachable!(),
3191            }
3192        }
3193
3194        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
3195            match event!(peer_manager) {
3196                PeerAction::Connect { .. } => {}
3197                _ => unreachable!(),
3198            }
3199        }
3200
3201        // generate 'Connect' actions
3202        peer_manager.fill_outbound_slots();
3203
3204        // all dialed connections should be in 'PendingOut' state
3205        let dials = peer_manager.connection_info.num_pending_out;
3206        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
3207
3208        let num_pendingout_states = peer_manager
3209            .peers
3210            .iter()
3211            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
3212            .map(|(peer_id, _)| *peer_id)
3213            .collect::<Vec<PeerId>>();
3214        assert_eq!(
3215            num_pendingout_states.len(),
3216            peer_manager.connection_info.config.max_concurrent_outbound_dials
3217        );
3218
3219        // establish dialed connections
3220        for peer_id in &num_pendingout_states {
3221            peer_manager.on_active_outgoing_established(*peer_id);
3222        }
3223
3224        // all dialed connections should now be in 'Out' state
3225        for peer_id in &num_pendingout_states {
3226            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
3227        }
3228
3229        // no more pending outbound connections
3230        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
3231    }
3232
3233    #[tokio::test]
3234    async fn test_connect() {
3235        let peer = PeerId::random();
3236        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3237        let mut peers = PeersManager::default();
3238        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3239        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3240
3241        match event!(peers) {
3242            PeerAction::Connect { peer_id, remote_addr } => {
3243                assert_eq!(peer_id, peer);
3244                assert_eq!(remote_addr, socket_addr);
3245            }
3246            _ => unreachable!(),
3247        }
3248
3249        let (record, _) = peers.peer_by_id(peer).unwrap();
3250        assert_eq!(record.tcp_addr(), socket_addr);
3251        assert_eq!(record.udp_addr(), socket_addr);
3252
3253        // connect again
3254        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3255
3256        let (record, _) = peers.peer_by_id(peer).unwrap();
3257        assert_eq!(record.tcp_addr(), socket_addr);
3258        assert_eq!(record.udp_addr(), socket_addr);
3259    }
3260
3261    #[tokio::test]
3262    async fn test_incoming_connection_from_banned() {
3263        let peer = PeerId::random();
3264        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3265        let config = PeersConfig::test().with_max_inbound(3);
3266        let mut peers = PeersManager::new(config);
3267        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
3268
3269        match event!(peers) {
3270            PeerAction::PeerAdded(peer_id) => {
3271                assert_eq!(peer_id, peer);
3272            }
3273            _ => unreachable!(),
3274        }
3275        match event!(peers) {
3276            PeerAction::Connect { peer_id, .. } => {
3277                assert_eq!(peer_id, peer);
3278            }
3279            _ => unreachable!(),
3280        }
3281
3282        poll_fn(|cx| {
3283            assert!(peers.poll(cx).is_pending());
3284            Poll::Ready(())
3285        })
3286        .await;
3287
3288        // simulate new connection drops with error
3289        loop {
3290            peers.on_active_session_dropped(
3291                &socket_addr,
3292                &peer,
3293                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3294                    reth_eth_wire::EthVersion::Eth68,
3295                    reth_eth_wire::EthMessageID::Status,
3296                )),
3297            );
3298
3299            if peers.peers.get(&peer).unwrap().is_banned() {
3300                break;
3301            }
3302
3303            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3304            peers.on_incoming_session_established(peer, socket_addr);
3305
3306            match event!(peers) {
3307                PeerAction::Connect { peer_id, .. } => {
3308                    assert_eq!(peer_id, peer);
3309                }
3310                _ => unreachable!(),
3311            }
3312        }
3313
3314        assert!(peers.peers.get(&peer).unwrap().is_banned());
3315
3316        // fill all incoming slots
3317        for _ in 0..peers.connection_info.config.max_inbound {
3318            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3319            peers.on_incoming_session_established(peer, socket_addr);
3320
3321            match event!(peers) {
3322                PeerAction::DisconnectBannedIncoming { peer_id } => {
3323                    assert_eq!(peer_id, peer);
3324                }
3325                _ => unreachable!(),
3326            }
3327        }
3328
3329        poll_fn(|cx| {
3330            assert!(peers.poll(cx).is_pending());
3331            Poll::Ready(())
3332        })
3333        .await;
3334
3335        assert_eq!(peers.connection_info.num_inbound, 0);
3336
3337        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3338
3339        // Assert we can still accept new connections
3340        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3341        assert_eq!(peers.connection_info.num_pending_in, 1);
3342
3343        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
3344        // connection info is updated via the peer's state which would be a noop here since the
3345        // banned peer's state is idle
3346        peers.on_active_session_gracefully_closed(peer);
3347        assert_eq!(peers.connection_info.num_inbound, 0);
3348    }
3349
3350    #[tokio::test]
3351    async fn test_add_pending_connect() {
3352        let peer = PeerId::random();
3353        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3354        let mut peers = PeersManager::default();
3355        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3356        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3357        assert_eq!(peers.connection_info.num_pending_out, 1);
3358    }
3359
3360    #[tokio::test]
3361    async fn test_dns_updates_peer_address() {
3362        let peer_id = PeerId::random();
3363        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3364        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3365
3366        let trusted = TrustedPeer {
3367            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3368            tcp_port: 8008,
3369            udp_port: 8008,
3370            id: peer_id,
3371        };
3372
3373        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3374        let mut manager = PeersManager::new(config);
3375        manager
3376            .trusted_peers_resolver
3377            .set_interval(tokio::time::interval(Duration::from_millis(1)));
3378
3379        manager.peers.insert(
3380            peer_id,
3381            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3382        );
3383
3384        for _ in 0..100 {
3385            let _ = event!(manager);
3386            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3387                break;
3388            }
3389            tokio::time::sleep(Duration::from_millis(10)).await;
3390        }
3391
3392        let updated_peer = manager.peers.get(&peer_id).unwrap();
3393        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3394    }
3395
3396    #[tokio::test]
3397    async fn test_ip_filter_blocks_inbound_connection() {
3398        use reth_net_banlist::IpFilter;
3399        use std::net::IpAddr;
3400
3401        // Create a filter that only allows 192.168.0.0/16
3402        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3403        let config = PeersConfig::test().with_ip_filter(ip_filter);
3404        let mut peers = PeersManager::new(config);
3405
3406        // Try to connect from an allowed IP
3407        let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3408        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3409
3410        // Try to connect from a disallowed IP
3411        let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3412        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3413    }
3414
3415    #[tokio::test]
3416    async fn test_ip_filter_blocks_outbound_connection() {
3417        use reth_net_banlist::IpFilter;
3418        use std::net::SocketAddr;
3419
3420        // Create a filter that only allows 192.168.0.0/16
3421        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3422        let config = PeersConfig::test().with_ip_filter(ip_filter);
3423        let mut peers = PeersManager::new(config);
3424
3425        let peer_id = PeerId::new([1; 64]);
3426
3427        // Try to add a peer with an allowed IP
3428        let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3429        peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3430        assert!(peers.peers.contains_key(&peer_id));
3431
3432        // Try to add a peer with a disallowed IP
3433        let peer_id2 = PeerId::new([2; 64]);
3434        let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3435        peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3436        assert!(!peers.peers.contains_key(&peer_id2));
3437    }
3438
3439    #[tokio::test]
3440    async fn test_ip_filter_ipv6() {
3441        use reth_net_banlist::IpFilter;
3442        use std::net::IpAddr;
3443
3444        // Create a filter that only allows IPv6 range 2001:db8::/32
3445        let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3446        let config = PeersConfig::test().with_ip_filter(ip_filter);
3447        let mut peers = PeersManager::new(config);
3448
3449        // Try to connect from an allowed IPv6 address
3450        let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3451        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3452
3453        // Try to connect from a disallowed IPv6 address
3454        let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3455        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3456    }
3457
3458    #[tokio::test]
3459    async fn test_ip_filter_multiple_ranges() {
3460        use reth_net_banlist::IpFilter;
3461        use std::net::IpAddr;
3462
3463        // Create a filter that allows multiple ranges
3464        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3465        let config = PeersConfig::test().with_ip_filter(ip_filter);
3466        let mut peers = PeersManager::new(config);
3467
3468        // Try IPs from both allowed ranges
3469        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3470        let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3471        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3472        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3473
3474        // Try IP from disallowed range
3475        let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3476        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3477    }
3478
3479    #[tokio::test]
3480    async fn test_ip_filter_no_restriction() {
3481        use reth_net_banlist::IpFilter;
3482        use std::net::IpAddr;
3483
3484        // Create a filter with no restrictions (allow all)
3485        let ip_filter = IpFilter::allow_all();
3486        let config = PeersConfig::test().with_ip_filter(ip_filter);
3487        let mut peers = PeersManager::new(config);
3488
3489        // All IPs should be allowed
3490        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3491        let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3492        let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3493        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3494        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3495        assert!(peers.on_incoming_pending_session(ip3).is_ok());
3496    }
3497
3498    #[tokio::test]
3499    async fn test_best_unconnected_prefers_fork_id_as_tiebreaker() {
3500        let mut peers = PeersManager::default();
3501        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8008);
3502
3503        let fork_id = ForkId { hash: ForkHash([0xaa, 0xbb, 0xcc, 0xdd]), next: 0 };
3504
3505        // add two peers with equal reputation, only one has a fork_id
3506        let no_fork = PeerId::random();
3507        peers.add_peer(no_fork, PeerAddr::from_tcp(addr), None);
3508
3509        let with_fork = PeerId::random();
3510        peers.add_peer(with_fork, PeerAddr::from_tcp(addr), None);
3511        peers.peers.get_mut(&with_fork).unwrap().fork_id = Some(Box::new(fork_id));
3512
3513        let (best_id, _) = peers.best_unconnected().unwrap();
3514        assert_eq!(best_id, with_fork, "fork_id should break tie when reputation is equal");
3515    }
3516
3517    #[tokio::test]
3518    async fn test_add_trusted_peer_node_resolves_absent_peer() {
3519        let peer_id = PeerId::random();
3520        let trusted = TrustedPeer {
3521            host: url::Host::Domain("example.invalid".to_string()),
3522            tcp_port: 30303,
3523            udp_port: 30303,
3524            id: peer_id,
3525        };
3526
3527        let mut manager = PeersManager::default();
3528        manager.add_trusted_peer_node(trusted);
3529
3530        assert!(manager.trusted_peer_ids.contains(&peer_id));
3531        assert_eq!(manager.trusted_peers_resolver.trusted_peers.len(), 1);
3532        assert!(!manager.peers.contains_key(&peer_id));
3533
3534        let resolved = NodeRecord {
3535            address: "10.0.0.1".parse::<IpAddr>().unwrap(),
3536            tcp_port: 30303,
3537            udp_port: 30303,
3538            id: peer_id,
3539        };
3540        manager.on_resolved_peer(peer_id, resolved);
3541
3542        let peer = manager.peers.get(&peer_id).expect("peer should be added after resolution");
3543        assert!(peer.kind.is_trusted());
3544        assert_eq!(peer.addr.tcp().ip(), "10.0.0.1".parse::<IpAddr>().unwrap());
3545    }
3546
3547    #[tokio::test]
3548    async fn test_removed_trusted_peer_ignores_late_resolution() {
3549        let peer_id = PeerId::random();
3550        let trusted = TrustedPeer {
3551            host: url::Host::Domain("example.invalid".to_string()),
3552            tcp_port: 30303,
3553            udp_port: 30303,
3554            id: peer_id,
3555        };
3556
3557        let mut manager = PeersManager::default();
3558        manager.add_trusted_peer_node(trusted);
3559        manager.remove_peer_from_trusted_set(peer_id);
3560
3561        let resolved = NodeRecord {
3562            address: "10.0.0.1".parse::<IpAddr>().unwrap(),
3563            tcp_port: 30303,
3564            udp_port: 30303,
3565            id: peer_id,
3566        };
3567        manager.on_resolved_peer(peer_id, resolved);
3568
3569        assert!(!manager.peers.contains_key(&peer_id));
3570        assert!(!manager.trusted_peer_ids.contains(&peer_id));
3571    }
3572
3573    #[tokio::test]
3574    async fn test_rotation_disconnects_eligible_peer() {
3575        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3576        let peer = PeerId::random();
3577        let mut peers = PeersManager::new(
3578            PeersConfig::test()
3579                .with_max_outbound(1)
3580                .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3581        );
3582
3583        peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3584        match event!(peers) {
3585            PeerAction::PeerAdded(_) => {}
3586            _ => unreachable!(),
3587        }
3588        match event!(peers) {
3589            PeerAction::Connect { .. } => {}
3590            _ => unreachable!(),
3591        }
3592        peers.on_active_outgoing_established(peer);
3593
3594        set_connected_at(
3595            &mut peers,
3596            peer,
3597            std::time::Instant::now() - Duration::from_secs(11 * 60),
3598        );
3599
3600        tokio::time::sleep(Duration::from_millis(100)).await;
3601
3602        match event!(peers) {
3603            PeerAction::Disconnect { peer_id, reason } => {
3604                assert_eq!(peer_id, peer);
3605                assert_eq!(reason, Some(DisconnectReason::UselessPeer));
3606            }
3607            _ => unreachable!(),
3608        }
3609    }
3610
3611    #[tokio::test]
3612    async fn test_rotation_then_remove_uses_active_session_removal() {
3613        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 7)), 8008);
3614        let peer = PeerId::random();
3615        let mut peers = PeersManager::new(
3616            PeersConfig::test()
3617                .with_max_outbound(1)
3618                .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3619        );
3620
3621        peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3622        match event!(peers) {
3623            PeerAction::PeerAdded(_) => {}
3624            _ => unreachable!(),
3625        }
3626        match event!(peers) {
3627            PeerAction::Connect { .. } => {}
3628            _ => unreachable!(),
3629        }
3630        peers.on_active_outgoing_established(peer);
3631        set_connected_at(
3632            &mut peers,
3633            peer,
3634            std::time::Instant::now() - Duration::from_secs(11 * 60),
3635        );
3636
3637        tokio::time::sleep(Duration::from_millis(100)).await;
3638
3639        match event!(peers) {
3640            PeerAction::Disconnect { peer_id, reason } => {
3641                assert_eq!(peer_id, peer);
3642                assert_eq!(reason, Some(DisconnectReason::UselessPeer));
3643            }
3644            _ => unreachable!(),
3645        }
3646        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::Out);
3647        assert_eq!(peers.connection_info.num_outbound, 1);
3648
3649        peers.remove_peer(peer);
3650        match event!(peers) {
3651            PeerAction::PeerRemoved(peer_id) => assert_eq!(peer_id, peer),
3652            _ => unreachable!(),
3653        }
3654        match event!(peers) {
3655            PeerAction::Disconnect { peer_id, reason } => {
3656                assert_eq!(peer_id, peer);
3657                assert_eq!(reason, Some(DisconnectReason::DisconnectRequested));
3658            }
3659            _ => unreachable!(),
3660        }
3661
3662        let p = peers.peers.get(&peer).unwrap();
3663        assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
3664        assert!(p.remove_after_disconnect);
3665        assert_eq!(peers.connection_info.num_outbound, 1);
3666
3667        peers.on_active_session_gracefully_closed(peer);
3668        assert_eq!(peers.connection_info.num_outbound, 0);
3669        assert!(!peers.peers.contains_key(&peer));
3670    }
3671
3672    #[tokio::test]
3673    async fn test_rotation_skips_trusted_peers() {
3674        let _addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3675        let peer = PeerId::random();
3676        let trusted = TrustedPeer {
3677            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 3)),
3678            tcp_port: 8008,
3679            udp_port: 8008,
3680            id: peer,
3681        };
3682        let mut peers = PeersManager::new(
3683            PeersConfig::test()
3684                .with_max_outbound(1)
3685                .with_trusted_nodes(vec![trusted])
3686                .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3687        );
3688
3689        match event!(peers) {
3690            PeerAction::Connect { .. } => {}
3691            _ => unreachable!(),
3692        }
3693        peers.on_active_outgoing_established(peer);
3694        set_connected_at(
3695            &mut peers,
3696            peer,
3697            std::time::Instant::now() - Duration::from_secs(11 * 60),
3698        );
3699
3700        tokio::time::sleep(Duration::from_millis(100)).await;
3701
3702        poll_fn(|cx| {
3703            assert!(peers.poll(cx).is_pending(), "trusted peer must not be rotated");
3704            Poll::Ready(())
3705        })
3706        .await;
3707    }
3708
3709    #[tokio::test]
3710    async fn test_rotation_skips_recently_connected_peers() {
3711        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 4)), 8008);
3712        let peer = PeerId::random();
3713        let mut peers = PeersManager::new(
3714            PeersConfig::test()
3715                .with_max_outbound(1)
3716                .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3717        );
3718
3719        peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3720        match event!(peers) {
3721            PeerAction::PeerAdded(_) => {}
3722            _ => unreachable!(),
3723        }
3724        match event!(peers) {
3725            PeerAction::Connect { .. } => {}
3726            _ => unreachable!(),
3727        }
3728        peers.on_active_outgoing_established(peer);
3729
3730        tokio::time::sleep(Duration::from_millis(100)).await;
3731
3732        poll_fn(|cx| {
3733            assert!(peers.poll(cx).is_pending(), "recently connected peer must not be rotated");
3734            Poll::Ready(())
3735        })
3736        .await;
3737    }
3738
3739    #[tokio::test]
3740    async fn test_rotation_skips_when_slots_available() {
3741        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 5)), 8008);
3742        let peer = PeerId::random();
3743        let mut peers = PeersManager::new(
3744            PeersConfig::test()
3745                .with_max_outbound(2)
3746                .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3747        );
3748
3749        peers.add_peer(peer, PeerAddr::from_tcp(addr), None);
3750        match event!(peers) {
3751            PeerAction::PeerAdded(_) => {}
3752            _ => unreachable!(),
3753        }
3754        match event!(peers) {
3755            PeerAction::Connect { .. } => {}
3756            _ => unreachable!(),
3757        }
3758        peers.on_active_outgoing_established(peer);
3759        set_connected_at(
3760            &mut peers,
3761            peer,
3762            std::time::Instant::now() - Duration::from_secs(11 * 60),
3763        );
3764
3765        tokio::time::sleep(Duration::from_millis(100)).await;
3766
3767        poll_fn(|cx| {
3768            assert!(
3769                peers.poll(cx).is_pending(),
3770                "rotation must not fire when outbound slots are available"
3771            );
3772            Poll::Ready(())
3773        })
3774        .await;
3775    }
3776
3777    #[tokio::test]
3778    async fn test_rotation_disconnects_inbound_peer() {
3779        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 6)), 8008);
3780        let peer = PeerId::random();
3781        let mut peers = PeersManager::new(
3782            PeersConfig::test()
3783                .with_max_inbound(1)
3784                .with_peer_rotation_interval(Some(Duration::from_millis(50))),
3785        );
3786
3787        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
3788        peers.on_incoming_session_established(peer, addr);
3789
3790        match event!(peers) {
3791            PeerAction::PeerAdded(_) => {}
3792            _ => unreachable!(),
3793        }
3794
3795        set_connected_at(
3796            &mut peers,
3797            peer,
3798            std::time::Instant::now() - Duration::from_secs(11 * 60),
3799        );
3800
3801        tokio::time::sleep(Duration::from_millis(100)).await;
3802
3803        match event!(peers) {
3804            PeerAction::Disconnect { peer_id, reason } => {
3805                assert_eq!(peer_id, peer);
3806                assert_eq!(reason, Some(DisconnectReason::UselessPeer));
3807            }
3808            _ => unreachable!(),
3809        }
3810    }
3811}