Skip to main content

reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    is_connection_failed_reputation,
18    peers::{
19        config::PeerBackoffDurations,
20        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21    },
22    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27    fmt::Display,
28    io::{self},
29    net::{IpAddr, SocketAddr},
30    task::{Context, Poll},
31    time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35    sync::mpsc,
36    time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41/// Maintains the state of _all_ the peers known to the network.
42///
43/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
44/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
45///
46/// The [`PeersManager`] will be notified on peer related changes
47#[derive(Debug)]
48pub struct PeersManager {
49    /// All peers known to the network
50    peers: HashMap<PeerId, Peer>,
51    /// The set of trusted peer ids.
52    ///
53    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
54    /// an address: [`Self::add_trusted_peer_id`]
55    trusted_peer_ids: HashSet<PeerId>,
56    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
57    /// peer's address when the DNS records change.
58    trusted_peers_resolver: TrustedPeersResolver,
59    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
60    manager_tx: mpsc::UnboundedSender<PeerCommand>,
61    /// Receiver half of the command channel.
62    handle_rx: UnboundedReceiverStream<PeerCommand>,
63    /// Buffered actions until the manager is polled.
64    queued_actions: VecDeque<PeerAction>,
65    /// Interval for triggering connections if there are free slots.
66    refill_slots_interval: Interval,
67    /// How to weigh reputation changes
68    reputation_weights: ReputationChangeWeights,
69    /// Tracks current slot stats.
70    connection_info: ConnectionInfo,
71    /// Tracks unwanted ips/peer ids.
72    ban_list: BanList,
73    /// Tracks currently backed off peers.
74    backed_off_peers: HashMap<PeerId, std::time::Instant>,
75    /// Interval at which to check for peers to unban and release from the backoff map.
76    release_interval: Interval,
77    /// How long to ban bad peers.
78    ban_duration: Duration,
79    /// How long peers to which we could not connect for non-fatal reasons, e.g.
80    /// [`DisconnectReason::TooManyPeers`], are put in time out.
81    backoff_durations: PeerBackoffDurations,
82    /// If non-trusted peers should be connected to, or the connection from non-trusted
83    /// incoming peers should be accepted.
84    trusted_nodes_only: bool,
85    /// Timestamp of the last time [`Self::tick`] was called.
86    last_tick: Instant,
87    /// Maximum number of backoff attempts before we give up on a peer and dropping.
88    max_backoff_count: u8,
89    /// Tracks the connection state of the node
90    net_connection_state: NetworkConnectionState,
91    /// How long to temporarily ban ip on an incoming connection attempt.
92    incoming_ip_throttle_duration: Duration,
93    /// IP address filter for restricting network connections to specific IP ranges.
94    ip_filter: reth_net_banlist::IpFilter,
95    /// If true, discovered peers without a confirmed ENR fork ID will not be added until their
96    /// fork ID is verified via EIP-868.
97    enforce_enr_fork_id: bool,
98}
99
100impl PeersManager {
101    /// Create a new instance with the given config
102    pub fn new(config: PeersConfig) -> Self {
103        let PeersConfig {
104            refill_slots_interval,
105            connection_info,
106            reputation_weights,
107            ban_list,
108            ban_duration,
109            backoff_durations,
110            trusted_nodes,
111            trusted_nodes_only,
112            trusted_nodes_resolution_interval,
113            basic_nodes,
114            max_backoff_count,
115            incoming_ip_throttle_duration,
116            ip_filter,
117            enforce_enr_fork_id,
118        } = config;
119        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
120        let now = Instant::now();
121
122        // We use half of the interval to decrease the max duration to `150%` in worst case
123        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
124
125        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
126        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
127
128        for trusted_peer in &trusted_nodes {
129            match trusted_peer.resolve_blocking() {
130                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
131                    trusted_peer_ids.insert(id);
132                    peers.entry(id).or_insert_with(|| {
133                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
134                    });
135                }
136                Err(err) => {
137                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
138                }
139            }
140        }
141
142        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
143            peers.entry(id).or_insert_with(|| {
144                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
145            });
146        }
147
148        trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
149
150        Self {
151            peers,
152            trusted_peer_ids,
153            trusted_peers_resolver: TrustedPeersResolver::new(
154                trusted_nodes,
155                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
156            ),
157            manager_tx,
158            handle_rx: UnboundedReceiverStream::new(handle_rx),
159            queued_actions: Default::default(),
160            reputation_weights,
161            refill_slots_interval: tokio::time::interval(refill_slots_interval),
162            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
163            connection_info: ConnectionInfo::new(connection_info),
164            ban_list,
165            backed_off_peers: Default::default(),
166            ban_duration,
167            backoff_durations,
168            trusted_nodes_only,
169            last_tick: Instant::now(),
170            max_backoff_count,
171            net_connection_state: NetworkConnectionState::default(),
172            incoming_ip_throttle_duration,
173            ip_filter,
174            enforce_enr_fork_id,
175        }
176    }
177
178    /// Returns a new [`PeersHandle`] that can send commands to this type.
179    pub(crate) fn handle(&self) -> PeersHandle {
180        PeersHandle::new(self.manager_tx.clone())
181    }
182
183    /// Returns `true` if discovered peers must have a confirmed ENR fork ID before being added.
184    pub(crate) const fn enforce_enr_fork_id(&self) -> bool {
185        self.enforce_enr_fork_id
186    }
187
188    /// Returns the number of peers in the peer set
189    #[inline]
190    pub(crate) fn num_known_peers(&self) -> usize {
191        self.peers.len()
192    }
193
194    /// Returns an iterator over all peers
195    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
196        self.peers.iter().map(|(peer_id, v)| {
197            NodeRecord::new_with_ports(
198                v.addr.tcp().ip(),
199                v.addr.tcp().port(),
200                v.addr.udp().map(|addr| addr.port()),
201                *peer_id,
202            )
203        })
204    }
205
206    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
207    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
208        self.peers.get(&peer_id).map(|v| {
209            (
210                NodeRecord::new_with_ports(
211                    v.addr.tcp().ip(),
212                    v.addr.tcp().port(),
213                    v.addr.udp().map(|addr| addr.port()),
214                    peer_id,
215                ),
216                v.kind,
217            )
218        })
219    }
220
221    /// Returns `true` if the given peer is connected via an inbound session.
222    pub(crate) fn is_inbound_peer(&self, peer_id: &PeerId) -> bool {
223        self.peers.get(peer_id).is_some_and(|p| {
224            matches!(p.state, PeerConnectionState::In | PeerConnectionState::DisconnectingIn)
225        })
226    }
227
228    /// Returns an iterator over all peer ids for peers with the given kind
229    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
230        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
231    }
232
233    /// Returns the number of currently active inbound connections.
234    #[inline]
235    pub(crate) const fn num_inbound_connections(&self) -> usize {
236        self.connection_info.num_inbound
237    }
238
239    /// Returns the number of currently __active__ outbound connections.
240    #[inline]
241    pub(crate) const fn num_outbound_connections(&self) -> usize {
242        self.connection_info.num_outbound
243    }
244
245    /// Returns the number of currently pending outbound connections.
246    #[inline]
247    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
248        self.connection_info.num_pending_out
249    }
250
251    /// Returns the number of currently backed off peers.
252    #[inline]
253    pub(crate) fn num_backed_off_peers(&self) -> usize {
254        self.backed_off_peers.len()
255    }
256
257    /// Returns the number of idle trusted peers.
258    fn num_idle_trusted_peers(&self) -> usize {
259        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
260    }
261
262    /// Invoked when a new _incoming_ tcp connection is accepted.
263    ///
264    /// returns an error if the inbound ip address is on the ban list
265    pub(crate) fn on_incoming_pending_session(
266        &mut self,
267        addr: IpAddr,
268    ) -> Result<(), InboundConnectionError> {
269        // Check if the IP is in the allowed ranges (netrestrict)
270        if !self.ip_filter.is_allowed(&addr) {
271            trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
272            return Err(InboundConnectionError::IpBanned)
273        }
274
275        if self.ban_list.is_banned_ip(&addr) {
276            return Err(InboundConnectionError::IpBanned)
277        }
278
279        // check if we even have slots for a new incoming connection
280        if !self.connection_info.has_in_capacity() {
281            if self.trusted_peer_ids.is_empty() {
282                // if we don't have any incoming slots and no trusted peers, we don't accept any new
283                // connections
284                return Err(InboundConnectionError::ExceedsCapacity)
285            }
286
287            // there's an edge case here where no incoming connections besides from trusted peers
288            // are allowed (max_inbound == 0), in which case we still need to allow new pending
289            // incoming connections until all trusted peers are connected.
290            let num_idle_trusted_peers = self.num_idle_trusted_peers();
291            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
292                // we still want to limit concurrent pending connections
293                let max_inbound =
294                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
295                if self.connection_info.num_pending_in < max_inbound {
296                    self.connection_info.inc_pending_in();
297                    return Ok(())
298                }
299            }
300
301            // all trusted peers are either connected or connecting
302            return Err(InboundConnectionError::ExceedsCapacity)
303        }
304
305        // also cap the incoming connections we can process at once
306        if !self.connection_info.has_in_pending_capacity() {
307            return Err(InboundConnectionError::ExceedsCapacity)
308        }
309
310        // apply the rate limit
311        self.throttle_incoming_ip(addr);
312
313        self.connection_info.inc_pending_in();
314        Ok(())
315    }
316
317    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
318    /// rejected.
319    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
320        self.connection_info.decr_pending_in();
321    }
322
323    /// Invoked when a pending session was closed.
324    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
325        self.connection_info.decr_pending_in()
326    }
327
328    /// Invoked when a pending session was closed.
329    pub(crate) fn on_incoming_pending_session_dropped(
330        &mut self,
331        remote_addr: SocketAddr,
332        err: &PendingSessionHandshakeError,
333    ) {
334        if err.is_fatal_protocol_error() {
335            self.ban_ip(remote_addr.ip());
336
337            if err.merits_discovery_ban() {
338                self.queued_actions
339                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
340            }
341        }
342
343        self.connection_info.decr_pending_in();
344    }
345
346    /// Called when a new _incoming_ active session was established to the given peer.
347    ///
348    /// This will update the state of the peer if not yet tracked.
349    ///
350    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
351    /// be scheduled.
352    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
353        self.connection_info.decr_pending_in();
354
355        // we only need to check the peer id here as the ip address will have been checked at
356        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
357        if self.ban_list.is_banned_peer(&peer_id) {
358            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
359            return
360        }
361
362        // check if the peer is trustable or not
363        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
364        if self.trusted_nodes_only && !is_trusted {
365            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
366            return
367        }
368
369        // start a new tick, so the peer is not immediately rewarded for the time since last tick
370        self.tick();
371
372        match self.peers.entry(peer_id) {
373            Entry::Occupied(mut entry) => {
374                let peer = entry.get_mut();
375                if peer.is_banned() {
376                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
377                    return
378                }
379                // it might be the case that we're also trying to connect to this peer at the same
380                // time, so we need to adjust the state here
381                if peer.state.is_pending_out() {
382                    self.connection_info.decr_state(peer.state);
383                }
384
385                peer.state = PeerConnectionState::In;
386
387                is_trusted = is_trusted || peer.is_trusted();
388            }
389            Entry::Vacant(entry) => {
390                // peer is missing in the table, we add it but mark it as to be removed after
391                // disconnect, because we only know the outgoing port
392                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
393                peer.remove_after_disconnect = true;
394                entry.insert(peer);
395                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
396            }
397        }
398
399        let has_in_capacity = self.connection_info.has_in_capacity();
400        // increment new incoming connection
401        self.connection_info.inc_in();
402
403        // disconnect the peer if we don't have capacity for more inbound connections
404        if !is_trusted && !has_in_capacity {
405            self.queued_actions.push_back(PeerAction::Disconnect {
406                peer_id,
407                reason: Some(DisconnectReason::TooManyPeers),
408            });
409        }
410    }
411
412    /// Bans the peer temporarily with the configured ban timeout
413    fn ban_peer(&mut self, peer_id: PeerId) {
414        let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
415            (peer.is_trusted() || peer.is_static())
416        {
417            // For misbehaving trusted or static peers, we provide a bit more leeway when
418            // penalizing them.
419            self.backoff_durations.low / 2
420        } else {
421            self.ban_duration
422        };
423
424        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
425        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
426    }
427
428    /// Bans the IP temporarily with the configured ban timeout
429    fn ban_ip(&mut self, ip: IpAddr) {
430        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
431    }
432
433    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
434    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
435        self.ban_list
436            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
437    }
438
439    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
440    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
441        trace!(target: "net::peers", ?peer_id, "backing off");
442
443        if let Some(peer) = self.peers.get_mut(&peer_id) {
444            peer.backed_off = true;
445            self.backed_off_peers.insert(peer_id, until);
446        }
447    }
448
449    /// Unbans the peer
450    fn unban_peer(&mut self, peer_id: PeerId) {
451        self.ban_list.unban_peer(&peer_id);
452        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
453    }
454
455    /// Tick function to update reputation of all connected peers.
456    /// Peers are rewarded with reputation increases for the time they are connected since the last
457    /// tick. This is to prevent peers from being disconnected eventually due to slashed
458    /// reputation because of some bad messages (most likely transaction related)
459    fn tick(&mut self) {
460        let now = Instant::now();
461        // Determine the number of seconds since the last tick.
462        // Ensuring that now is always greater than last_tick to account for issues with system
463        // time.
464        let secs_since_last_tick =
465            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
466        self.last_tick = now;
467
468        // update reputation via seconds connected
469        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
470            // update reputation via seconds connected, but keep the target _around_ the default
471            // reputation.
472            if peer.1.reputation < DEFAULT_REPUTATION {
473                peer.1.reputation += secs_since_last_tick;
474            }
475        }
476    }
477
478    /// Returns the tracked reputation for a peer.
479    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
480        self.peers.get(peer_id).map(|peer| peer.reputation)
481    }
482
483    /// Apply the corresponding reputation change to the given peer.
484    ///
485    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
486    /// reputation changes that can be attributed to network conditions. If the peer is a
487    /// trusted peer, it will also be less strict with the reputation slashing.
488    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
489        trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
490
491        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
492            // First check if we should reset the reputation
493            if rep.is_reset() {
494                peer.reset_reputation()
495            } else {
496                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
497                if peer.is_trusted() || peer.is_static() {
498                    // exempt trusted and static peers from reputation slashing for
499                    if matches!(
500                        rep,
501                        ReputationChangeKind::Dropped |
502                            ReputationChangeKind::BadAnnouncement |
503                            ReputationChangeKind::Timeout |
504                            ReputationChangeKind::AlreadySeenTransaction
505                    ) {
506                        return
507                    }
508
509                    // also be less strict with the reputation slashing for trusted peers
510                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
511                        // this caps the reputation change to the maximum allowed for trusted peers
512                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
513                    }
514                }
515                peer.apply_reputation(reputation_change, rep)
516            }
517        } else {
518            return
519        };
520
521        match outcome {
522            ReputationChangeOutcome::None => {}
523            ReputationChangeOutcome::Ban => {
524                self.ban_peer(*peer_id);
525            }
526            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
527            ReputationChangeOutcome::DisconnectAndBan => {
528                self.queued_actions.push_back(PeerAction::Disconnect {
529                    peer_id: *peer_id,
530                    reason: Some(DisconnectReason::DisconnectRequested),
531                });
532                self.ban_peer(*peer_id);
533            }
534        }
535    }
536
537    /// Gracefully disconnected a pending _outgoing_ session
538    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
539        if let Some(peer) = self.peers.get_mut(peer_id) {
540            self.connection_info.decr_state(peer.state);
541            peer.state = PeerConnectionState::Idle;
542        }
543    }
544
545    /// Invoked when an _outgoing_ pending session was closed during authentication or the
546    /// handshake.
547    pub(crate) fn on_outgoing_pending_session_dropped(
548        &mut self,
549        remote_addr: &SocketAddr,
550        peer_id: &PeerId,
551        err: &PendingSessionHandshakeError,
552    ) {
553        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
554    }
555
556    /// Gracefully disconnected an active session
557    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
558        match self.peers.entry(peer_id) {
559            Entry::Occupied(mut entry) => {
560                trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
561                self.connection_info.decr_state(entry.get().state);
562
563                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
564                    // this peer should be removed from the set
565                    entry.remove();
566                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
567                } else {
568                    let peer = entry.get_mut();
569                    // reset the peer's state
570                    // we reset the backoff counter since we're able to establish a successful
571                    // session to that peer
572                    peer.severe_backoff_counter = 0;
573                    peer.state = PeerConnectionState::Idle;
574
575                    // but we're backing off slightly to avoid dialing the peer again right away, to
576                    // give the remote time to also properly register the closed session and clean
577                    // up and to avoid any issues with ip throttling on the remote in case this
578                    // session was terminated right away.
579                    peer.backed_off = true;
580                    self.backed_off_peers.insert(
581                        peer_id,
582                        std::time::Instant::now() + self.incoming_ip_throttle_duration,
583                    );
584                    trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
585                }
586            }
587            Entry::Vacant(_) => return,
588        }
589
590        self.fill_outbound_slots();
591    }
592
593    /// Called when a _pending_ outbound connection is successful.
594    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
595        if let Some(peer) = self.peers.get_mut(&peer_id) {
596            trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
597            self.connection_info.decr_state(peer.state);
598            self.connection_info.inc_out();
599            peer.state = PeerConnectionState::Out;
600        }
601    }
602
603    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
604    ///
605    /// Depending on whether the error is fatal, the peer will be removed from the peer set
606    /// otherwise its reputation is slashed.
607    pub(crate) fn on_active_session_dropped(
608        &mut self,
609        remote_addr: &SocketAddr,
610        peer_id: &PeerId,
611        err: &EthStreamError,
612    ) {
613        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
614    }
615
616    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
617    /// connection.
618    pub(crate) fn on_outgoing_connection_failure(
619        &mut self,
620        remote_addr: &SocketAddr,
621        peer_id: &PeerId,
622        err: &io::Error,
623    ) {
624        // there's a race condition where we accepted an incoming connection while we were trying to
625        // connect to the same peer at the same time. if the outgoing connection failed
626        // after the incoming connection was accepted, we can ignore this error
627        if let Some(peer) = self.peers.get(peer_id) {
628            if peer.state.is_incoming() {
629                // we already have an active connection to the peer, so we can ignore this error
630                return
631            }
632
633            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
634                // trigger resolution task for trusted peer since multiple connection failures
635                // occurred
636                self.trusted_peers_resolver.interval.reset_immediately();
637            }
638        }
639
640        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
641    }
642
643    fn on_connection_failure(
644        &mut self,
645        remote_addr: &SocketAddr,
646        peer_id: &PeerId,
647        err: impl SessionError,
648        reputation_change: ReputationChangeKind,
649    ) {
650        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
651
652        if err.is_fatal_protocol_error() {
653            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
654            // remove the peer to which we can't establish a connection due to protocol related
655            // issues.
656            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
657                self.connection_info.decr_state(entry.get().state);
658                // only remove if the peer is not trusted
659                if entry.get().is_trusted() {
660                    entry.get_mut().state = PeerConnectionState::Idle;
661                } else {
662                    entry.remove();
663                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
664                    // If the error is caused by a peer that should be banned from discovery
665                    if err.merits_discovery_ban() {
666                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
667                            peer_id: *peer_id,
668                            ip_addr: remote_addr.ip(),
669                        })
670                    }
671                }
672            }
673
674            // ban the peer
675            self.ban_peer(*peer_id);
676        } else {
677            let mut backoff_until = None;
678            let mut remove_peer = false;
679
680            if let Some(peer) = self.peers.get_mut(peer_id) {
681                if let Some(kind) = err.should_backoff() {
682                    if peer.is_trusted() || peer.is_static() {
683                        // provide a bit more leeway for trusted peers and use a lower backoff so
684                        // that we keep re-trying them after backing off shortly, but we should at
685                        // least backoff for the low duration to not violate the ip based inbound
686                        // connection throttle that peer has in place, because this peer might not
687                        // have us registered as a trusted peer.
688                        let backoff = self.backoff_durations.low;
689                        backoff_until = Some(std::time::Instant::now() + backoff);
690                        trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
691                    } else {
692                        // Increment peer.backoff_counter
693                        if kind.is_severe() {
694                            peer.severe_backoff_counter =
695                                peer.severe_backoff_counter.saturating_add(1);
696                        }
697                        trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
698
699                        let backoff_time =
700                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
701
702                        // The peer has signaled that it is currently unable to process any more
703                        // connections, so we will hold off on attempting any new connections for a
704                        // while
705                        backoff_until = Some(backoff_time);
706                    }
707                } else {
708                    // If the error was not a backoff error, we reduce the peer's reputation
709                    let reputation_change = self.reputation_weights.change(reputation_change);
710                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
711                };
712
713                self.connection_info.decr_state(peer.state);
714                peer.state = PeerConnectionState::Idle;
715
716                if peer.severe_backoff_counter > self.max_backoff_count &&
717                    !peer.is_trusted() &&
718                    !peer.is_static()
719                {
720                    // mark peer for removal if it has been backoff too many times and is _not_
721                    // trusted or static
722                    remove_peer = true;
723                }
724            }
725
726            // remove peer if it has been marked for removal
727            if remove_peer {
728                trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
729                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
730                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
731            } else if let Some(backoff_until) = backoff_until {
732                // otherwise, backoff the peer if marked as such
733                self.backoff_peer_until(*peer_id, backoff_until);
734            }
735        }
736
737        self.fill_outbound_slots();
738    }
739
740    /// Invoked if a pending session was disconnected because there's already a connection to the
741    /// peer.
742    ///
743    /// If the session was an outgoing connection, this means that the peer initiated a connection
744    /// to us at the same time and this connection is already established.
745    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
746        match direction {
747            Direction::Incoming => {
748                // need to decrement the ingoing counter
749                self.connection_info.decr_pending_in();
750            }
751            Direction::Outgoing(_) => {
752                // cleanup is handled when the incoming active session is activated in
753                // `on_incoming_session_established`
754            }
755        }
756    }
757
758    /// Called for a newly discovered peer.
759    ///
760    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
761    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
762        self.add_peer_kind(peer_id, None, addr, fork_id)
763    }
764
765    /// Marks the given peer as trusted.
766    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
767        self.trusted_peer_ids.insert(peer_id);
768        if let Some(peer) = self.peers.get_mut(&peer_id) {
769            peer.kind = PeerKind::Trusted;
770        }
771    }
772
773    /// Called for a newly discovered trusted peer.
774    ///
775    /// If the peer already exists, then the address and kind will be updated.
776    #[cfg_attr(not(test), expect(dead_code))]
777    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
778        self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
779    }
780
781    /// Called for a newly discovered peer.
782    ///
783    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
784    /// If the peer exists and a [`PeerKind`] is provided then the peer's kind is updated
785    pub(crate) fn add_peer_kind(
786        &mut self,
787        peer_id: PeerId,
788        kind: Option<PeerKind>,
789        addr: PeerAddr,
790        fork_id: Option<ForkId>,
791    ) {
792        let ip_addr = addr.tcp().ip();
793
794        // Check if the IP is in the allowed ranges (netrestrict)
795        if !self.ip_filter.is_allowed(&ip_addr) {
796            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
797            return
798        }
799
800        if self.ban_list.is_banned(&peer_id, &ip_addr) {
801            return
802        }
803
804        match self.peers.entry(peer_id) {
805            Entry::Occupied(mut entry) => {
806                let peer = entry.get_mut();
807                peer.fork_id = fork_id.map(Box::new);
808                peer.addr = addr;
809
810                if let Some(kind) = kind {
811                    peer.kind = kind;
812                }
813
814                if peer.state.is_incoming() {
815                    // now that we have an actual discovered address, for that peer and not just the
816                    // ip of the incoming connection, we don't need to remove the peer after
817                    // disconnecting, See `on_incoming_session_established`
818                    peer.remove_after_disconnect = false;
819                }
820            }
821            Entry::Vacant(entry) => {
822                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
823                let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
824                peer.fork_id = fork_id.map(Box::new);
825                entry.insert(peer);
826                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
827            }
828        }
829
830        if kind.filter(|kind| kind.is_trusted()).is_some() {
831            // also track the peer in the peer id set
832            self.trusted_peer_ids.insert(peer_id);
833        }
834    }
835
836    /// Removes the tracked node from the set.
837    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
838        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
839        if entry.get().is_trusted() {
840            return
841        }
842        let mut peer = entry.remove();
843
844        trace!(target: "net::peers", ?peer_id, "remove discovered node");
845        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
846
847        if peer.state.is_connected() {
848            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
849            // we terminate the active session here, but only remove the peer after the session
850            // was disconnected, this prevents the case where the session is scheduled for
851            // disconnect but the node is immediately rediscovered, See also
852            // [`Self::on_disconnected()`]
853            peer.remove_after_disconnect = true;
854            peer.state.disconnect();
855            self.peers.insert(peer_id, peer);
856            self.queued_actions.push_back(PeerAction::Disconnect {
857                peer_id,
858                reason: Some(DisconnectReason::DisconnectRequested),
859            })
860        }
861    }
862
863    /// Connect to the given peer. NOTE: if the maximum number of outbound sessions is reached,
864    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
865    #[cfg_attr(not(test), expect(dead_code))]
866    pub(crate) fn add_and_connect(
867        &mut self,
868        peer_id: PeerId,
869        addr: PeerAddr,
870        fork_id: Option<ForkId>,
871    ) {
872        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
873    }
874
875    /// Connects a peer and its address with the given kind.
876    ///
877    /// Note: This is invoked on demand via an external command received by the manager
878    pub(crate) fn add_and_connect_kind(
879        &mut self,
880        peer_id: PeerId,
881        kind: PeerKind,
882        addr: PeerAddr,
883        fork_id: Option<ForkId>,
884    ) {
885        let ip_addr = addr.tcp().ip();
886
887        // Check if the IP is in the allowed ranges (netrestrict)
888        if !self.ip_filter.is_allowed(&ip_addr) {
889            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
890            return
891        }
892
893        if self.ban_list.is_banned(&peer_id, &ip_addr) {
894            return
895        }
896
897        match self.peers.entry(peer_id) {
898            Entry::Occupied(mut entry) => {
899                let peer = entry.get_mut();
900                peer.kind = kind;
901                peer.fork_id = fork_id.map(Box::new);
902                peer.addr = addr;
903
904                if peer.state == PeerConnectionState::Idle {
905                    // Try connecting again.
906                    peer.state = PeerConnectionState::PendingOut;
907                    self.connection_info.inc_pending_out();
908                    self.queued_actions
909                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
910                }
911            }
912            Entry::Vacant(entry) => {
913                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
914                let mut peer = Peer::with_kind(addr, kind);
915                peer.state = PeerConnectionState::PendingOut;
916                peer.fork_id = fork_id.map(Box::new);
917                entry.insert(peer);
918                self.connection_info.inc_pending_out();
919                self.queued_actions
920                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
921            }
922        }
923
924        if kind.is_trusted() {
925            self.trusted_peer_ids.insert(peer_id);
926        }
927    }
928
929    /// Removes the tracked node from the trusted set.
930    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
931        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
932        if !entry.get().is_trusted() {
933            return
934        }
935
936        let peer = entry.get_mut();
937        peer.kind = PeerKind::Basic;
938
939        self.trusted_peer_ids.remove(&peer_id);
940    }
941
942    /// Returns the idle peer with the highest reputation.
943    ///
944    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
945    /// not currently marked as banned or backed off.
946    ///
947    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
948    /// `trusted` peers.
949    ///
950    /// Returns `None` if no peer is available.
951    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
952        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
953            !peer.is_backed_off() &&
954                !peer.is_banned() &&
955                peer.state.is_unconnected() &&
956                (!self.trusted_nodes_only || peer.is_trusted())
957        });
958
959        // keep track of the best peer, if there's one
960        let mut best_peer = unconnected.next()?;
961
962        if best_peer.1.is_trusted() || best_peer.1.is_static() {
963            return Some((*best_peer.0, best_peer.1))
964        }
965
966        for maybe_better in unconnected {
967            // if the peer is trusted or static, return it immediately
968            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
969                return Some((*maybe_better.0, maybe_better.1))
970            }
971
972            // otherwise we keep track of the best peer using the reputation
973            if maybe_better.1.reputation > best_peer.1.reputation {
974                best_peer = maybe_better;
975            }
976        }
977        Some((*best_peer.0, best_peer.1))
978    }
979
980    /// If there's capacity for new outbound connections, this will queue new
981    /// [`PeerAction::Connect`] actions.
982    ///
983    /// New connections are only initiated, if slots are available and appropriate peers are
984    /// available.
985    fn fill_outbound_slots(&mut self) {
986        self.tick();
987
988        if !self.net_connection_state.is_active() {
989            // nothing to fill
990            return
991        }
992
993        // as long as there are slots available fill them with the best peers
994        while self.connection_info.has_out_capacity() {
995            let action = {
996                let (peer_id, peer) = match self.best_unconnected() {
997                    Some(peer) => peer,
998                    _ => break,
999                };
1000
1001                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
1002
1003                peer.state = PeerConnectionState::PendingOut;
1004                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
1005            };
1006
1007            self.connection_info.inc_pending_out();
1008
1009            self.queued_actions.push_back(action);
1010        }
1011    }
1012
1013    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1014        if let Some(peer) = self.peers.get_mut(&peer_id) {
1015            let new_addr = PeerAddr::new_with_ports(
1016                new_record.address,
1017                new_record.tcp_port,
1018                Some(new_record.udp_port),
1019            );
1020
1021            if peer.addr != new_addr {
1022                peer.addr = new_addr;
1023                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1024            }
1025        }
1026    }
1027
1028    /// Keeps track of network state changes.
1029    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1030        self.net_connection_state = state;
1031    }
1032
1033    /// Returns the current network connection state.
1034    pub const fn connection_state(&self) -> &NetworkConnectionState {
1035        &self.net_connection_state
1036    }
1037
1038    /// Sets `net_connection_state` to `ShuttingDown`.
1039    pub const fn on_shutdown(&mut self) {
1040        self.net_connection_state = NetworkConnectionState::ShuttingDown;
1041    }
1042
1043    /// Advances the state.
1044    ///
1045    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
1046    /// [`PeersManager`] is polled.
1047    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1048        loop {
1049            // drain buffered actions
1050            if let Some(action) = self.queued_actions.pop_front() {
1051                return Poll::Ready(action)
1052            }
1053
1054            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1055                match cmd {
1056                    PeerCommand::Add(peer_id, addr) => {
1057                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1058                    }
1059                    PeerCommand::Remove(peer) => self.remove_peer(peer),
1060                    PeerCommand::ReputationChange(peer_id, rep) => {
1061                        self.apply_reputation_change(&peer_id, rep)
1062                    }
1063                    PeerCommand::GetPeer(peer, tx) => {
1064                        let _ = tx.send(self.peers.get(&peer).cloned());
1065                    }
1066                    PeerCommand::GetPeers(tx) => {
1067                        let _ = tx.send(self.iter_peers().collect());
1068                    }
1069                }
1070            }
1071
1072            if self.release_interval.poll_tick(cx).is_ready() {
1073                let now = std::time::Instant::now();
1074                let (_, unbanned_peers) = self.ban_list.evict(now);
1075
1076                for peer_id in unbanned_peers {
1077                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1078                        peer.unban();
1079                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1080                    }
1081                }
1082
1083                // clear the backoff list of expired backoffs, and mark the relevant peers as
1084                // ready to be dialed
1085                self.backed_off_peers.retain(|peer_id, until| {
1086                    if now > *until {
1087                        if let Some(peer) = self.peers.get_mut(peer_id) {
1088                            peer.backed_off = false;
1089                        }
1090                        return false
1091                    }
1092                    true
1093                })
1094            }
1095
1096            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1097                self.fill_outbound_slots();
1098            }
1099
1100            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1101                self.on_resolved_peer(peer_id, new_record);
1102            }
1103
1104            if self.queued_actions.is_empty() {
1105                return Poll::Pending
1106            }
1107        }
1108    }
1109}
1110
1111impl Default for PeersManager {
1112    fn default() -> Self {
1113        Self::new(Default::default())
1114    }
1115}
1116
1117/// Tracks stats about connected nodes
1118#[derive(Debug, Clone, PartialEq, Eq, Default)]
1119pub struct ConnectionInfo {
1120    /// Counter for currently occupied slots for active outbound connections.
1121    num_outbound: usize,
1122    /// Counter for pending outbound connections.
1123    num_pending_out: usize,
1124    /// Counter for currently occupied slots for active inbound connections.
1125    num_inbound: usize,
1126    /// Counter for pending inbound connections.
1127    num_pending_in: usize,
1128    /// Restrictions on number of connections.
1129    config: ConnectionsConfig,
1130}
1131
1132// === impl ConnectionInfo ===
1133
1134impl ConnectionInfo {
1135    /// Returns a new [`ConnectionInfo`] with the given config.
1136    const fn new(config: ConnectionsConfig) -> Self {
1137        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1138    }
1139
1140    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1141    const fn has_out_capacity(&self) -> bool {
1142        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1143            self.num_outbound < self.config.max_outbound
1144    }
1145
1146    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1147    const fn has_in_capacity(&self) -> bool {
1148        self.num_inbound < self.config.max_inbound
1149    }
1150
1151    /// Returns `true` if we can handle an additional incoming pending connection.
1152    const fn has_in_pending_capacity(&self) -> bool {
1153        self.num_pending_in < self.config.max_inbound
1154    }
1155
1156    const fn decr_state(&mut self, state: PeerConnectionState) {
1157        match state {
1158            PeerConnectionState::Idle => {}
1159            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1160            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1161            PeerConnectionState::PendingOut => self.decr_pending_out(),
1162        }
1163    }
1164
1165    const fn decr_out(&mut self) {
1166        self.num_outbound -= 1;
1167    }
1168
1169    const fn inc_out(&mut self) {
1170        self.num_outbound += 1;
1171    }
1172
1173    const fn inc_pending_out(&mut self) {
1174        self.num_pending_out += 1;
1175    }
1176
1177    const fn inc_in(&mut self) {
1178        self.num_inbound += 1;
1179    }
1180
1181    const fn inc_pending_in(&mut self) {
1182        self.num_pending_in += 1;
1183    }
1184
1185    const fn decr_in(&mut self) {
1186        self.num_inbound -= 1;
1187    }
1188
1189    const fn decr_pending_out(&mut self) {
1190        self.num_pending_out -= 1;
1191    }
1192
1193    const fn decr_pending_in(&mut self) {
1194        self.num_pending_in -= 1;
1195    }
1196}
1197
1198/// Actions the peer manager can trigger.
1199#[derive(Debug)]
1200pub enum PeerAction {
1201    /// Start a new connection to a peer.
1202    Connect {
1203        /// The peer to connect to.
1204        peer_id: PeerId,
1205        /// Where to reach the node
1206        remote_addr: SocketAddr,
1207    },
1208    /// Disconnect an existing connection.
1209    Disconnect {
1210        /// The peer ID of the established connection.
1211        peer_id: PeerId,
1212        /// An optional reason for the disconnect.
1213        reason: Option<DisconnectReason>,
1214    },
1215    /// Disconnect an existing incoming connection, because the peers reputation is below the
1216    /// banned threshold or is on the [`BanList`]
1217    DisconnectBannedIncoming {
1218        /// The peer ID of the established connection.
1219        peer_id: PeerId,
1220    },
1221    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1222    DisconnectUntrustedIncoming {
1223        /// The peer ID.
1224        peer_id: PeerId,
1225    },
1226    /// Ban the peer in discovery.
1227    DiscoveryBanPeerId {
1228        /// The peer ID.
1229        peer_id: PeerId,
1230        /// The IP address.
1231        ip_addr: IpAddr,
1232    },
1233    /// Ban the IP in discovery.
1234    DiscoveryBanIp {
1235        /// The IP address.
1236        ip_addr: IpAddr,
1237    },
1238    /// Ban the peer temporarily
1239    BanPeer {
1240        /// The peer ID.
1241        peer_id: PeerId,
1242    },
1243    /// Unban the peer temporarily
1244    UnBanPeer {
1245        /// The peer ID.
1246        peer_id: PeerId,
1247    },
1248    /// Emit peerAdded event
1249    PeerAdded(PeerId),
1250    /// Emit peerRemoved event
1251    PeerRemoved(PeerId),
1252}
1253
1254/// Error thrown when an incoming connection is rejected right away
1255#[derive(Debug, Error, PartialEq, Eq)]
1256pub enum InboundConnectionError {
1257    /// The remote's ip address is banned
1258    IpBanned,
1259    /// No capacity for new inbound connections
1260    ExceedsCapacity,
1261}
1262
1263impl Display for InboundConnectionError {
1264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1265        write!(f, "{self:?}")
1266    }
1267}
1268
1269/// The reason a peer was backed off.
1270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1271pub enum BackoffReason {
1272    /// The remote peer responded with `TooManyPeers` (0x04).
1273    TooManyPeers,
1274    /// The session was gracefully closed and we're backing off briefly.
1275    GracefulClose,
1276    /// A connection or protocol-level error occurred.
1277    ConnectionError,
1278}
1279
1280impl BackoffReason {
1281    /// Derives the backoff reason from an optional [`DisconnectReason`].
1282    pub const fn from_disconnect(reason: Option<DisconnectReason>) -> Self {
1283        match reason {
1284            Some(DisconnectReason::TooManyPeers) => Self::TooManyPeers,
1285            _ => Self::ConnectionError,
1286        }
1287    }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292    use alloy_primitives::B512;
1293    use reth_eth_wire::{
1294        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1295        DisconnectReason,
1296    };
1297    use reth_net_banlist::BanList;
1298    use reth_network_api::Direction;
1299    use reth_network_peers::{PeerId, TrustedPeer};
1300    use reth_network_types::{
1301        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1302    };
1303    use std::{
1304        future::{poll_fn, Future},
1305        io,
1306        net::{IpAddr, Ipv4Addr, SocketAddr},
1307        pin::Pin,
1308        task::{Context, Poll},
1309        time::Duration,
1310    };
1311    use url::Host;
1312
1313    use super::PeersManager;
1314    use crate::{
1315        error::SessionError,
1316        peers::{
1317            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1318            PeerConnectionState,
1319        },
1320        session::PendingSessionHandshakeError,
1321        PeersConfig,
1322    };
1323
1324    struct PeerActionFuture<'a> {
1325        peers: &'a mut PeersManager,
1326    }
1327
1328    impl Future for PeerActionFuture<'_> {
1329        type Output = PeerAction;
1330
1331        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1332            self.get_mut().peers.poll(cx)
1333        }
1334    }
1335
1336    macro_rules! event {
1337        ($peers:expr) => {
1338            PeerActionFuture { peers: &mut $peers }.await
1339        };
1340    }
1341
1342    #[tokio::test]
1343    async fn test_insert() {
1344        let peer = PeerId::random();
1345        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1346        let mut peers = PeersManager::default();
1347        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1348
1349        match event!(peers) {
1350            PeerAction::PeerAdded(peer_id) => {
1351                assert_eq!(peer_id, peer);
1352            }
1353            _ => unreachable!(),
1354        }
1355        match event!(peers) {
1356            PeerAction::Connect { peer_id, remote_addr } => {
1357                assert_eq!(peer_id, peer);
1358                assert_eq!(remote_addr, socket_addr);
1359            }
1360            _ => unreachable!(),
1361        }
1362
1363        let (record, _) = peers.peer_by_id(peer).unwrap();
1364        assert_eq!(record.tcp_addr(), socket_addr);
1365        assert_eq!(record.udp_addr(), socket_addr);
1366    }
1367
1368    #[tokio::test]
1369    async fn test_insert_udp() {
1370        let peer = PeerId::random();
1371        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1372        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1373        let mut peers = PeersManager::default();
1374        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1375
1376        match event!(peers) {
1377            PeerAction::PeerAdded(peer_id) => {
1378                assert_eq!(peer_id, peer);
1379            }
1380            _ => unreachable!(),
1381        }
1382        match event!(peers) {
1383            PeerAction::Connect { peer_id, remote_addr } => {
1384                assert_eq!(peer_id, peer);
1385                assert_eq!(remote_addr, tcp_addr);
1386            }
1387            _ => unreachable!(),
1388        }
1389
1390        let (record, _) = peers.peer_by_id(peer).unwrap();
1391        assert_eq!(record.tcp_addr(), tcp_addr);
1392        assert_eq!(record.udp_addr(), udp_addr);
1393    }
1394
1395    #[tokio::test]
1396    async fn test_ban() {
1397        let peer = PeerId::random();
1398        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1399        let mut peers = PeersManager::default();
1400        peers.ban_peer(peer);
1401        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1402
1403        match event!(peers) {
1404            PeerAction::BanPeer { peer_id } => {
1405                assert_eq!(peer_id, peer);
1406            }
1407            _ => unreachable!(),
1408        }
1409
1410        poll_fn(|cx| {
1411            assert!(peers.poll(cx).is_pending());
1412            Poll::Ready(())
1413        })
1414        .await;
1415    }
1416
1417    #[tokio::test]
1418    async fn test_unban() {
1419        let peer = PeerId::random();
1420        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1421        let mut peers = PeersManager::default();
1422        peers.ban_peer(peer);
1423        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1424
1425        match event!(peers) {
1426            PeerAction::BanPeer { peer_id } => {
1427                assert_eq!(peer_id, peer);
1428            }
1429            _ => unreachable!(),
1430        }
1431
1432        poll_fn(|cx| {
1433            assert!(peers.poll(cx).is_pending());
1434            Poll::Ready(())
1435        })
1436        .await;
1437
1438        peers.unban_peer(peer);
1439
1440        match event!(peers) {
1441            PeerAction::UnBanPeer { peer_id } => {
1442                assert_eq!(peer_id, peer);
1443            }
1444            _ => unreachable!(),
1445        }
1446
1447        poll_fn(|cx| {
1448            assert!(peers.poll(cx).is_pending());
1449            Poll::Ready(())
1450        })
1451        .await;
1452    }
1453
1454    #[tokio::test]
1455    async fn test_backoff_on_busy() {
1456        let peer = PeerId::random();
1457        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1458
1459        let mut peers = PeersManager::new(PeersConfig::test());
1460        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1461
1462        match event!(peers) {
1463            PeerAction::PeerAdded(peer_id) => {
1464                assert_eq!(peer_id, peer);
1465            }
1466            _ => unreachable!(),
1467        }
1468        match event!(peers) {
1469            PeerAction::Connect { peer_id, .. } => {
1470                assert_eq!(peer_id, peer);
1471            }
1472            _ => unreachable!(),
1473        }
1474
1475        poll_fn(|cx| {
1476            assert!(peers.poll(cx).is_pending());
1477            Poll::Ready(())
1478        })
1479        .await;
1480
1481        peers.on_active_session_dropped(
1482            &socket_addr,
1483            &peer,
1484            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1485                DisconnectReason::TooManyPeers,
1486            )),
1487        );
1488
1489        poll_fn(|cx| {
1490            assert!(peers.poll(cx).is_pending());
1491            Poll::Ready(())
1492        })
1493        .await;
1494
1495        assert!(peers.backed_off_peers.contains_key(&peer));
1496        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1497
1498        tokio::time::sleep(peers.backoff_durations.low).await;
1499
1500        match event!(peers) {
1501            PeerAction::Connect { peer_id, .. } => {
1502                assert_eq!(peer_id, peer);
1503            }
1504            _ => unreachable!(),
1505        }
1506
1507        assert!(!peers.backed_off_peers.contains_key(&peer));
1508        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1509    }
1510
1511    #[tokio::test]
1512    async fn test_backoff_on_no_response() {
1513        let peer = PeerId::random();
1514        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1515
1516        let backoff_durations = PeerBackoffDurations::test();
1517        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1518        let mut peers = PeersManager::new(config);
1519        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1520
1521        match event!(peers) {
1522            PeerAction::PeerAdded(peer_id) => {
1523                assert_eq!(peer_id, peer);
1524            }
1525            _ => unreachable!(),
1526        }
1527        match event!(peers) {
1528            PeerAction::Connect { peer_id, .. } => {
1529                assert_eq!(peer_id, peer);
1530            }
1531            _ => unreachable!(),
1532        }
1533
1534        poll_fn(|cx| {
1535            assert!(peers.poll(cx).is_pending());
1536            Poll::Ready(())
1537        })
1538        .await;
1539
1540        peers.on_outgoing_pending_session_dropped(
1541            &socket_addr,
1542            &peer,
1543            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1544                EthHandshakeError::NoResponse,
1545            )),
1546        );
1547
1548        poll_fn(|cx| {
1549            assert!(peers.poll(cx).is_pending());
1550            Poll::Ready(())
1551        })
1552        .await;
1553
1554        assert!(peers.backed_off_peers.contains_key(&peer));
1555        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1556
1557        tokio::time::sleep(backoff_durations.high).await;
1558
1559        match event!(peers) {
1560            PeerAction::Connect { peer_id, .. } => {
1561                assert_eq!(peer_id, peer);
1562            }
1563            _ => unreachable!(),
1564        }
1565
1566        assert!(!peers.backed_off_peers.contains_key(&peer));
1567        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1568    }
1569
1570    #[tokio::test]
1571    async fn test_low_backoff() {
1572        let peer = PeerId::random();
1573        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1574        let config = PeersConfig::test();
1575        let mut peers = PeersManager::new(config);
1576        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1577        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1578
1579        let backoff_timestamp = peers
1580            .backoff_durations
1581            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1582
1583        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1584        assert!(backoff_timestamp <= expected);
1585    }
1586
1587    #[tokio::test]
1588    async fn test_multiple_backoff_calculations() {
1589        let peer = PeerId::random();
1590        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1591        let config = PeersConfig::default();
1592        let mut peers = PeersManager::new(config);
1593        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1594        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1595
1596        // Simulate a peer that was already backed off once
1597        peer_struct.severe_backoff_counter = 1;
1598
1599        let now = std::time::Instant::now();
1600
1601        // Simulate the increment that happens in on_connection_failure
1602        peer_struct.severe_backoff_counter += 1;
1603        // Get official backoff time
1604        let backoff_time = peers
1605            .backoff_durations
1606            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1607
1608        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1609        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1610
1611        // We can't use assert_eq! since there is a very small diff in the nano secs
1612        // Usually it is 1800s != 1799.9999996s
1613        assert!(backoff_time.duration_since(now) > backoff_duration);
1614    }
1615
1616    #[tokio::test]
1617    async fn test_ban_on_active_drop() {
1618        let peer = PeerId::random();
1619        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1620        let mut peers = PeersManager::default();
1621        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1622
1623        match event!(peers) {
1624            PeerAction::PeerAdded(peer_id) => {
1625                assert_eq!(peer_id, peer);
1626            }
1627            _ => unreachable!(),
1628        }
1629        match event!(peers) {
1630            PeerAction::Connect { peer_id, .. } => {
1631                assert_eq!(peer_id, peer);
1632            }
1633            _ => unreachable!(),
1634        }
1635
1636        poll_fn(|cx| {
1637            assert!(peers.poll(cx).is_pending());
1638            Poll::Ready(())
1639        })
1640        .await;
1641
1642        peers.on_active_session_dropped(
1643            &socket_addr,
1644            &peer,
1645            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1646                DisconnectReason::UselessPeer,
1647            )),
1648        );
1649
1650        match event!(peers) {
1651            PeerAction::PeerRemoved(peer_id) => {
1652                assert_eq!(peer_id, peer);
1653            }
1654            _ => unreachable!(),
1655        }
1656        match event!(peers) {
1657            PeerAction::BanPeer { peer_id } => {
1658                assert_eq!(peer_id, peer);
1659            }
1660            _ => unreachable!(),
1661        }
1662
1663        poll_fn(|cx| {
1664            assert!(peers.poll(cx).is_pending());
1665            Poll::Ready(())
1666        })
1667        .await;
1668
1669        assert!(!peers.peers.contains_key(&peer));
1670    }
1671
1672    #[tokio::test]
1673    async fn test_remove_on_max_backoff_count() {
1674        let peer = PeerId::random();
1675        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1676        let config = PeersConfig::test();
1677        let mut peers = PeersManager::new(config.clone());
1678        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1679        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1680
1681        // Simulate a peer that was already backed off once
1682        peer_struct.severe_backoff_counter = config.max_backoff_count;
1683
1684        match event!(peers) {
1685            PeerAction::PeerAdded(peer_id) => {
1686                assert_eq!(peer_id, peer);
1687            }
1688            _ => unreachable!(),
1689        }
1690        match event!(peers) {
1691            PeerAction::Connect { peer_id, .. } => {
1692                assert_eq!(peer_id, peer);
1693            }
1694            _ => unreachable!(),
1695        }
1696
1697        poll_fn(|cx| {
1698            assert!(peers.poll(cx).is_pending());
1699            Poll::Ready(())
1700        })
1701        .await;
1702
1703        peers.on_outgoing_pending_session_dropped(
1704            &socket_addr,
1705            &peer,
1706            &PendingSessionHandshakeError::Eth(
1707                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1708            ),
1709        );
1710
1711        match event!(peers) {
1712            PeerAction::PeerRemoved(peer_id) => {
1713                assert_eq!(peer_id, peer);
1714            }
1715            _ => unreachable!(),
1716        }
1717
1718        poll_fn(|cx| {
1719            assert!(peers.poll(cx).is_pending());
1720            Poll::Ready(())
1721        })
1722        .await;
1723
1724        assert!(!peers.peers.contains_key(&peer));
1725    }
1726
1727    #[tokio::test]
1728    async fn test_ban_on_pending_drop() {
1729        let peer = PeerId::random();
1730        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1731        let mut peers = PeersManager::default();
1732        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1733
1734        match event!(peers) {
1735            PeerAction::PeerAdded(peer_id) => {
1736                assert_eq!(peer_id, peer);
1737            }
1738            _ => unreachable!(),
1739        }
1740        match event!(peers) {
1741            PeerAction::Connect { peer_id, .. } => {
1742                assert_eq!(peer_id, peer);
1743            }
1744            _ => unreachable!(),
1745        }
1746
1747        poll_fn(|cx| {
1748            assert!(peers.poll(cx).is_pending());
1749            Poll::Ready(())
1750        })
1751        .await;
1752
1753        peers.on_outgoing_pending_session_dropped(
1754            &socket_addr,
1755            &peer,
1756            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1757                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1758            )),
1759        );
1760
1761        match event!(peers) {
1762            PeerAction::PeerRemoved(peer_id) => {
1763                assert_eq!(peer_id, peer);
1764            }
1765            _ => unreachable!(),
1766        }
1767        match event!(peers) {
1768            PeerAction::BanPeer { peer_id } => {
1769                assert_eq!(peer_id, peer);
1770            }
1771            _ => unreachable!(),
1772        }
1773
1774        poll_fn(|cx| {
1775            assert!(peers.poll(cx).is_pending());
1776            Poll::Ready(())
1777        })
1778        .await;
1779
1780        assert!(!peers.peers.contains_key(&peer));
1781    }
1782
1783    #[tokio::test]
1784    async fn test_internally_closed_incoming() {
1785        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1786        let mut peers = PeersManager::default();
1787
1788        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1789        assert_eq!(peers.connection_info.num_pending_in, 1);
1790        peers.on_incoming_pending_session_rejected_internally();
1791        assert_eq!(peers.connection_info.num_pending_in, 0);
1792    }
1793
1794    #[tokio::test]
1795    async fn test_reject_incoming_at_pending_capacity() {
1796        let mut peers = PeersManager::default();
1797
1798        for count in 1..=peers.connection_info.config.max_inbound {
1799            let socket_addr =
1800                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1801            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1802            assert_eq!(peers.connection_info.num_pending_in, count);
1803        }
1804        assert!(peers.connection_info.has_in_capacity());
1805        assert!(!peers.connection_info.has_in_pending_capacity());
1806
1807        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1808        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1809    }
1810
1811    #[tokio::test]
1812    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1813        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1814        let trusted = PeerId::random();
1815        peers.add_trusted_peer_id(trusted);
1816
1817        // connect the trusted peer
1818        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1819        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1820        peers.on_incoming_session_established(trusted, addr);
1821
1822        match event!(peers) {
1823            PeerAction::PeerAdded(id) => {
1824                assert_eq!(id, trusted);
1825            }
1826            _ => unreachable!(),
1827        }
1828
1829        // saturate the remaining inbound slots with untrusted peers
1830        let mut connected_untrusted_peer_ids = Vec::new();
1831        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1832            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1833            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1834            let peer_id = PeerId::random();
1835            peers.on_incoming_session_established(peer_id, addr);
1836            connected_untrusted_peer_ids.push(peer_id);
1837
1838            match event!(peers) {
1839                PeerAction::PeerAdded(id) => {
1840                    assert_eq!(id, peer_id);
1841                }
1842                _ => unreachable!(),
1843            }
1844        }
1845
1846        let mut pending_addrs = Vec::new();
1847
1848        // saturate available slots
1849        for i in 0..2 {
1850            let socket_addr =
1851                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1852            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1853
1854            pending_addrs.push(socket_addr);
1855        }
1856
1857        assert_eq!(peers.connection_info.num_pending_in, 2);
1858
1859        // try to handle additional incoming connections at capacity
1860        for i in 0..2 {
1861            let socket_addr =
1862                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1863            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1864        }
1865
1866        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1867            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1868                DisconnectReason::UselessPeer,
1869            )),
1870        ));
1871
1872        // Remove all pending peers
1873        for pending_addr in pending_addrs {
1874            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1875        }
1876
1877        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1878
1879        println!(
1880            "num_inbound: {}, has_in_capacity: {}",
1881            peers.connection_info.num_inbound,
1882            peers.connection_info.has_in_capacity()
1883        );
1884
1885        // disconnect a connected peer
1886        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1887
1888        println!(
1889            "num_inbound: {}, has_in_capacity: {}",
1890            peers.connection_info.num_inbound,
1891            peers.connection_info.has_in_capacity()
1892        );
1893
1894        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1895        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1896    }
1897
1898    #[tokio::test]
1899    async fn test_closed_incoming() {
1900        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1901        let mut peers = PeersManager::default();
1902
1903        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1904        assert_eq!(peers.connection_info.num_pending_in, 1);
1905        peers.on_incoming_pending_session_gracefully_closed();
1906        assert_eq!(peers.connection_info.num_pending_in, 0);
1907    }
1908
1909    #[tokio::test]
1910    async fn test_dropped_incoming() {
1911        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1912        let ban_duration = Duration::from_millis(500);
1913        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1914        let mut peers = PeersManager::new(config);
1915
1916        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1917        assert_eq!(peers.connection_info.num_pending_in, 1);
1918        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1919            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1920                DisconnectReason::UselessPeer,
1921            )),
1922        ));
1923
1924        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1925        assert_eq!(peers.connection_info.num_pending_in, 0);
1926        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1927
1928        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1929
1930        // unbanned after timeout
1931        tokio::time::sleep(ban_duration).await;
1932
1933        poll_fn(|cx| {
1934            let _ = peers.poll(cx);
1935            Poll::Ready(())
1936        })
1937        .await;
1938
1939        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1940        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1941    }
1942
1943    #[tokio::test]
1944    async fn test_reputation_change_connected() {
1945        let peer = PeerId::random();
1946        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1947        let mut peers = PeersManager::default();
1948        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1949
1950        match event!(peers) {
1951            PeerAction::PeerAdded(peer_id) => {
1952                assert_eq!(peer_id, peer);
1953            }
1954            _ => unreachable!(),
1955        }
1956        match event!(peers) {
1957            PeerAction::Connect { peer_id, remote_addr } => {
1958                assert_eq!(peer_id, peer);
1959                assert_eq!(remote_addr, socket_addr);
1960            }
1961            _ => unreachable!(),
1962        }
1963
1964        let p = peers.peers.get_mut(&peer).unwrap();
1965        assert_eq!(p.state, PeerConnectionState::PendingOut);
1966
1967        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1968
1969        let p = peers.peers.get(&peer).unwrap();
1970        assert_eq!(p.state, PeerConnectionState::PendingOut);
1971        assert!(p.is_banned());
1972
1973        peers.on_active_session_gracefully_closed(peer);
1974
1975        let p = peers.peers.get(&peer).unwrap();
1976        assert_eq!(p.state, PeerConnectionState::Idle);
1977        assert!(p.is_banned());
1978
1979        match event!(peers) {
1980            PeerAction::Disconnect { peer_id, .. } => {
1981                assert_eq!(peer_id, peer);
1982            }
1983            _ => unreachable!(),
1984        }
1985    }
1986
1987    #[tokio::test]
1988    async fn retain_trusted_status() {
1989        let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1990        let trusted = PeerId::random();
1991        let mut peers =
1992            PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
1993                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
1994                tcp_port: 8008,
1995                udp_port: 8008,
1996                id: trusted,
1997            }]));
1998        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1999        peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
2000        assert!(peers.peers.get(&trusted).unwrap().is_trusted());
2001    }
2002
2003    #[tokio::test]
2004    async fn accept_incoming_trusted_unknown_peer_address() {
2005        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2006        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
2007        // try to connect trusted peer
2008        let trusted = PeerId::random();
2009        peers.add_trusted_peer_id(trusted);
2010
2011        // saturate the inbound slots
2012        for i in 0..peers.connection_info.config.max_inbound {
2013            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
2014            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2015            let peer_id = PeerId::random();
2016            peers.on_incoming_session_established(peer_id, addr);
2017
2018            match event!(peers) {
2019                PeerAction::PeerAdded(id) => {
2020                    assert_eq!(id, peer_id);
2021                }
2022                _ => unreachable!(),
2023            }
2024        }
2025
2026        // try to connect untrusted peer
2027        let untrusted = PeerId::random();
2028        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2029        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2030        peers.on_incoming_session_established(untrusted, socket_addr);
2031
2032        match event!(peers) {
2033            PeerAction::PeerAdded(id) => {
2034                assert_eq!(id, untrusted);
2035            }
2036            _ => unreachable!(),
2037        }
2038
2039        match event!(peers) {
2040            PeerAction::Disconnect { peer_id, reason } => {
2041                assert_eq!(peer_id, untrusted);
2042                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2043            }
2044            _ => unreachable!(),
2045        }
2046
2047        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2048        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2049        peers.on_incoming_session_established(trusted, socket_addr);
2050
2051        match event!(peers) {
2052            PeerAction::PeerAdded(id) => {
2053                assert_eq!(id, trusted);
2054            }
2055            _ => unreachable!(),
2056        }
2057
2058        poll_fn(|cx| {
2059            assert!(peers.poll(cx).is_pending());
2060            Poll::Ready(())
2061        })
2062        .await;
2063
2064        let peer = peers.peers.get(&trusted).unwrap();
2065        assert_eq!(peer.state, PeerConnectionState::In);
2066    }
2067
2068    #[tokio::test]
2069    async fn test_already_connected() {
2070        let peer = PeerId::random();
2071        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2072        let mut peers = PeersManager::default();
2073
2074        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
2075        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2076        assert_eq!(peers.connection_info.num_pending_in, 1);
2077
2078        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
2079        // to increase by 1
2080        peers.on_incoming_session_established(peer, socket_addr);
2081        let p = peers.peers.get_mut(&peer).expect("peer not found");
2082        assert_eq!(p.addr.tcp(), socket_addr);
2083        assert_eq!(peers.connection_info.num_pending_in, 0);
2084        assert_eq!(peers.connection_info.num_inbound, 1);
2085
2086        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
2087        // by 1
2088        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2089        assert_eq!(peers.connection_info.num_pending_in, 1);
2090
2091        // Simulate a rejection due to an already established connection, expecting the
2092        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
2093        // should not be changed.
2094        peers.on_already_connected(Direction::Incoming);
2095
2096        let p = peers.peers.get_mut(&peer).expect("peer not found");
2097        assert_eq!(p.addr.tcp(), socket_addr);
2098        assert_eq!(peers.connection_info.num_pending_in, 0);
2099        assert_eq!(peers.connection_info.num_inbound, 1);
2100    }
2101
2102    #[tokio::test]
2103    async fn test_reputation_change_trusted_peer() {
2104        let peer = PeerId::random();
2105        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2106        let mut peers = PeersManager::default();
2107        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2108
2109        match event!(peers) {
2110            PeerAction::PeerAdded(peer_id) => {
2111                assert_eq!(peer_id, peer);
2112            }
2113            _ => unreachable!(),
2114        }
2115        match event!(peers) {
2116            PeerAction::Connect { peer_id, remote_addr } => {
2117                assert_eq!(peer_id, peer);
2118                assert_eq!(remote_addr, socket_addr);
2119            }
2120            _ => unreachable!(),
2121        }
2122
2123        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2124        peers.on_active_outgoing_established(peer);
2125        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2126
2127        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2128
2129        {
2130            let p = peers.peers.get(&peer).unwrap();
2131            assert_eq!(p.state, PeerConnectionState::Out);
2132            // not banned yet
2133            assert!(!p.is_banned());
2134        }
2135
2136        // ensure peer is banned eventually
2137        loop {
2138            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2139
2140            let p = peers.peers.get(&peer).unwrap();
2141            if p.is_banned() {
2142                break
2143            }
2144        }
2145
2146        match event!(peers) {
2147            PeerAction::Disconnect { peer_id, .. } => {
2148                assert_eq!(peer_id, peer);
2149            }
2150            _ => unreachable!(),
2151        }
2152    }
2153
2154    #[tokio::test]
2155    async fn test_reputation_management() {
2156        let peer = PeerId::random();
2157        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2158        let mut peers = PeersManager::default();
2159        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2160        assert_eq!(peers.get_reputation(&peer), Some(0));
2161
2162        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2163        assert_eq!(peers.get_reputation(&peer), Some(1024));
2164
2165        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2166        assert_eq!(peers.get_reputation(&peer), Some(0));
2167    }
2168
2169    #[tokio::test]
2170    async fn test_remove_discovered_active() {
2171        let peer = PeerId::random();
2172        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2173        let mut peers = PeersManager::default();
2174        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2175
2176        match event!(peers) {
2177            PeerAction::PeerAdded(peer_id) => {
2178                assert_eq!(peer_id, peer);
2179            }
2180            _ => unreachable!(),
2181        }
2182        match event!(peers) {
2183            PeerAction::Connect { peer_id, remote_addr } => {
2184                assert_eq!(peer_id, peer);
2185                assert_eq!(remote_addr, socket_addr);
2186            }
2187            _ => unreachable!(),
2188        }
2189
2190        let p = peers.peers.get(&peer).unwrap();
2191        assert_eq!(p.state, PeerConnectionState::PendingOut);
2192
2193        peers.remove_peer(peer);
2194
2195        match event!(peers) {
2196            PeerAction::PeerRemoved(peer_id) => {
2197                assert_eq!(peer_id, peer);
2198            }
2199            _ => unreachable!(),
2200        }
2201        match event!(peers) {
2202            PeerAction::Disconnect { peer_id, .. } => {
2203                assert_eq!(peer_id, peer);
2204            }
2205            _ => unreachable!(),
2206        }
2207
2208        let p = peers.peers.get(&peer).unwrap();
2209        assert_eq!(p.state, PeerConnectionState::PendingOut);
2210
2211        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2212        let p = peers.peers.get(&peer).unwrap();
2213        assert_eq!(p.state, PeerConnectionState::PendingOut);
2214
2215        peers.on_active_session_gracefully_closed(peer);
2216        assert!(!peers.peers.contains_key(&peer));
2217    }
2218
2219    #[tokio::test]
2220    async fn test_fatal_outgoing_connection_error_trusted() {
2221        let peer = PeerId::random();
2222        let config = PeersConfig::test()
2223            .with_trusted_nodes(vec![TrustedPeer {
2224                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2225                tcp_port: 8008,
2226                udp_port: 8008,
2227                id: peer,
2228            }])
2229            .with_trusted_nodes_only(true);
2230        let mut peers = PeersManager::new(config);
2231        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2232
2233        match event!(peers) {
2234            PeerAction::Connect { peer_id, remote_addr } => {
2235                assert_eq!(peer_id, peer);
2236                assert_eq!(remote_addr, socket_addr);
2237            }
2238            _ => unreachable!(),
2239        }
2240
2241        let p = peers.peers.get(&peer).unwrap();
2242        assert_eq!(p.state, PeerConnectionState::PendingOut);
2243
2244        assert_eq!(peers.num_outbound_connections(), 0);
2245
2246        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2247            EthHandshakeError::NonStatusMessageInHandshake,
2248        ));
2249        assert!(err.is_fatal_protocol_error());
2250
2251        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2252        assert_eq!(peers.num_outbound_connections(), 0);
2253
2254        // try tmp ban peer
2255        match event!(peers) {
2256            PeerAction::BanPeer { peer_id } => {
2257                assert_eq!(peer_id, peer);
2258            }
2259            err => unreachable!("{err:?}"),
2260        }
2261
2262        // ensure we still have trusted peer
2263        assert!(peers.peers.contains_key(&peer));
2264
2265        // await for the ban to expire
2266        tokio::time::sleep(peers.backoff_durations.medium).await;
2267
2268        match event!(peers) {
2269            PeerAction::Connect { peer_id, remote_addr } => {
2270                assert_eq!(peer_id, peer);
2271                assert_eq!(remote_addr, socket_addr);
2272            }
2273            err => unreachable!("{err:?}"),
2274        }
2275    }
2276
2277    #[tokio::test]
2278    async fn test_outgoing_connection_error() {
2279        let peer = PeerId::random();
2280        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2281        let mut peers = PeersManager::default();
2282        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2283
2284        match event!(peers) {
2285            PeerAction::PeerAdded(peer_id) => {
2286                assert_eq!(peer_id, peer);
2287            }
2288            _ => unreachable!(),
2289        }
2290        match event!(peers) {
2291            PeerAction::Connect { peer_id, remote_addr } => {
2292                assert_eq!(peer_id, peer);
2293                assert_eq!(remote_addr, socket_addr);
2294            }
2295            _ => unreachable!(),
2296        }
2297
2298        let p = peers.peers.get(&peer).unwrap();
2299        assert_eq!(p.state, PeerConnectionState::PendingOut);
2300
2301        assert_eq!(peers.num_outbound_connections(), 0);
2302
2303        peers.on_outgoing_connection_failure(
2304            &socket_addr,
2305            &peer,
2306            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2307        );
2308
2309        assert_eq!(peers.num_outbound_connections(), 0);
2310    }
2311
2312    #[tokio::test]
2313    async fn test_outgoing_connection_gracefully_closed() {
2314        let peer = PeerId::random();
2315        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2316        let mut peers = PeersManager::default();
2317        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2318
2319        match event!(peers) {
2320            PeerAction::PeerAdded(peer_id) => {
2321                assert_eq!(peer_id, peer);
2322            }
2323            _ => unreachable!(),
2324        }
2325        match event!(peers) {
2326            PeerAction::Connect { peer_id, remote_addr } => {
2327                assert_eq!(peer_id, peer);
2328                assert_eq!(remote_addr, socket_addr);
2329            }
2330            _ => unreachable!(),
2331        }
2332
2333        let p = peers.peers.get(&peer).unwrap();
2334        assert_eq!(p.state, PeerConnectionState::PendingOut);
2335
2336        assert_eq!(peers.num_outbound_connections(), 0);
2337
2338        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2339
2340        assert_eq!(peers.num_outbound_connections(), 0);
2341        assert_eq!(peers.connection_info.num_pending_out, 0);
2342    }
2343
2344    #[tokio::test]
2345    async fn test_discovery_ban_list() {
2346        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2347        let socket_addr = SocketAddr::new(ip, 8008);
2348        let ban_list = BanList::new(vec![], vec![ip]);
2349        let config = PeersConfig::default().with_ban_list(ban_list);
2350        let mut peer_manager = PeersManager::new(config);
2351        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2352
2353        assert!(peer_manager.peers.is_empty());
2354    }
2355
2356    #[tokio::test]
2357    async fn test_on_pending_ban_list() {
2358        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2359        let socket_addr = SocketAddr::new(ip, 8008);
2360        let ban_list = BanList::new(vec![], vec![ip]);
2361        let config = PeersConfig::test().with_ban_list(ban_list);
2362        let mut peer_manager = PeersManager::new(config);
2363        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2364        // because we have no active peers this should be fine for testings
2365        match a {
2366            Ok(_) => panic!(),
2367            Err(err) => match err {
2368                InboundConnectionError::IpBanned => {
2369                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2370                }
2371                _ => unreachable!(),
2372            },
2373        }
2374    }
2375
2376    #[tokio::test]
2377    async fn test_on_active_inbound_ban_list() {
2378        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2379        let socket_addr = SocketAddr::new(ip, 8008);
2380        let given_peer_id = PeerId::random();
2381        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2382        let config = PeersConfig::test().with_ban_list(ban_list);
2383        let mut peer_manager = PeersManager::new(config);
2384        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2385        // non-trusted nodes should also increase pending_in
2386        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2387        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2388        // after the connection is established, the peer should be removed, the num_pending_in
2389        // should be decreased, and the num_inbound should not be increased
2390        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2391        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2392
2393        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2394            peer_manager.queued_actions.pop_front()
2395        else {
2396            panic!()
2397        };
2398
2399        assert_eq!(peer_id, given_peer_id)
2400    }
2401
2402    #[test]
2403    fn test_connection_limits() {
2404        let mut info = ConnectionInfo::default();
2405        info.inc_in();
2406        assert_eq!(info.num_inbound, 1);
2407        assert_eq!(info.num_outbound, 0);
2408        assert!(info.has_in_capacity());
2409
2410        info.decr_in();
2411        assert_eq!(info.num_inbound, 0);
2412        assert_eq!(info.num_outbound, 0);
2413
2414        info.inc_out();
2415        assert_eq!(info.num_inbound, 0);
2416        assert_eq!(info.num_outbound, 1);
2417        assert!(info.has_out_capacity());
2418
2419        info.decr_out();
2420        assert_eq!(info.num_inbound, 0);
2421        assert_eq!(info.num_outbound, 0);
2422    }
2423
2424    #[test]
2425    fn test_connection_peer_state() {
2426        let mut info = ConnectionInfo::default();
2427        info.inc_in();
2428
2429        info.decr_state(PeerConnectionState::In);
2430        assert_eq!(info.num_inbound, 0);
2431        assert_eq!(info.num_outbound, 0);
2432
2433        info.inc_out();
2434
2435        info.decr_state(PeerConnectionState::Out);
2436        assert_eq!(info.num_inbound, 0);
2437        assert_eq!(info.num_outbound, 0);
2438    }
2439
2440    #[tokio::test]
2441    async fn test_trusted_peers_are_prioritized() {
2442        let trusted_peer = PeerId::random();
2443        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2444        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2445            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2446            tcp_port: 8008,
2447            udp_port: 8008,
2448            id: trusted_peer,
2449        }]);
2450        let mut peers = PeersManager::new(config);
2451
2452        let basic_peer = PeerId::random();
2453        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2454        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2455
2456        match event!(peers) {
2457            PeerAction::PeerAdded(peer_id) => {
2458                assert_eq!(peer_id, basic_peer);
2459            }
2460            _ => unreachable!(),
2461        }
2462        match event!(peers) {
2463            PeerAction::Connect { peer_id, remote_addr } => {
2464                assert_eq!(peer_id, trusted_peer);
2465                assert_eq!(remote_addr, trusted_sock);
2466            }
2467            _ => unreachable!(),
2468        }
2469        match event!(peers) {
2470            PeerAction::Connect { peer_id, remote_addr } => {
2471                assert_eq!(peer_id, basic_peer);
2472                assert_eq!(remote_addr, basic_sock);
2473            }
2474            _ => unreachable!(),
2475        }
2476    }
2477
2478    #[tokio::test]
2479    async fn test_connect_trusted_nodes_only() {
2480        let trusted_peer = PeerId::random();
2481        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2482        let config = PeersConfig::test()
2483            .with_trusted_nodes(vec![TrustedPeer {
2484                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2485                tcp_port: 8008,
2486                udp_port: 8008,
2487                id: trusted_peer,
2488            }])
2489            .with_trusted_nodes_only(true);
2490        let mut peers = PeersManager::new(config);
2491
2492        let basic_peer = PeerId::random();
2493        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2494        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2495
2496        match event!(peers) {
2497            PeerAction::PeerAdded(peer_id) => {
2498                assert_eq!(peer_id, basic_peer);
2499            }
2500            _ => unreachable!(),
2501        }
2502        match event!(peers) {
2503            PeerAction::Connect { peer_id, remote_addr } => {
2504                assert_eq!(peer_id, trusted_peer);
2505                assert_eq!(remote_addr, trusted_sock);
2506            }
2507            _ => unreachable!(),
2508        }
2509        poll_fn(|cx| {
2510            assert!(peers.poll(cx).is_pending());
2511            Poll::Ready(())
2512        })
2513        .await;
2514    }
2515
2516    #[tokio::test]
2517    async fn test_incoming_with_trusted_nodes_only() {
2518        let trusted_peer = PeerId::random();
2519        let config = PeersConfig::test()
2520            .with_trusted_nodes(vec![TrustedPeer {
2521                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2522                tcp_port: 8008,
2523                udp_port: 8008,
2524                id: trusted_peer,
2525            }])
2526            .with_trusted_nodes_only(true);
2527        let mut peers = PeersManager::new(config);
2528
2529        let basic_peer = PeerId::random();
2530        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2531        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2532        // non-trusted nodes should also increase pending_in
2533        assert_eq!(peers.connection_info.num_pending_in, 1);
2534        peers.on_incoming_session_established(basic_peer, basic_sock);
2535        // after the connection is established, the peer should be removed, the num_pending_in
2536        // should be decreased, and the num_inbound mut not be increased
2537        assert_eq!(peers.connection_info.num_pending_in, 0);
2538        assert_eq!(peers.connection_info.num_inbound, 0);
2539
2540        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2541            peers.queued_actions.pop_front()
2542        else {
2543            panic!()
2544        };
2545        assert_eq!(basic_peer, peer_id);
2546        assert!(!peers.peers.contains_key(&basic_peer));
2547    }
2548
2549    #[tokio::test]
2550    async fn test_incoming_without_trusted_nodes_only() {
2551        let trusted_peer = PeerId::random();
2552        let config = PeersConfig::test()
2553            .with_trusted_nodes(vec![TrustedPeer {
2554                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2555                tcp_port: 8008,
2556                udp_port: 8008,
2557                id: trusted_peer,
2558            }])
2559            .with_trusted_nodes_only(false);
2560        let mut peers = PeersManager::new(config);
2561
2562        let basic_peer = PeerId::random();
2563        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2564        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2565
2566        // non-trusted nodes should also increase pending_in
2567        assert_eq!(peers.connection_info.num_pending_in, 1);
2568        peers.on_incoming_session_established(basic_peer, basic_sock);
2569        // after the connection is established, the peer should be removed, the num_pending_in
2570        // should be decreased, and the num_inbound must be increased
2571        assert_eq!(peers.connection_info.num_pending_in, 0);
2572        assert_eq!(peers.connection_info.num_inbound, 1);
2573        assert!(peers.peers.contains_key(&basic_peer));
2574    }
2575
2576    #[tokio::test]
2577    async fn test_incoming_at_capacity() {
2578        let mut config = PeersConfig::test();
2579        config.connection_info.max_inbound = 1;
2580        let mut peers = PeersManager::new(config);
2581
2582        let peer = PeerId::random();
2583        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2584        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2585
2586        peers.on_incoming_session_established(peer, addr);
2587
2588        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2589        assert_eq!(
2590            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2591            InboundConnectionError::ExceedsCapacity
2592        );
2593    }
2594
2595    #[tokio::test]
2596    async fn test_incoming_rate_limit() {
2597        let config = PeersConfig {
2598            incoming_ip_throttle_duration: Duration::from_millis(100),
2599            ..PeersConfig::test()
2600        };
2601        let mut peers = PeersManager::new(config);
2602
2603        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2604        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2605        assert_eq!(
2606            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2607            InboundConnectionError::IpBanned
2608        );
2609
2610        peers.release_interval.reset_immediately();
2611        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2612
2613        // await unban
2614        poll_fn(|cx| loop {
2615            if peers.poll(cx).is_pending() {
2616                return Poll::Ready(());
2617            }
2618        })
2619        .await;
2620
2621        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2622        assert_eq!(
2623            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2624            InboundConnectionError::IpBanned
2625        );
2626    }
2627
2628    #[tokio::test]
2629    async fn test_tick() {
2630        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2631        let socket_addr = SocketAddr::new(ip, 8008);
2632        let config = PeersConfig::test();
2633        let mut peer_manager = PeersManager::new(config);
2634        let peer_id = PeerId::random();
2635        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2636
2637        tokio::time::sleep(Duration::from_secs(1)).await;
2638        peer_manager.tick();
2639
2640        // still unconnected
2641        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2642
2643        // mark as connected
2644        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2645
2646        tokio::time::sleep(Duration::from_secs(1)).await;
2647        peer_manager.tick();
2648
2649        // still at default reputation
2650        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2651
2652        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2653
2654        tokio::time::sleep(Duration::from_secs(1)).await;
2655        peer_manager.tick();
2656
2657        // tick applied
2658        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2659    }
2660
2661    #[tokio::test]
2662    async fn test_remove_incoming_after_disconnect() {
2663        let peer_id = PeerId::random();
2664        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2665        let mut peers = PeersManager::default();
2666
2667        peers.on_incoming_pending_session(addr.ip()).unwrap();
2668        peers.on_incoming_session_established(peer_id, addr);
2669        let peer = peers.peers.get(&peer_id).unwrap();
2670        assert_eq!(peer.state, PeerConnectionState::In);
2671        assert!(peer.remove_after_disconnect);
2672
2673        peers.on_active_session_gracefully_closed(peer_id);
2674        assert!(!peers.peers.contains_key(&peer_id))
2675    }
2676
2677    #[tokio::test]
2678    async fn test_keep_incoming_after_disconnect_if_discovered() {
2679        let peer_id = PeerId::random();
2680        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2681        let mut peers = PeersManager::default();
2682
2683        peers.on_incoming_pending_session(addr.ip()).unwrap();
2684        peers.on_incoming_session_established(peer_id, addr);
2685        let peer = peers.peers.get(&peer_id).unwrap();
2686        assert_eq!(peer.state, PeerConnectionState::In);
2687        assert!(peer.remove_after_disconnect);
2688
2689        // trigger discovery manually while the peer is still connected
2690        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2691
2692        peers.on_active_session_gracefully_closed(peer_id);
2693
2694        let peer = peers.peers.get(&peer_id).unwrap();
2695        assert_eq!(peer.state, PeerConnectionState::Idle);
2696        assert!(!peer.remove_after_disconnect);
2697    }
2698
2699    #[tokio::test]
2700    async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2701        let throttle_duration = Duration::from_millis(100);
2702        let config =
2703            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2704        let mut peers = PeersManager::new(config);
2705
2706        let peer_id = PeerId::random();
2707        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2708
2709        // Add as regular peer
2710        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2711
2712        match event!(peers) {
2713            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2714            _ => unreachable!(),
2715        }
2716
2717        match event!(peers) {
2718            PeerAction::Connect { .. } => {}
2719            _ => unreachable!(),
2720        }
2721
2722        // Simulate outbound connection established
2723        peers.on_active_outgoing_established(peer_id);
2724        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2725
2726        // Gracefully close the session
2727        peers.on_active_session_gracefully_closed(peer_id);
2728
2729        let peer = peers.peers.get(&peer_id).unwrap();
2730        assert_eq!(peer.state, PeerConnectionState::Idle);
2731        assert!(peer.backed_off);
2732
2733        // Verify the peer is in the backed_off_peers set
2734        assert!(peers.backed_off_peers.contains_key(&peer_id));
2735
2736        // Immediately try to poll - should not trigger any actions yet
2737        poll_fn(|cx| {
2738            assert!(peers.poll(cx).is_pending());
2739            Poll::Ready(())
2740        })
2741        .await;
2742
2743        // Peer should still be backed off
2744        assert!(peers.backed_off_peers.contains_key(&peer_id));
2745        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2746
2747        // Sleep for the throttle duration
2748        tokio::time::sleep(throttle_duration).await;
2749
2750        // After throttle duration, event! will poll until we get a Connect action
2751        match event!(peers) {
2752            PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
2753            _ => unreachable!(),
2754        }
2755
2756        // After connection is initiated, peer should no longer be backed off
2757        assert!(!peers.backed_off_peers.contains_key(&peer_id));
2758        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2759    }
2760
2761    #[tokio::test]
2762    async fn test_backed_off_peer_can_accept_incoming_connection() {
2763        let throttle_duration = Duration::from_millis(100);
2764        let config =
2765            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2766        let mut peers = PeersManager::new(config);
2767
2768        let peer_id = PeerId::random();
2769        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2770
2771        // Add as regular peer
2772        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2773
2774        match event!(peers) {
2775            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2776            _ => unreachable!(),
2777        }
2778
2779        match event!(peers) {
2780            PeerAction::Connect { .. } => {}
2781            _ => unreachable!(),
2782        }
2783
2784        // Simulate outbound connection established
2785        peers.on_active_outgoing_established(peer_id);
2786        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2787
2788        // Gracefully close the session - this will back off the peer
2789        peers.on_active_session_gracefully_closed(peer_id);
2790
2791        let peer = peers.peers.get(&peer_id).unwrap();
2792        assert_eq!(peer.state, PeerConnectionState::Idle);
2793        assert!(peer.backed_off);
2794        assert!(peers.backed_off_peers.contains_key(&peer_id));
2795
2796        // Now simulate an incoming connection from the backed-off peer
2797        // First, handle the incoming pending session
2798        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2799        assert_eq!(peers.connection_info.num_pending_in, 1);
2800
2801        // Establish the incoming session
2802        peers.on_incoming_session_established(peer_id, addr);
2803
2804        // Peer should have been added to incoming connections
2805        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2806        assert_eq!(peers.connection_info.num_inbound, 1);
2807
2808        // Peer should still be backed off for outbound connections
2809        assert!(peers.backed_off_peers.contains_key(&peer_id));
2810        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2811
2812        // Verify we don't try to reconnect outbound while peer is backed off
2813        poll_fn(|cx| {
2814            assert!(peers.poll(cx).is_pending());
2815            Poll::Ready(())
2816        })
2817        .await;
2818
2819        // No outbound connection should be attempted while backed off
2820        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2821
2822        // After throttle duration, the backoff should be cleared
2823        tokio::time::sleep(throttle_duration).await;
2824
2825        poll_fn(|cx| {
2826            let _ = peers.poll(cx);
2827            Poll::Ready(())
2828        })
2829        .await;
2830
2831        // Backoff should be cleared now
2832        assert!(!peers.backed_off_peers.contains_key(&peer_id));
2833        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2834
2835        // Peer should still be in incoming state
2836        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2837    }
2838
2839    #[tokio::test]
2840    async fn test_incoming_outgoing_already_connected() {
2841        let peer_id = PeerId::random();
2842        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2843        let mut peers = PeersManager::default();
2844
2845        peers.on_incoming_pending_session(addr.ip()).unwrap();
2846        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2847
2848        match event!(peers) {
2849            PeerAction::PeerAdded(_) => {}
2850            _ => unreachable!(),
2851        }
2852
2853        match event!(peers) {
2854            PeerAction::Connect { .. } => {}
2855            _ => unreachable!(),
2856        }
2857
2858        peers.on_incoming_session_established(peer_id, addr);
2859        peers.on_already_connected(Direction::Outgoing(peer_id));
2860        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2861        assert_eq!(peers.connection_info.num_inbound, 1);
2862        assert_eq!(peers.connection_info.num_pending_out, 0);
2863        assert_eq!(peers.connection_info.num_pending_in, 0);
2864        assert_eq!(peers.connection_info.num_outbound, 0);
2865    }
2866
2867    #[tokio::test]
2868    async fn test_already_connected_incoming_outgoing_connection_error() {
2869        let peer_id = PeerId::random();
2870        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2871        let mut peers = PeersManager::default();
2872
2873        peers.on_incoming_pending_session(addr.ip()).unwrap();
2874        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2875
2876        match event!(peers) {
2877            PeerAction::PeerAdded(_) => {}
2878            _ => unreachable!(),
2879        }
2880
2881        match event!(peers) {
2882            PeerAction::Connect { .. } => {}
2883            _ => unreachable!(),
2884        }
2885
2886        peers.on_incoming_session_established(peer_id, addr);
2887
2888        peers.on_outgoing_connection_failure(
2889            &addr,
2890            &peer_id,
2891            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2892        );
2893        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2894        assert_eq!(peers.connection_info.num_inbound, 1);
2895        assert_eq!(peers.connection_info.num_pending_out, 0);
2896        assert_eq!(peers.connection_info.num_pending_in, 0);
2897        assert_eq!(peers.connection_info.num_outbound, 0);
2898    }
2899
2900    #[tokio::test]
2901    async fn test_max_concurrent_dials() {
2902        let config = PeersConfig::default();
2903        let mut peer_manager = PeersManager::new(config);
2904        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2905        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2906        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2907            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2908        }
2909
2910        peer_manager.fill_outbound_slots();
2911        let dials = peer_manager
2912            .queued_actions
2913            .iter()
2914            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2915            .count();
2916        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2917    }
2918
2919    #[tokio::test]
2920    async fn test_max_num_of_pending_dials() {
2921        let config = PeersConfig::default();
2922        let mut peer_manager = PeersManager::new(config);
2923        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2924        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2925
2926        // add more peers than allowed
2927        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2928            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2929        }
2930
2931        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2932            match event!(peer_manager) {
2933                PeerAction::PeerAdded(_) => {}
2934                _ => unreachable!(),
2935            }
2936        }
2937
2938        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2939            match event!(peer_manager) {
2940                PeerAction::Connect { .. } => {}
2941                _ => unreachable!(),
2942            }
2943        }
2944
2945        // generate 'Connect' actions
2946        peer_manager.fill_outbound_slots();
2947
2948        // all dialed connections should be in 'PendingOut' state
2949        let dials = peer_manager.connection_info.num_pending_out;
2950        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2951
2952        let num_pendingout_states = peer_manager
2953            .peers
2954            .iter()
2955            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2956            .map(|(peer_id, _)| *peer_id)
2957            .collect::<Vec<PeerId>>();
2958        assert_eq!(
2959            num_pendingout_states.len(),
2960            peer_manager.connection_info.config.max_concurrent_outbound_dials
2961        );
2962
2963        // establish dialed connections
2964        for peer_id in &num_pendingout_states {
2965            peer_manager.on_active_outgoing_established(*peer_id);
2966        }
2967
2968        // all dialed connections should now be in 'Out' state
2969        for peer_id in &num_pendingout_states {
2970            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2971        }
2972
2973        // no more pending outbound connections
2974        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2975    }
2976
2977    #[tokio::test]
2978    async fn test_connect() {
2979        let peer = PeerId::random();
2980        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2981        let mut peers = PeersManager::default();
2982        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2983        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2984
2985        match event!(peers) {
2986            PeerAction::Connect { peer_id, remote_addr } => {
2987                assert_eq!(peer_id, peer);
2988                assert_eq!(remote_addr, socket_addr);
2989            }
2990            _ => unreachable!(),
2991        }
2992
2993        let (record, _) = peers.peer_by_id(peer).unwrap();
2994        assert_eq!(record.tcp_addr(), socket_addr);
2995        assert_eq!(record.udp_addr(), socket_addr);
2996
2997        // connect again
2998        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2999
3000        let (record, _) = peers.peer_by_id(peer).unwrap();
3001        assert_eq!(record.tcp_addr(), socket_addr);
3002        assert_eq!(record.udp_addr(), socket_addr);
3003    }
3004
3005    #[tokio::test]
3006    async fn test_incoming_connection_from_banned() {
3007        let peer = PeerId::random();
3008        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3009        let config = PeersConfig::test().with_max_inbound(3);
3010        let mut peers = PeersManager::new(config);
3011        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
3012
3013        match event!(peers) {
3014            PeerAction::PeerAdded(peer_id) => {
3015                assert_eq!(peer_id, peer);
3016            }
3017            _ => unreachable!(),
3018        }
3019        match event!(peers) {
3020            PeerAction::Connect { peer_id, .. } => {
3021                assert_eq!(peer_id, peer);
3022            }
3023            _ => unreachable!(),
3024        }
3025
3026        poll_fn(|cx| {
3027            assert!(peers.poll(cx).is_pending());
3028            Poll::Ready(())
3029        })
3030        .await;
3031
3032        // simulate new connection drops with error
3033        loop {
3034            peers.on_active_session_dropped(
3035                &socket_addr,
3036                &peer,
3037                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3038                    reth_eth_wire::EthVersion::Eth68,
3039                    reth_eth_wire::EthMessageID::Status,
3040                )),
3041            );
3042
3043            if peers.peers.get(&peer).unwrap().is_banned() {
3044                break;
3045            }
3046
3047            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3048            peers.on_incoming_session_established(peer, socket_addr);
3049
3050            match event!(peers) {
3051                PeerAction::Connect { peer_id, .. } => {
3052                    assert_eq!(peer_id, peer);
3053                }
3054                _ => unreachable!(),
3055            }
3056        }
3057
3058        assert!(peers.peers.get(&peer).unwrap().is_banned());
3059
3060        // fill all incoming slots
3061        for _ in 0..peers.connection_info.config.max_inbound {
3062            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3063            peers.on_incoming_session_established(peer, socket_addr);
3064
3065            match event!(peers) {
3066                PeerAction::DisconnectBannedIncoming { peer_id } => {
3067                    assert_eq!(peer_id, peer);
3068                }
3069                _ => unreachable!(),
3070            }
3071        }
3072
3073        poll_fn(|cx| {
3074            assert!(peers.poll(cx).is_pending());
3075            Poll::Ready(())
3076        })
3077        .await;
3078
3079        assert_eq!(peers.connection_info.num_inbound, 0);
3080
3081        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3082
3083        // Assert we can still accept new connections
3084        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3085        assert_eq!(peers.connection_info.num_pending_in, 1);
3086
3087        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
3088        // connection info is updated via the peer's state which would be a noop here since the
3089        // banned peer's state is idle
3090        peers.on_active_session_gracefully_closed(peer);
3091        assert_eq!(peers.connection_info.num_inbound, 0);
3092    }
3093
3094    #[tokio::test]
3095    async fn test_add_pending_connect() {
3096        let peer = PeerId::random();
3097        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3098        let mut peers = PeersManager::default();
3099        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3100        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3101        assert_eq!(peers.connection_info.num_pending_out, 1);
3102    }
3103
3104    #[tokio::test]
3105    async fn test_dns_updates_peer_address() {
3106        let peer_id = PeerId::random();
3107        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3108        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3109
3110        let trusted = TrustedPeer {
3111            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3112            tcp_port: 8008,
3113            udp_port: 8008,
3114            id: peer_id,
3115        };
3116
3117        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3118        let mut manager = PeersManager::new(config);
3119        manager
3120            .trusted_peers_resolver
3121            .set_interval(tokio::time::interval(Duration::from_millis(1)));
3122
3123        manager.peers.insert(
3124            peer_id,
3125            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3126        );
3127
3128        for _ in 0..100 {
3129            let _ = event!(manager);
3130            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3131                break;
3132            }
3133            tokio::time::sleep(Duration::from_millis(10)).await;
3134        }
3135
3136        let updated_peer = manager.peers.get(&peer_id).unwrap();
3137        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3138    }
3139
3140    #[tokio::test]
3141    async fn test_ip_filter_blocks_inbound_connection() {
3142        use reth_net_banlist::IpFilter;
3143        use std::net::IpAddr;
3144
3145        // Create a filter that only allows 192.168.0.0/16
3146        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3147        let config = PeersConfig::test().with_ip_filter(ip_filter);
3148        let mut peers = PeersManager::new(config);
3149
3150        // Try to connect from an allowed IP
3151        let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3152        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3153
3154        // Try to connect from a disallowed IP
3155        let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3156        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3157    }
3158
3159    #[tokio::test]
3160    async fn test_ip_filter_blocks_outbound_connection() {
3161        use reth_net_banlist::IpFilter;
3162        use std::net::SocketAddr;
3163
3164        // Create a filter that only allows 192.168.0.0/16
3165        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3166        let config = PeersConfig::test().with_ip_filter(ip_filter);
3167        let mut peers = PeersManager::new(config);
3168
3169        let peer_id = PeerId::new([1; 64]);
3170
3171        // Try to add a peer with an allowed IP
3172        let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3173        peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3174        assert!(peers.peers.contains_key(&peer_id));
3175
3176        // Try to add a peer with a disallowed IP
3177        let peer_id2 = PeerId::new([2; 64]);
3178        let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3179        peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3180        assert!(!peers.peers.contains_key(&peer_id2));
3181    }
3182
3183    #[tokio::test]
3184    async fn test_ip_filter_ipv6() {
3185        use reth_net_banlist::IpFilter;
3186        use std::net::IpAddr;
3187
3188        // Create a filter that only allows IPv6 range 2001:db8::/32
3189        let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3190        let config = PeersConfig::test().with_ip_filter(ip_filter);
3191        let mut peers = PeersManager::new(config);
3192
3193        // Try to connect from an allowed IPv6 address
3194        let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3195        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3196
3197        // Try to connect from a disallowed IPv6 address
3198        let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3199        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3200    }
3201
3202    #[tokio::test]
3203    async fn test_ip_filter_multiple_ranges() {
3204        use reth_net_banlist::IpFilter;
3205        use std::net::IpAddr;
3206
3207        // Create a filter that allows multiple ranges
3208        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3209        let config = PeersConfig::test().with_ip_filter(ip_filter);
3210        let mut peers = PeersManager::new(config);
3211
3212        // Try IPs from both allowed ranges
3213        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3214        let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3215        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3216        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3217
3218        // Try IP from disallowed range
3219        let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3220        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3221    }
3222
3223    #[tokio::test]
3224    async fn test_ip_filter_no_restriction() {
3225        use reth_net_banlist::IpFilter;
3226        use std::net::IpAddr;
3227
3228        // Create a filter with no restrictions (allow all)
3229        let ip_filter = IpFilter::allow_all();
3230        let config = PeersConfig::test().with_ip_filter(ip_filter);
3231        let mut peers = PeersManager::new(config);
3232
3233        // All IPs should be allowed
3234        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3235        let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3236        let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3237        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3238        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3239        assert!(peers.on_incoming_pending_session(ip3).is_ok());
3240    }
3241}