reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    is_connection_failed_reputation,
18    peers::{
19        config::PeerBackoffDurations,
20        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21    },
22    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27    fmt::Display,
28    io::{self},
29    net::{IpAddr, SocketAddr},
30    task::{Context, Poll},
31    time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35    sync::mpsc,
36    time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41/// Maintains the state of _all_ the peers known to the network.
42///
43/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
44/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
45///
46/// The [`PeersManager`] will be notified on peer related changes
47#[derive(Debug)]
48pub struct PeersManager {
49    /// All peers known to the network
50    peers: HashMap<PeerId, Peer>,
51    /// The set of trusted peer ids.
52    ///
53    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
54    /// an address: [`Self::add_trusted_peer_id`]
55    trusted_peer_ids: HashSet<PeerId>,
56    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
57    /// peer's address when the DNS records change.
58    trusted_peers_resolver: TrustedPeersResolver,
59    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
60    manager_tx: mpsc::UnboundedSender<PeerCommand>,
61    /// Receiver half of the command channel.
62    handle_rx: UnboundedReceiverStream<PeerCommand>,
63    /// Buffered actions until the manager is polled.
64    queued_actions: VecDeque<PeerAction>,
65    /// Interval for triggering connections if there are free slots.
66    refill_slots_interval: Interval,
67    /// How to weigh reputation changes
68    reputation_weights: ReputationChangeWeights,
69    /// Tracks current slot stats.
70    connection_info: ConnectionInfo,
71    /// Tracks unwanted ips/peer ids.
72    ban_list: BanList,
73    /// Tracks currently backed off peers.
74    backed_off_peers: HashMap<PeerId, std::time::Instant>,
75    /// Interval at which to check for peers to unban and release from the backoff map.
76    release_interval: Interval,
77    /// How long to ban bad peers.
78    ban_duration: Duration,
79    /// How long peers to which we could not connect for non-fatal reasons, e.g.
80    /// [`DisconnectReason::TooManyPeers`], are put in time out.
81    backoff_durations: PeerBackoffDurations,
82    /// If non-trusted peers should be connected to, or the connection from non-trusted
83    /// incoming peers should be accepted.
84    trusted_nodes_only: bool,
85    /// Timestamp of the last time [`Self::tick`] was called.
86    last_tick: Instant,
87    /// Maximum number of backoff attempts before we give up on a peer and dropping.
88    max_backoff_count: u8,
89    /// Tracks the connection state of the node
90    net_connection_state: NetworkConnectionState,
91    /// How long to temporarily ban ip on an incoming connection attempt.
92    incoming_ip_throttle_duration: Duration,
93    /// IP address filter for restricting network connections to specific IP ranges.
94    ip_filter: reth_net_banlist::IpFilter,
95}
96
97impl PeersManager {
98    /// Create a new instance with the given config
99    pub fn new(config: PeersConfig) -> Self {
100        let PeersConfig {
101            refill_slots_interval,
102            connection_info,
103            reputation_weights,
104            ban_list,
105            ban_duration,
106            backoff_durations,
107            trusted_nodes,
108            trusted_nodes_only,
109            trusted_nodes_resolution_interval,
110            basic_nodes,
111            max_backoff_count,
112            incoming_ip_throttle_duration,
113            ip_filter,
114        } = config;
115        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
116        let now = Instant::now();
117
118        // We use half of the interval to decrease the max duration to `150%` in worst case
119        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
120
121        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
122        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
123
124        for trusted_peer in &trusted_nodes {
125            match trusted_peer.resolve_blocking() {
126                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
127                    trusted_peer_ids.insert(id);
128                    peers.entry(id).or_insert_with(|| {
129                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
130                    });
131                }
132                Err(err) => {
133                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
134                }
135            }
136        }
137
138        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
139            peers.entry(id).or_insert_with(|| {
140                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
141            });
142        }
143
144        trace!(target: "net::peers", trusted_peers=?trusted_peer_ids, "Initialized peers manager");
145
146        Self {
147            peers,
148            trusted_peer_ids,
149            trusted_peers_resolver: TrustedPeersResolver::new(
150                trusted_nodes,
151                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
152            ),
153            manager_tx,
154            handle_rx: UnboundedReceiverStream::new(handle_rx),
155            queued_actions: Default::default(),
156            reputation_weights,
157            refill_slots_interval: tokio::time::interval(refill_slots_interval),
158            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
159            connection_info: ConnectionInfo::new(connection_info),
160            ban_list,
161            backed_off_peers: Default::default(),
162            ban_duration,
163            backoff_durations,
164            trusted_nodes_only,
165            last_tick: Instant::now(),
166            max_backoff_count,
167            net_connection_state: NetworkConnectionState::default(),
168            incoming_ip_throttle_duration,
169            ip_filter,
170        }
171    }
172
173    /// Returns a new [`PeersHandle`] that can send commands to this type.
174    pub(crate) fn handle(&self) -> PeersHandle {
175        PeersHandle::new(self.manager_tx.clone())
176    }
177
178    /// Returns the number of peers in the peer set
179    #[inline]
180    pub(crate) fn num_known_peers(&self) -> usize {
181        self.peers.len()
182    }
183
184    /// Returns an iterator over all peers
185    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
186        self.peers.iter().map(|(peer_id, v)| {
187            NodeRecord::new_with_ports(
188                v.addr.tcp().ip(),
189                v.addr.tcp().port(),
190                v.addr.udp().map(|addr| addr.port()),
191                *peer_id,
192            )
193        })
194    }
195
196    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
197    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
198        self.peers.get(&peer_id).map(|v| {
199            (
200                NodeRecord::new_with_ports(
201                    v.addr.tcp().ip(),
202                    v.addr.tcp().port(),
203                    v.addr.udp().map(|addr| addr.port()),
204                    peer_id,
205                ),
206                v.kind,
207            )
208        })
209    }
210
211    /// Returns an iterator over all peer ids for peers with the given kind
212    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
213        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
214    }
215
216    /// Returns the number of currently active inbound connections.
217    #[inline]
218    pub(crate) const fn num_inbound_connections(&self) -> usize {
219        self.connection_info.num_inbound
220    }
221
222    /// Returns the number of currently __active__ outbound connections.
223    #[inline]
224    pub(crate) const fn num_outbound_connections(&self) -> usize {
225        self.connection_info.num_outbound
226    }
227
228    /// Returns the number of currently pending outbound connections.
229    #[inline]
230    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
231        self.connection_info.num_pending_out
232    }
233
234    /// Returns the number of currently backed off peers.
235    #[inline]
236    pub(crate) fn num_backed_off_peers(&self) -> usize {
237        self.backed_off_peers.len()
238    }
239
240    /// Returns the number of idle trusted peers.
241    fn num_idle_trusted_peers(&self) -> usize {
242        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
243    }
244
245    /// Invoked when a new _incoming_ tcp connection is accepted.
246    ///
247    /// returns an error if the inbound ip address is on the ban list
248    pub(crate) fn on_incoming_pending_session(
249        &mut self,
250        addr: IpAddr,
251    ) -> Result<(), InboundConnectionError> {
252        // Check if the IP is in the allowed ranges (netrestrict)
253        if !self.ip_filter.is_allowed(&addr) {
254            trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
255            return Err(InboundConnectionError::IpBanned)
256        }
257
258        if self.ban_list.is_banned_ip(&addr) {
259            return Err(InboundConnectionError::IpBanned)
260        }
261
262        // check if we even have slots for a new incoming connection
263        if !self.connection_info.has_in_capacity() {
264            if self.trusted_peer_ids.is_empty() {
265                // if we don't have any incoming slots and no trusted peers, we don't accept any new
266                // connections
267                return Err(InboundConnectionError::ExceedsCapacity)
268            }
269
270            // there's an edge case here where no incoming connections besides from trusted peers
271            // are allowed (max_inbound == 0), in which case we still need to allow new pending
272            // incoming connections until all trusted peers are connected.
273            let num_idle_trusted_peers = self.num_idle_trusted_peers();
274            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
275                // we still want to limit concurrent pending connections
276                let max_inbound =
277                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
278                if self.connection_info.num_pending_in < max_inbound {
279                    self.connection_info.inc_pending_in();
280                    return Ok(())
281                }
282            }
283
284            // all trusted peers are either connected or connecting
285            return Err(InboundConnectionError::ExceedsCapacity)
286        }
287
288        // also cap the incoming connections we can process at once
289        if !self.connection_info.has_in_pending_capacity() {
290            return Err(InboundConnectionError::ExceedsCapacity)
291        }
292
293        // apply the rate limit
294        self.throttle_incoming_ip(addr);
295
296        self.connection_info.inc_pending_in();
297        Ok(())
298    }
299
300    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
301    /// rejected.
302    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
303        self.connection_info.decr_pending_in();
304    }
305
306    /// Invoked when a pending session was closed.
307    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
308        self.connection_info.decr_pending_in()
309    }
310
311    /// Invoked when a pending session was closed.
312    pub(crate) fn on_incoming_pending_session_dropped(
313        &mut self,
314        remote_addr: SocketAddr,
315        err: &PendingSessionHandshakeError,
316    ) {
317        if err.is_fatal_protocol_error() {
318            self.ban_ip(remote_addr.ip());
319
320            if err.merits_discovery_ban() {
321                self.queued_actions
322                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
323            }
324        }
325
326        self.connection_info.decr_pending_in();
327    }
328
329    /// Called when a new _incoming_ active session was established to the given peer.
330    ///
331    /// This will update the state of the peer if not yet tracked.
332    ///
333    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
334    /// be scheduled.
335    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
336        self.connection_info.decr_pending_in();
337
338        // we only need to check the peer id here as the ip address will have been checked at
339        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
340        if self.ban_list.is_banned_peer(&peer_id) {
341            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
342            return
343        }
344
345        // check if the peer is trustable or not
346        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
347        if self.trusted_nodes_only && !is_trusted {
348            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
349            return
350        }
351
352        // start a new tick, so the peer is not immediately rewarded for the time since last tick
353        self.tick();
354
355        match self.peers.entry(peer_id) {
356            Entry::Occupied(mut entry) => {
357                let peer = entry.get_mut();
358                if peer.is_banned() {
359                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
360                    return
361                }
362                // it might be the case that we're also trying to connect to this peer at the same
363                // time, so we need to adjust the state here
364                if peer.state.is_pending_out() {
365                    self.connection_info.decr_state(peer.state);
366                }
367
368                peer.state = PeerConnectionState::In;
369
370                is_trusted = is_trusted || peer.is_trusted();
371            }
372            Entry::Vacant(entry) => {
373                // peer is missing in the table, we add it but mark it as to be removed after
374                // disconnect, because we only know the outgoing port
375                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
376                peer.remove_after_disconnect = true;
377                entry.insert(peer);
378                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
379            }
380        }
381
382        let has_in_capacity = self.connection_info.has_in_capacity();
383        // increment new incoming connection
384        self.connection_info.inc_in();
385
386        // disconnect the peer if we don't have capacity for more inbound connections
387        if !is_trusted && !has_in_capacity {
388            self.queued_actions.push_back(PeerAction::Disconnect {
389                peer_id,
390                reason: Some(DisconnectReason::TooManyPeers),
391            });
392        }
393    }
394
395    /// Bans the peer temporarily with the configured ban timeout
396    fn ban_peer(&mut self, peer_id: PeerId) {
397        let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
398            (peer.is_trusted() || peer.is_static())
399        {
400            // For misbehaving trusted or static peers, we provide a bit more leeway when
401            // penalizing them.
402            self.backoff_durations.low / 2
403        } else {
404            self.ban_duration
405        };
406
407        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
408        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
409    }
410
411    /// Bans the IP temporarily with the configured ban timeout
412    fn ban_ip(&mut self, ip: IpAddr) {
413        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
414    }
415
416    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
417    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
418        self.ban_list
419            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
420    }
421
422    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
423    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
424        trace!(target: "net::peers", ?peer_id, "backing off");
425
426        if let Some(peer) = self.peers.get_mut(&peer_id) {
427            peer.backed_off = true;
428            self.backed_off_peers.insert(peer_id, until);
429        }
430    }
431
432    /// Unbans the peer
433    fn unban_peer(&mut self, peer_id: PeerId) {
434        self.ban_list.unban_peer(&peer_id);
435        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
436    }
437
438    /// Tick function to update reputation of all connected peers.
439    /// Peers are rewarded with reputation increases for the time they are connected since the last
440    /// tick. This is to prevent peers from being disconnected eventually due to slashed
441    /// reputation because of some bad messages (most likely transaction related)
442    fn tick(&mut self) {
443        let now = Instant::now();
444        // Determine the number of seconds since the last tick.
445        // Ensuring that now is always greater than last_tick to account for issues with system
446        // time.
447        let secs_since_last_tick =
448            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
449        self.last_tick = now;
450
451        // update reputation via seconds connected
452        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
453            // update reputation via seconds connected, but keep the target _around_ the default
454            // reputation.
455            if peer.1.reputation < DEFAULT_REPUTATION {
456                peer.1.reputation += secs_since_last_tick;
457            }
458        }
459    }
460
461    /// Returns the tracked reputation for a peer.
462    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
463        self.peers.get(peer_id).map(|peer| peer.reputation)
464    }
465
466    /// Apply the corresponding reputation change to the given peer.
467    ///
468    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
469    /// reputation changes that can be attributed to network conditions. If the peer is a
470    /// trusted peer, it will also be less strict with the reputation slashing.
471    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
472        trace!(target: "net::peers", ?peer_id, reputation=?rep, "applying reputation change");
473
474        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
475            // First check if we should reset the reputation
476            if rep.is_reset() {
477                peer.reset_reputation()
478            } else {
479                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
480                if peer.is_trusted() || peer.is_static() {
481                    // exempt trusted and static peers from reputation slashing for
482                    if matches!(
483                        rep,
484                        ReputationChangeKind::Dropped |
485                            ReputationChangeKind::BadAnnouncement |
486                            ReputationChangeKind::Timeout |
487                            ReputationChangeKind::AlreadySeenTransaction
488                    ) {
489                        return
490                    }
491
492                    // also be less strict with the reputation slashing for trusted peers
493                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
494                        // this caps the reputation change to the maximum allowed for trusted peers
495                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
496                    }
497                }
498                peer.apply_reputation(reputation_change, rep)
499            }
500        } else {
501            return
502        };
503
504        match outcome {
505            ReputationChangeOutcome::None => {}
506            ReputationChangeOutcome::Ban => {
507                self.ban_peer(*peer_id);
508            }
509            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
510            ReputationChangeOutcome::DisconnectAndBan => {
511                self.queued_actions.push_back(PeerAction::Disconnect {
512                    peer_id: *peer_id,
513                    reason: Some(DisconnectReason::DisconnectRequested),
514                });
515                self.ban_peer(*peer_id);
516            }
517        }
518    }
519
520    /// Gracefully disconnected a pending _outgoing_ session
521    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
522        if let Some(peer) = self.peers.get_mut(peer_id) {
523            self.connection_info.decr_state(peer.state);
524            peer.state = PeerConnectionState::Idle;
525        }
526    }
527
528    /// Invoked when an _outgoing_ pending session was closed during authentication or the
529    /// handshake.
530    pub(crate) fn on_outgoing_pending_session_dropped(
531        &mut self,
532        remote_addr: &SocketAddr,
533        peer_id: &PeerId,
534        err: &PendingSessionHandshakeError,
535    ) {
536        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
537    }
538
539    /// Gracefully disconnected an active session
540    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
541        match self.peers.entry(peer_id) {
542            Entry::Occupied(mut entry) => {
543                trace!(target: "net::peers", ?peer_id, direction=?entry.get().state, "active session gracefully closed");
544                self.connection_info.decr_state(entry.get().state);
545
546                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
547                    // this peer should be removed from the set
548                    entry.remove();
549                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
550                } else {
551                    let peer = entry.get_mut();
552                    // reset the peer's state
553                    // we reset the backoff counter since we're able to establish a successful
554                    // session to that peer
555                    peer.severe_backoff_counter = 0;
556                    peer.state = PeerConnectionState::Idle;
557
558                    // but we're backing off slightly to avoid dialing the peer again right away, to
559                    // give the remote time to also properly register the closed session and clean
560                    // up and to avoid any issues with ip throttling on the remote in case this
561                    // session was terminated right away.
562                    peer.backed_off = true;
563                    self.backed_off_peers.insert(
564                        peer_id,
565                        std::time::Instant::now() + self.incoming_ip_throttle_duration,
566                    );
567                    trace!(target: "net::peers", ?peer_id, kind=?peer.kind, duration=?self.incoming_ip_throttle_duration, "backing off on gracefully closed session");
568                }
569            }
570            Entry::Vacant(_) => return,
571        }
572
573        self.fill_outbound_slots();
574    }
575
576    /// Called when a _pending_ outbound connection is successful.
577    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
578        if let Some(peer) = self.peers.get_mut(&peer_id) {
579            trace!(target: "net::peers", ?peer_id, "established active outgoing connection");
580            self.connection_info.decr_state(peer.state);
581            self.connection_info.inc_out();
582            peer.state = PeerConnectionState::Out;
583        }
584    }
585
586    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
587    ///
588    /// Depending on whether the error is fatal, the peer will be removed from the peer set
589    /// otherwise its reputation is slashed.
590    pub(crate) fn on_active_session_dropped(
591        &mut self,
592        remote_addr: &SocketAddr,
593        peer_id: &PeerId,
594        err: &EthStreamError,
595    ) {
596        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
597    }
598
599    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
600    /// connection.
601    pub(crate) fn on_outgoing_connection_failure(
602        &mut self,
603        remote_addr: &SocketAddr,
604        peer_id: &PeerId,
605        err: &io::Error,
606    ) {
607        // there's a race condition where we accepted an incoming connection while we were trying to
608        // connect to the same peer at the same time. if the outgoing connection failed
609        // after the incoming connection was accepted, we can ignore this error
610        if let Some(peer) = self.peers.get(peer_id) {
611            if peer.state.is_incoming() {
612                // we already have an active connection to the peer, so we can ignore this error
613                return
614            }
615
616            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
617                // trigger resolution task for trusted peer since multiple connection failures
618                // occurred
619                self.trusted_peers_resolver.interval.reset_immediately();
620            }
621        }
622
623        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
624    }
625
626    fn on_connection_failure(
627        &mut self,
628        remote_addr: &SocketAddr,
629        peer_id: &PeerId,
630        err: impl SessionError,
631        reputation_change: ReputationChangeKind,
632    ) {
633        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
634
635        if err.is_fatal_protocol_error() {
636            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
637            // remove the peer to which we can't establish a connection due to protocol related
638            // issues.
639            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
640                self.connection_info.decr_state(entry.get().state);
641                // only remove if the peer is not trusted
642                if entry.get().is_trusted() {
643                    entry.get_mut().state = PeerConnectionState::Idle;
644                } else {
645                    entry.remove();
646                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
647                    // If the error is caused by a peer that should be banned from discovery
648                    if err.merits_discovery_ban() {
649                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
650                            peer_id: *peer_id,
651                            ip_addr: remote_addr.ip(),
652                        })
653                    }
654                }
655            }
656
657            // ban the peer
658            self.ban_peer(*peer_id);
659        } else {
660            let mut backoff_until = None;
661            let mut remove_peer = false;
662
663            if let Some(peer) = self.peers.get_mut(peer_id) {
664                if let Some(kind) = err.should_backoff() {
665                    if peer.is_trusted() || peer.is_static() {
666                        // provide a bit more leeway for trusted peers and use a lower backoff so
667                        // that we keep re-trying them after backing off shortly, but we should at
668                        // least backoff for the low duration to not violate the ip based inbound
669                        // connection throttle that peer has in place, because this peer might not
670                        // have us registered as a trusted peer.
671                        let backoff = self.backoff_durations.low;
672                        backoff_until = Some(std::time::Instant::now() + backoff);
673                        trace!(target: "net::peers", ?peer_id, ?backoff, "backing off trusted peer");
674                    } else {
675                        // Increment peer.backoff_counter
676                        if kind.is_severe() {
677                            peer.severe_backoff_counter =
678                                peer.severe_backoff_counter.saturating_add(1);
679                        }
680                        trace!(target: "net::peers", ?peer_id, ?kind, severe_backoff_counter=peer.severe_backoff_counter, "backing off basic peer");
681
682                        let backoff_time =
683                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
684
685                        // The peer has signaled that it is currently unable to process any more
686                        // connections, so we will hold off on attempting any new connections for a
687                        // while
688                        backoff_until = Some(backoff_time);
689                    }
690                } else {
691                    // If the error was not a backoff error, we reduce the peer's reputation
692                    let reputation_change = self.reputation_weights.change(reputation_change);
693                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
694                };
695
696                self.connection_info.decr_state(peer.state);
697                peer.state = PeerConnectionState::Idle;
698
699                if peer.severe_backoff_counter > self.max_backoff_count &&
700                    !peer.is_trusted() &&
701                    !peer.is_static()
702                {
703                    // mark peer for removal if it has been backoff too many times and is _not_
704                    // trusted or static
705                    remove_peer = true;
706                }
707            }
708
709            // remove peer if it has been marked for removal
710            if remove_peer {
711                trace!(target: "net", ?peer_id, "removed peer after exceeding backoff counter");
712                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
713                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
714            } else if let Some(backoff_until) = backoff_until {
715                // otherwise, backoff the peer if marked as such
716                self.backoff_peer_until(*peer_id, backoff_until);
717            }
718        }
719
720        self.fill_outbound_slots();
721    }
722
723    /// Invoked if a pending session was disconnected because there's already a connection to the
724    /// peer.
725    ///
726    /// If the session was an outgoing connection, this means that the peer initiated a connection
727    /// to us at the same time and this connection is already established.
728    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
729        match direction {
730            Direction::Incoming => {
731                // need to decrement the ingoing counter
732                self.connection_info.decr_pending_in();
733            }
734            Direction::Outgoing(_) => {
735                // cleanup is handled when the incoming active session is activated in
736                // `on_incoming_session_established`
737            }
738        }
739    }
740
741    /// Called as follow-up for a discovered peer.
742    ///
743    /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
744    /// protocol
745    pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
746        if let Some(peer) = self.peers.get_mut(&peer_id) {
747            trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
748            peer.fork_id = Some(Box::new(fork_id));
749        }
750    }
751
752    /// Called for a newly discovered peer.
753    ///
754    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
755    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
756        self.add_peer_kind(peer_id, None, addr, fork_id)
757    }
758
759    /// Marks the given peer as trusted.
760    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
761        self.trusted_peer_ids.insert(peer_id);
762        if let Some(peer) = self.peers.get_mut(&peer_id) {
763            peer.kind = PeerKind::Trusted;
764        }
765    }
766
767    /// Called for a newly discovered trusted peer.
768    ///
769    /// If the peer already exists, then the address and kind will be updated.
770    #[cfg_attr(not(test), expect(dead_code))]
771    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
772        self.add_peer_kind(peer_id, Some(PeerKind::Trusted), addr, None)
773    }
774
775    /// Called for a newly discovered peer.
776    ///
777    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
778    /// If the peer exists and a [`PeerKind`] is provided then the peer's kind is updated
779    pub(crate) fn add_peer_kind(
780        &mut self,
781        peer_id: PeerId,
782        kind: Option<PeerKind>,
783        addr: PeerAddr,
784        fork_id: Option<ForkId>,
785    ) {
786        let ip_addr = addr.tcp().ip();
787
788        // Check if the IP is in the allowed ranges (netrestrict)
789        if !self.ip_filter.is_allowed(&ip_addr) {
790            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
791            return
792        }
793
794        if self.ban_list.is_banned(&peer_id, &ip_addr) {
795            return
796        }
797
798        match self.peers.entry(peer_id) {
799            Entry::Occupied(mut entry) => {
800                let peer = entry.get_mut();
801                peer.fork_id = fork_id.map(Box::new);
802                peer.addr = addr;
803
804                if let Some(kind) = kind {
805                    peer.kind = kind;
806                }
807
808                if peer.state.is_incoming() {
809                    // now that we have an actual discovered address, for that peer and not just the
810                    // ip of the incoming connection, we don't need to remove the peer after
811                    // disconnecting, See `on_incoming_session_established`
812                    peer.remove_after_disconnect = false;
813                }
814            }
815            Entry::Vacant(entry) => {
816                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
817                let mut peer = Peer::with_kind(addr, kind.unwrap_or(PeerKind::Basic));
818                peer.fork_id = fork_id.map(Box::new);
819                entry.insert(peer);
820                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
821            }
822        }
823
824        if kind.filter(|kind| kind.is_trusted()).is_some() {
825            // also track the peer in the peer id set
826            self.trusted_peer_ids.insert(peer_id);
827        }
828    }
829
830    /// Removes the tracked node from the set.
831    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
832        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
833        if entry.get().is_trusted() {
834            return
835        }
836        let mut peer = entry.remove();
837
838        trace!(target: "net::peers", ?peer_id, "remove discovered node");
839        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
840
841        if peer.state.is_connected() {
842            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
843            // we terminate the active session here, but only remove the peer after the session
844            // was disconnected, this prevents the case where the session is scheduled for
845            // disconnect but the node is immediately rediscovered, See also
846            // [`Self::on_disconnected()`]
847            peer.remove_after_disconnect = true;
848            peer.state.disconnect();
849            self.peers.insert(peer_id, peer);
850            self.queued_actions.push_back(PeerAction::Disconnect {
851                peer_id,
852                reason: Some(DisconnectReason::DisconnectRequested),
853            })
854        }
855    }
856
857    /// Connect to the given peer. NOTE: if the maximum number of outbound sessions is reached,
858    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
859    #[cfg_attr(not(test), expect(dead_code))]
860    pub(crate) fn add_and_connect(
861        &mut self,
862        peer_id: PeerId,
863        addr: PeerAddr,
864        fork_id: Option<ForkId>,
865    ) {
866        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
867    }
868
869    /// Connects a peer and its address with the given kind.
870    ///
871    /// Note: This is invoked on demand via an external command received by the manager
872    pub(crate) fn add_and_connect_kind(
873        &mut self,
874        peer_id: PeerId,
875        kind: PeerKind,
876        addr: PeerAddr,
877        fork_id: Option<ForkId>,
878    ) {
879        let ip_addr = addr.tcp().ip();
880
881        // Check if the IP is in the allowed ranges (netrestrict)
882        if !self.ip_filter.is_allowed(&ip_addr) {
883            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
884            return
885        }
886
887        if self.ban_list.is_banned(&peer_id, &ip_addr) {
888            return
889        }
890
891        match self.peers.entry(peer_id) {
892            Entry::Occupied(mut entry) => {
893                let peer = entry.get_mut();
894                peer.kind = kind;
895                peer.fork_id = fork_id.map(Box::new);
896                peer.addr = addr;
897
898                if peer.state == PeerConnectionState::Idle {
899                    // Try connecting again.
900                    peer.state = PeerConnectionState::PendingOut;
901                    self.connection_info.inc_pending_out();
902                    self.queued_actions
903                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
904                }
905            }
906            Entry::Vacant(entry) => {
907                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
908                let mut peer = Peer::with_kind(addr, kind);
909                peer.state = PeerConnectionState::PendingOut;
910                peer.fork_id = fork_id.map(Box::new);
911                entry.insert(peer);
912                self.connection_info.inc_pending_out();
913                self.queued_actions
914                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
915            }
916        }
917
918        if kind.is_trusted() {
919            self.trusted_peer_ids.insert(peer_id);
920        }
921    }
922
923    /// Removes the tracked node from the trusted set.
924    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
925        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
926        if !entry.get().is_trusted() {
927            return
928        }
929
930        let peer = entry.get_mut();
931        peer.kind = PeerKind::Basic;
932
933        self.trusted_peer_ids.remove(&peer_id);
934    }
935
936    /// Returns the idle peer with the highest reputation.
937    ///
938    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
939    /// not currently marked as banned or backed off.
940    ///
941    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
942    /// `trusted` peers.
943    ///
944    /// Returns `None` if no peer is available.
945    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
946        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
947            !peer.is_backed_off() &&
948                !peer.is_banned() &&
949                peer.state.is_unconnected() &&
950                (!self.trusted_nodes_only || peer.is_trusted())
951        });
952
953        // keep track of the best peer, if there's one
954        let mut best_peer = unconnected.next()?;
955
956        if best_peer.1.is_trusted() || best_peer.1.is_static() {
957            return Some((*best_peer.0, best_peer.1))
958        }
959
960        for maybe_better in unconnected {
961            // if the peer is trusted or static, return it immediately
962            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
963                return Some((*maybe_better.0, maybe_better.1))
964            }
965
966            // otherwise we keep track of the best peer using the reputation
967            if maybe_better.1.reputation > best_peer.1.reputation {
968                best_peer = maybe_better;
969            }
970        }
971        Some((*best_peer.0, best_peer.1))
972    }
973
974    /// If there's capacity for new outbound connections, this will queue new
975    /// [`PeerAction::Connect`] actions.
976    ///
977    /// New connections are only initiated, if slots are available and appropriate peers are
978    /// available.
979    fn fill_outbound_slots(&mut self) {
980        self.tick();
981
982        if !self.net_connection_state.is_active() {
983            // nothing to fill
984            return
985        }
986
987        // as long as there are slots available fill them with the best peers
988        while self.connection_info.has_out_capacity() {
989            let action = {
990                let (peer_id, peer) = match self.best_unconnected() {
991                    Some(peer) => peer,
992                    _ => break,
993                };
994
995                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
996
997                peer.state = PeerConnectionState::PendingOut;
998                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
999            };
1000
1001            self.connection_info.inc_pending_out();
1002
1003            self.queued_actions.push_back(action);
1004        }
1005    }
1006
1007    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
1008        if let Some(peer) = self.peers.get_mut(&peer_id) {
1009            let new_addr = PeerAddr::new_with_ports(
1010                new_record.address,
1011                new_record.tcp_port,
1012                Some(new_record.udp_port),
1013            );
1014
1015            if peer.addr != new_addr {
1016                peer.addr = new_addr;
1017                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
1018            }
1019        }
1020    }
1021
1022    /// Keeps track of network state changes.
1023    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
1024        self.net_connection_state = state;
1025    }
1026
1027    /// Returns the current network connection state.
1028    pub const fn connection_state(&self) -> &NetworkConnectionState {
1029        &self.net_connection_state
1030    }
1031
1032    /// Sets `net_connection_state` to `ShuttingDown`.
1033    pub const fn on_shutdown(&mut self) {
1034        self.net_connection_state = NetworkConnectionState::ShuttingDown;
1035    }
1036
1037    /// Advances the state.
1038    ///
1039    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
1040    /// [`PeersManager`] is polled.
1041    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1042        loop {
1043            // drain buffered actions
1044            if let Some(action) = self.queued_actions.pop_front() {
1045                return Poll::Ready(action)
1046            }
1047
1048            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1049                match cmd {
1050                    PeerCommand::Add(peer_id, addr) => {
1051                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1052                    }
1053                    PeerCommand::Remove(peer) => self.remove_peer(peer),
1054                    PeerCommand::ReputationChange(peer_id, rep) => {
1055                        self.apply_reputation_change(&peer_id, rep)
1056                    }
1057                    PeerCommand::GetPeer(peer, tx) => {
1058                        let _ = tx.send(self.peers.get(&peer).cloned());
1059                    }
1060                    PeerCommand::GetPeers(tx) => {
1061                        let _ = tx.send(self.iter_peers().collect());
1062                    }
1063                }
1064            }
1065
1066            if self.release_interval.poll_tick(cx).is_ready() {
1067                let now = std::time::Instant::now();
1068                let (_, unbanned_peers) = self.ban_list.evict(now);
1069
1070                for peer_id in unbanned_peers {
1071                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1072                        peer.unban();
1073                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1074                    }
1075                }
1076
1077                // clear the backoff list of expired backoffs, and mark the relevant peers as
1078                // ready to be dialed
1079                self.backed_off_peers.retain(|peer_id, until| {
1080                    if now > *until {
1081                        if let Some(peer) = self.peers.get_mut(peer_id) {
1082                            peer.backed_off = false;
1083                        }
1084                        return false
1085                    }
1086                    true
1087                })
1088            }
1089
1090            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1091                self.fill_outbound_slots();
1092            }
1093
1094            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1095                self.on_resolved_peer(peer_id, new_record);
1096            }
1097
1098            if self.queued_actions.is_empty() {
1099                return Poll::Pending
1100            }
1101        }
1102    }
1103}
1104
1105impl Default for PeersManager {
1106    fn default() -> Self {
1107        Self::new(Default::default())
1108    }
1109}
1110
1111/// Tracks stats about connected nodes
1112#[derive(Debug, Clone, PartialEq, Eq, Default)]
1113pub struct ConnectionInfo {
1114    /// Counter for currently occupied slots for active outbound connections.
1115    num_outbound: usize,
1116    /// Counter for pending outbound connections.
1117    num_pending_out: usize,
1118    /// Counter for currently occupied slots for active inbound connections.
1119    num_inbound: usize,
1120    /// Counter for pending inbound connections.
1121    num_pending_in: usize,
1122    /// Restrictions on number of connections.
1123    config: ConnectionsConfig,
1124}
1125
1126// === impl ConnectionInfo ===
1127
1128impl ConnectionInfo {
1129    /// Returns a new [`ConnectionInfo`] with the given config.
1130    const fn new(config: ConnectionsConfig) -> Self {
1131        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1132    }
1133
1134    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1135    const fn has_out_capacity(&self) -> bool {
1136        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1137            self.num_outbound < self.config.max_outbound
1138    }
1139
1140    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1141    const fn has_in_capacity(&self) -> bool {
1142        self.num_inbound < self.config.max_inbound
1143    }
1144
1145    /// Returns `true` if we can handle an additional incoming pending connection.
1146    const fn has_in_pending_capacity(&self) -> bool {
1147        self.num_pending_in < self.config.max_inbound
1148    }
1149
1150    const fn decr_state(&mut self, state: PeerConnectionState) {
1151        match state {
1152            PeerConnectionState::Idle => {}
1153            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1154            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1155            PeerConnectionState::PendingOut => self.decr_pending_out(),
1156        }
1157    }
1158
1159    const fn decr_out(&mut self) {
1160        self.num_outbound -= 1;
1161    }
1162
1163    const fn inc_out(&mut self) {
1164        self.num_outbound += 1;
1165    }
1166
1167    const fn inc_pending_out(&mut self) {
1168        self.num_pending_out += 1;
1169    }
1170
1171    const fn inc_in(&mut self) {
1172        self.num_inbound += 1;
1173    }
1174
1175    const fn inc_pending_in(&mut self) {
1176        self.num_pending_in += 1;
1177    }
1178
1179    const fn decr_in(&mut self) {
1180        self.num_inbound -= 1;
1181    }
1182
1183    const fn decr_pending_out(&mut self) {
1184        self.num_pending_out -= 1;
1185    }
1186
1187    const fn decr_pending_in(&mut self) {
1188        self.num_pending_in -= 1;
1189    }
1190}
1191
1192/// Actions the peer manager can trigger.
1193#[derive(Debug)]
1194pub enum PeerAction {
1195    /// Start a new connection to a peer.
1196    Connect {
1197        /// The peer to connect to.
1198        peer_id: PeerId,
1199        /// Where to reach the node
1200        remote_addr: SocketAddr,
1201    },
1202    /// Disconnect an existing connection.
1203    Disconnect {
1204        /// The peer ID of the established connection.
1205        peer_id: PeerId,
1206        /// An optional reason for the disconnect.
1207        reason: Option<DisconnectReason>,
1208    },
1209    /// Disconnect an existing incoming connection, because the peers reputation is below the
1210    /// banned threshold or is on the [`BanList`]
1211    DisconnectBannedIncoming {
1212        /// The peer ID of the established connection.
1213        peer_id: PeerId,
1214    },
1215    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1216    DisconnectUntrustedIncoming {
1217        /// The peer ID.
1218        peer_id: PeerId,
1219    },
1220    /// Ban the peer in discovery.
1221    DiscoveryBanPeerId {
1222        /// The peer ID.
1223        peer_id: PeerId,
1224        /// The IP address.
1225        ip_addr: IpAddr,
1226    },
1227    /// Ban the IP in discovery.
1228    DiscoveryBanIp {
1229        /// The IP address.
1230        ip_addr: IpAddr,
1231    },
1232    /// Ban the peer temporarily
1233    BanPeer {
1234        /// The peer ID.
1235        peer_id: PeerId,
1236    },
1237    /// Unban the peer temporarily
1238    UnBanPeer {
1239        /// The peer ID.
1240        peer_id: PeerId,
1241    },
1242    /// Emit peerAdded event
1243    PeerAdded(PeerId),
1244    /// Emit peerRemoved event
1245    PeerRemoved(PeerId),
1246}
1247
1248/// Error thrown when a incoming connection is rejected right away
1249#[derive(Debug, Error, PartialEq, Eq)]
1250pub enum InboundConnectionError {
1251    /// The remote's ip address is banned
1252    IpBanned,
1253    /// No capacity for new inbound connections
1254    ExceedsCapacity,
1255}
1256
1257impl Display for InboundConnectionError {
1258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1259        write!(f, "{self:?}")
1260    }
1261}
1262
1263#[cfg(test)]
1264mod tests {
1265    use alloy_primitives::B512;
1266    use reth_eth_wire::{
1267        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1268        DisconnectReason,
1269    };
1270    use reth_net_banlist::BanList;
1271    use reth_network_api::Direction;
1272    use reth_network_peers::{PeerId, TrustedPeer};
1273    use reth_network_types::{
1274        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1275    };
1276    use std::{
1277        future::{poll_fn, Future},
1278        io,
1279        net::{IpAddr, Ipv4Addr, SocketAddr},
1280        pin::Pin,
1281        task::{Context, Poll},
1282        time::Duration,
1283    };
1284    use url::Host;
1285
1286    use super::PeersManager;
1287    use crate::{
1288        error::SessionError,
1289        peers::{
1290            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1291            PeerConnectionState,
1292        },
1293        session::PendingSessionHandshakeError,
1294        PeersConfig,
1295    };
1296
1297    struct PeerActionFuture<'a> {
1298        peers: &'a mut PeersManager,
1299    }
1300
1301    impl Future for PeerActionFuture<'_> {
1302        type Output = PeerAction;
1303
1304        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1305            self.get_mut().peers.poll(cx)
1306        }
1307    }
1308
1309    macro_rules! event {
1310        ($peers:expr) => {
1311            PeerActionFuture { peers: &mut $peers }.await
1312        };
1313    }
1314
1315    #[tokio::test]
1316    async fn test_insert() {
1317        let peer = PeerId::random();
1318        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1319        let mut peers = PeersManager::default();
1320        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1321
1322        match event!(peers) {
1323            PeerAction::PeerAdded(peer_id) => {
1324                assert_eq!(peer_id, peer);
1325            }
1326            _ => unreachable!(),
1327        }
1328        match event!(peers) {
1329            PeerAction::Connect { peer_id, remote_addr } => {
1330                assert_eq!(peer_id, peer);
1331                assert_eq!(remote_addr, socket_addr);
1332            }
1333            _ => unreachable!(),
1334        }
1335
1336        let (record, _) = peers.peer_by_id(peer).unwrap();
1337        assert_eq!(record.tcp_addr(), socket_addr);
1338        assert_eq!(record.udp_addr(), socket_addr);
1339    }
1340
1341    #[tokio::test]
1342    async fn test_insert_udp() {
1343        let peer = PeerId::random();
1344        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1345        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1346        let mut peers = PeersManager::default();
1347        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1348
1349        match event!(peers) {
1350            PeerAction::PeerAdded(peer_id) => {
1351                assert_eq!(peer_id, peer);
1352            }
1353            _ => unreachable!(),
1354        }
1355        match event!(peers) {
1356            PeerAction::Connect { peer_id, remote_addr } => {
1357                assert_eq!(peer_id, peer);
1358                assert_eq!(remote_addr, tcp_addr);
1359            }
1360            _ => unreachable!(),
1361        }
1362
1363        let (record, _) = peers.peer_by_id(peer).unwrap();
1364        assert_eq!(record.tcp_addr(), tcp_addr);
1365        assert_eq!(record.udp_addr(), udp_addr);
1366    }
1367
1368    #[tokio::test]
1369    async fn test_ban() {
1370        let peer = PeerId::random();
1371        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1372        let mut peers = PeersManager::default();
1373        peers.ban_peer(peer);
1374        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1375
1376        match event!(peers) {
1377            PeerAction::BanPeer { peer_id } => {
1378                assert_eq!(peer_id, peer);
1379            }
1380            _ => unreachable!(),
1381        }
1382
1383        poll_fn(|cx| {
1384            assert!(peers.poll(cx).is_pending());
1385            Poll::Ready(())
1386        })
1387        .await;
1388    }
1389
1390    #[tokio::test]
1391    async fn test_unban() {
1392        let peer = PeerId::random();
1393        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1394        let mut peers = PeersManager::default();
1395        peers.ban_peer(peer);
1396        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1397
1398        match event!(peers) {
1399            PeerAction::BanPeer { peer_id } => {
1400                assert_eq!(peer_id, peer);
1401            }
1402            _ => unreachable!(),
1403        }
1404
1405        poll_fn(|cx| {
1406            assert!(peers.poll(cx).is_pending());
1407            Poll::Ready(())
1408        })
1409        .await;
1410
1411        peers.unban_peer(peer);
1412
1413        match event!(peers) {
1414            PeerAction::UnBanPeer { peer_id } => {
1415                assert_eq!(peer_id, peer);
1416            }
1417            _ => unreachable!(),
1418        }
1419
1420        poll_fn(|cx| {
1421            assert!(peers.poll(cx).is_pending());
1422            Poll::Ready(())
1423        })
1424        .await;
1425    }
1426
1427    #[tokio::test]
1428    async fn test_backoff_on_busy() {
1429        let peer = PeerId::random();
1430        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1431
1432        let mut peers = PeersManager::new(PeersConfig::test());
1433        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1434
1435        match event!(peers) {
1436            PeerAction::PeerAdded(peer_id) => {
1437                assert_eq!(peer_id, peer);
1438            }
1439            _ => unreachable!(),
1440        }
1441        match event!(peers) {
1442            PeerAction::Connect { peer_id, .. } => {
1443                assert_eq!(peer_id, peer);
1444            }
1445            _ => unreachable!(),
1446        }
1447
1448        poll_fn(|cx| {
1449            assert!(peers.poll(cx).is_pending());
1450            Poll::Ready(())
1451        })
1452        .await;
1453
1454        peers.on_active_session_dropped(
1455            &socket_addr,
1456            &peer,
1457            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1458                DisconnectReason::TooManyPeers,
1459            )),
1460        );
1461
1462        poll_fn(|cx| {
1463            assert!(peers.poll(cx).is_pending());
1464            Poll::Ready(())
1465        })
1466        .await;
1467
1468        assert!(peers.backed_off_peers.contains_key(&peer));
1469        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1470
1471        tokio::time::sleep(peers.backoff_durations.low).await;
1472
1473        match event!(peers) {
1474            PeerAction::Connect { peer_id, .. } => {
1475                assert_eq!(peer_id, peer);
1476            }
1477            _ => unreachable!(),
1478        }
1479
1480        assert!(!peers.backed_off_peers.contains_key(&peer));
1481        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1482    }
1483
1484    #[tokio::test]
1485    async fn test_backoff_on_no_response() {
1486        let peer = PeerId::random();
1487        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1488
1489        let backoff_durations = PeerBackoffDurations::test();
1490        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1491        let mut peers = PeersManager::new(config);
1492        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1493
1494        match event!(peers) {
1495            PeerAction::PeerAdded(peer_id) => {
1496                assert_eq!(peer_id, peer);
1497            }
1498            _ => unreachable!(),
1499        }
1500        match event!(peers) {
1501            PeerAction::Connect { peer_id, .. } => {
1502                assert_eq!(peer_id, peer);
1503            }
1504            _ => unreachable!(),
1505        }
1506
1507        poll_fn(|cx| {
1508            assert!(peers.poll(cx).is_pending());
1509            Poll::Ready(())
1510        })
1511        .await;
1512
1513        peers.on_outgoing_pending_session_dropped(
1514            &socket_addr,
1515            &peer,
1516            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1517                EthHandshakeError::NoResponse,
1518            )),
1519        );
1520
1521        poll_fn(|cx| {
1522            assert!(peers.poll(cx).is_pending());
1523            Poll::Ready(())
1524        })
1525        .await;
1526
1527        assert!(peers.backed_off_peers.contains_key(&peer));
1528        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1529
1530        tokio::time::sleep(backoff_durations.high).await;
1531
1532        match event!(peers) {
1533            PeerAction::Connect { peer_id, .. } => {
1534                assert_eq!(peer_id, peer);
1535            }
1536            _ => unreachable!(),
1537        }
1538
1539        assert!(!peers.backed_off_peers.contains_key(&peer));
1540        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1541    }
1542
1543    #[tokio::test]
1544    async fn test_low_backoff() {
1545        let peer = PeerId::random();
1546        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1547        let config = PeersConfig::test();
1548        let mut peers = PeersManager::new(config);
1549        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1550        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1551
1552        let backoff_timestamp = peers
1553            .backoff_durations
1554            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1555
1556        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1557        assert!(backoff_timestamp <= expected);
1558    }
1559
1560    #[tokio::test]
1561    async fn test_multiple_backoff_calculations() {
1562        let peer = PeerId::random();
1563        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1564        let config = PeersConfig::default();
1565        let mut peers = PeersManager::new(config);
1566        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1567        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1568
1569        // Simulate a peer that was already backed off once
1570        peer_struct.severe_backoff_counter = 1;
1571
1572        let now = std::time::Instant::now();
1573
1574        // Simulate the increment that happens in on_connection_failure
1575        peer_struct.severe_backoff_counter += 1;
1576        // Get official backoff time
1577        let backoff_time = peers
1578            .backoff_durations
1579            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1580
1581        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1582        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1583
1584        // We can't use assert_eq! since there is a very small diff in the nano secs
1585        // Usually it is 1800s != 1799.9999996s
1586        assert!(backoff_time.duration_since(now) > backoff_duration);
1587    }
1588
1589    #[tokio::test]
1590    async fn test_ban_on_active_drop() {
1591        let peer = PeerId::random();
1592        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1593        let mut peers = PeersManager::default();
1594        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1595
1596        match event!(peers) {
1597            PeerAction::PeerAdded(peer_id) => {
1598                assert_eq!(peer_id, peer);
1599            }
1600            _ => unreachable!(),
1601        }
1602        match event!(peers) {
1603            PeerAction::Connect { peer_id, .. } => {
1604                assert_eq!(peer_id, peer);
1605            }
1606            _ => unreachable!(),
1607        }
1608
1609        poll_fn(|cx| {
1610            assert!(peers.poll(cx).is_pending());
1611            Poll::Ready(())
1612        })
1613        .await;
1614
1615        peers.on_active_session_dropped(
1616            &socket_addr,
1617            &peer,
1618            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1619                DisconnectReason::UselessPeer,
1620            )),
1621        );
1622
1623        match event!(peers) {
1624            PeerAction::PeerRemoved(peer_id) => {
1625                assert_eq!(peer_id, peer);
1626            }
1627            _ => unreachable!(),
1628        }
1629        match event!(peers) {
1630            PeerAction::BanPeer { peer_id } => {
1631                assert_eq!(peer_id, peer);
1632            }
1633            _ => unreachable!(),
1634        }
1635
1636        poll_fn(|cx| {
1637            assert!(peers.poll(cx).is_pending());
1638            Poll::Ready(())
1639        })
1640        .await;
1641
1642        assert!(!peers.peers.contains_key(&peer));
1643    }
1644
1645    #[tokio::test]
1646    async fn test_remove_on_max_backoff_count() {
1647        let peer = PeerId::random();
1648        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1649        let config = PeersConfig::test();
1650        let mut peers = PeersManager::new(config.clone());
1651        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1652        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1653
1654        // Simulate a peer that was already backed off once
1655        peer_struct.severe_backoff_counter = config.max_backoff_count;
1656
1657        match event!(peers) {
1658            PeerAction::PeerAdded(peer_id) => {
1659                assert_eq!(peer_id, peer);
1660            }
1661            _ => unreachable!(),
1662        }
1663        match event!(peers) {
1664            PeerAction::Connect { peer_id, .. } => {
1665                assert_eq!(peer_id, peer);
1666            }
1667            _ => unreachable!(),
1668        }
1669
1670        poll_fn(|cx| {
1671            assert!(peers.poll(cx).is_pending());
1672            Poll::Ready(())
1673        })
1674        .await;
1675
1676        peers.on_outgoing_pending_session_dropped(
1677            &socket_addr,
1678            &peer,
1679            &PendingSessionHandshakeError::Eth(
1680                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1681            ),
1682        );
1683
1684        match event!(peers) {
1685            PeerAction::PeerRemoved(peer_id) => {
1686                assert_eq!(peer_id, peer);
1687            }
1688            _ => unreachable!(),
1689        }
1690
1691        poll_fn(|cx| {
1692            assert!(peers.poll(cx).is_pending());
1693            Poll::Ready(())
1694        })
1695        .await;
1696
1697        assert!(!peers.peers.contains_key(&peer));
1698    }
1699
1700    #[tokio::test]
1701    async fn test_ban_on_pending_drop() {
1702        let peer = PeerId::random();
1703        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1704        let mut peers = PeersManager::default();
1705        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1706
1707        match event!(peers) {
1708            PeerAction::PeerAdded(peer_id) => {
1709                assert_eq!(peer_id, peer);
1710            }
1711            _ => unreachable!(),
1712        }
1713        match event!(peers) {
1714            PeerAction::Connect { peer_id, .. } => {
1715                assert_eq!(peer_id, peer);
1716            }
1717            _ => unreachable!(),
1718        }
1719
1720        poll_fn(|cx| {
1721            assert!(peers.poll(cx).is_pending());
1722            Poll::Ready(())
1723        })
1724        .await;
1725
1726        peers.on_outgoing_pending_session_dropped(
1727            &socket_addr,
1728            &peer,
1729            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1730                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1731            )),
1732        );
1733
1734        match event!(peers) {
1735            PeerAction::PeerRemoved(peer_id) => {
1736                assert_eq!(peer_id, peer);
1737            }
1738            _ => unreachable!(),
1739        }
1740        match event!(peers) {
1741            PeerAction::BanPeer { peer_id } => {
1742                assert_eq!(peer_id, peer);
1743            }
1744            _ => unreachable!(),
1745        }
1746
1747        poll_fn(|cx| {
1748            assert!(peers.poll(cx).is_pending());
1749            Poll::Ready(())
1750        })
1751        .await;
1752
1753        assert!(!peers.peers.contains_key(&peer));
1754    }
1755
1756    #[tokio::test]
1757    async fn test_internally_closed_incoming() {
1758        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1759        let mut peers = PeersManager::default();
1760
1761        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1762        assert_eq!(peers.connection_info.num_pending_in, 1);
1763        peers.on_incoming_pending_session_rejected_internally();
1764        assert_eq!(peers.connection_info.num_pending_in, 0);
1765    }
1766
1767    #[tokio::test]
1768    async fn test_reject_incoming_at_pending_capacity() {
1769        let mut peers = PeersManager::default();
1770
1771        for count in 1..=peers.connection_info.config.max_inbound {
1772            let socket_addr =
1773                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1774            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1775            assert_eq!(peers.connection_info.num_pending_in, count);
1776        }
1777        assert!(peers.connection_info.has_in_capacity());
1778        assert!(!peers.connection_info.has_in_pending_capacity());
1779
1780        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1781        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1782    }
1783
1784    #[tokio::test]
1785    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1786        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1787        let trusted = PeerId::random();
1788        peers.add_trusted_peer_id(trusted);
1789
1790        // connect the trusted peer
1791        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1792        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1793        peers.on_incoming_session_established(trusted, addr);
1794
1795        match event!(peers) {
1796            PeerAction::PeerAdded(id) => {
1797                assert_eq!(id, trusted);
1798            }
1799            _ => unreachable!(),
1800        }
1801
1802        // saturate the remaining inbound slots with untrusted peers
1803        let mut connected_untrusted_peer_ids = Vec::new();
1804        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1805            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1806            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1807            let peer_id = PeerId::random();
1808            peers.on_incoming_session_established(peer_id, addr);
1809            connected_untrusted_peer_ids.push(peer_id);
1810
1811            match event!(peers) {
1812                PeerAction::PeerAdded(id) => {
1813                    assert_eq!(id, peer_id);
1814                }
1815                _ => unreachable!(),
1816            }
1817        }
1818
1819        let mut pending_addrs = Vec::new();
1820
1821        // saturate available slots
1822        for i in 0..2 {
1823            let socket_addr =
1824                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1825            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1826
1827            pending_addrs.push(socket_addr);
1828        }
1829
1830        assert_eq!(peers.connection_info.num_pending_in, 2);
1831
1832        // try to handle additional incoming connections at capacity
1833        for i in 0..2 {
1834            let socket_addr =
1835                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1836            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1837        }
1838
1839        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1840            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1841                DisconnectReason::UselessPeer,
1842            )),
1843        ));
1844
1845        // Remove all pending peers
1846        for pending_addr in pending_addrs {
1847            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1848        }
1849
1850        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1851
1852        println!(
1853            "num_inbound: {}, has_in_capacity: {}",
1854            peers.connection_info.num_inbound,
1855            peers.connection_info.has_in_capacity()
1856        );
1857
1858        // disconnect a connected peer
1859        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1860
1861        println!(
1862            "num_inbound: {}, has_in_capacity: {}",
1863            peers.connection_info.num_inbound,
1864            peers.connection_info.has_in_capacity()
1865        );
1866
1867        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1868        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1869    }
1870
1871    #[tokio::test]
1872    async fn test_closed_incoming() {
1873        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1874        let mut peers = PeersManager::default();
1875
1876        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1877        assert_eq!(peers.connection_info.num_pending_in, 1);
1878        peers.on_incoming_pending_session_gracefully_closed();
1879        assert_eq!(peers.connection_info.num_pending_in, 0);
1880    }
1881
1882    #[tokio::test]
1883    async fn test_dropped_incoming() {
1884        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1885        let ban_duration = Duration::from_millis(500);
1886        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1887        let mut peers = PeersManager::new(config);
1888
1889        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1890        assert_eq!(peers.connection_info.num_pending_in, 1);
1891        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1892            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1893                DisconnectReason::UselessPeer,
1894            )),
1895        ));
1896
1897        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1898        assert_eq!(peers.connection_info.num_pending_in, 0);
1899        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1900
1901        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1902
1903        // unbanned after timeout
1904        tokio::time::sleep(ban_duration).await;
1905
1906        poll_fn(|cx| {
1907            let _ = peers.poll(cx);
1908            Poll::Ready(())
1909        })
1910        .await;
1911
1912        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1913        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1914    }
1915
1916    #[tokio::test]
1917    async fn test_reputation_change_connected() {
1918        let peer = PeerId::random();
1919        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1920        let mut peers = PeersManager::default();
1921        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1922
1923        match event!(peers) {
1924            PeerAction::PeerAdded(peer_id) => {
1925                assert_eq!(peer_id, peer);
1926            }
1927            _ => unreachable!(),
1928        }
1929        match event!(peers) {
1930            PeerAction::Connect { peer_id, remote_addr } => {
1931                assert_eq!(peer_id, peer);
1932                assert_eq!(remote_addr, socket_addr);
1933            }
1934            _ => unreachable!(),
1935        }
1936
1937        let p = peers.peers.get_mut(&peer).unwrap();
1938        assert_eq!(p.state, PeerConnectionState::PendingOut);
1939
1940        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1941
1942        let p = peers.peers.get(&peer).unwrap();
1943        assert_eq!(p.state, PeerConnectionState::PendingOut);
1944        assert!(p.is_banned());
1945
1946        peers.on_active_session_gracefully_closed(peer);
1947
1948        let p = peers.peers.get(&peer).unwrap();
1949        assert_eq!(p.state, PeerConnectionState::Idle);
1950        assert!(p.is_banned());
1951
1952        match event!(peers) {
1953            PeerAction::Disconnect { peer_id, .. } => {
1954                assert_eq!(peer_id, peer);
1955            }
1956            _ => unreachable!(),
1957        }
1958    }
1959
1960    #[tokio::test]
1961    async fn retain_trusted_status() {
1962        let _socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1963        let trusted = PeerId::random();
1964        let mut peers =
1965            PeersManager::new(PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
1966                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
1967                tcp_port: 8008,
1968                udp_port: 8008,
1969                id: trusted,
1970            }]));
1971        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1972        peers.add_peer(trusted, PeerAddr::from_tcp(socket_addr), None);
1973        assert!(peers.peers.get(&trusted).unwrap().is_trusted());
1974    }
1975
1976    #[tokio::test]
1977    async fn accept_incoming_trusted_unknown_peer_address() {
1978        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1979        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1980        // try to connect trusted peer
1981        let trusted = PeerId::random();
1982        peers.add_trusted_peer_id(trusted);
1983
1984        // saturate the inbound slots
1985        for i in 0..peers.connection_info.config.max_inbound {
1986            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1987            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1988            let peer_id = PeerId::random();
1989            peers.on_incoming_session_established(peer_id, addr);
1990
1991            match event!(peers) {
1992                PeerAction::PeerAdded(id) => {
1993                    assert_eq!(id, peer_id);
1994                }
1995                _ => unreachable!(),
1996            }
1997        }
1998
1999        // try to connect untrusted peer
2000        let untrusted = PeerId::random();
2001        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
2002        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2003        peers.on_incoming_session_established(untrusted, socket_addr);
2004
2005        match event!(peers) {
2006            PeerAction::PeerAdded(id) => {
2007                assert_eq!(id, untrusted);
2008            }
2009            _ => unreachable!(),
2010        }
2011
2012        match event!(peers) {
2013            PeerAction::Disconnect { peer_id, reason } => {
2014                assert_eq!(peer_id, untrusted);
2015                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
2016            }
2017            _ => unreachable!(),
2018        }
2019
2020        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
2021        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2022        peers.on_incoming_session_established(trusted, socket_addr);
2023
2024        match event!(peers) {
2025            PeerAction::PeerAdded(id) => {
2026                assert_eq!(id, trusted);
2027            }
2028            _ => unreachable!(),
2029        }
2030
2031        poll_fn(|cx| {
2032            assert!(peers.poll(cx).is_pending());
2033            Poll::Ready(())
2034        })
2035        .await;
2036
2037        let peer = peers.peers.get(&trusted).unwrap();
2038        assert_eq!(peer.state, PeerConnectionState::In);
2039    }
2040
2041    #[tokio::test]
2042    async fn test_already_connected() {
2043        let peer = PeerId::random();
2044        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2045        let mut peers = PeersManager::default();
2046
2047        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
2048        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2049        assert_eq!(peers.connection_info.num_pending_in, 1);
2050
2051        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
2052        // to increase by 1
2053        peers.on_incoming_session_established(peer, socket_addr);
2054        let p = peers.peers.get_mut(&peer).expect("peer not found");
2055        assert_eq!(p.addr.tcp(), socket_addr);
2056        assert_eq!(peers.connection_info.num_pending_in, 0);
2057        assert_eq!(peers.connection_info.num_inbound, 1);
2058
2059        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
2060        // by 1
2061        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2062        assert_eq!(peers.connection_info.num_pending_in, 1);
2063
2064        // Simulate a rejection due to an already established connection, expecting the
2065        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
2066        // should not be changed.
2067        peers.on_already_connected(Direction::Incoming);
2068
2069        let p = peers.peers.get_mut(&peer).expect("peer not found");
2070        assert_eq!(p.addr.tcp(), socket_addr);
2071        assert_eq!(peers.connection_info.num_pending_in, 0);
2072        assert_eq!(peers.connection_info.num_inbound, 1);
2073    }
2074
2075    #[tokio::test]
2076    async fn test_reputation_change_trusted_peer() {
2077        let peer = PeerId::random();
2078        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2079        let mut peers = PeersManager::default();
2080        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2081
2082        match event!(peers) {
2083            PeerAction::PeerAdded(peer_id) => {
2084                assert_eq!(peer_id, peer);
2085            }
2086            _ => unreachable!(),
2087        }
2088        match event!(peers) {
2089            PeerAction::Connect { peer_id, remote_addr } => {
2090                assert_eq!(peer_id, peer);
2091                assert_eq!(remote_addr, socket_addr);
2092            }
2093            _ => unreachable!(),
2094        }
2095
2096        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2097        peers.on_active_outgoing_established(peer);
2098        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2099
2100        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2101
2102        {
2103            let p = peers.peers.get(&peer).unwrap();
2104            assert_eq!(p.state, PeerConnectionState::Out);
2105            // not banned yet
2106            assert!(!p.is_banned());
2107        }
2108
2109        // ensure peer is banned eventually
2110        loop {
2111            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2112
2113            let p = peers.peers.get(&peer).unwrap();
2114            if p.is_banned() {
2115                break
2116            }
2117        }
2118
2119        match event!(peers) {
2120            PeerAction::Disconnect { peer_id, .. } => {
2121                assert_eq!(peer_id, peer);
2122            }
2123            _ => unreachable!(),
2124        }
2125    }
2126
2127    #[tokio::test]
2128    async fn test_reputation_management() {
2129        let peer = PeerId::random();
2130        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2131        let mut peers = PeersManager::default();
2132        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2133        assert_eq!(peers.get_reputation(&peer), Some(0));
2134
2135        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2136        assert_eq!(peers.get_reputation(&peer), Some(1024));
2137
2138        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2139        assert_eq!(peers.get_reputation(&peer), Some(0));
2140    }
2141
2142    #[tokio::test]
2143    async fn test_remove_discovered_active() {
2144        let peer = PeerId::random();
2145        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2146        let mut peers = PeersManager::default();
2147        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2148
2149        match event!(peers) {
2150            PeerAction::PeerAdded(peer_id) => {
2151                assert_eq!(peer_id, peer);
2152            }
2153            _ => unreachable!(),
2154        }
2155        match event!(peers) {
2156            PeerAction::Connect { peer_id, remote_addr } => {
2157                assert_eq!(peer_id, peer);
2158                assert_eq!(remote_addr, socket_addr);
2159            }
2160            _ => unreachable!(),
2161        }
2162
2163        let p = peers.peers.get(&peer).unwrap();
2164        assert_eq!(p.state, PeerConnectionState::PendingOut);
2165
2166        peers.remove_peer(peer);
2167
2168        match event!(peers) {
2169            PeerAction::PeerRemoved(peer_id) => {
2170                assert_eq!(peer_id, peer);
2171            }
2172            _ => unreachable!(),
2173        }
2174        match event!(peers) {
2175            PeerAction::Disconnect { peer_id, .. } => {
2176                assert_eq!(peer_id, peer);
2177            }
2178            _ => unreachable!(),
2179        }
2180
2181        let p = peers.peers.get(&peer).unwrap();
2182        assert_eq!(p.state, PeerConnectionState::PendingOut);
2183
2184        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2185        let p = peers.peers.get(&peer).unwrap();
2186        assert_eq!(p.state, PeerConnectionState::PendingOut);
2187
2188        peers.on_active_session_gracefully_closed(peer);
2189        assert!(!peers.peers.contains_key(&peer));
2190    }
2191
2192    #[tokio::test]
2193    async fn test_fatal_outgoing_connection_error_trusted() {
2194        let peer = PeerId::random();
2195        let config = PeersConfig::test()
2196            .with_trusted_nodes(vec![TrustedPeer {
2197                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2198                tcp_port: 8008,
2199                udp_port: 8008,
2200                id: peer,
2201            }])
2202            .with_trusted_nodes_only(true);
2203        let mut peers = PeersManager::new(config);
2204        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2205
2206        match event!(peers) {
2207            PeerAction::Connect { peer_id, remote_addr } => {
2208                assert_eq!(peer_id, peer);
2209                assert_eq!(remote_addr, socket_addr);
2210            }
2211            _ => unreachable!(),
2212        }
2213
2214        let p = peers.peers.get(&peer).unwrap();
2215        assert_eq!(p.state, PeerConnectionState::PendingOut);
2216
2217        assert_eq!(peers.num_outbound_connections(), 0);
2218
2219        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2220            EthHandshakeError::NonStatusMessageInHandshake,
2221        ));
2222        assert!(err.is_fatal_protocol_error());
2223
2224        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2225        assert_eq!(peers.num_outbound_connections(), 0);
2226
2227        // try tmp ban peer
2228        match event!(peers) {
2229            PeerAction::BanPeer { peer_id } => {
2230                assert_eq!(peer_id, peer);
2231            }
2232            err => unreachable!("{err:?}"),
2233        }
2234
2235        // ensure we still have trusted peer
2236        assert!(peers.peers.contains_key(&peer));
2237
2238        // await for the ban to expire
2239        tokio::time::sleep(peers.backoff_durations.medium).await;
2240
2241        match event!(peers) {
2242            PeerAction::Connect { peer_id, remote_addr } => {
2243                assert_eq!(peer_id, peer);
2244                assert_eq!(remote_addr, socket_addr);
2245            }
2246            err => unreachable!("{err:?}"),
2247        }
2248    }
2249
2250    #[tokio::test]
2251    async fn test_outgoing_connection_error() {
2252        let peer = PeerId::random();
2253        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2254        let mut peers = PeersManager::default();
2255        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2256
2257        match event!(peers) {
2258            PeerAction::PeerAdded(peer_id) => {
2259                assert_eq!(peer_id, peer);
2260            }
2261            _ => unreachable!(),
2262        }
2263        match event!(peers) {
2264            PeerAction::Connect { peer_id, remote_addr } => {
2265                assert_eq!(peer_id, peer);
2266                assert_eq!(remote_addr, socket_addr);
2267            }
2268            _ => unreachable!(),
2269        }
2270
2271        let p = peers.peers.get(&peer).unwrap();
2272        assert_eq!(p.state, PeerConnectionState::PendingOut);
2273
2274        assert_eq!(peers.num_outbound_connections(), 0);
2275
2276        peers.on_outgoing_connection_failure(
2277            &socket_addr,
2278            &peer,
2279            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2280        );
2281
2282        assert_eq!(peers.num_outbound_connections(), 0);
2283    }
2284
2285    #[tokio::test]
2286    async fn test_outgoing_connection_gracefully_closed() {
2287        let peer = PeerId::random();
2288        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2289        let mut peers = PeersManager::default();
2290        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2291
2292        match event!(peers) {
2293            PeerAction::PeerAdded(peer_id) => {
2294                assert_eq!(peer_id, peer);
2295            }
2296            _ => unreachable!(),
2297        }
2298        match event!(peers) {
2299            PeerAction::Connect { peer_id, remote_addr } => {
2300                assert_eq!(peer_id, peer);
2301                assert_eq!(remote_addr, socket_addr);
2302            }
2303            _ => unreachable!(),
2304        }
2305
2306        let p = peers.peers.get(&peer).unwrap();
2307        assert_eq!(p.state, PeerConnectionState::PendingOut);
2308
2309        assert_eq!(peers.num_outbound_connections(), 0);
2310
2311        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2312
2313        assert_eq!(peers.num_outbound_connections(), 0);
2314        assert_eq!(peers.connection_info.num_pending_out, 0);
2315    }
2316
2317    #[tokio::test]
2318    async fn test_discovery_ban_list() {
2319        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2320        let socket_addr = SocketAddr::new(ip, 8008);
2321        let ban_list = BanList::new(vec![], vec![ip]);
2322        let config = PeersConfig::default().with_ban_list(ban_list);
2323        let mut peer_manager = PeersManager::new(config);
2324        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2325
2326        assert!(peer_manager.peers.is_empty());
2327    }
2328
2329    #[tokio::test]
2330    async fn test_on_pending_ban_list() {
2331        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2332        let socket_addr = SocketAddr::new(ip, 8008);
2333        let ban_list = BanList::new(vec![], vec![ip]);
2334        let config = PeersConfig::test().with_ban_list(ban_list);
2335        let mut peer_manager = PeersManager::new(config);
2336        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2337        // because we have no active peers this should be fine for testings
2338        match a {
2339            Ok(_) => panic!(),
2340            Err(err) => match err {
2341                InboundConnectionError::IpBanned => {
2342                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2343                }
2344                _ => unreachable!(),
2345            },
2346        }
2347    }
2348
2349    #[tokio::test]
2350    async fn test_on_active_inbound_ban_list() {
2351        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2352        let socket_addr = SocketAddr::new(ip, 8008);
2353        let given_peer_id = PeerId::random();
2354        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2355        let config = PeersConfig::test().with_ban_list(ban_list);
2356        let mut peer_manager = PeersManager::new(config);
2357        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2358        // non-trusted nodes should also increase pending_in
2359        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2360        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2361        // after the connection is established, the peer should be removed, the num_pending_in
2362        // should be decreased, and the num_inbound should not be increased
2363        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2364        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2365
2366        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2367            peer_manager.queued_actions.pop_front()
2368        else {
2369            panic!()
2370        };
2371
2372        assert_eq!(peer_id, given_peer_id)
2373    }
2374
2375    #[test]
2376    fn test_connection_limits() {
2377        let mut info = ConnectionInfo::default();
2378        info.inc_in();
2379        assert_eq!(info.num_inbound, 1);
2380        assert_eq!(info.num_outbound, 0);
2381        assert!(info.has_in_capacity());
2382
2383        info.decr_in();
2384        assert_eq!(info.num_inbound, 0);
2385        assert_eq!(info.num_outbound, 0);
2386
2387        info.inc_out();
2388        assert_eq!(info.num_inbound, 0);
2389        assert_eq!(info.num_outbound, 1);
2390        assert!(info.has_out_capacity());
2391
2392        info.decr_out();
2393        assert_eq!(info.num_inbound, 0);
2394        assert_eq!(info.num_outbound, 0);
2395    }
2396
2397    #[test]
2398    fn test_connection_peer_state() {
2399        let mut info = ConnectionInfo::default();
2400        info.inc_in();
2401
2402        info.decr_state(PeerConnectionState::In);
2403        assert_eq!(info.num_inbound, 0);
2404        assert_eq!(info.num_outbound, 0);
2405
2406        info.inc_out();
2407
2408        info.decr_state(PeerConnectionState::Out);
2409        assert_eq!(info.num_inbound, 0);
2410        assert_eq!(info.num_outbound, 0);
2411    }
2412
2413    #[tokio::test]
2414    async fn test_trusted_peers_are_prioritized() {
2415        let trusted_peer = PeerId::random();
2416        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2417        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2418            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2419            tcp_port: 8008,
2420            udp_port: 8008,
2421            id: trusted_peer,
2422        }]);
2423        let mut peers = PeersManager::new(config);
2424
2425        let basic_peer = PeerId::random();
2426        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2427        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2428
2429        match event!(peers) {
2430            PeerAction::PeerAdded(peer_id) => {
2431                assert_eq!(peer_id, basic_peer);
2432            }
2433            _ => unreachable!(),
2434        }
2435        match event!(peers) {
2436            PeerAction::Connect { peer_id, remote_addr } => {
2437                assert_eq!(peer_id, trusted_peer);
2438                assert_eq!(remote_addr, trusted_sock);
2439            }
2440            _ => unreachable!(),
2441        }
2442        match event!(peers) {
2443            PeerAction::Connect { peer_id, remote_addr } => {
2444                assert_eq!(peer_id, basic_peer);
2445                assert_eq!(remote_addr, basic_sock);
2446            }
2447            _ => unreachable!(),
2448        }
2449    }
2450
2451    #[tokio::test]
2452    async fn test_connect_trusted_nodes_only() {
2453        let trusted_peer = PeerId::random();
2454        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2455        let config = PeersConfig::test()
2456            .with_trusted_nodes(vec![TrustedPeer {
2457                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2458                tcp_port: 8008,
2459                udp_port: 8008,
2460                id: trusted_peer,
2461            }])
2462            .with_trusted_nodes_only(true);
2463        let mut peers = PeersManager::new(config);
2464
2465        let basic_peer = PeerId::random();
2466        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2467        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2468
2469        match event!(peers) {
2470            PeerAction::PeerAdded(peer_id) => {
2471                assert_eq!(peer_id, basic_peer);
2472            }
2473            _ => unreachable!(),
2474        }
2475        match event!(peers) {
2476            PeerAction::Connect { peer_id, remote_addr } => {
2477                assert_eq!(peer_id, trusted_peer);
2478                assert_eq!(remote_addr, trusted_sock);
2479            }
2480            _ => unreachable!(),
2481        }
2482        poll_fn(|cx| {
2483            assert!(peers.poll(cx).is_pending());
2484            Poll::Ready(())
2485        })
2486        .await;
2487    }
2488
2489    #[tokio::test]
2490    async fn test_incoming_with_trusted_nodes_only() {
2491        let trusted_peer = PeerId::random();
2492        let config = PeersConfig::test()
2493            .with_trusted_nodes(vec![TrustedPeer {
2494                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2495                tcp_port: 8008,
2496                udp_port: 8008,
2497                id: trusted_peer,
2498            }])
2499            .with_trusted_nodes_only(true);
2500        let mut peers = PeersManager::new(config);
2501
2502        let basic_peer = PeerId::random();
2503        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2504        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2505        // non-trusted nodes should also increase pending_in
2506        assert_eq!(peers.connection_info.num_pending_in, 1);
2507        peers.on_incoming_session_established(basic_peer, basic_sock);
2508        // after the connection is established, the peer should be removed, the num_pending_in
2509        // should be decreased, and the num_inbound mut not be increased
2510        assert_eq!(peers.connection_info.num_pending_in, 0);
2511        assert_eq!(peers.connection_info.num_inbound, 0);
2512
2513        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2514            peers.queued_actions.pop_front()
2515        else {
2516            panic!()
2517        };
2518        assert_eq!(basic_peer, peer_id);
2519        assert!(!peers.peers.contains_key(&basic_peer));
2520    }
2521
2522    #[tokio::test]
2523    async fn test_incoming_without_trusted_nodes_only() {
2524        let trusted_peer = PeerId::random();
2525        let config = PeersConfig::test()
2526            .with_trusted_nodes(vec![TrustedPeer {
2527                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2528                tcp_port: 8008,
2529                udp_port: 8008,
2530                id: trusted_peer,
2531            }])
2532            .with_trusted_nodes_only(false);
2533        let mut peers = PeersManager::new(config);
2534
2535        let basic_peer = PeerId::random();
2536        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2537        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2538
2539        // non-trusted nodes should also increase pending_in
2540        assert_eq!(peers.connection_info.num_pending_in, 1);
2541        peers.on_incoming_session_established(basic_peer, basic_sock);
2542        // after the connection is established, the peer should be removed, the num_pending_in
2543        // should be decreased, and the num_inbound must be increased
2544        assert_eq!(peers.connection_info.num_pending_in, 0);
2545        assert_eq!(peers.connection_info.num_inbound, 1);
2546        assert!(peers.peers.contains_key(&basic_peer));
2547    }
2548
2549    #[tokio::test]
2550    async fn test_incoming_at_capacity() {
2551        let mut config = PeersConfig::test();
2552        config.connection_info.max_inbound = 1;
2553        let mut peers = PeersManager::new(config);
2554
2555        let peer = PeerId::random();
2556        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2557        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2558
2559        peers.on_incoming_session_established(peer, addr);
2560
2561        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2562        assert_eq!(
2563            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2564            InboundConnectionError::ExceedsCapacity
2565        );
2566    }
2567
2568    #[tokio::test]
2569    async fn test_incoming_rate_limit() {
2570        let config = PeersConfig {
2571            incoming_ip_throttle_duration: Duration::from_millis(100),
2572            ..PeersConfig::test()
2573        };
2574        let mut peers = PeersManager::new(config);
2575
2576        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2577        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2578        assert_eq!(
2579            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2580            InboundConnectionError::IpBanned
2581        );
2582
2583        peers.release_interval.reset_immediately();
2584        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2585
2586        // await unban
2587        poll_fn(|cx| loop {
2588            if peers.poll(cx).is_pending() {
2589                return Poll::Ready(());
2590            }
2591        })
2592        .await;
2593
2594        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2595        assert_eq!(
2596            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2597            InboundConnectionError::IpBanned
2598        );
2599    }
2600
2601    #[tokio::test]
2602    async fn test_tick() {
2603        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2604        let socket_addr = SocketAddr::new(ip, 8008);
2605        let config = PeersConfig::test();
2606        let mut peer_manager = PeersManager::new(config);
2607        let peer_id = PeerId::random();
2608        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2609
2610        tokio::time::sleep(Duration::from_secs(1)).await;
2611        peer_manager.tick();
2612
2613        // still unconnected
2614        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2615
2616        // mark as connected
2617        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2618
2619        tokio::time::sleep(Duration::from_secs(1)).await;
2620        peer_manager.tick();
2621
2622        // still at default reputation
2623        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2624
2625        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2626
2627        tokio::time::sleep(Duration::from_secs(1)).await;
2628        peer_manager.tick();
2629
2630        // tick applied
2631        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2632    }
2633
2634    #[tokio::test]
2635    async fn test_remove_incoming_after_disconnect() {
2636        let peer_id = PeerId::random();
2637        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2638        let mut peers = PeersManager::default();
2639
2640        peers.on_incoming_pending_session(addr.ip()).unwrap();
2641        peers.on_incoming_session_established(peer_id, addr);
2642        let peer = peers.peers.get(&peer_id).unwrap();
2643        assert_eq!(peer.state, PeerConnectionState::In);
2644        assert!(peer.remove_after_disconnect);
2645
2646        peers.on_active_session_gracefully_closed(peer_id);
2647        assert!(!peers.peers.contains_key(&peer_id))
2648    }
2649
2650    #[tokio::test]
2651    async fn test_keep_incoming_after_disconnect_if_discovered() {
2652        let peer_id = PeerId::random();
2653        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2654        let mut peers = PeersManager::default();
2655
2656        peers.on_incoming_pending_session(addr.ip()).unwrap();
2657        peers.on_incoming_session_established(peer_id, addr);
2658        let peer = peers.peers.get(&peer_id).unwrap();
2659        assert_eq!(peer.state, PeerConnectionState::In);
2660        assert!(peer.remove_after_disconnect);
2661
2662        // trigger discovery manually while the peer is still connected
2663        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2664
2665        peers.on_active_session_gracefully_closed(peer_id);
2666
2667        let peer = peers.peers.get(&peer_id).unwrap();
2668        assert_eq!(peer.state, PeerConnectionState::Idle);
2669        assert!(!peer.remove_after_disconnect);
2670    }
2671
2672    #[tokio::test]
2673    async fn test_peer_reconnect_after_graceful_close_respects_throttle() {
2674        let throttle_duration = Duration::from_millis(100);
2675        let config =
2676            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2677        let mut peers = PeersManager::new(config);
2678
2679        let peer_id = PeerId::random();
2680        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2681
2682        // Add as regular peer
2683        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2684
2685        match event!(peers) {
2686            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2687            _ => unreachable!(),
2688        }
2689
2690        match event!(peers) {
2691            PeerAction::Connect { .. } => {}
2692            _ => unreachable!(),
2693        }
2694
2695        // Simulate outbound connection established
2696        peers.on_active_outgoing_established(peer_id);
2697        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2698
2699        // Gracefully close the session
2700        peers.on_active_session_gracefully_closed(peer_id);
2701
2702        let peer = peers.peers.get(&peer_id).unwrap();
2703        assert_eq!(peer.state, PeerConnectionState::Idle);
2704        assert!(peer.backed_off);
2705
2706        // Verify the peer is in the backed_off_peers set
2707        assert!(peers.backed_off_peers.contains_key(&peer_id));
2708
2709        // Immediately try to poll - should not trigger any actions yet
2710        poll_fn(|cx| {
2711            assert!(peers.poll(cx).is_pending());
2712            Poll::Ready(())
2713        })
2714        .await;
2715
2716        // Peer should still be backed off
2717        assert!(peers.backed_off_peers.contains_key(&peer_id));
2718        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2719
2720        // Sleep for the throttle duration
2721        tokio::time::sleep(throttle_duration).await;
2722
2723        // After throttle duration, event! will poll until we get a Connect action
2724        match event!(peers) {
2725            PeerAction::Connect { peer_id: id, .. } => assert_eq!(id, peer_id),
2726            _ => unreachable!(),
2727        }
2728
2729        // After connection is initiated, peer should no longer be backed off
2730        assert!(!peers.backed_off_peers.contains_key(&peer_id));
2731        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2732    }
2733
2734    #[tokio::test]
2735    async fn test_backed_off_peer_can_accept_incoming_connection() {
2736        let throttle_duration = Duration::from_millis(100);
2737        let config =
2738            PeersConfig { incoming_ip_throttle_duration: throttle_duration, ..PeersConfig::test() };
2739        let mut peers = PeersManager::new(config);
2740
2741        let peer_id = PeerId::random();
2742        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2743
2744        // Add as regular peer
2745        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2746
2747        match event!(peers) {
2748            PeerAction::PeerAdded(id) => assert_eq!(id, peer_id),
2749            _ => unreachable!(),
2750        }
2751
2752        match event!(peers) {
2753            PeerAction::Connect { .. } => {}
2754            _ => unreachable!(),
2755        }
2756
2757        // Simulate outbound connection established
2758        peers.on_active_outgoing_established(peer_id);
2759        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::Out);
2760
2761        // Gracefully close the session - this will back off the peer
2762        peers.on_active_session_gracefully_closed(peer_id);
2763
2764        let peer = peers.peers.get(&peer_id).unwrap();
2765        assert_eq!(peer.state, PeerConnectionState::Idle);
2766        assert!(peer.backed_off);
2767        assert!(peers.backed_off_peers.contains_key(&peer_id));
2768
2769        // Now simulate an incoming connection from the backed-off peer
2770        // First, handle the incoming pending session
2771        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2772        assert_eq!(peers.connection_info.num_pending_in, 1);
2773
2774        // Establish the incoming session
2775        peers.on_incoming_session_established(peer_id, addr);
2776
2777        // Peer should have been added to incoming connections
2778        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2779        assert_eq!(peers.connection_info.num_inbound, 1);
2780
2781        // Peer should still be backed off for outbound connections
2782        assert!(peers.backed_off_peers.contains_key(&peer_id));
2783        assert!(peers.peers.get(&peer_id).unwrap().backed_off);
2784
2785        // Verify we don't try to reconnect outbound while peer is backed off
2786        poll_fn(|cx| {
2787            assert!(peers.poll(cx).is_pending());
2788            Poll::Ready(())
2789        })
2790        .await;
2791
2792        // No outbound connection should be attempted while backed off
2793        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2794
2795        // After throttle duration, the backoff should be cleared
2796        tokio::time::sleep(throttle_duration).await;
2797
2798        poll_fn(|cx| {
2799            let _ = peers.poll(cx);
2800            Poll::Ready(())
2801        })
2802        .await;
2803
2804        // Backoff should be cleared now
2805        assert!(!peers.backed_off_peers.contains_key(&peer_id));
2806        assert!(!peers.peers.get(&peer_id).unwrap().backed_off);
2807
2808        // Peer should still be in incoming state
2809        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2810    }
2811
2812    #[tokio::test]
2813    async fn test_incoming_outgoing_already_connected() {
2814        let peer_id = PeerId::random();
2815        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2816        let mut peers = PeersManager::default();
2817
2818        peers.on_incoming_pending_session(addr.ip()).unwrap();
2819        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2820
2821        match event!(peers) {
2822            PeerAction::PeerAdded(_) => {}
2823            _ => unreachable!(),
2824        }
2825
2826        match event!(peers) {
2827            PeerAction::Connect { .. } => {}
2828            _ => unreachable!(),
2829        }
2830
2831        peers.on_incoming_session_established(peer_id, addr);
2832        peers.on_already_connected(Direction::Outgoing(peer_id));
2833        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2834        assert_eq!(peers.connection_info.num_inbound, 1);
2835        assert_eq!(peers.connection_info.num_pending_out, 0);
2836        assert_eq!(peers.connection_info.num_pending_in, 0);
2837        assert_eq!(peers.connection_info.num_outbound, 0);
2838    }
2839
2840    #[tokio::test]
2841    async fn test_already_connected_incoming_outgoing_connection_error() {
2842        let peer_id = PeerId::random();
2843        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2844        let mut peers = PeersManager::default();
2845
2846        peers.on_incoming_pending_session(addr.ip()).unwrap();
2847        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2848
2849        match event!(peers) {
2850            PeerAction::PeerAdded(_) => {}
2851            _ => unreachable!(),
2852        }
2853
2854        match event!(peers) {
2855            PeerAction::Connect { .. } => {}
2856            _ => unreachable!(),
2857        }
2858
2859        peers.on_incoming_session_established(peer_id, addr);
2860
2861        peers.on_outgoing_connection_failure(
2862            &addr,
2863            &peer_id,
2864            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2865        );
2866        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2867        assert_eq!(peers.connection_info.num_inbound, 1);
2868        assert_eq!(peers.connection_info.num_pending_out, 0);
2869        assert_eq!(peers.connection_info.num_pending_in, 0);
2870        assert_eq!(peers.connection_info.num_outbound, 0);
2871    }
2872
2873    #[tokio::test]
2874    async fn test_max_concurrent_dials() {
2875        let config = PeersConfig::default();
2876        let mut peer_manager = PeersManager::new(config);
2877        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2878        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2879        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2880            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2881        }
2882
2883        peer_manager.fill_outbound_slots();
2884        let dials = peer_manager
2885            .queued_actions
2886            .iter()
2887            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2888            .count();
2889        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2890    }
2891
2892    #[tokio::test]
2893    async fn test_max_num_of_pending_dials() {
2894        let config = PeersConfig::default();
2895        let mut peer_manager = PeersManager::new(config);
2896        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2897        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2898
2899        // add more peers than allowed
2900        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2901            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2902        }
2903
2904        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2905            match event!(peer_manager) {
2906                PeerAction::PeerAdded(_) => {}
2907                _ => unreachable!(),
2908            }
2909        }
2910
2911        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2912            match event!(peer_manager) {
2913                PeerAction::Connect { .. } => {}
2914                _ => unreachable!(),
2915            }
2916        }
2917
2918        // generate 'Connect' actions
2919        peer_manager.fill_outbound_slots();
2920
2921        // all dialed connections should be in 'PendingOut' state
2922        let dials = peer_manager.connection_info.num_pending_out;
2923        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2924
2925        let num_pendingout_states = peer_manager
2926            .peers
2927            .iter()
2928            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2929            .map(|(peer_id, _)| *peer_id)
2930            .collect::<Vec<PeerId>>();
2931        assert_eq!(
2932            num_pendingout_states.len(),
2933            peer_manager.connection_info.config.max_concurrent_outbound_dials
2934        );
2935
2936        // establish dialed connections
2937        for peer_id in &num_pendingout_states {
2938            peer_manager.on_active_outgoing_established(*peer_id);
2939        }
2940
2941        // all dialed connections should now be in 'Out' state
2942        for peer_id in &num_pendingout_states {
2943            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2944        }
2945
2946        // no more pending outbound connections
2947        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2948    }
2949
2950    #[tokio::test]
2951    async fn test_connect() {
2952        let peer = PeerId::random();
2953        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2954        let mut peers = PeersManager::default();
2955        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2956        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2957
2958        match event!(peers) {
2959            PeerAction::Connect { peer_id, remote_addr } => {
2960                assert_eq!(peer_id, peer);
2961                assert_eq!(remote_addr, socket_addr);
2962            }
2963            _ => unreachable!(),
2964        }
2965
2966        let (record, _) = peers.peer_by_id(peer).unwrap();
2967        assert_eq!(record.tcp_addr(), socket_addr);
2968        assert_eq!(record.udp_addr(), socket_addr);
2969
2970        // connect again
2971        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2972
2973        let (record, _) = peers.peer_by_id(peer).unwrap();
2974        assert_eq!(record.tcp_addr(), socket_addr);
2975        assert_eq!(record.udp_addr(), socket_addr);
2976    }
2977
2978    #[tokio::test]
2979    async fn test_incoming_connection_from_banned() {
2980        let peer = PeerId::random();
2981        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2982        let config = PeersConfig::test().with_max_inbound(3);
2983        let mut peers = PeersManager::new(config);
2984        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2985
2986        match event!(peers) {
2987            PeerAction::PeerAdded(peer_id) => {
2988                assert_eq!(peer_id, peer);
2989            }
2990            _ => unreachable!(),
2991        }
2992        match event!(peers) {
2993            PeerAction::Connect { peer_id, .. } => {
2994                assert_eq!(peer_id, peer);
2995            }
2996            _ => unreachable!(),
2997        }
2998
2999        poll_fn(|cx| {
3000            assert!(peers.poll(cx).is_pending());
3001            Poll::Ready(())
3002        })
3003        .await;
3004
3005        // simulate new connection drops with error
3006        loop {
3007            peers.on_active_session_dropped(
3008                &socket_addr,
3009                &peer,
3010                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
3011                    reth_eth_wire::EthVersion::Eth68,
3012                    reth_eth_wire::EthMessageID::Status,
3013                )),
3014            );
3015
3016            if peers.peers.get(&peer).unwrap().is_banned() {
3017                break;
3018            }
3019
3020            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3021            peers.on_incoming_session_established(peer, socket_addr);
3022
3023            match event!(peers) {
3024                PeerAction::Connect { peer_id, .. } => {
3025                    assert_eq!(peer_id, peer);
3026                }
3027                _ => unreachable!(),
3028            }
3029        }
3030
3031        assert!(peers.peers.get(&peer).unwrap().is_banned());
3032
3033        // fill all incoming slots
3034        for _ in 0..peers.connection_info.config.max_inbound {
3035            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
3036            peers.on_incoming_session_established(peer, socket_addr);
3037
3038            match event!(peers) {
3039                PeerAction::DisconnectBannedIncoming { peer_id } => {
3040                    assert_eq!(peer_id, peer);
3041                }
3042                _ => unreachable!(),
3043            }
3044        }
3045
3046        poll_fn(|cx| {
3047            assert!(peers.poll(cx).is_pending());
3048            Poll::Ready(())
3049        })
3050        .await;
3051
3052        assert_eq!(peers.connection_info.num_inbound, 0);
3053
3054        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
3055
3056        // Assert we can still accept new connections
3057        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
3058        assert_eq!(peers.connection_info.num_pending_in, 1);
3059
3060        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
3061        // connection info is updated via the peer's state which would be a noop here since the
3062        // banned peer's state is idle
3063        peers.on_active_session_gracefully_closed(peer);
3064        assert_eq!(peers.connection_info.num_inbound, 0);
3065    }
3066
3067    #[tokio::test]
3068    async fn test_add_pending_connect() {
3069        let peer = PeerId::random();
3070        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
3071        let mut peers = PeersManager::default();
3072        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
3073        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
3074        assert_eq!(peers.connection_info.num_pending_out, 1);
3075    }
3076
3077    #[tokio::test]
3078    async fn test_dns_updates_peer_address() {
3079        let peer_id = PeerId::random();
3080        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
3081        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
3082
3083        let trusted = TrustedPeer {
3084            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
3085            tcp_port: 8008,
3086            udp_port: 8008,
3087            id: peer_id,
3088        };
3089
3090        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
3091        let mut manager = PeersManager::new(config);
3092        manager
3093            .trusted_peers_resolver
3094            .set_interval(tokio::time::interval(Duration::from_millis(1)));
3095
3096        manager.peers.insert(
3097            peer_id,
3098            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
3099        );
3100
3101        for _ in 0..100 {
3102            let _ = event!(manager);
3103            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
3104                break;
3105            }
3106            tokio::time::sleep(Duration::from_millis(10)).await;
3107        }
3108
3109        let updated_peer = manager.peers.get(&peer_id).unwrap();
3110        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
3111    }
3112
3113    #[tokio::test]
3114    async fn test_ip_filter_blocks_inbound_connection() {
3115        use reth_net_banlist::IpFilter;
3116        use std::net::IpAddr;
3117
3118        // Create a filter that only allows 192.168.0.0/16
3119        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3120        let config = PeersConfig::test().with_ip_filter(ip_filter);
3121        let mut peers = PeersManager::new(config);
3122
3123        // Try to connect from an allowed IP
3124        let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
3125        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3126
3127        // Try to connect from a disallowed IP
3128        let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
3129        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3130    }
3131
3132    #[tokio::test]
3133    async fn test_ip_filter_blocks_outbound_connection() {
3134        use reth_net_banlist::IpFilter;
3135        use std::net::SocketAddr;
3136
3137        // Create a filter that only allows 192.168.0.0/16
3138        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
3139        let config = PeersConfig::test().with_ip_filter(ip_filter);
3140        let mut peers = PeersManager::new(config);
3141
3142        let peer_id = PeerId::new([1; 64]);
3143
3144        // Try to add a peer with an allowed IP
3145        let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
3146        peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
3147        assert!(peers.peers.contains_key(&peer_id));
3148
3149        // Try to add a peer with a disallowed IP
3150        let peer_id2 = PeerId::new([2; 64]);
3151        let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
3152        peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
3153        assert!(!peers.peers.contains_key(&peer_id2));
3154    }
3155
3156    #[tokio::test]
3157    async fn test_ip_filter_ipv6() {
3158        use reth_net_banlist::IpFilter;
3159        use std::net::IpAddr;
3160
3161        // Create a filter that only allows IPv6 range 2001:db8::/32
3162        let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
3163        let config = PeersConfig::test().with_ip_filter(ip_filter);
3164        let mut peers = PeersManager::new(config);
3165
3166        // Try to connect from an allowed IPv6 address
3167        let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
3168        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
3169
3170        // Try to connect from a disallowed IPv6 address
3171        let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
3172        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3173    }
3174
3175    #[tokio::test]
3176    async fn test_ip_filter_multiple_ranges() {
3177        use reth_net_banlist::IpFilter;
3178        use std::net::IpAddr;
3179
3180        // Create a filter that allows multiple ranges
3181        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
3182        let config = PeersConfig::test().with_ip_filter(ip_filter);
3183        let mut peers = PeersManager::new(config);
3184
3185        // Try IPs from both allowed ranges
3186        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3187        let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3188        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3189        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3190
3191        // Try IP from disallowed range
3192        let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3193        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3194    }
3195
3196    #[tokio::test]
3197    async fn test_ip_filter_no_restriction() {
3198        use reth_net_banlist::IpFilter;
3199        use std::net::IpAddr;
3200
3201        // Create a filter with no restrictions (allow all)
3202        let ip_filter = IpFilter::allow_all();
3203        let config = PeersConfig::test().with_ip_filter(ip_filter);
3204        let mut peers = PeersManager::new(config);
3205
3206        // All IPs should be allowed
3207        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3208        let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3209        let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3210        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3211        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3212        assert!(peers.on_incoming_pending_session(ip3).is_ok());
3213    }
3214}