Skip to main content

reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    is_connection_failed_reputation,
18    peers::{
19        config::PeerBackoffDurations,
20        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21    },
22    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23    PersistedPeerInfo, ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27    fmt::Display,
28    io::{self},
29    net::{IpAddr, SocketAddr},
30    task::{Context, Poll},
31    time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35    sync::mpsc,
36    time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41/// Maintains the state of _all_ the peers known to the network.
42///
43/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
44/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
45///
46/// The [`PeersManager`] will be notified on peer related changes
47#[derive(Debug)]
48pub struct PeersManager {
49    /// All peers known to the network
50    peers: HashMap<PeerId, Peer>,
51    /// The set of trusted peer ids.
52    ///
53    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
54    /// an address: [`Self::add_trusted_peer_id`]
55    trusted_peer_ids: HashSet<PeerId>,
56    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
57    /// peer's address when the DNS records change.
58    trusted_peers_resolver: TrustedPeersResolver,
59    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
60    manager_tx: mpsc::UnboundedSender<PeerCommand>,
61    /// Receiver half of the command channel.
62    handle_rx: UnboundedReceiverStream<PeerCommand>,
63    /// Buffered actions until the manager is polled.
64    queued_actions: VecDeque<PeerAction>,
65    /// Interval for triggering connections if there are free slots.
66    refill_slots_interval: Interval,
67    /// How to weigh reputation changes
68    reputation_weights: ReputationChangeWeights,
69    /// Tracks current slot stats.
70    connection_info: ConnectionInfo,
71    /// Tracks unwanted ips/peer ids.
72    ban_list: BanList,
73    /// Tracks currently backed off peers.
74    backed_off_peers: HashMap<PeerId, std::time::Instant>,
75    /// Interval at which to check for peers to unban and release from the backoff map.
76    release_interval: Interval,
77    /// How long to ban bad peers.
78    ban_duration: Duration,
79    /// How long peers to which we could not connect for non-fatal reasons, e.g.
80    /// [`DisconnectReason::TooManyPeers`], are put in time out.
81    backoff_durations: PeerBackoffDurations,
82    /// If non-trusted peers should be connected to, or the connection from non-trusted
83    /// incoming peers should be accepted.
84    trusted_nodes_only: bool,
85    /// Timestamp of the last time [`Self::tick`] was called.
86    last_tick: Instant,
87    /// Maximum number of backoff attempts before we give up on a peer and dropping.
88    max_backoff_count: u8,
89    /// Tracks the connection state of the node
90    net_connection_state: NetworkConnectionState,
91    /// How long to temporarily ban ip on an incoming connection attempt.
92    incoming_ip_throttle_duration: Duration,
93    /// IP address filter for restricting network connections to specific IP ranges.
94    ip_filter: reth_net_banlist::IpFilter,
95    /// If true, discovered peers without a confirmed ENR fork ID will not be added until their
96    /// fork ID is verified via EIP-868.
97    enforce_enr_fork_id: bool,
98}
99
100impl PeersManager {
101    /// Create a new instance with the given config
102    pub fn new(config: PeersConfig) -> Self {
103        let PeersConfig {
104            refill_slots_interval,
105            connection_info,
106            reputation_weights,
107            ban_list,
108            ban_duration,
109            backoff_durations,
110            trusted_nodes,
111            trusted_nodes_only,
112            trusted_nodes_resolution_interval,
113            basic_nodes,
114            persisted_peers,
115            max_backoff_count,
116            incoming_ip_throttle_duration,
117            ip_filter,
118            enforce_enr_fork_id,
119        } = config;
120        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
121        let now = Instant::now();
122
123        // We use half of the interval to decrease the max duration to `150%` in worst case
124        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
125
126        let mut peers =
127            HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len() + persisted_peers.len());
128        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
129
130        for trusted_peer in &trusted_nodes {
131            match trusted_peer.resolve_blocking() {
132                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
133                    trusted_peer_ids.insert(id);
134                    peers.entry(id).or_insert_with(|| {
135                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
136                    });
137                }
138                Err(err) => {
139                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
140                }
141            }
142        }
143
144        for PersistedPeerInfo { record, kind, fork_id, reputation } in persisted_peers {
145            // When enforce_enr_fork_id is enabled, skip persisted peers that don't have a
146            // confirmed fork ID. These were likely accumulated from a different network during
147            // a prior run without the flag.
148            if enforce_enr_fork_id && fork_id.is_none() {
149                continue
150            }
151            let NodeRecord { address, tcp_port, udp_port, id } = record;
152            peers.entry(id).or_insert_with(|| {
153                let mut peer = Peer::with_kind(
154                    PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)),
155                    kind,
156                );
157                peer.fork_id = fork_id.map(Box::new);
158                peer.reputation = reputation;
159                peer
160            });
161        }
162
163        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
164            peers.entry(id).or_insert_with(|| {
165                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
166            });
167        }
168
169        trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
170
171        Self {
172            peers,
173            trusted_peer_ids,
174            trusted_peers_resolver: TrustedPeersResolver::new(
175                trusted_nodes,
176                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
177            ),
178            manager_tx,
179            handle_rx: UnboundedReceiverStream::new(handle_rx),
180            queued_actions: Default::default(),
181            reputation_weights,
182            refill_slots_interval: tokio::time::interval(refill_slots_interval),
183            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
184            connection_info: ConnectionInfo::new(connection_info),
185            ban_list,
186            backed_off_peers: Default::default(),
187            ban_duration,
188            backoff_durations,
189            trusted_nodes_only,
190            last_tick: Instant::now(),
191            max_backoff_count,
192            net_connection_state: NetworkConnectionState::default(),
193            incoming_ip_throttle_duration,
194            ip_filter,
195            enforce_enr_fork_id,
196        }
197    }
198
199    /// Returns a new [`PeersHandle`] that can send commands to this type.
200    pub(crate) fn handle(&self) -> PeersHandle {
201        PeersHandle::new(self.manager_tx.clone())
202    }
203
204    /// Returns `true` if discovered peers must have a confirmed ENR fork ID before being added.
205    pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
206        self.enforce_enr_fork_id
207    }
208
209    /// Returns the number of peers in the peer set
210    #[inline]
211    pub(crate) fn num_known_peers(&self) -> usize {
212        self.peers.len()
213    }
214
215    /// Returns an iterator over all peers as [`NodeRecord`]s.
216    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
217        self.peers.iter().map(|(peer_id, v)| {
218            NodeRecord::new_with_ports(
219                v.addr.tcp().ip(),
220                v.addr.tcp().port(),
221                v.addr.udp().map(|addr| addr.port()),
222                *peer_id,
223            )
224        })
225    }
226
227    /// Returns an iterator over peers suitable for persisting to disk.
228    ///
229    /// Filters out backed-off and banned peers, and includes metadata like kind, fork ID, and
230    /// reputation.
231    pub(crate) fn persistable_peers(&self) -> impl Iterator<Item = PersistedPeerInfo> + '_ {
232        self.peers.iter().filter(|(_, peer)| !peer.is_backed_off() && !peer.is_banned()).map(
233            |(peer_id, peer)| PersistedPeerInfo {
234                record: NodeRecord::new_with_ports(
235                    peer.addr.tcp().ip(),
236                    peer.addr.tcp().port(),
237                    peer.addr.udp().map(|addr| addr.port()),
238                    *peer_id,
239                ),
240                kind: peer.kind,
241                fork_id: peer.fork_id.as_deref().copied(),
242                reputation: peer.reputation,
243            },
244        )
245    }
246
247    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
248    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
249        self.peers.get(&peer_id).map(|v| {
250            (
251                NodeRecord::new_with_ports(
252                    v.addr.tcp().ip(),
253                    v.addr.tcp().port(),
254                    v.addr.udp().map(|addr| addr.port()),
255                    peer_id,
256                ),
257                v.kind,
258            )
259        })
260    }
261
262    /// Returns `true` if the given peer is connected via an inbound session.
263    pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
264        self.peers.get(peer_id).is_some_and(|p| {
265            matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
266        })
267    }
268
269    /// Returns an iterator over all peer ids for peers with the given kind
270    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
271        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
272    }
273
274    /// Returns the number of currently active inbound connections.
275    #[inline]
276    pub(crate) const fn num_inbound_connections(&self) -> usize {
277        self.connection_info.num_inbound
278    }
279
280    /// Returns the number of currently __active__ outbound connections.
281    #[inline]
282    pub(crate) const fn num_outbound_connections(&self) -> usize {
283        self.connection_info.num_outbound
284    }
285
286    /// Returns the number of currently pending outbound connections.
287    #[inline]
288    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
289        self.connection_info.num_pending_out
290    }
291
292    /// Returns the number of currently backed off peers.
293    #[inline]
294    pub(crate) fn num_backed_off_peers(&self) -> usize {
295        self.backed_off_peers.len()
296    }
297
298    /// Returns the number of idle trusted peers.
299    fn num_idle_trusted_peers(&self) -> usize {
300        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
301    }
302
303    /// Invoked when a new _incoming_ tcp connection is accepted.
304    ///
305    /// returns an error if the inbound ip address is on the ban list
306    pub(crate) fn on_incoming_pending_session(
307        &mut self,
308        addr: IpAddr,
309    ) -> Result<(), InboundConnectionError> {
310        // Check if the IP is in the allowed ranges (netrestrict)
311        if !self.ip_filter.is_allowed(&addr) {
312            trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
313            return Err(InboundConnectionError::IpBanned)
314        }
315
316        if self.ban_list.is_banned_ip(&addr) {
317            return Err(InboundConnectionError::IpBanned)
318        }
319
320        // check if we even have slots for a new incoming connection
321        if !self.connection_info.has_in_capacity() {
322            if self.trusted_peer_ids.is_empty() {
323                // if we don't have any incoming slots and no trusted peers, we don't accept any new
324                // connections
325                return Err(InboundConnectionError::ExceedsCapacity)
326            }
327
328            // there's an edge case here where no incoming connections besides from trusted peers
329            // are allowed (max_inbound == 0), in which case we still need to allow new pending
330            // incoming connections until all trusted peers are connected.
331            let num_idle_trusted_peers = self.num_idle_trusted_peers();
332            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
333                // we still want to limit concurrent pending connections
334                let max_inbound =
335                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
336                if self.connection_info.num_pending_in < max_inbound {
337                    self.connection_info.inc_pending_in();
338                    return Ok(())
339                }
340            }
341
342            // all trusted peers are either connected or connecting
343            return Err(InboundConnectionError::ExceedsCapacity)
344        }
345
346        // also cap the incoming connections we can process at once
347        if !self.connection_info.has_in_pending_capacity() {
348            return Err(InboundConnectionError::ExceedsCapacity)
349        }
350
351        // apply the rate limit
352        self.throttle_incoming_ip(addr);
353
354        self.connection_info.inc_pending_in();
355        Ok(())
356    }
357
358    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
359    /// rejected.
360    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
361        self.connection_info.decr_pending_in();
362    }
363
364    /// Invoked when a pending session was closed.
365    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
366        self.connection_info.decr_pending_in()
367    }
368
369    /// Invoked when a pending session was closed.
370    pub(crate) fn on_incoming_pending_session_dropped(
371        &mut self,
372        remote_addr: SocketAddr,
373        err: &PendingSessionHandshakeError,
374    ) {
375        if err.is_fatal_protocol_error() {
376            self.ban_ip(remote_addr.ip());
377
378            if err.merits_discovery_ban() {
379                self.queued_actions
380                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
381            }
382        }
383
384        self.connection_info.decr_pending_in();
385    }
386
387    /// Called when a new _incoming_ active session was established to the given peer.
388    ///
389    /// This will update the state of the peer if not yet tracked.
390    ///
391    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
392    /// be scheduled.
393    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
394        self.connection_info.decr_pending_in();
395
396        // we only need to check the peer id here as the ip address will have been checked at
397        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
398        if self.ban_list.is_banned_peer(&peer_id) {
399            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
400            return
401        }
402
403        // check if the peer is trustable or not
404        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
405        if self.trusted_nodes_only && !is_trusted {
406            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
407            return
408        }
409
410        // start a new tick, so the peer is not immediately rewarded for the time since last tick
411        self.tick();
412
413        match self.peers.entry(peer_id) {
414            Entry::Occupied(mut entry) => {
415                let peer = entry.get_mut();
416                if peer.is_banned() {
417                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
418                    return
419                }
420                // it might be the case that we're also trying to connect to this peer at the same
421                // time, so we need to adjust the state here
422                if peer.state.is_pending_out() {
423                    self.connection_info.decr_state(peer.state);
424                }
425
426                peer.state = PeerConnectionState::In;
427
428                is_trusted = is_trusted || peer.is_trusted();
429            }
430            Entry::Vacant(entry) => {
431                // peer is missing in the table, we add it but mark it as to be removed after
432                // disconnect, because we only know the outgoing port
433                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
434                peer.remove_after_disconnect = true;
435                entry.insert(peer);
436                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
437            }
438        }
439
440        let has_in_capacity = self.connection_info.has_in_capacity();
441        // increment new incoming connection
442        self.connection_info.inc_in();
443
444        // disconnect the peer if we don't have capacity for more inbound connections
445        if !is_trusted && !has_in_capacity {
446            self.queued_actions.push_back(PeerAction::Disconnect {
447                peer_id,
448                reason: Some(DisconnectReason::TooManyPeers),
449            });
450        }
451    }
452
453    /// Bans the peer temporarily with the configured ban timeout
454    fn ban_peer(&mut self, peer_id: PeerId) {
455        let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
456            (peer.is_trusted() || peer.is_static())
457        {
458            // For misbehaving trusted or static peers, we provide a bit more leeway when
459            // penalizing them.
460            self.backoff_durations.low / 2
461        } else {
462            self.ban_duration
463        };
464
465        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
466        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
467    }
468
469    /// Bans the IP temporarily with the configured ban timeout
470    fn ban_ip(&mut self, ip: IpAddr) {
471        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
472    }
473
474    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
475    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
476        self.ban_list
477            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
478    }
479
480    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
481    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
482        trace!(target: "net::peers", ?peer_id, "backing off");
483
484        if let Some(peer) = self.peers.get_mut(&peer_id) {
485            peer.backed_off = true;
486            self.backed_off_peers.insert(peer_id, until);
487        }
488    }
489
490    /// Unbans the peer
491    fn unban_peer(&mut self, peer_id: PeerId) {
492        self.ban_list.unban_peer(&peer_id);
493        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
494    }
495
496    /// Tick function to update reputation of all connected peers.
497    /// Peers are rewarded with reputation increases for the time they are connected since the last
498    /// tick. This is to prevent peers from being disconnected eventually due to slashed
499    /// reputation because of some bad messages (most likely transaction related)
500    fn tick(&mut self) {
501        let now = Instant::now();
502        // Determine the number of seconds since the last tick.
503        // Ensuring that now is always greater than last_tick to account for issues with system
504        // time.
505        let secs_since_last_tick =
506            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
507        self.last_tick = now;
508
509        // update reputation via seconds connected
510        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
511            // update reputation via seconds connected, but keep the target _around_ the default
512            // reputation.
513            if peer.1.reputation < DEFAULT_REPUTATION {
514                peer.1.reputation += secs_since_last_tick;
515            }
516        }
517    }
518
519    /// Returns the tracked reputation for a peer.
520    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
521        self.peers.get(peer_id).map(|peer| peer.reputation)
522    }
523
524    /// Apply the corresponding reputation change to the given peer.
525    ///
526    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
527    /// reputation changes that can be attributed to network conditions. If the peer is a
528    /// trusted peer, it will also be less strict with the reputation slashing.
529    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
530        trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
531
532        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
533            // First check if we should reset the reputation
534            if rep.is_reset() {
535                peer.reset_reputation()
536            } else {
537                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
538                if peer.is_trusted() || peer.is_static() {
539                    // exempt trusted and static peers from reputation slashing for
540                    if matches!(
541                        rep,
542                        ReputationChangeKind::Dropped |
543                            ReputationChangeKind::BadAnnouncement |
544                            ReputationChangeKind::Timeout |
545                            ReputationChangeKind::AlreadySeenTransaction
546                    ) {
547                        return
548                    }
549
550                    // also be less strict with the reputation slashing for trusted peers
551                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
552                        // this caps the reputation change to the maximum allowed for trusted peers
553                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
554                    }
555                }
556                peer.apply_reputation(reputation_change, rep)
557            }
558        } else {
559            return
560        };
561
562        match outcome {
563            ReputationChangeOutcome::None => {}
564            ReputationChangeOutcome::Ban => {
565                self.ban_peer(*peer_id);
566            }
567            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
568            ReputationChangeOutcome::DisconnectAndBan => {
569                self.queued_actions.push_back(PeerAction::Disconnect {
570                    peer_id: *peer_id,
571                    reason: Some(DisconnectReason::DisconnectRequested),
572                });
573                self.ban_peer(*peer_id);
574            }
575        }
576    }
577
578    /// Gracefully disconnected a pending _outgoing_ session
579    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
580        if let Some(peer) = self.peers.get_mut(peer_id) {
581            self.connection_info.decr_state(peer.state);
582            peer.state = PeerConnectionState::Idle;
583        }
584    }
585
586    /// Invoked when an _outgoing_ pending session was closed during authentication or the
587    /// handshake.
588    pub(crate) fn on_outgoing_pending_session_dropped(
589        &mut self,
590        remote_addr: &SocketAddr,
591        peer_id: &PeerId,
592        err: &PendingSessionHandshakeError,
593    ) {
594        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
595    }
596
597    /// Gracefully disconnected an active session
598    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
599        match self.peers.entry(peer_id) {
600            Entry::Occupied(mut entry) => {
601                trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
602                self.connection_info.decr_state(entry.get().state);
603
604                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
605                    // this peer should be removed from the set
606                    entry.remove();
607                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
608                } else {
609                    let peer = entry.get_mut();
610                    // reset the peer's state
611                    // we reset the backoff counter since we're able to establish a successful
612                    // session to that peer
613                    peer.severe_backoff_counter = 0;
614                    peer.state = PeerConnectionState::Idle;
615
616                    // but we're backing off slightly to avoid dialing the peer again right away, to
617                    // give the remote time to also properly register the closed session and clean
618                    // up and to avoid any issues with ip throttling on the remote in case this
619                    // session was terminated right away.
620                    peer.backed_off = true;
621                    self.backed_off_peers.insert(
622                        peer_id,
623                        std::time::Instant::now() + self.incoming_ip_throttle_duration,
624                    );
625                    trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
626                }
627            }
628            Entry::Vacant(_) => return,
629        }
630
631        self.fill_outbound_slots();
632    }
633
634    /// Called when a _pending_ outbound connection is successful.
635    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
636        if let Some(peer) = self.peers.get_mut(&peer_id) {
637            trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
638            self.connection_info.decr_state(peer.state);
639            self.connection_info.inc_out();
640            peer.state = PeerConnectionState::Out;
641        }
642    }
643
644    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
645    ///
646    /// Depending on whether the error is fatal, the peer will be removed from the peer set
647    /// otherwise its reputation is slashed.
648    pub(crate) fn on_active_session_dropped(
649        &mut self,
650        remote_addr: &SocketAddr,
651        peer_id: &PeerId,
652        err: &EthStreamError,
653    ) {
654        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
655    }
656
657    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
658    /// connection.
659    pub(crate) fn on_outgoing_connection_failure(
660        &mut self,
661        remote_addr: &SocketAddr,
662        peer_id: &PeerId,
663        err: &io::Error,
664    ) {
665        // there's a race condition where we accepted an incoming connection while we were trying to
666        // connect to the same peer at the same time. if the outgoing connection failed
667        // after the incoming connection was accepted, we can ignore this error
668        if let Some(peer) = self.peers.get(peer_id) {
669            if peer.state.is_incoming() {
670                // we already have an active connection to the peer, so we can ignore this error
671                return
672            }
673
674            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
675                // trigger resolution task for trusted peer since multiple connection failures
676                // occurred
677                self.trusted_peers_resolver.interval.reset_immediately();
678            }
679        }
680
681        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
682    }
683
684    fn on_connection_failure(
685        &mut self,
686        remote_addr: &SocketAddr,
687        peer_id: &PeerId,
688        err: impl SessionError,
689        reputation_change: ReputationChangeKind,
690    ) {
691        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
692
693        if err.is_fatal_protocol_error() {
694            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
695            // remove the peer to which we can't establish a connection due to protocol related
696            // issues.
697            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
698                self.connection_info.decr_state(entry.get().state);
699                // only remove if the peer is not trusted
700                if entry.get().is_trusted() {
701                    entry.get_mut().state = PeerConnectionState::Idle;
702                } else {
703                    entry.remove();
704                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
705                    // If the error is caused by a peer that should be banned from discovery
706                    if err.merits_discovery_ban() {
707                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
708                            peer_id: *peer_id,
709                            ip_addr: remote_addr.ip(),
710                        })
711                    }
712                }
713            }
714
715            // ban the peer
716            self.ban_peer(*peer_id);
717        } else {
718            let mut backoff_until = None;
719            let mut remove_peer = false;
720
721            if let Some(peer) = self.peers.get_mut(peer_id) {
722                if let Some(kind) = err.should_backoff() {
723                    if peer.is_trusted() || peer.is_static() {
724                        // provide a bit more leeway for trusted peers and use a lower backoff so
725                        // that we keep re-trying them after backing off shortly, but we should at
726                        // least backoff for the low duration to not violate the ip based inbound
727                        // connection throttle that peer has in place, because this peer might not
728                        // have us registered as a trusted peer.
729                        let backoff = self.backoff_durations.low;
730                        backoff_until = Some(std::time::Instant::now() + backoff);
731                        trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
732                    } else {
733                        // Increment peer.backoff_counter
734                        if kind.is_severe() {
735                            peer.severe_backoff_counter =
736                                peer.severe_backoff_counter.saturating_add(1);
737                        }
738                        trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
739
740                        let backoff_time =
741                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
742
743                        // The peer has signaled that it is currently unable to process any more
744                        // connections, so we will hold off on attempting any new connections for a
745                        // while
746                        backoff_until = Some(backoff_time);
747                    }
748                } else {
749                    // If the error was not a backoff error, we reduce the peer's reputation
750                    let reputation_change = self.reputation_weights.change(reputation_change);
751                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
752                };
753
754                self.connection_info.decr_state(peer.state);
755                peer.state = PeerConnectionState::Idle;
756
757                if peer.severe_backoff_counter > self.max_backoff_count &&
758                    !peer.is_trusted() &&
759                    !peer.is_static()
760                {
761                    // mark peer for removal if it has been backoff too many times and is _not_
762                    // trusted or static
763                    remove_peer = true;
764                }
765            }
766
767            // remove peer if it has been marked for removal
768            if remove_peer {
769                trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
770                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
771                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
772            } else if let Some(backoff_until) = backoff_until {
773                // otherwise, backoff the peer if marked as such
774                self.backoff_peer_until(*peer_id, backoff_until);
775            }
776        }
777
778        self.fill_outbound_slots();
779    }
780
781    /// Invoked if a pending session was disconnected because there's already a connection to the
782    /// peer.
783    ///
784    /// If the session was an outgoing connection, this means that the peer initiated a connection
785    /// to us at the same time and this connection is already established.
786    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
787        match direction {
788            Direction::Incoming => {
789                // need to decrement the ingoing counter
790                self.connection_info.decr_pending_in();
791            }
792            Direction::Outgoing(_) => {
793                // cleanup is handled when the incoming active session is activated in
794                // `on_incoming_session_established`
795            }
796        }
797    }
798
799    /// Called for a newly discovered peer.
800    ///
801    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
802    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
803        self.add_peer_kind(peer_id, None, addr, fork_id)
804    }
805
806    /// Marks the given peer as trusted.
807    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
808        self.trusted_peer_ids.insert(peer_id);
809        if let Some(peer) = self.peers.get_mut(&peer_id) {
810            peer.kind = PeerKind::Trusted;
811        }
812    }
813
814    /// Called for a newly discovered trusted peer.
815    ///
816    /// If the peer already exists, then the address and kind will be updated.
817    #[cfg_attr(not(test), expect(dead_code))]
818    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
819        self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
820    }
821
822    /// Called for a newly discovered peer.
823    ///
824    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
825    /// If the peer exists and a [`PeerKind`] is provided then the peer's kind is updated
826    pub(crate) fn add_peer_kind(
827        &mut self,
828        peer_id: PeerId,
829        kind: Option<PeerKind>,
830        addr: PeerAddr,
831        fork_id: Option<ForkId>,
832    ) {
833        let ip_addr = addr.tcp().ip();
834
835        // Check if the IP is in the allowed ranges (netrestrict)
836        if !self.ip_filter.is_allowed(&ip_addr) {
837            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
838            return
839        }
840
841        if self.ban_list.is_banned(&peer_id, &ip_addr) {
842            return
843        }
844
845        match self.peers.entry(peer_id) {
846            Entry::Occupied(mut entry) => {
847                let peer = entry.get_mut();
848                peer.fork_id = fork_id.map(Box::new);
849                peer.addr = addr;
850
851                if let Some(kind) = kind {
852                    peer.kind = kind;
853                }
854
855                if peer.state.is_incoming() {
856                    // now that we have an actual discovered address, for that peer and not just the
857                    // ip of the incoming connection, we don't need to remove the peer after
858                    // disconnecting, See `on_incoming_session_established`
859                    peer.remove_after_disconnect = false;
860                }
861            }
862            Entry::Vacant(entry) => {
863                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
864                let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
865                peer.fork_id = fork_id.map(Box::new);
866                entry.insert(peer);
867                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
868            }
869        }
870
871        if kind.is_some_and(|kind| kind.is_trusted()) {
872            // also track the peer in the peer id set
873            self.trusted_peer_ids.insert(peer_id);
874        }
875    }
876
877    /// Removes the tracked node from the set.
878    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
879        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
880        if entry.get().is_trusted() {
881            return
882        }
883        let mut peer = entry.remove();
884
885        trace!(target: "net::peers", ?peer_id, "remove discovered node");
886        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
887
888        if peer.state.is_connected() {
889            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
890            // we terminate the active session here, but only remove the peer after the session
891            // was disconnected, this prevents the case where the session is scheduled for
892            // disconnect but the node is immediately rediscovered, See also
893            // [`Self::on_disconnected()`]
894            peer.remove_after_disconnect = true;
895            peer.state.disconnect();
896            self.peers.insert(peer_id, peer);
897            self.queued_actions.push_back(PeerAction::Disconnect {
898                peer_id,
899                reason: Some(DisconnectReason::DisconnectRequested),
900            })
901        }
902    }
903
904    /// Connect to the given peer. NOTE: if the maximum number of outbound sessions is reached,
905    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
906    #[cfg_attr(not(test), expect(dead_code))]
907    pub(crate) fn add_and_connect(
908        &mut self,
909        peer_id: PeerId,
910        addr: PeerAddr,
911        fork_id: Option<ForkId>,
912    ) {
913        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
914    }
915
916    /// Connects a peer and its address with the given kind.
917    ///
918    /// Note: This is invoked on demand via an external command received by the manager
919    pub(crate) fn add_and_connect_kind(
920        &mut self,
921        peer_id: PeerId,
922        kind: PeerKind,
923        addr: PeerAddr,
924        fork_id: Option<ForkId>,
925    ) {
926        let ip_addr = addr.tcp().ip();
927
928        // Check if the IP is in the allowed ranges (netrestrict)
929        if !self.ip_filter.is_allowed(&ip_addr) {
930            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
931            return
932        }
933
934        if self.ban_list.is_banned(&peer_id, &ip_addr) {
935            return
936        }
937
938        match self.peers.entry(peer_id) {
939            Entry::Occupied(mut entry) => {
940                let peer = entry.get_mut();
941                peer.kind = kind;
942                peer.fork_id = fork_id.map(Box::new);
943                peer.addr = addr;
944
945                if peer.state == PeerConnectionState::Idle {
946                    // Try connecting again.
947                    peer.state = PeerConnectionState::PendingOut;
948                    self.connection_info.inc_pending_out();
949                    self.queued_actions
950                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
951                }
952            }
953            Entry::Vacant(entry) => {
954                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
955                let mut peer = Peer::with_kind(addr, kind);
956                peer.state = PeerConnectionState::PendingOut;
957                peer.fork_id = fork_id.map(Box::new);
958                entry.insert(peer);
959                self.connection_info.inc_pending_out();
960                self.queued_actions
961                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
962            }
963        }
964
965        if kind.is_trusted() {
966            self.trusted_peer_ids.insert(peer_id);
967        }
968    }
969
970    /// Removes the tracked node from the trusted set.
971    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
972        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
973        if !entry.get().is_trusted() {
974            return
975        }
976
977        let peer = entry.get_mut();
978        peer.kind = PeerKind::Basic;
979
980        self.trusted_peer_ids.remove(&peer_id);
981    }
982
983    /// Returns the best idle peer to connect to.
984    ///
985    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
986    /// not currently marked as banned or backed off.
987    ///
988    /// Among remaining peers, the one with the highest reputation is selected. When reputation is
989    /// equal, a peer with a discovered `fork_id` is preferred since it indicates a compatible fork.
990    ///
991    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
992    /// `trusted` peers.
993    ///
994    /// Returns `None` if no peer is available.
995    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
996        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
997            !peer.is_backed_off() &&
998                !peer.is_banned() &&
999                peer.state.is_unconnected() &&
1000                (!self.trusted_nodes_only || peer.is_trusted())
1001        });
1002
1003        // keep track of the best peer, if there's one
1004        let mut best_peer = unconnected.next()?;
1005
1006        if best_peer.1.is_trusted() || best_peer.1.is_static() {
1007            return Some((*best_peer.0, best_peer.1))
1008        }
1009
1010        for maybe_better in unconnected {
1011            // if the peer is trusted or static, return it immediately
1012            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
1013                return Some((*maybe_better.0, maybe_better.1))
1014            }
1015
1016            // prefer higher reputation, break ties by fork_id presence
1017            match maybe_better.1.reputation.cmp(&best_peer.1.reputation) {
1018                std::cmp::Ordering::Greater => best_peer = maybe_better,
1019                std::cmp::Ordering::Equal
1020                    if maybe_better.1.fork_id.is_some() && best_peer.1.fork_id.is_none() =>
1021                {
1022                    best_peer = maybe_better
1023                }
1024                _ => {}
1025            }
1026        }
1027        Some((*best_peer.0, best_peer.1))
1028    }
1029
1030    /// If there's capacity for new outbound connections, this will queue new
1031    /// [`PeerAction::Connect`] actions.
1032    ///
1033    /// New connections are only initiated, if slots are available and appropriate peers are
1034    /// available.
1035    fn fill_outbound_slots(&mut self) {
1036        self.tick();
1037
1038        if !self.net_connection_state.is_active() {
1039            // nothing to fill
1040            return
1041        }
1042
1043        // as long as there are slots available fill them with the best peers
1044        while self.connection_info.has_out_capacity() {
1045            let action = {
1046                let (peer_id, peer) = match self.best_unconnected() {
1047                    Some(peer) => peer,
1048                    _ => break,
1049                };
1050
1051                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
1052
1053                peer.state = PeerConnectionState::PendingOut;
1054                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
1055            };
1056
1057            self.connection_info.inc_pending_out();
1058
1059            self.queued_actions.push_back(action);
1060        }
1061    }
1062
1063    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1064        if let Some(peer) = self.peers.get_mut(&peer_id) {
1065            let new_addr = PeerAddr::new_with_ports(
1066                new_record.address,
1067                new_record.tcp_port,
1068                Some(new_record.udp_port),
1069            );
1070
1071            if peer.addr != new_addr {
1072                peer.addr = new_addr;
1073                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1074            }
1075        }
1076    }
1077
1078    /// Keeps track of network state changes.
1079    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1080        self.net_connection_state = state;
1081    }
1082
1083    /// Returns the current network connection state.
1084    pub const fn connection_state(&self) -> &NetworkConnectionState {
1085        &self.net_connection_state
1086    }
1087
1088    /// Sets `net_connection_state` to `ShuttingDown`.
1089    pub const fn on_shutdown(&mut self) {
1090        self.net_connection_state = NetworkConnectionState::ShuttingDown;
1091    }
1092
1093    /// Advances the state.
1094    ///
1095    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
1096    /// [`PeersManager`] is polled.
1097    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1098        loop {
1099            // drain buffered actions
1100            if let Some(action) = self.queued_actions.pop_front() {
1101                return Poll::Ready(action)
1102            }
1103
1104            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1105                match cmd {
1106                    PeerCommand::Add(peer_id, addr) => {
1107                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1108                    }
1109                    PeerCommand::Remove(peer) => self.remove_peer(peer),
1110                    PeerCommand::ReputationChange(peer_id, rep) => {
1111                        self.apply_reputation_change(&peer_id, rep)
1112                    }
1113                    PeerCommand::GetPeer(peer, tx) => {
1114                        let _ = tx.send(self.peers.get(&peer).cloned());
1115                    }
1116                    PeerCommand::GetPeers(tx) => {
1117                        let _ = tx.send(self.iter_peers().collect());
1118                    }
1119                }
1120            }
1121
1122            if self.release_interval.poll_tick(cx).is_ready() {
1123                let now = std::time::Instant::now();
1124                let (_, unbanned_peers) = self.ban_list.evict(now);
1125
1126                for peer_id in unbanned_peers {
1127                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1128                        peer.unban();
1129                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1130                    }
1131                }
1132
1133                // clear the backoff list of expired backoffs, and mark the relevant peers as
1134                // ready to be dialed
1135                self.backed_off_peers.retain(|peer_id, until| {
1136                    if now > *until {
1137                        if let Some(peer) = self.peers.get_mut(peer_id) {
1138                            peer.backed_off = false;
1139                        }
1140                        return false
1141                    }
1142                    true
1143                })
1144            }
1145
1146            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1147                self.fill_outbound_slots();
1148            }
1149
1150            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1151                self.on_resolved_peer(peer_id, new_record);
1152            }
1153
1154            if self.queued_actions.is_empty() {
1155                return Poll::Pending
1156            }
1157        }
1158    }
1159}
1160
1161impl Default for PeersManager {
1162    fn default() -> Self {
1163        Self::new(Default::default())
1164    }
1165}
1166
1167/// Tracks stats about connected nodes
1168#[derive(Debug, Clone, PartialEq, Eq, Default)]
1169pub struct ConnectionInfo {
1170    /// Counter for currently occupied slots for active outbound connections.
1171    num_outbound: usize,
1172    /// Counter for pending outbound connections.
1173    num_pending_out: usize,
1174    /// Counter for currently occupied slots for active inbound connections.
1175    num_inbound: usize,
1176    /// Counter for pending inbound connections.
1177    num_pending_in: usize,
1178    /// Restrictions on number of connections.
1179    config: ConnectionsConfig,
1180}
1181
1182// === impl ConnectionInfo ===
1183
1184impl ConnectionInfo {
1185    /// Returns a new [`ConnectionInfo`] with the given config.
1186    const fn new(config: ConnectionsConfig) -> Self {
1187        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1188    }
1189
1190    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1191    const fn has_out_capacity(&self) -> bool {
1192        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1193            self.num_outbound < self.config.max_outbound
1194    }
1195
1196    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1197    const fn has_in_capacity(&self) -> bool {
1198        self.num_inbound < self.config.max_inbound
1199    }
1200
1201    /// Returns `true` if we can handle an additional incoming pending connection.
1202    const fn has_in_pending_capacity(&self) -> bool {
1203        self.num_pending_in < self.config.max_inbound
1204    }
1205
1206    const fn decr_state(&mut self, state: PeerConnectionState) {
1207        match state {
1208            PeerConnectionState::Idle => {}
1209            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1210            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1211            PeerConnectionState::PendingOut => self.decr_pending_out(),
1212        }
1213    }
1214
1215    const fn decr_out(&mut self) {
1216        self.num_outbound -= 1;
1217    }
1218
1219    const fn inc_out(&mut self) {
1220        self.num_outbound += 1;
1221    }
1222
1223    const fn inc_pending_out(&mut self) {
1224        self.num_pending_out += 1;
1225    }
1226
1227    const fn inc_in(&mut self) {
1228        self.num_inbound += 1;
1229    }
1230
1231    const fn inc_pending_in(&mut self) {
1232        self.num_pending_in += 1;
1233    }
1234
1235    const fn decr_in(&mut self) {
1236        self.num_inbound -= 1;
1237    }
1238
1239    const fn decr_pending_out(&mut self) {
1240        self.num_pending_out -= 1;
1241    }
1242
1243    const fn decr_pending_in(&mut self) {
1244        self.num_pending_in -= 1;
1245    }
1246}
1247
1248/// Actions the peer manager can trigger.
1249#[derive(Debug)]
1250pub enum PeerAction {
1251    /// Start a new connection to a peer.
1252    Connect {
1253        /// The peer to connect to.
1254        peer_id: PeerId,
1255        /// Where to reach the node
1256        remote_addr: SocketAddr,
1257    },
1258    /// Disconnect an existing connection.
1259    Disconnect {
1260        /// The peer ID of the established connection.
1261        peer_id: PeerId,
1262        /// An optional reason for the disconnect.
1263        reason: Option<DisconnectReason>,
1264    },
1265    /// Disconnect an existing incoming connection, because the peers reputation is below the
1266    /// banned threshold or is on the [`BanList`]
1267    DisconnectBannedIncoming {
1268        /// The peer ID of the established connection.
1269        peer_id: PeerId,
1270    },
1271    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1272    DisconnectUntrustedIncoming {
1273        /// The peer ID.
1274        peer_id: PeerId,
1275    },
1276    /// Ban the peer in discovery.
1277    DiscoveryBanPeerId {
1278        /// The peer ID.
1279        peer_id: PeerId,
1280        /// The IP address.
1281        ip_addr: IpAddr,
1282    },
1283    /// Ban the IP in discovery.
1284    DiscoveryBanIp {
1285        /// The IP address.
1286        ip_addr: IpAddr,
1287    },
1288    /// Ban the peer temporarily
1289    BanPeer {
1290        /// The peer ID.
1291        peer_id: PeerId,
1292    },
1293    /// Unban the peer temporarily
1294    UnBanPeer {
1295        /// The peer ID.
1296        peer_id: PeerId,
1297    },
1298    /// Emit peerAdded event
1299    PeerAdded(PeerId),
1300    /// Emit peerRemoved event
1301    PeerRemoved(PeerId),
1302}
1303
1304/// Error thrown when an incoming connection is rejected right away
1305#[derive(Debug, Error, PartialEq, Eq)]
1306pub enum InboundConnectionError {
1307    /// The remote's ip address is banned
1308    IpBanned,
1309    /// No capacity for new inbound connections
1310    ExceedsCapacity,
1311}
1312
1313impl Display for InboundConnectionError {
1314    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1315        write!(f, "{self:?}")
1316    }
1317}
1318
1319/// The reason a peer was backed off.
1320#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1321pub enum BackoffReason {
1322    /// The remote peer responded with `TooManyPeers` (0x04).
1323    TooManyPeers,
1324    /// The session was gracefully closed and we're backing off briefly.
1325    GracefulClose,
1326    /// A connection or protocol-level error occurred.
1327    ConnectionError,
1328}
1329
1330impl BackoffReason {
1331    /// Derives the backoff reason from an optional [`DisconnectReason`].
1332    pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
1333        match reason {
1334            Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
1335            _ => Self::ConnectionError,
1336        }
1337    }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342    use alloy_primitives::B512;
1343    use reth_eth_wire::{
1344        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1345        DisconnectReason,
1346    };
1347    use reth_ethereum_forks::{ForkHash, ForkId};
1348    use reth_net_banlist::BanList;
1349    use reth_network_api::Direction;
1350    use reth_network_peers::{PeerId, TrustedPeer};
1351    use reth_network_types::{
1352        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1353    };
1354    use std::{
1355        future::{poll_fn, Future},
1356        io,
1357        net::{IpAddr, Ipv4Addr, SocketAddr},
1358        pin::Pin,
1359        task::{Context, Poll},
1360        time::Duration,
1361    };
1362    use url::Host;
1363
1364    use super::PeersManager;
1365    use crate::{
1366        error::SessionError,
1367        peers::{
1368            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1369            PeerConnectionState,
1370        },
1371        session::PendingSessionHandshakeError,
1372        PeersConfig,
1373    };
1374
1375    struct PeerActionFuture<'a> {
1376        peers: &'a mut PeersManager,
1377    }
1378
1379    impl Future for PeerActionFuture<'_> {
1380        type Output = PeerAction;
1381
1382        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1383            self.get_mut().peers.poll(cx)
1384        }
1385    }
1386
1387    macro_rules! event {
1388        ($peers:expr) => {
1389            PeerActionFuture { peers: &mut $peers }.await
1390        };
1391    }
1392
1393    #[tokio::test]
1394    async fn test_insert() {
1395        let peer = PeerId::random();
1396        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1397        let mut peers = PeersManager::default();
1398        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1399
1400        match event!(peers) {
1401            PeerAction::PeerAdded(peer_id) => {
1402                assert_eq!(peer_id, peer);
1403            }
1404            _ => unreachable!(),
1405        }
1406        match event!(peers) {
1407            PeerAction::Connect { peer_id, remote_addr } => {
1408                assert_eq!(peer_id, peer);
1409                assert_eq!(remote_addr, socket_addr);
1410            }
1411            _ => unreachable!(),
1412        }
1413
1414        let (record, _) = peers.peer_by_id(peer).unwrap();
1415        assert_eq!(record.tcp_addr(), socket_addr);
1416        assert_eq!(record.udp_addr(), socket_addr);
1417    }
1418
1419    #[tokio::test]
1420    async fn test_insert_udp() {
1421        let peer = PeerId::random();
1422        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1423        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1424        let mut peers = PeersManager::default();
1425        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1426
1427        match event!(peers) {
1428            PeerAction::PeerAdded(peer_id) => {
1429                assert_eq!(peer_id, peer);
1430            }
1431            _ => unreachable!(),
1432        }
1433        match event!(peers) {
1434            PeerAction::Connect { peer_id, remote_addr } => {
1435                assert_eq!(peer_id, peer);
1436                assert_eq!(remote_addr, tcp_addr);
1437            }
1438            _ => unreachable!(),
1439        }
1440
1441        let (record, _) = peers.peer_by_id(peer).unwrap();
1442        assert_eq!(record.tcp_addr(), tcp_addr);
1443        assert_eq!(record.udp_addr(), udp_addr);
1444    }
1445
1446    #[tokio::test]
1447    async fn test_ban() {
1448        let peer = PeerId::random();
1449        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1450        let mut peers = PeersManager::default();
1451        peers.ban_peer(peer);
1452        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1453
1454        match event!(peers) {
1455            PeerAction::BanPeer { peer_id } => {
1456                assert_eq!(peer_id, peer);
1457            }
1458            _ => unreachable!(),
1459        }
1460
1461        poll_fn(|cx| {
1462            assert!(peers.poll(cx).is_pending());
1463            Poll::Ready(())
1464        })
1465        .await;
1466    }
1467
1468    #[tokio::test]
1469    async fn test_unban() {
1470        let peer = PeerId::random();
1471        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1472        let mut peers = PeersManager::default();
1473        peers.ban_peer(peer);
1474        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1475
1476        match event!(peers) {
1477            PeerAction::BanPeer { peer_id } => {
1478                assert_eq!(peer_id, peer);
1479            }
1480            _ => unreachable!(),
1481        }
1482
1483        poll_fn(|cx| {
1484            assert!(peers.poll(cx).is_pending());
1485            Poll::Ready(())
1486        })
1487        .await;
1488
1489        peers.unban_peer(peer);
1490
1491        match event!(peers) {
1492            PeerAction::UnBanPeer { peer_id } => {
1493                assert_eq!(peer_id, peer);
1494            }
1495            _ => unreachable!(),
1496        }
1497
1498        poll_fn(|cx| {
1499            assert!(peers.poll(cx).is_pending());
1500            Poll::Ready(())
1501        })
1502        .await;
1503    }
1504
1505    #[tokio::test]
1506    async fn test_backoff_on_busy() {
1507        let peer = PeerId::random();
1508        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1509
1510        let mut peers = PeersManager::new(PeersConfig::test());
1511        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1512
1513        match event!(peers) {
1514            PeerAction::PeerAdded(peer_id) => {
1515                assert_eq!(peer_id, peer);
1516            }
1517            _ => unreachable!(),
1518        }
1519        match event!(peers) {
1520            PeerAction::Connect { peer_id, .. } => {
1521                assert_eq!(peer_id, peer);
1522            }
1523            _ => unreachable!(),
1524        }
1525
1526        poll_fn(|cx| {
1527            assert!(peers.poll(cx).is_pending());
1528            Poll::Ready(())
1529        })
1530        .await;
1531
1532        peers.on_active_session_dropped(
1533            &socket_addr,
1534            &peer,
1535            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1536                DisconnectReason::TooManyPeers,
1537            )),
1538        );
1539
1540        poll_fn(|cx| {
1541            assert!(peers.poll(cx).is_pending());
1542            Poll::Ready(())
1543        })
1544        .await;
1545
1546        assert!(peers.backed_off_peers.contains_key(&peer));
1547        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1548
1549        tokio::time::sleep(peers.backoff_durations.low).await;
1550
1551        match event!(peers) {
1552            PeerAction::Connect { peer_id, .. } => {
1553                assert_eq!(peer_id, peer);
1554            }
1555            _ => unreachable!(),
1556        }
1557
1558        assert!(!peers.backed_off_peers.contains_key(&peer));
1559        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1560    }
1561
1562    #[tokio::test]
1563    async fn test_backoff_on_no_response() {
1564        let peer = PeerId::random();
1565        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1566
1567        let backoff_durations = PeerBackoffDurations::test();
1568        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1569        let mut peers = PeersManager::new(config);
1570        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1571
1572        match event!(peers) {
1573            PeerAction::PeerAdded(peer_id) => {
1574                assert_eq!(peer_id, peer);
1575            }
1576            _ => unreachable!(),
1577        }
1578        match event!(peers) {
1579            PeerAction::Connect { peer_id, .. } => {
1580                assert_eq!(peer_id, peer);
1581            }
1582            _ => unreachable!(),
1583        }
1584
1585        poll_fn(|cx| {
1586            assert!(peers.poll(cx).is_pending());
1587            Poll::Ready(())
1588        })
1589        .await;
1590
1591        peers.on_outgoing_pending_session_dropped(
1592            &socket_addr,
1593            &peer,
1594            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1595                EthHandshakeError::NoResponse,
1596            )),
1597        );
1598
1599        poll_fn(|cx| {
1600            assert!(peers.poll(cx).is_pending());
1601            Poll::Ready(())
1602        })
1603        .await;
1604
1605        assert!(peers.backed_off_peers.contains_key(&peer));
1606        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1607
1608        tokio::time::sleep(backoff_durations.high).await;
1609
1610        match event!(peers) {
1611            PeerAction::Connect { peer_id, .. } => {
1612                assert_eq!(peer_id, peer);
1613            }
1614            _ => unreachable!(),
1615        }
1616
1617        assert!(!peers.backed_off_peers.contains_key(&peer));
1618        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1619    }
1620
1621    #[tokio::test]
1622    async fn test_low_backoff() {
1623        let peer = PeerId::random();
1624        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1625        let config = PeersConfig::test();
1626        let mut peers = PeersManager::new(config);
1627        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1628        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1629
1630        let backoff_timestamp = peers
1631            .backoff_durations
1632            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1633
1634        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1635        assert!(backoff_timestamp <= expected);
1636    }
1637
1638    #[tokio::test]
1639    async fn test_multiple_backoff_calculations() {
1640        let peer = PeerId::random();
1641        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1642        let config = PeersConfig::default();
1643        let mut peers = PeersManager::new(config);
1644        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1645        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1646
1647        // Simulate a peer that was already backed off once
1648        peer_struct.severe_backoff_counter = 1;
1649
1650        let now = std::time::Instant::now();
1651
1652        // Simulate the increment that happens in on_connection_failure
1653        peer_struct.severe_backoff_counter += 1;
1654        // Get official backoff time
1655        let backoff_time = peers
1656            .backoff_durations
1657            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1658
1659        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1660        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1661
1662        // We can't use assert_eq! since there is a very small diff in the nano secs
1663        // Usually it is 1800s != 1799.9999996s
1664        assert!(backoff_time.duration_since(now) > backoff_duration);
1665    }
1666
1667    #[tokio::test]
1668    async fn test_ban_on_active_drop() {
1669        let peer = PeerId::random();
1670        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1671        let mut peers = PeersManager::default();
1672        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1673
1674        match event!(peers) {
1675            PeerAction::PeerAdded(peer_id) => {
1676                assert_eq!(peer_id, peer);
1677            }
1678            _ => unreachable!(),
1679        }
1680        match event!(peers) {
1681            PeerAction::Connect { peer_id, .. } => {
1682                assert_eq!(peer_id, peer);
1683            }
1684            _ => unreachable!(),
1685        }
1686
1687        poll_fn(|cx| {
1688            assert!(peers.poll(cx).is_pending());
1689            Poll::Ready(())
1690        })
1691        .await;
1692
1693        peers.on_active_session_dropped(
1694            &socket_addr,
1695            &peer,
1696            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1697                DisconnectReason::UselessPeer,
1698            )),
1699        );
1700
1701        match event!(peers) {
1702            PeerAction::PeerRemoved(peer_id) => {
1703                assert_eq!(peer_id, peer);
1704            }
1705            _ => unreachable!(),
1706        }
1707        match event!(peers) {
1708            PeerAction::BanPeer { peer_id } => {
1709                assert_eq!(peer_id, peer);
1710            }
1711            _ => unreachable!(),
1712        }
1713
1714        poll_fn(|cx| {
1715            assert!(peers.poll(cx).is_pending());
1716            Poll::Ready(())
1717        })
1718        .await;
1719
1720        assert!(!peers.peers.contains_key(&peer));
1721    }
1722
1723    #[tokio::test]
1724    async fn test_remove_on_max_backoff_count() {
1725        let peer = PeerId::random();
1726        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1727        let config = PeersConfig::test();
1728        let mut peers = PeersManager::new(config.clone());
1729        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1730        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1731
1732        // Simulate a peer that was already backed off once
1733        peer_struct.severe_backoff_counter = config.max_backoff_count;
1734
1735        match event!(peers) {
1736            PeerAction::PeerAdded(peer_id) => {
1737                assert_eq!(peer_id, peer);
1738            }
1739            _ => unreachable!(),
1740        }
1741        match event!(peers) {
1742            PeerAction::Connect { peer_id, .. } => {
1743                assert_eq!(peer_id, peer);
1744            }
1745            _ => unreachable!(),
1746        }
1747
1748        poll_fn(|cx| {
1749            assert!(peers.poll(cx).is_pending());
1750            Poll::Ready(())
1751        })
1752        .await;
1753
1754        peers.on_outgoing_pending_session_dropped(
1755            &socket_addr,
1756            &peer,
1757            &PendingSessionHandshakeError::Eth(
1758                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1759            ),
1760        );
1761
1762        match event!(peers) {
1763            PeerAction::PeerRemoved(peer_id) => {
1764                assert_eq!(peer_id, peer);
1765            }
1766            _ => unreachable!(),
1767        }
1768
1769        poll_fn(|cx| {
1770            assert!(peers.poll(cx).is_pending());
1771            Poll::Ready(())
1772        })
1773        .await;
1774
1775        assert!(!peers.peers.contains_key(&peer));
1776    }
1777
1778    #[tokio::test]
1779    async fn test_ban_on_pending_drop() {
1780        let peer = PeerId::random();
1781        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1782        let mut peers = PeersManager::default();
1783        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1784
1785        match event!(peers) {
1786            PeerAction::PeerAdded(peer_id) => {
1787                assert_eq!(peer_id, peer);
1788            }
1789            _ => unreachable!(),
1790        }
1791        match event!(peers) {
1792            PeerAction::Connect { peer_id, .. } => {
1793                assert_eq!(peer_id, peer);
1794            }
1795            _ => unreachable!(),
1796        }
1797
1798        poll_fn(|cx| {
1799            assert!(peers.poll(cx).is_pending());
1800            Poll::Ready(())
1801        })
1802        .await;
1803
1804        peers.on_outgoing_pending_session_dropped(
1805            &socket_addr,
1806            &peer,
1807            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1808                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1809            )),
1810        );
1811
1812        match event!(peers) {
1813            PeerAction::PeerRemoved(peer_id) => {
1814                assert_eq!(peer_id, peer);
1815            }
1816            _ => unreachable!(),
1817        }
1818        match event!(peers) {
1819            PeerAction::BanPeer { peer_id } => {
1820                assert_eq!(peer_id, peer);
1821            }
1822            _ => unreachable!(),
1823        }
1824
1825        poll_fn(|cx| {
1826            assert!(peers.poll(cx).is_pending());
1827            Poll::Ready(())
1828        })
1829        .await;
1830
1831        assert!(!peers.peers.contains_key(&peer));
1832    }
1833
1834    #[tokio::test]
1835    async fn test_internally_closed_incoming() {
1836        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1837        let mut peers = PeersManager::default();
1838
1839        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1840        assert_eq!(peers.connection_info.num_pending_in, 1);
1841        peers.on_incoming_pending_session_rejected_internally();
1842        assert_eq!(peers.connection_info.num_pending_in, 0);
1843    }
1844
1845    #[tokio::test]
1846    async fn test_reject_incoming_at_pending_capacity() {
1847        let mut peers = PeersManager::default();
1848
1849        for count in 1..=peers.connection_info.config.max_inbound {
1850            let socket_addr =
1851                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1852            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1853            assert_eq!(peers.connection_info.num_pending_in, count);
1854        }
1855        assert!(peers.connection_info.has_in_capacity());
1856        assert!(!peers.connection_info.has_in_pending_capacity());
1857
1858        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1859        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1860    }
1861
1862    #[tokio::test]
1863    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1864        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1865        let trusted = PeerId::random();
1866        peers.add_trusted_peer_id(trusted);
1867
1868        // connect the trusted peer
1869        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1870        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1871        peers.on_incoming_session_established(trusted, addr);
1872
1873        match event!(peers) {
1874            PeerAction::PeerAdded(id) => {
1875                assert_eq!(id, trusted);
1876            }
1877            _ => unreachable!(),
1878        }
1879
1880        // saturate the remaining inbound slots with untrusted peers
1881        let mut connected_untrusted_peer_ids = Vec::new();
1882        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1883            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1884            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1885            let peer_id = PeerId::random();
1886            peers.on_incoming_session_established(peer_id, addr);
1887            connected_untrusted_peer_ids.push(peer_id);
1888
1889            match event!(peers) {
1890                PeerAction::PeerAdded(id) => {
1891                    assert_eq!(id, peer_id);
1892                }
1893                _ => unreachable!(),
1894            }
1895        }
1896
1897        let mut pending_addrs = Vec::new();
1898
1899        // saturate available slots
1900        for i in 0..2 {
1901            let socket_addr =
1902                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1903            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1904
1905            pending_addrs.push(socket_addr);
1906        }
1907
1908        assert_eq!(peers.connection_info.num_pending_in, 2);
1909
1910        // try to handle additional incoming connections at capacity
1911        for i in 0..2 {
1912            let socket_addr =
1913                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1914            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1915        }
1916
1917        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1918            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1919                DisconnectReason::UselessPeer,
1920            )),
1921        ));
1922
1923        // Remove all pending peers
1924        for pending_addr in pending_addrs {
1925            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1926        }
1927
1928        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1929
1930        println!(
1931            "num_inbound: {}, has_in_capacity: {}",
1932            peers.connection_info.num_inbound,
1933            peers.connection_info.has_in_capacity()
1934        );
1935
1936        // disconnect a connected peer
1937        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1938
1939        println!(
1940            "num_inbound: {}, has_in_capacity: {}",
1941            peers.connection_info.num_inbound,
1942            peers.connection_info.has_in_capacity()
1943        );
1944
1945        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1946        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1947    }
1948
1949    #[tokio::test]
1950    async fn test_closed_incoming() {
1951        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1952        let mut peers = PeersManager::default();
1953
1954        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1955        assert_eq!(peers.connection_info.num_pending_in, 1);
1956        peers.on_incoming_pending_session_gracefully_closed();
1957        assert_eq!(peers.connection_info.num_pending_in, 0);
1958    }
1959
1960    #[tokio::test]
1961    async fn test_dropped_incoming() {
1962        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1963        let ban_duration = Duration::from_millis(500);
1964        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1965        let mut peers = PeersManager::new(config);
1966
1967        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1968        assert_eq!(peers.connection_info.num_pending_in, 1);
1969        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1970            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1971                DisconnectReason::UselessPeer,
1972            )),
1973        ));
1974
1975        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1976        assert_eq!(peers.connection_info.num_pending_in, 0);
1977        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1978
1979        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1980
1981        // unbanned after timeout
1982        tokio::time::sleep(ban_duration).await;
1983
1984        poll_fn(|cx| {
1985            let _ = peers.poll(cx);
1986            Poll::Ready(())
1987        })
1988        .await;
1989
1990        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1991        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1992    }
1993
1994    #[tokio::test]
1995    async fn test_reputation_change_connected() {
1996        let peer = PeerId::random();
1997        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1998        let mut peers = PeersManager::default();
1999        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2000
2001        match event!(peers) {
2002            PeerAction::PeerAdded(peer_id) => {
2003                assert_eq!(peer_id, peer);
2004            }
2005            _ => unreachable!(),
2006        }
2007        match event!(peers) {
2008            PeerAction::Connect { peer_id, remote_addr } => {
2009                assert_eq!(peer_id, peer);
2010                assert_eq!(remote_addr, socket_addr);
2011            }
2012            _ => unreachable!(),
2013        }
2014
2015        let p = peers.peers.get_mut(&peer).unwrap();
2016        assert_eq!(p.state, PeerConnectionState::PendingOut);
2017
2018        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
2019
2020        let p = peers.peers.get(&peer).unwrap();
2021        assert_eq!(p.state, PeerConnectionState::PendingOut);
2022        assert!(p.is_banned());
2023
2024        peers.on_active_session_gracefully_closed(peer);
2025
2026        let p = peers.peers.get(&peer).unwrap();
2027        assert_eq!(p.state, PeerConnectionState::Idle);
2028        assert!(p.is_banned());
2029
2030        match event!(peers) {
2031            PeerAction::Disconnect { peer_id, .. } => {
2032                assert_eq!(peer_id, peer);
2033            }
2034            _ => unreachable!(),
2035        }
2036    }
2037
2038    #[tokio::test]
2039    async fn retain_trusted_status() {
2040        let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2041        let trusted = PeerId::random();
2042        let mut peers =
2043            PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2044                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2045                tcp_port: 8008,
2046                udp_port: 8008,
2047                id: trusted,
2048            }]));
2049        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2050        peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
2051        assert!(peers.peers.get(&trusted).unwrap().is_trusted());
2052    }
2053
2054    #[tokio::test]
2055    async fn accept_incoming_trusted_unknown_peer_address() {
2056        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2057        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2058        // try to connect trusted peer
2059        let trusted = PeerId::random();
2060        peers.add_trusted_peer_id(trusted);
2061
2062        // saturate the inbound slots
2063        for i in 0..peers.connection_info.config.max_inbound {
2064            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
2065            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2066            let peer_id = PeerId::random();
2067            peers.on_incoming_session_established(peer_id, addr);
2068
2069            match event!(peers) {
2070                PeerAction::PeerAdded(id) => {
2071                    assert_eq!(id, peer_id);
2072                }
2073                _ => unreachable!(),
2074            }
2075        }
2076
2077        // try to connect untrusted peer
2078        let untrusted = PeerId::random();
2079        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2080        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2081        peers.on_incoming_session_established(untrusted, socket_addr);
2082
2083        match event!(peers) {
2084            PeerAction::PeerAdded(id) => {
2085                assert_eq!(id, untrusted);
2086            }
2087            _ => unreachable!(),
2088        }
2089
2090        match event!(peers) {
2091            PeerAction::Disconnect { peer_id, reason } => {
2092                assert_eq!(peer_id, untrusted);
2093                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2094            }
2095            _ => unreachable!(),
2096        }
2097
2098        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2099        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2100        peers.on_incoming_session_established(trusted, socket_addr);
2101
2102        match event!(peers) {
2103            PeerAction::PeerAdded(id) => {
2104                assert_eq!(id, trusted);
2105            }
2106            _ => unreachable!(),
2107        }
2108
2109        poll_fn(|cx| {
2110            assert!(peers.poll(cx).is_pending());
2111            Poll::Ready(())
2112        })
2113        .await;
2114
2115        let peer = peers.peers.get(&trusted).unwrap();
2116        assert_eq!(peer.state, PeerConnectionState::In);
2117    }
2118
2119    #[tokio::test]
2120    async fn test_already_connected() {
2121        let peer = PeerId::random();
2122        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2123        let mut peers = PeersManager::default();
2124
2125        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
2126        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2127        assert_eq!(peers.connection_info.num_pending_in, 1);
2128
2129        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
2130        // to increase by 1
2131        peers.on_incoming_session_established(peer, socket_addr);
2132        let p = peers.peers.get_mut(&peer).expect("peer not found");
2133        assert_eq!(p.addr.tcp(), socket_addr);
2134        assert_eq!(peers.connection_info.num_pending_in, 0);
2135        assert_eq!(peers.connection_info.num_inbound, 1);
2136
2137        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
2138        // by 1
2139        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2140        assert_eq!(peers.connection_info.num_pending_in, 1);
2141
2142        // Simulate a rejection due to an already established connection, expecting the
2143        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
2144        // should not be changed.
2145        peers.on_already_connected(Direction::Incoming);
2146
2147        let p = peers.peers.get_mut(&peer).expect("peer not found");
2148        assert_eq!(p.addr.tcp(), socket_addr);
2149        assert_eq!(peers.connection_info.num_pending_in, 0);
2150        assert_eq!(peers.connection_info.num_inbound, 1);
2151    }
2152
2153    #[tokio::test]
2154    async fn test_reputation_change_trusted_peer() {
2155        let peer = PeerId::random();
2156        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2157        let mut peers = PeersManager::default();
2158        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2159
2160        match event!(peers) {
2161            PeerAction::PeerAdded(peer_id) => {
2162                assert_eq!(peer_id, peer);
2163            }
2164            _ => unreachable!(),
2165        }
2166        match event!(peers) {
2167            PeerAction::Connect { peer_id, remote_addr } => {
2168                assert_eq!(peer_id, peer);
2169                assert_eq!(remote_addr, socket_addr);
2170            }
2171            _ => unreachable!(),
2172        }
2173
2174        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2175        peers.on_active_outgoing_established(peer);
2176        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2177
2178        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2179
2180        {
2181            let p = peers.peers.get(&peer).unwrap();
2182            assert_eq!(p.state, PeerConnectionState::Out);
2183            // not banned yet
2184            assert!(!p.is_banned());
2185        }
2186
2187        // ensure peer is banned eventually
2188        loop {
2189            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2190
2191            let p = peers.peers.get(&peer).unwrap();
2192            if p.is_banned() {
2193                break
2194            }
2195        }
2196
2197        match event!(peers) {
2198            PeerAction::Disconnect { peer_id, .. } => {
2199                assert_eq!(peer_id, peer);
2200            }
2201            _ => unreachable!(),
2202        }
2203    }
2204
2205    #[tokio::test]
2206    async fn test_reputation_management() {
2207        let peer = PeerId::random();
2208        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2209        let mut peers = PeersManager::default();
2210        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2211        assert_eq!(peers.get_reputation(&peer), Some(0));
2212
2213        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2214        assert_eq!(peers.get_reputation(&peer), Some(1024));
2215
2216        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2217        assert_eq!(peers.get_reputation(&peer), Some(0));
2218    }
2219
2220    #[tokio::test]
2221    async fn test_remove_discovered_active() {
2222        let peer = PeerId::random();
2223        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2224        let mut peers = PeersManager::default();
2225        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2226
2227        match event!(peers) {
2228            PeerAction::PeerAdded(peer_id) => {
2229                assert_eq!(peer_id, peer);
2230            }
2231            _ => unreachable!(),
2232        }
2233        match event!(peers) {
2234            PeerAction::Connect { peer_id, remote_addr } => {
2235                assert_eq!(peer_id, peer);
2236                assert_eq!(remote_addr, socket_addr);
2237            }
2238            _ => unreachable!(),
2239        }
2240
2241        let p = peers.peers.get(&peer).unwrap();
2242        assert_eq!(p.state, PeerConnectionState::PendingOut);
2243
2244        peers.remove_peer(peer);
2245
2246        match event!(peers) {
2247            PeerAction::PeerRemoved(peer_id) => {
2248                assert_eq!(peer_id, peer);
2249            }
2250            _ => unreachable!(),
2251        }
2252        match event!(peers) {
2253            PeerAction::Disconnect { peer_id, .. } => {
2254                assert_eq!(peer_id, peer);
2255            }
2256            _ => unreachable!(),
2257        }
2258
2259        let p = peers.peers.get(&peer).unwrap();
2260        assert_eq!(p.state, PeerConnectionState::PendingOut);
2261
2262        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2263        let p = peers.peers.get(&peer).unwrap();
2264        assert_eq!(p.state, PeerConnectionState::PendingOut);
2265
2266        peers.on_active_session_gracefully_closed(peer);
2267        assert!(!peers.peers.contains_key(&peer));
2268    }
2269
2270    #[tokio::test]
2271    async fn test_fatal_outgoing_connection_error_trusted() {
2272        let peer = PeerId::random();
2273        let config = PeersConfig::test()
2274            .with_trusted_nodes(vec![TrustedPeer {
2275                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2276                tcp_port: 8008,
2277                udp_port: 8008,
2278                id: peer,
2279            }])
2280            .with_trusted_nodes_only(true);
2281        let mut peers = PeersManager::new(config);
2282        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2283
2284        match event!(peers) {
2285            PeerAction::Connect { peer_id, remote_addr } => {
2286                assert_eq!(peer_id, peer);
2287                assert_eq!(remote_addr, socket_addr);
2288            }
2289            _ => unreachable!(),
2290        }
2291
2292        let p = peers.peers.get(&peer).unwrap();
2293        assert_eq!(p.state, PeerConnectionState::PendingOut);
2294
2295        assert_eq!(peers.num_outbound_connections(), 0);
2296
2297        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2298            EthHandshakeError::NonStatusMessageInHandshake,
2299        ));
2300        assert!(err.is_fatal_protocol_error());
2301
2302        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2303        assert_eq!(peers.num_outbound_connections(), 0);
2304
2305        // try tmp ban peer
2306        match event!(peers) {
2307            PeerAction::BanPeer { peer_id } => {
2308                assert_eq!(peer_id, peer);
2309            }
2310            err => unreachable!("{err:?}"),
2311        }
2312
2313        // ensure we still have trusted peer
2314        assert!(peers.peers.contains_key(&peer));
2315
2316        // await for the ban to expire
2317        tokio::time::sleep(peers.backoff_durations.medium).await;
2318
2319        match event!(peers) {
2320            PeerAction::Connect { peer_id, remote_addr } => {
2321                assert_eq!(peer_id, peer);
2322                assert_eq!(remote_addr, socket_addr);
2323            }
2324            err => unreachable!("{err:?}"),
2325        }
2326    }
2327
2328    #[tokio::test]
2329    async fn test_outgoing_connection_error() {
2330        let peer = PeerId::random();
2331        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2332        let mut peers = PeersManager::default();
2333        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2334
2335        match event!(peers) {
2336            PeerAction::PeerAdded(peer_id) => {
2337                assert_eq!(peer_id, peer);
2338            }
2339            _ => unreachable!(),
2340        }
2341        match event!(peers) {
2342            PeerAction::Connect { peer_id, remote_addr } => {
2343                assert_eq!(peer_id, peer);
2344                assert_eq!(remote_addr, socket_addr);
2345            }
2346            _ => unreachable!(),
2347        }
2348
2349        let p = peers.peers.get(&peer).unwrap();
2350        assert_eq!(p.state, PeerConnectionState::PendingOut);
2351
2352        assert_eq!(peers.num_outbound_connections(), 0);
2353
2354        peers.on_outgoing_connection_failure(
2355            &socket_addr,
2356            &peer,
2357            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2358        );
2359
2360        assert_eq!(peers.num_outbound_connections(), 0);
2361    }
2362
2363    #[tokio::test]
2364    async fn test_outgoing_connection_gracefully_closed() {
2365        let peer = PeerId::random();
2366        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2367        let mut peers = PeersManager::default();
2368        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2369
2370        match event!(peers) {
2371            PeerAction::PeerAdded(peer_id) => {
2372                assert_eq!(peer_id, peer);
2373            }
2374            _ => unreachable!(),
2375        }
2376        match event!(peers) {
2377            PeerAction::Connect { peer_id, remote_addr } => {
2378                assert_eq!(peer_id, peer);
2379                assert_eq!(remote_addr, socket_addr);
2380            }
2381            _ => unreachable!(),
2382        }
2383
2384        let p = peers.peers.get(&peer).unwrap();
2385        assert_eq!(p.state, PeerConnectionState::PendingOut);
2386
2387        assert_eq!(peers.num_outbound_connections(), 0);
2388
2389        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2390
2391        assert_eq!(peers.num_outbound_connections(), 0);
2392        assert_eq!(peers.connection_info.num_pending_out, 0);
2393    }
2394
2395    #[tokio::test]
2396    async fn test_discovery_ban_list() {
2397        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2398        let socket_addr = SocketAddr::new(ip, 8008);
2399        let ban_list = BanList::new(vec![], vec![ip]);
2400        let config = PeersConfig::default().with_ban_list(ban_list);
2401        let mut peer_manager = PeersManager::new(config);
2402        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2403
2404        assert!(peer_manager.peers.is_empty());
2405    }
2406
2407    #[tokio::test]
2408    async fn test_on_pending_ban_list() {
2409        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2410        let socket_addr = SocketAddr::new(ip, 8008);
2411        let ban_list = BanList::new(vec![], vec![ip]);
2412        let config = PeersConfig::test().with_ban_list(ban_list);
2413        let mut peer_manager = PeersManager::new(config);
2414        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2415        // because we have no active peers this should be fine for testings
2416        match a {
2417            Ok(_) => panic!(),
2418            Err(err) => match err {
2419                InboundConnectionError::IpBanned => {
2420                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2421                }
2422                _ => unreachable!(),
2423            },
2424        }
2425    }
2426
2427    #[tokio::test]
2428    async fn test_on_active_inbound_ban_list() {
2429        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2430        let socket_addr = SocketAddr::new(ip, 8008);
2431        let given_peer_id = PeerId::random();
2432        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2433        let config = PeersConfig::test().with_ban_list(ban_list);
2434        let mut peer_manager = PeersManager::new(config);
2435        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2436        // non-trusted nodes should also increase pending_in
2437        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2438        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2439        // after the connection is established, the peer should be removed, the num_pending_in
2440        // should be decreased, and the num_inbound should not be increased
2441        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2442        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2443
2444        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2445            peer_manager.queued_actions.pop_front()
2446        else {
2447            panic!()
2448        };
2449
2450        assert_eq!(peer_id, given_peer_id)
2451    }
2452
2453    #[test]
2454    fn test_connection_limits() {
2455        let mut info = ConnectionInfo::default();
2456        info.inc_in();
2457        assert_eq!(info.num_inbound, 1);
2458        assert_eq!(info.num_outbound, 0);
2459        assert!(info.has_in_capacity());
2460
2461        info.decr_in();
2462        assert_eq!(info.num_inbound, 0);
2463        assert_eq!(info.num_outbound, 0);
2464
2465        info.inc_out();
2466        assert_eq!(info.num_inbound, 0);
2467        assert_eq!(info.num_outbound, 1);
2468        assert!(info.has_out_capacity());
2469
2470        info.decr_out();
2471        assert_eq!(info.num_inbound, 0);
2472        assert_eq!(info.num_outbound, 0);
2473    }
2474
2475    #[test]
2476    fn test_connection_peer_state() {
2477        let mut info = ConnectionInfo::default();
2478        info.inc_in();
2479
2480        info.decr_state(PeerConnectionState::In);
2481        assert_eq!(info.num_inbound, 0);
2482        assert_eq!(info.num_outbound, 0);
2483
2484        info.inc_out();
2485
2486        info.decr_state(PeerConnectionState::Out);
2487        assert_eq!(info.num_inbound, 0);
2488        assert_eq!(info.num_outbound, 0);
2489    }
2490
2491    #[tokio::test]
2492    async fn test_trusted_peers_are_prioritized() {
2493        let trusted_peer = PeerId::random();
2494        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2495        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2496            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2497            tcp_port: 8008,
2498            udp_port: 8008,
2499            id: trusted_peer,
2500        }]);
2501        let mut peers = PeersManager::new(config);
2502
2503        let basic_peer = PeerId::random();
2504        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2505        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2506
2507        match event!(peers) {
2508            PeerAction::PeerAdded(peer_id) => {
2509                assert_eq!(peer_id, basic_peer);
2510            }
2511            _ => unreachable!(),
2512        }
2513        match event!(peers) {
2514            PeerAction::Connect { peer_id, remote_addr } => {
2515                assert_eq!(peer_id, trusted_peer);
2516                assert_eq!(remote_addr, trusted_sock);
2517            }
2518            _ => unreachable!(),
2519        }
2520        match event!(peers) {
2521            PeerAction::Connect { peer_id, remote_addr } => {
2522                assert_eq!(peer_id, basic_peer);
2523                assert_eq!(remote_addr, basic_sock);
2524            }
2525            _ => unreachable!(),
2526        }
2527    }
2528
2529    #[tokio::test]
2530    async fn test_connect_trusted_nodes_only() {
2531        let trusted_peer = PeerId::random();
2532        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2533        let config = PeersConfig::test()
2534            .with_trusted_nodes(vec![TrustedPeer {
2535                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2536                tcp_port: 8008,
2537                udp_port: 8008,
2538                id: trusted_peer,
2539            }])
2540            .with_trusted_nodes_only(true);
2541        let mut peers = PeersManager::new(config);
2542
2543        let basic_peer = PeerId::random();
2544        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2545        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2546
2547        match event!(peers) {
2548            PeerAction::PeerAdded(peer_id) => {
2549                assert_eq!(peer_id, basic_peer);
2550            }
2551            _ => unreachable!(),
2552        }
2553        match event!(peers) {
2554            PeerAction::Connect { peer_id, remote_addr } => {
2555                assert_eq!(peer_id, trusted_peer);
2556                assert_eq!(remote_addr, trusted_sock);
2557            }
2558            _ => unreachable!(),
2559        }
2560        poll_fn(|cx| {
2561            assert!(peers.poll(cx).is_pending());
2562            Poll::Ready(())
2563        })
2564        .await;
2565    }
2566
2567    #[tokio::test]
2568    async fn test_incoming_with_trusted_nodes_only() {
2569        let trusted_peer = PeerId::random();
2570        let config = PeersConfig::test()
2571            .with_trusted_nodes(vec![TrustedPeer {
2572                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2573                tcp_port: 8008,
2574                udp_port: 8008,
2575                id: trusted_peer,
2576            }])
2577            .with_trusted_nodes_only(true);
2578        let mut peers = PeersManager::new(config);
2579
2580        let basic_peer = PeerId::random();
2581        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2582        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2583        // non-trusted nodes should also increase pending_in
2584        assert_eq!(peers.connection_info.num_pending_in, 1);
2585        peers.on_incoming_session_established(basic_peer, basic_sock);
2586        // after the connection is established, the peer should be removed, the num_pending_in
2587        // should be decreased, and the num_inbound mut not be increased
2588        assert_eq!(peers.connection_info.num_pending_in, 0);
2589        assert_eq!(peers.connection_info.num_inbound, 0);
2590
2591        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2592            peers.queued_actions.pop_front()
2593        else {
2594            panic!()
2595        };
2596        assert_eq!(basic_peer, peer_id);
2597        assert!(!peers.peers.contains_key(&basic_peer));
2598    }
2599
2600    #[tokio::test]
2601    async fn test_incoming_without_trusted_nodes_only() {
2602        let trusted_peer = PeerId::random();
2603        let config = PeersConfig::test()
2604            .with_trusted_nodes(vec![TrustedPeer {
2605                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2606                tcp_port: 8008,
2607                udp_port: 8008,
2608                id: trusted_peer,
2609            }])
2610            .with_trusted_nodes_only(false);
2611        let mut peers = PeersManager::new(config);
2612
2613        let basic_peer = PeerId::random();
2614        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2615        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2616
2617        // non-trusted nodes should also increase pending_in
2618        assert_eq!(peers.connection_info.num_pending_in, 1);
2619        peers.on_incoming_session_established(basic_peer, basic_sock);
2620        // after the connection is established, the peer should be removed, the num_pending_in
2621        // should be decreased, and the num_inbound must be increased
2622        assert_eq!(peers.connection_info.num_pending_in, 0);
2623        assert_eq!(peers.connection_info.num_inbound, 1);
2624        assert!(peers.peers.contains_key(&basic_peer));
2625    }
2626
2627    #[tokio::test]
2628    async fn test_incoming_at_capacity() {
2629        let mut config = PeersConfig::test();
2630        config.connection_info.max_inbound = 1;
2631        let mut peers = PeersManager::new(config);
2632
2633        let peer = PeerId::random();
2634        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2635        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2636
2637        peers.on_incoming_session_established(peer, addr);
2638
2639        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2640        assert_eq!(
2641            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2642            InboundConnectionError::ExceedsCapacity
2643        );
2644    }
2645
2646    #[tokio::test]
2647    async fn test_incoming_rate_limit() {
2648        let config = PeersConfig {
2649            incoming_ip_throttle_duration: Duration::from_millis(100),
2650            ..PeersConfig::test()
2651        };
2652        let mut peers = PeersManager::new(config);
2653
2654        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2655        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2656        assert_eq!(
2657            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2658            InboundConnectionError::IpBanned
2659        );
2660
2661        peers.release_interval.reset_immediately();
2662        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2663
2664        // await unban
2665        poll_fn(|cx| loop {
2666            if peers.poll(cx).is_pending() {
2667                return Poll::Ready(());
2668            }
2669        })
2670        .await;
2671
2672        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2673        assert_eq!(
2674            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2675            InboundConnectionError::IpBanned
2676        );
2677    }
2678
2679    #[tokio::test]
2680    async fn test_tick() {
2681        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2682        let socket_addr = SocketAddr::new(ip, 8008);
2683        let config = PeersConfig::test();
2684        let mut peer_manager = PeersManager::new(config);
2685        let peer_id = PeerId::random();
2686        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2687
2688        tokio::time::sleep(Duration::from_secs(1)).await;
2689        peer_manager.tick();
2690
2691        // still unconnected
2692        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2693
2694        // mark as connected
2695        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2696
2697        tokio::time::sleep(Duration::from_secs(1)).await;
2698        peer_manager.tick();
2699
2700        // still at default reputation
2701        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2702
2703        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2704
2705        tokio::time::sleep(Duration::from_secs(1)).await;
2706        peer_manager.tick();
2707
2708        // tick applied
2709        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2710    }
2711
2712    #[tokio::test]
2713    async fn test_remove_incoming_after_disconnect() {
2714        let peer_id = PeerId::random();
2715        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2716        let mut peers = PeersManager::default();
2717
2718        peers.on_incoming_pending_session(addr.ip()).unwrap();
2719        peers.on_incoming_session_established(peer_id, addr);
2720        let peer = peers.peers.get(&peer_id).unwrap();
2721        assert_eq!(peer.state, PeerConnectionState::In);
2722        assert!(peer.remove_after_disconnect);
2723
2724        peers.on_active_session_gracefully_closed(peer_id);
2725        assert!(!peers.peers.contains_key(&peer_id))
2726    }
2727
2728    #[tokio::test]
2729    async fn test_keep_incoming_after_disconnect_if_discovered() {
2730        let peer_id = PeerId::random();
2731        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2732        let mut peers = PeersManager::default();
2733
2734        peers.on_incoming_pending_session(addr.ip()).unwrap();
2735        peers.on_incoming_session_established(peer_id, addr);
2736        let peer = peers.peers.get(&peer_id).unwrap();
2737        assert_eq!(peer.state, PeerConnectionState::In);
2738        assert!(peer.remove_after_disconnect);
2739
2740        // trigger discovery manually while the peer is still connected
2741        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2742
2743        peers.on_active_session_gracefully_closed(peer_id);
2744
2745        let peer = peers.peers.get(&peer_id).unwrap();
2746        assert_eq!(peer.state, PeerConnectionState::Idle);
2747        assert!(!peer.remove_after_disconnect);
2748    }
2749
2750    #[tokio::test]
2751    async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2752        let throttle_duration = Duration::from_millis(100);
2753        let config =
2754            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2755        let mut peers = PeersManager::new(config);
2756
2757        let peer_id = PeerId::random();
2758        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2759
2760        // Add as regular peer
2761        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2762
2763        match event!(peers) {
2764            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2765            _ => unreachable!(),
2766        }
2767
2768        match event!(peers) {
2769            PeerAction::Connect { .. } => {}
2770            _ => unreachable!(),
2771        }
2772
2773        // Simulate outbound connection established
2774        peers.on_active_outgoing_established(peer_id);
2775        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2776
2777        // Gracefully close the session
2778        peers.on_active_session_gracefully_closed(peer_id);
2779
2780        let peer = peers.peers.get(&peer_id).unwrap();
2781        assert_eq!(peer.state, PeerConnectionState::Idle);
2782        assert!(peer.backed_off);
2783
2784        // Verify the peer is in the backed_off_peers set
2785        assert!(peers.backed_off_peers.contains_key(&peer_id));
2786
2787        // Immediately try to poll - should not trigger any actions yet
2788        poll_fn(|cx| {
2789            assert!(peers.poll(cx).is_pending());
2790            Poll::Ready(())
2791        })
2792        .await;
2793
2794        // Peer should still be backed off
2795        assert!(peers.backed_off_peers.contains_key(&peer_id));
2796        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2797
2798        // Sleep for the throttle duration
2799        tokio::time::sleep(throttle_duration).await;
2800
2801        // After throttle duration, event! will poll until we get a Connect action
2802        match event!(peers) {
2803            PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
2804            _ => unreachable!(),
2805        }
2806
2807        // After connection is initiated, peer should no longer be backed off
2808        assert!(!peers.backed_off_peers.contains_key(&peer_id));
2809        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2810    }
2811
2812    #[tokio::test]
2813    async fn test_backed_off_peer_can_accept_incoming_connection() {
2814        let throttle_duration = Duration::from_millis(100);
2815        let config =
2816            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2817        let mut peers = PeersManager::new(config);
2818
2819        let peer_id = PeerId::random();
2820        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2821
2822        // Add as regular peer
2823        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2824
2825        match event!(peers) {
2826            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2827            _ => unreachable!(),
2828        }
2829
2830        match event!(peers) {
2831            PeerAction::Connect { .. } => {}
2832            _ => unreachable!(),
2833        }
2834
2835        // Simulate outbound connection established
2836        peers.on_active_outgoing_established(peer_id);
2837        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2838
2839        // Gracefully close the session - this will back off the peer
2840        peers.on_active_session_gracefully_closed(peer_id);
2841
2842        let peer = peers.peers.get(&peer_id).unwrap();
2843        assert_eq!(peer.state, PeerConnectionState::Idle);
2844        assert!(peer.backed_off);
2845        assert!(peers.backed_off_peers.contains_key(&peer_id));
2846
2847        // Now simulate an incoming connection from the backed-off peer
2848        // First, handle the incoming pending session
2849        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2850        assert_eq!(peers.connection_info.num_pending_in, 1);
2851
2852        // Establish the incoming session
2853        peers.on_incoming_session_established(peer_id, addr);
2854
2855        // Peer should have been added to incoming connections
2856        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2857        assert_eq!(peers.connection_info.num_inbound, 1);
2858
2859        // Peer should still be backed off for outbound connections
2860        assert!(peers.backed_off_peers.contains_key(&peer_id));
2861        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2862
2863        // Verify we don't try to reconnect outbound while peer is backed off
2864        poll_fn(|cx| {
2865            assert!(peers.poll(cx).is_pending());
2866            Poll::Ready(())
2867        })
2868        .await;
2869
2870        // No outbound connection should be attempted while backed off
2871        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2872
2873        // After throttle duration, the backoff should be cleared
2874        tokio::time::sleep(throttle_duration).await;
2875
2876        poll_fn(|cx| {
2877            let _ = peers.poll(cx);
2878            Poll::Ready(())
2879        })
2880        .await;
2881
2882        // Backoff should be cleared now
2883        assert!(!peers.backed_off_peers.contains_key(&peer_id));
2884        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2885
2886        // Peer should still be in incoming state
2887        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2888    }
2889
2890    #[tokio::test]
2891    async fn test_incoming_outgoing_already_connected() {
2892        let peer_id = PeerId::random();
2893        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2894        let mut peers = PeersManager::default();
2895
2896        peers.on_incoming_pending_session(addr.ip()).unwrap();
2897        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2898
2899        match event!(peers) {
2900            PeerAction::PeerAdded(_) => {}
2901            _ => unreachable!(),
2902        }
2903
2904        match event!(peers) {
2905            PeerAction::Connect { .. } => {}
2906            _ => unreachable!(),
2907        }
2908
2909        peers.on_incoming_session_established(peer_id, addr);
2910        peers.on_already_connected(Direction::Outgoing(peer_id));
2911        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2912        assert_eq!(peers.connection_info.num_inbound, 1);
2913        assert_eq!(peers.connection_info.num_pending_out, 0);
2914        assert_eq!(peers.connection_info.num_pending_in, 0);
2915        assert_eq!(peers.connection_info.num_outbound, 0);
2916    }
2917
2918    #[tokio::test]
2919    async fn test_already_connected_incoming_outgoing_connection_error() {
2920        let peer_id = PeerId::random();
2921        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2922        let mut peers = PeersManager::default();
2923
2924        peers.on_incoming_pending_session(addr.ip()).unwrap();
2925        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2926
2927        match event!(peers) {
2928            PeerAction::PeerAdded(_) => {}
2929            _ => unreachable!(),
2930        }
2931
2932        match event!(peers) {
2933            PeerAction::Connect { .. } => {}
2934            _ => unreachable!(),
2935        }
2936
2937        peers.on_incoming_session_established(peer_id, addr);
2938
2939        peers.on_outgoing_connection_failure(
2940            &addr,
2941            &peer_id,
2942            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2943        );
2944        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2945        assert_eq!(peers.connection_info.num_inbound, 1);
2946        assert_eq!(peers.connection_info.num_pending_out, 0);
2947        assert_eq!(peers.connection_info.num_pending_in, 0);
2948        assert_eq!(peers.connection_info.num_outbound, 0);
2949    }
2950
2951    #[tokio::test]
2952    async fn test_max_concurrent_dials() {
2953        let config = PeersConfig::default();
2954        let mut peer_manager = PeersManager::new(config);
2955        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2956        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2957        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2958            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2959        }
2960
2961        peer_manager.fill_outbound_slots();
2962        let dials = peer_manager
2963            .queued_actions
2964            .iter()
2965            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2966            .count();
2967        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2968    }
2969
2970    #[tokio::test]
2971    async fn test_max_num_of_pending_dials() {
2972        let config = PeersConfig::default();
2973        let mut peer_manager = PeersManager::new(config);
2974        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2975        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2976
2977        // add more peers than allowed
2978        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2979            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2980        }
2981
2982        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2983            match event!(peer_manager) {
2984                PeerAction::PeerAdded(_) => {}
2985                _ => unreachable!(),
2986            }
2987        }
2988
2989        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2990            match event!(peer_manager) {
2991                PeerAction::Connect { .. } => {}
2992                _ => unreachable!(),
2993            }
2994        }
2995
2996        // generate 'Connect' actions
2997        peer_manager.fill_outbound_slots();
2998
2999        // all dialed connections should be in 'PendingOut' state
3000        let dials = peer_manager.connection_info.num_pending_out;
3001        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
3002
3003        let num_pendingout_states = peer_manager
3004            .peers
3005            .iter()
3006            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
3007            .map(|(peer_id, _)| *peer_id)
3008            .collect::<Vec<PeerId>>();
3009        assert_eq!(
3010            num_pendingout_states.len(),
3011            peer_manager.connection_info.config.max_concurrent_outbound_dials
3012        );
3013
3014        // establish dialed connections
3015        for peer_id in &num_pendingout_states {
3016            peer_manager.on_active_outgoing_established(*peer_id);
3017        }
3018
3019        // all dialed connections should now be in 'Out' state
3020        for peer_id in &num_pendingout_states {
3021            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
3022        }
3023
3024        // no more pending outbound connections
3025        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
3026    }
3027
3028    #[tokio::test]
3029    async fn test_connect() {
3030        let peer = PeerId::random();
3031        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3032        let mut peers = PeersManager::default();
3033        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3034        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3035
3036        match event!(peers) {
3037            PeerAction::Connect { peer_id, remote_addr } => {
3038                assert_eq!(peer_id, peer);
3039                assert_eq!(remote_addr, socket_addr);
3040            }
3041            _ => unreachable!(),
3042        }
3043
3044        let (record, _) = peers.peer_by_id(peer).unwrap();
3045        assert_eq!(record.tcp_addr(), socket_addr);
3046        assert_eq!(record.udp_addr(), socket_addr);
3047
3048        // connect again
3049        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3050
3051        let (record, _) = peers.peer_by_id(peer).unwrap();
3052        assert_eq!(record.tcp_addr(), socket_addr);
3053        assert_eq!(record.udp_addr(), socket_addr);
3054    }
3055
3056    #[tokio::test]
3057    async fn test_incoming_connection_from_banned() {
3058        let peer = PeerId::random();
3059        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3060        let config = PeersConfig::test().with_max_inbound(3);
3061        let mut peers = PeersManager::new(config);
3062        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
3063
3064        match event!(peers) {
3065            PeerAction::PeerAdded(peer_id) => {
3066                assert_eq!(peer_id, peer);
3067            }
3068            _ => unreachable!(),
3069        }
3070        match event!(peers) {
3071            PeerAction::Connect { peer_id, .. } => {
3072                assert_eq!(peer_id, peer);
3073            }
3074            _ => unreachable!(),
3075        }
3076
3077        poll_fn(|cx| {
3078            assert!(peers.poll(cx).is_pending());
3079            Poll::Ready(())
3080        })
3081        .await;
3082
3083        // simulate new connection drops with error
3084        loop {
3085            peers.on_active_session_dropped(
3086                &socket_addr,
3087                &peer,
3088                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3089                    reth_eth_wire::EthVersion::Eth68,
3090                    reth_eth_wire::EthMessageID::Status,
3091                )),
3092            );
3093
3094            if peers.peers.get(&peer).unwrap().is_banned() {
3095                break;
3096            }
3097
3098            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3099            peers.on_incoming_session_established(peer, socket_addr);
3100
3101            match event!(peers) {
3102                PeerAction::Connect { peer_id, .. } => {
3103                    assert_eq!(peer_id, peer);
3104                }
3105                _ => unreachable!(),
3106            }
3107        }
3108
3109        assert!(peers.peers.get(&peer).unwrap().is_banned());
3110
3111        // fill all incoming slots
3112        for _ in 0..peers.connection_info.config.max_inbound {
3113            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3114            peers.on_incoming_session_established(peer, socket_addr);
3115
3116            match event!(peers) {
3117                PeerAction::DisconnectBannedIncoming { peer_id } => {
3118                    assert_eq!(peer_id, peer);
3119                }
3120                _ => unreachable!(),
3121            }
3122        }
3123
3124        poll_fn(|cx| {
3125            assert!(peers.poll(cx).is_pending());
3126            Poll::Ready(())
3127        })
3128        .await;
3129
3130        assert_eq!(peers.connection_info.num_inbound, 0);
3131
3132        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3133
3134        // Assert we can still accept new connections
3135        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3136        assert_eq!(peers.connection_info.num_pending_in, 1);
3137
3138        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
3139        // connection info is updated via the peer's state which would be a noop here since the
3140        // banned peer's state is idle
3141        peers.on_active_session_gracefully_closed(peer);
3142        assert_eq!(peers.connection_info.num_inbound, 0);
3143    }
3144
3145    #[tokio::test]
3146    async fn test_add_pending_connect() {
3147        let peer = PeerId::random();
3148        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3149        let mut peers = PeersManager::default();
3150        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3151        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3152        assert_eq!(peers.connection_info.num_pending_out, 1);
3153    }
3154
3155    #[tokio::test]
3156    async fn test_dns_updates_peer_address() {
3157        let peer_id = PeerId::random();
3158        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3159        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3160
3161        let trusted = TrustedPeer {
3162            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3163            tcp_port: 8008,
3164            udp_port: 8008,
3165            id: peer_id,
3166        };
3167
3168        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3169        let mut manager = PeersManager::new(config);
3170        manager
3171            .trusted_peers_resolver
3172            .set_interval(tokio::time::interval(Duration::from_millis(1)));
3173
3174        manager.peers.insert(
3175            peer_id,
3176            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3177        );
3178
3179        for _ in 0..100 {
3180            let _ = event!(manager);
3181            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3182                break;
3183            }
3184            tokio::time::sleep(Duration::from_millis(10)).await;
3185        }
3186
3187        let updated_peer = manager.peers.get(&peer_id).unwrap();
3188        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3189    }
3190
3191    #[tokio::test]
3192    async fn test_ip_filter_blocks_inbound_connection() {
3193        use reth_net_banlist::IpFilter;
3194        use std::net::IpAddr;
3195
3196        // Create a filter that only allows 192.168.0.0/16
3197        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3198        let config = PeersConfig::test().with_ip_filter(ip_filter);
3199        let mut peers = PeersManager::new(config);
3200
3201        // Try to connect from an allowed IP
3202        let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3203        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3204
3205        // Try to connect from a disallowed IP
3206        let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3207        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3208    }
3209
3210    #[tokio::test]
3211    async fn test_ip_filter_blocks_outbound_connection() {
3212        use reth_net_banlist::IpFilter;
3213        use std::net::SocketAddr;
3214
3215        // Create a filter that only allows 192.168.0.0/16
3216        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3217        let config = PeersConfig::test().with_ip_filter(ip_filter);
3218        let mut peers = PeersManager::new(config);
3219
3220        let peer_id = PeerId::new([1; 64]);
3221
3222        // Try to add a peer with an allowed IP
3223        let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3224        peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3225        assert!(peers.peers.contains_key(&peer_id));
3226
3227        // Try to add a peer with a disallowed IP
3228        let peer_id2 = PeerId::new([2; 64]);
3229        let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3230        peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3231        assert!(!peers.peers.contains_key(&peer_id2));
3232    }
3233
3234    #[tokio::test]
3235    async fn test_ip_filter_ipv6() {
3236        use reth_net_banlist::IpFilter;
3237        use std::net::IpAddr;
3238
3239        // Create a filter that only allows IPv6 range 2001:db8::/32
3240        let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3241        let config = PeersConfig::test().with_ip_filter(ip_filter);
3242        let mut peers = PeersManager::new(config);
3243
3244        // Try to connect from an allowed IPv6 address
3245        let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3246        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3247
3248        // Try to connect from a disallowed IPv6 address
3249        let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3250        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3251    }
3252
3253    #[tokio::test]
3254    async fn test_ip_filter_multiple_ranges() {
3255        use reth_net_banlist::IpFilter;
3256        use std::net::IpAddr;
3257
3258        // Create a filter that allows multiple ranges
3259        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3260        let config = PeersConfig::test().with_ip_filter(ip_filter);
3261        let mut peers = PeersManager::new(config);
3262
3263        // Try IPs from both allowed ranges
3264        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3265        let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3266        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3267        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3268
3269        // Try IP from disallowed range
3270        let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3271        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3272    }
3273
3274    #[tokio::test]
3275    async fn test_ip_filter_no_restriction() {
3276        use reth_net_banlist::IpFilter;
3277        use std::net::IpAddr;
3278
3279        // Create a filter with no restrictions (allow all)
3280        let ip_filter = IpFilter::allow_all();
3281        let config = PeersConfig::test().with_ip_filter(ip_filter);
3282        let mut peers = PeersManager::new(config);
3283
3284        // All IPs should be allowed
3285        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3286        let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3287        let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3288        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3289        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3290        assert!(peers.on_incoming_pending_session(ip3).is_ok());
3291    }
3292
3293    #[tokio::test]
3294    async fn test_best_unconnected_prefers_fork_id_as_tiebreaker() {
3295        let mut peers = PeersManager::default();
3296        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8008);
3297
3298        let fork_id = ForkId { hash: ForkHash([0xaa, 0xbb, 0xcc, 0xdd]), next: 0 };
3299
3300        // add two peers with equal reputation, only one has a fork_id
3301        let no_fork = PeerId::random();
3302        peers.add_peer(no_fork, PeerAddr::from_tcp(addr), None);
3303
3304        let with_fork = PeerId::random();
3305        peers.add_peer(with_fork, PeerAddr::from_tcp(addr), None);
3306        peers.peers.get_mut(&with_fork).unwrap().fork_id = Some(Box::new(fork_id));
3307
3308        let (best_id, _) = peers.best_unconnected().unwrap();
3309        assert_eq!(best_id, with_fork, "fork_id should break tie when reputation is equal");
3310    }
3311}