reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    is_connection_failed_reputation,
18    peers::{
19        config::PeerBackoffDurations,
20        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21    },
22    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27    fmt::Display,
28    io::{self},
29    net::{IpAddr, SocketAddr},
30    task::{Context, Poll},
31    time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35    sync::mpsc,
36    time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41/// Maintains the state of _all_ the peers known to the network.
42///
43/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
44/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
45///
46/// The [`PeersManager`] will be notified on peer related changes
47#[derive(Debug)]
48pub struct PeersManager {
49    /// All peers known to the network
50    peers: HashMap<PeerId, Peer>,
51    /// The set of trusted peer ids.
52    ///
53    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
54    /// an address: [`Self::add_trusted_peer_id`]
55    trusted_peer_ids: HashSet<PeerId>,
56    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
57    /// peer's address when the DNS records change.
58    trusted_peers_resolver: TrustedPeersResolver,
59    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
60    manager_tx: mpsc::UnboundedSender<PeerCommand>,
61    /// Receiver half of the command channel.
62    handle_rx: UnboundedReceiverStream<PeerCommand>,
63    /// Buffered actions until the manager is polled.
64    queued_actions: VecDeque<PeerAction>,
65    /// Interval for triggering connections if there are free slots.
66    refill_slots_interval: Interval,
67    /// How to weigh reputation changes
68    reputation_weights: ReputationChangeWeights,
69    /// Tracks current slot stats.
70    connection_info: ConnectionInfo,
71    /// Tracks unwanted ips/peer ids.
72    ban_list: BanList,
73    /// Tracks currently backed off peers.
74    backed_off_peers: HashMap<PeerId, std::time::Instant>,
75    /// Interval at which to check for peers to unban and release from the backoff map.
76    release_interval: Interval,
77    /// How long to ban bad peers.
78    ban_duration: Duration,
79    /// How long peers to which we could not connect for non-fatal reasons, e.g.
80    /// [`DisconnectReason::TooManyPeers`], are put in time out.
81    backoff_durations: PeerBackoffDurations,
82    /// If non-trusted peers should be connected to, or the connection from non-trusted
83    /// incoming peers should be accepted.
84    trusted_nodes_only: bool,
85    /// Timestamp of the last time [`Self::tick`] was called.
86    last_tick: Instant,
87    /// Maximum number of backoff attempts before we give up on a peer and dropping.
88    max_backoff_count: u8,
89    /// Tracks the connection state of the node
90    net_connection_state: NetworkConnectionState,
91    /// How long to temporarily ban ip on an incoming connection attempt.
92    incoming_ip_throttle_duration: Duration,
93    /// IP address filter for restricting network connections to specific IP ranges.
94    ip_filter: reth_net_banlist::IpFilter,
95}
96
97impl PeersManager {
98    /// Create a new instance with the given config
99    pub fn new(config: PeersConfig) -> Self {
100        let PeersConfig {
101            refill_slots_interval,
102            connection_info,
103            reputation_weights,
104            ban_list,
105            ban_duration,
106            backoff_durations,
107            trusted_nodes,
108            trusted_nodes_only,
109            trusted_nodes_resolution_interval,
110            basic_nodes,
111            max_backoff_count,
112            incoming_ip_throttle_duration,
113            ip_filter,
114        } = config;
115        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
116        let now = Instant::now();
117
118        // We use half of the interval to decrease the max duration to `150%` in worst case
119        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
120
121        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
122        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
123
124        for trusted_peer in &trusted_nodes {
125            match trusted_peer.resolve_blocking() {
126                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
127                    trusted_peer_ids.insert(id);
128                    peers.entry(id).or_insert_with(|| {
129                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
130                    });
131                }
132                Err(err) => {
133                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
134                }
135            }
136        }
137
138        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
139            peers.entry(id).or_insert_with(|| {
140                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
141            });
142        }
143
144        Self {
145            peers,
146            trusted_peer_ids,
147            trusted_peers_resolver: TrustedPeersResolver::new(
148                trusted_nodes,
149                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
150            ),
151            manager_tx,
152            handle_rx: UnboundedReceiverStream::new(handle_rx),
153            queued_actions: Default::default(),
154            reputation_weights,
155            refill_slots_interval: tokio::time::interval(refill_slots_interval),
156            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
157            connection_info: ConnectionInfo::new(connection_info),
158            ban_list,
159            backed_off_peers: Default::default(),
160            ban_duration,
161            backoff_durations,
162            trusted_nodes_only,
163            last_tick: Instant::now(),
164            max_backoff_count,
165            net_connection_state: NetworkConnectionState::default(),
166            incoming_ip_throttle_duration,
167            ip_filter,
168        }
169    }
170
171    /// Returns a new [`PeersHandle`] that can send commands to this type.
172    pub(crate) fn handle(&self) -> PeersHandle {
173        PeersHandle::new(self.manager_tx.clone())
174    }
175
176    /// Returns the number of peers in the peer set
177    #[inline]
178    pub(crate) fn num_known_peers(&self) -> usize {
179        self.peers.len()
180    }
181
182    /// Returns an iterator over all peers
183    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
184        self.peers.iter().map(|(peer_id, v)| {
185            NodeRecord::new_with_ports(
186                v.addr.tcp().ip(),
187                v.addr.tcp().port(),
188                v.addr.udp().map(|addr| addr.port()),
189                *peer_id,
190            )
191        })
192    }
193
194    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
195    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
196        self.peers.get(&peer_id).map(|v| {
197            (
198                NodeRecord::new_with_ports(
199                    v.addr.tcp().ip(),
200                    v.addr.tcp().port(),
201                    v.addr.udp().map(|addr| addr.port()),
202                    peer_id,
203                ),
204                v.kind,
205            )
206        })
207    }
208
209    /// Returns an iterator over all peer ids for peers with the given kind
210    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
211        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
212    }
213
214    /// Returns the number of currently active inbound connections.
215    #[inline]
216    pub(crate) const fn num_inbound_connections(&self) -> usize {
217        self.connection_info.num_inbound
218    }
219
220    /// Returns the number of currently __active__ outbound connections.
221    #[inline]
222    pub(crate) const fn num_outbound_connections(&self) -> usize {
223        self.connection_info.num_outbound
224    }
225
226    /// Returns the number of currently pending outbound connections.
227    #[inline]
228    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
229        self.connection_info.num_pending_out
230    }
231
232    /// Returns the number of currently backed off peers.
233    #[inline]
234    pub(crate) fn num_backed_off_peers(&self) -> usize {
235        self.backed_off_peers.len()
236    }
237
238    /// Returns the number of idle trusted peers.
239    fn num_idle_trusted_peers(&self) -> usize {
240        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
241    }
242
243    /// Invoked when a new _incoming_ tcp connection is accepted.
244    ///
245    /// returns an error if the inbound ip address is on the ban list
246    pub(crate) fn on_incoming_pending_session(
247        &mut self,
248        addr: IpAddr,
249    ) -> Result<(), InboundConnectionError> {
250        // Check if the IP is in the allowed ranges (netrestrict)
251        if !self.ip_filter.is_allowed(&addr) {
252            trace!(target: "net", ?addr, "Rejecting connection from IP not in allowed ranges");
253            return Err(InboundConnectionError::IpBanned)
254        }
255
256        if self.ban_list.is_banned_ip(&addr) {
257            return Err(InboundConnectionError::IpBanned)
258        }
259
260        // check if we even have slots for a new incoming connection
261        if !self.connection_info.has_in_capacity() {
262            if self.trusted_peer_ids.is_empty() {
263                // if we don't have any incoming slots and no trusted peers, we don't accept any new
264                // connections
265                return Err(InboundConnectionError::ExceedsCapacity)
266            }
267
268            // there's an edge case here where no incoming connections besides from trusted peers
269            // are allowed (max_inbound == 0), in which case we still need to allow new pending
270            // incoming connections until all trusted peers are connected.
271            let num_idle_trusted_peers = self.num_idle_trusted_peers();
272            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
273                // we still want to limit concurrent pending connections
274                let max_inbound =
275                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
276                if self.connection_info.num_pending_in < max_inbound {
277                    self.connection_info.inc_pending_in();
278                    return Ok(())
279                }
280            }
281
282            // all trusted peers are either connected or connecting
283            return Err(InboundConnectionError::ExceedsCapacity)
284        }
285
286        // also cap the incoming connections we can process at once
287        if !self.connection_info.has_in_pending_capacity() {
288            return Err(InboundConnectionError::ExceedsCapacity)
289        }
290
291        // apply the rate limit
292        self.throttle_incoming_ip(addr);
293
294        self.connection_info.inc_pending_in();
295        Ok(())
296    }
297
298    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
299    /// rejected.
300    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
301        self.connection_info.decr_pending_in();
302    }
303
304    /// Invoked when a pending session was closed.
305    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
306        self.connection_info.decr_pending_in()
307    }
308
309    /// Invoked when a pending session was closed.
310    pub(crate) fn on_incoming_pending_session_dropped(
311        &mut self,
312        remote_addr: SocketAddr,
313        err: &PendingSessionHandshakeError,
314    ) {
315        if err.is_fatal_protocol_error() {
316            self.ban_ip(remote_addr.ip());
317
318            if err.merits_discovery_ban() {
319                self.queued_actions
320                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
321            }
322        }
323
324        self.connection_info.decr_pending_in();
325    }
326
327    /// Called when a new _incoming_ active session was established to the given peer.
328    ///
329    /// This will update the state of the peer if not yet tracked.
330    ///
331    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
332    /// be scheduled.
333    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
334        self.connection_info.decr_pending_in();
335
336        // we only need to check the peer id here as the ip address will have been checked at
337        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
338        if self.ban_list.is_banned_peer(&peer_id) {
339            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
340            return
341        }
342
343        // check if the peer is trustable or not
344        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
345        if self.trusted_nodes_only && !is_trusted {
346            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
347            return
348        }
349
350        // start a new tick, so the peer is not immediately rewarded for the time since last tick
351        self.tick();
352
353        match self.peers.entry(peer_id) {
354            Entry::Occupied(mut entry) => {
355                let peer = entry.get_mut();
356                if peer.is_banned() {
357                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
358                    return
359                }
360                // it might be the case that we're also trying to connect to this peer at the same
361                // time, so we need to adjust the state here
362                if peer.state.is_pending_out() {
363                    self.connection_info.decr_state(peer.state);
364                }
365
366                peer.state = PeerConnectionState::In;
367
368                is_trusted = is_trusted || peer.is_trusted();
369            }
370            Entry::Vacant(entry) => {
371                // peer is missing in the table, we add it but mark it as to be removed after
372                // disconnect, because we only know the outgoing port
373                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
374                peer.remove_after_disconnect = true;
375                entry.insert(peer);
376                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
377            }
378        }
379
380        let has_in_capacity = self.connection_info.has_in_capacity();
381        // increment new incoming connection
382        self.connection_info.inc_in();
383
384        // disconnect the peer if we don't have capacity for more inbound connections
385        if !is_trusted && !has_in_capacity {
386            self.queued_actions.push_back(PeerAction::Disconnect {
387                peer_id,
388                reason: Some(DisconnectReason::TooManyPeers),
389            });
390        }
391    }
392
393    /// Bans the peer temporarily with the configured ban timeout
394    fn ban_peer(&mut self, peer_id: PeerId) {
395        let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
396            (peer.is_trusted() || peer.is_static())
397        {
398            // For misbehaving trusted or static peers, we provide a bit more leeway when
399            // penalizing them.
400            self.backoff_durations.low / 2
401        } else {
402            self.ban_duration
403        };
404
405        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
406        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
407    }
408
409    /// Bans the IP temporarily with the configured ban timeout
410    fn ban_ip(&mut self, ip: IpAddr) {
411        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
412    }
413
414    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
415    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
416        self.ban_list
417            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
418    }
419
420    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
421    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
422        trace!(target: "net::peers", ?peer_id, "backing off");
423
424        if let Some(peer) = self.peers.get_mut(&peer_id) {
425            peer.backed_off = true;
426            self.backed_off_peers.insert(peer_id, until);
427        }
428    }
429
430    /// Unbans the peer
431    fn unban_peer(&mut self, peer_id: PeerId) {
432        self.ban_list.unban_peer(&peer_id);
433        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
434    }
435
436    /// Tick function to update reputation of all connected peers.
437    /// Peers are rewarded with reputation increases for the time they are connected since the last
438    /// tick. This is to prevent peers from being disconnected eventually due to slashed
439    /// reputation because of some bad messages (most likely transaction related)
440    fn tick(&mut self) {
441        let now = Instant::now();
442        // Determine the number of seconds since the last tick.
443        // Ensuring that now is always greater than last_tick to account for issues with system
444        // time.
445        let secs_since_last_tick =
446            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
447        self.last_tick = now;
448
449        // update reputation via seconds connected
450        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
451            // update reputation via seconds connected, but keep the target _around_ the default
452            // reputation.
453            if peer.1.reputation < DEFAULT_REPUTATION {
454                peer.1.reputation += secs_since_last_tick;
455            }
456        }
457    }
458
459    /// Returns the tracked reputation for a peer.
460    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
461        self.peers.get(peer_id).map(|peer| peer.reputation)
462    }
463
464    /// Apply the corresponding reputation change to the given peer.
465    ///
466    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
467    /// reputation changes that can be attributed to network conditions. If the peer is a
468    /// trusted peer, it will also be less strict with the reputation slashing.
469    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
470        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
471            // First check if we should reset the reputation
472            if rep.is_reset() {
473                peer.reset_reputation()
474            } else {
475                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
476                if peer.is_trusted() || peer.is_static() {
477                    // exempt trusted and static peers from reputation slashing for
478                    if matches!(
479                        rep,
480                        ReputationChangeKind::Dropped |
481                            ReputationChangeKind::BadAnnouncement |
482                            ReputationChangeKind::Timeout |
483                            ReputationChangeKind::AlreadySeenTransaction
484                    ) {
485                        return
486                    }
487
488                    // also be less strict with the reputation slashing for trusted peers
489                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
490                        // this caps the reputation change to the maximum allowed for trusted peers
491                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
492                    }
493                }
494                peer.apply_reputation(reputation_change, rep)
495            }
496        } else {
497            return
498        };
499
500        match outcome {
501            ReputationChangeOutcome::None => {}
502            ReputationChangeOutcome::Ban => {
503                self.ban_peer(*peer_id);
504            }
505            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
506            ReputationChangeOutcome::DisconnectAndBan => {
507                self.queued_actions.push_back(PeerAction::Disconnect {
508                    peer_id: *peer_id,
509                    reason: Some(DisconnectReason::DisconnectRequested),
510                });
511                self.ban_peer(*peer_id);
512            }
513        }
514    }
515
516    /// Gracefully disconnected a pending _outgoing_ session
517    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
518        if let Some(peer) = self.peers.get_mut(peer_id) {
519            self.connection_info.decr_state(peer.state);
520            peer.state = PeerConnectionState::Idle;
521        }
522    }
523
524    /// Invoked when an _outgoing_ pending session was closed during authentication or the
525    /// handshake.
526    pub(crate) fn on_outgoing_pending_session_dropped(
527        &mut self,
528        remote_addr: &SocketAddr,
529        peer_id: &PeerId,
530        err: &PendingSessionHandshakeError,
531    ) {
532        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
533    }
534
535    /// Gracefully disconnected an active session
536    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
537        match self.peers.entry(peer_id) {
538            Entry::Occupied(mut entry) => {
539                self.connection_info.decr_state(entry.get().state);
540
541                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
542                    // this peer should be removed from the set
543                    entry.remove();
544                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
545                } else {
546                    // reset the peer's state
547                    // we reset the backoff counter since we're able to establish a successful
548                    // session to that peer
549                    entry.get_mut().severe_backoff_counter = 0;
550                    entry.get_mut().state = PeerConnectionState::Idle;
551                    return
552                }
553            }
554            Entry::Vacant(_) => return,
555        }
556
557        self.fill_outbound_slots();
558    }
559
560    /// Called when a _pending_ outbound connection is successful.
561    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
562        if let Some(peer) = self.peers.get_mut(&peer_id) {
563            self.connection_info.decr_state(peer.state);
564            self.connection_info.inc_out();
565            peer.state = PeerConnectionState::Out;
566        }
567    }
568
569    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
570    ///
571    /// Depending on whether the error is fatal, the peer will be removed from the peer set
572    /// otherwise its reputation is slashed.
573    pub(crate) fn on_active_session_dropped(
574        &mut self,
575        remote_addr: &SocketAddr,
576        peer_id: &PeerId,
577        err: &EthStreamError,
578    ) {
579        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
580    }
581
582    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
583    /// connection.
584    pub(crate) fn on_outgoing_connection_failure(
585        &mut self,
586        remote_addr: &SocketAddr,
587        peer_id: &PeerId,
588        err: &io::Error,
589    ) {
590        // there's a race condition where we accepted an incoming connection while we were trying to
591        // connect to the same peer at the same time. if the outgoing connection failed
592        // after the incoming connection was accepted, we can ignore this error
593        if let Some(peer) = self.peers.get(peer_id) {
594            if peer.state.is_incoming() {
595                // we already have an active connection to the peer, so we can ignore this error
596                return
597            }
598
599            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
600                // trigger resolution task for trusted peer since multiple connection failures
601                // occurred
602                self.trusted_peers_resolver.interval.reset_immediately();
603            }
604        }
605
606        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
607    }
608
609    fn on_connection_failure(
610        &mut self,
611        remote_addr: &SocketAddr,
612        peer_id: &PeerId,
613        err: impl SessionError,
614        reputation_change: ReputationChangeKind,
615    ) {
616        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
617
618        if err.is_fatal_protocol_error() {
619            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
620            // remove the peer to which we can't establish a connection due to protocol related
621            // issues.
622            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
623                self.connection_info.decr_state(entry.get().state);
624                // only remove if the peer is not trusted
625                if entry.get().is_trusted() {
626                    entry.get_mut().state = PeerConnectionState::Idle;
627                } else {
628                    entry.remove();
629                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
630                    // If the error is caused by a peer that should be banned from discovery
631                    if err.merits_discovery_ban() {
632                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
633                            peer_id: *peer_id,
634                            ip_addr: remote_addr.ip(),
635                        })
636                    }
637                }
638            }
639
640            // ban the peer
641            self.ban_peer(*peer_id);
642        } else {
643            let mut backoff_until = None;
644            let mut remove_peer = false;
645
646            if let Some(peer) = self.peers.get_mut(peer_id) {
647                if let Some(kind) = err.should_backoff() {
648                    if peer.is_trusted() || peer.is_static() {
649                        // provide a bit more leeway for trusted peers and use a lower backoff so
650                        // that we keep re-trying them after backing off shortly, but we should at
651                        // least backoff for the low duration to not violate the ip based inbound
652                        // connection throttle that peer has in place, because this peer might not
653                        // have us registered as a trusted peer.
654                        let backoff = self.backoff_durations.low;
655                        backoff_until = Some(std::time::Instant::now() + backoff);
656                    } else {
657                        // Increment peer.backoff_counter
658                        if kind.is_severe() {
659                            peer.severe_backoff_counter =
660                                peer.severe_backoff_counter.saturating_add(1);
661                        }
662
663                        let backoff_time =
664                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
665
666                        // The peer has signaled that it is currently unable to process any more
667                        // connections, so we will hold off on attempting any new connections for a
668                        // while
669                        backoff_until = Some(backoff_time);
670                    }
671                } else {
672                    // If the error was not a backoff error, we reduce the peer's reputation
673                    let reputation_change = self.reputation_weights.change(reputation_change);
674                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
675                };
676
677                self.connection_info.decr_state(peer.state);
678                peer.state = PeerConnectionState::Idle;
679
680                if peer.severe_backoff_counter > self.max_backoff_count &&
681                    !peer.is_trusted() &&
682                    !peer.is_static()
683                {
684                    // mark peer for removal if it has been backoff too many times and is _not_
685                    // trusted or static
686                    remove_peer = true;
687                }
688            }
689
690            // remove peer if it has been marked for removal
691            if remove_peer {
692                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
693                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
694            } else if let Some(backoff_until) = backoff_until {
695                // otherwise, backoff the peer if marked as such
696                self.backoff_peer_until(*peer_id, backoff_until);
697            }
698        }
699
700        self.fill_outbound_slots();
701    }
702
703    /// Invoked if a pending session was disconnected because there's already a connection to the
704    /// peer.
705    ///
706    /// If the session was an outgoing connection, this means that the peer initiated a connection
707    /// to us at the same time and this connection is already established.
708    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
709        match direction {
710            Direction::Incoming => {
711                // need to decrement the ingoing counter
712                self.connection_info.decr_pending_in();
713            }
714            Direction::Outgoing(_) => {
715                // cleanup is handled when the incoming active session is activated in
716                // `on_incoming_session_established`
717            }
718        }
719    }
720
721    /// Called as follow-up for a discovered peer.
722    ///
723    /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
724    /// protocol
725    pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
726        if let Some(peer) = self.peers.get_mut(&peer_id) {
727            trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
728            peer.fork_id = Some(Box::new(fork_id));
729        }
730    }
731
732    /// Called for a newly discovered peer.
733    ///
734    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
735    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
736        self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
737    }
738
739    /// Marks the given peer as trusted.
740    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
741        self.trusted_peer_ids.insert(peer_id);
742    }
743
744    /// Called for a newly discovered trusted peer.
745    ///
746    /// If the peer already exists, then the address and kind will be updated.
747    #[cfg_attr(not(test), expect(dead_code))]
748    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
749        self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
750    }
751
752    /// Called for a newly discovered peer.
753    ///
754    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
755    pub(crate) fn add_peer_kind(
756        &mut self,
757        peer_id: PeerId,
758        kind: PeerKind,
759        addr: PeerAddr,
760        fork_id: Option<ForkId>,
761    ) {
762        let ip_addr = addr.tcp().ip();
763
764        // Check if the IP is in the allowed ranges (netrestrict)
765        if !self.ip_filter.is_allowed(&ip_addr) {
766            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping peer from IP not in allowed ranges");
767            return
768        }
769
770        if self.ban_list.is_banned(&peer_id, &ip_addr) {
771            return
772        }
773
774        match self.peers.entry(peer_id) {
775            Entry::Occupied(mut entry) => {
776                let peer = entry.get_mut();
777                peer.kind = kind;
778                peer.fork_id = fork_id.map(Box::new);
779                peer.addr = addr;
780
781                if peer.state.is_incoming() {
782                    // now that we have an actual discovered address, for that peer and not just the
783                    // ip of the incoming connection, we don't need to remove the peer after
784                    // disconnecting, See `on_incoming_session_established`
785                    peer.remove_after_disconnect = false;
786                }
787            }
788            Entry::Vacant(entry) => {
789                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
790                let mut peer = Peer::with_kind(addr, kind);
791                peer.fork_id = fork_id.map(Box::new);
792                entry.insert(peer);
793                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
794            }
795        }
796
797        if kind.is_trusted() {
798            self.trusted_peer_ids.insert(peer_id);
799        }
800    }
801
802    /// Removes the tracked node from the set.
803    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
804        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
805        if entry.get().is_trusted() {
806            return
807        }
808        let mut peer = entry.remove();
809
810        trace!(target: "net::peers", ?peer_id, "remove discovered node");
811        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
812
813        if peer.state.is_connected() {
814            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
815            // we terminate the active session here, but only remove the peer after the session
816            // was disconnected, this prevents the case where the session is scheduled for
817            // disconnect but the node is immediately rediscovered, See also
818            // [`Self::on_disconnected()`]
819            peer.remove_after_disconnect = true;
820            peer.state.disconnect();
821            self.peers.insert(peer_id, peer);
822            self.queued_actions.push_back(PeerAction::Disconnect {
823                peer_id,
824                reason: Some(DisconnectReason::DisconnectRequested),
825            })
826        }
827    }
828
829    /// Connect to the given peer. NOTE: if the maximum number of outbound sessions is reached,
830    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
831    #[cfg_attr(not(test), expect(dead_code))]
832    pub(crate) fn add_and_connect(
833        &mut self,
834        peer_id: PeerId,
835        addr: PeerAddr,
836        fork_id: Option<ForkId>,
837    ) {
838        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
839    }
840
841    /// Connects a peer and its address with the given kind.
842    ///
843    /// Note: This is invoked on demand via an external command received by the manager
844    pub(crate) fn add_and_connect_kind(
845        &mut self,
846        peer_id: PeerId,
847        kind: PeerKind,
848        addr: PeerAddr,
849        fork_id: Option<ForkId>,
850    ) {
851        let ip_addr = addr.tcp().ip();
852
853        // Check if the IP is in the allowed ranges (netrestrict)
854        if !self.ip_filter.is_allowed(&ip_addr) {
855            trace!(target: "net", ?peer_id, ?ip_addr, "Skipping outbound connection to IP not in allowed ranges");
856            return
857        }
858
859        if self.ban_list.is_banned(&peer_id, &ip_addr) {
860            return
861        }
862
863        match self.peers.entry(peer_id) {
864            Entry::Occupied(mut entry) => {
865                let peer = entry.get_mut();
866                peer.kind = kind;
867                peer.fork_id = fork_id.map(Box::new);
868                peer.addr = addr;
869
870                if peer.state == PeerConnectionState::Idle {
871                    // Try connecting again.
872                    peer.state = PeerConnectionState::PendingOut;
873                    self.connection_info.inc_pending_out();
874                    self.queued_actions
875                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
876                }
877            }
878            Entry::Vacant(entry) => {
879                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
880                let mut peer = Peer::with_kind(addr, kind);
881                peer.state = PeerConnectionState::PendingOut;
882                peer.fork_id = fork_id.map(Box::new);
883                entry.insert(peer);
884                self.connection_info.inc_pending_out();
885                self.queued_actions
886                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
887            }
888        }
889
890        if kind.is_trusted() {
891            self.trusted_peer_ids.insert(peer_id);
892        }
893    }
894
895    /// Removes the tracked node from the trusted set.
896    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
897        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
898        if !entry.get().is_trusted() {
899            return
900        }
901
902        let peer = entry.get_mut();
903        peer.kind = PeerKind::Basic;
904
905        self.trusted_peer_ids.remove(&peer_id);
906    }
907
908    /// Returns the idle peer with the highest reputation.
909    ///
910    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
911    /// not currently marked as banned or backed off.
912    ///
913    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
914    /// `trusted` peers.
915    ///
916    /// Returns `None` if no peer is available.
917    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
918        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
919            !peer.is_backed_off() &&
920                !peer.is_banned() &&
921                peer.state.is_unconnected() &&
922                (!self.trusted_nodes_only || peer.is_trusted())
923        });
924
925        // keep track of the best peer, if there's one
926        let mut best_peer = unconnected.next()?;
927
928        if best_peer.1.is_trusted() || best_peer.1.is_static() {
929            return Some((*best_peer.0, best_peer.1))
930        }
931
932        for maybe_better in unconnected {
933            // if the peer is trusted or static, return it immediately
934            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
935                return Some((*maybe_better.0, maybe_better.1))
936            }
937
938            // otherwise we keep track of the best peer using the reputation
939            if maybe_better.1.reputation > best_peer.1.reputation {
940                best_peer = maybe_better;
941            }
942        }
943        Some((*best_peer.0, best_peer.1))
944    }
945
946    /// If there's capacity for new outbound connections, this will queue new
947    /// [`PeerAction::Connect`] actions.
948    ///
949    /// New connections are only initiated, if slots are available and appropriate peers are
950    /// available.
951    fn fill_outbound_slots(&mut self) {
952        self.tick();
953
954        if !self.net_connection_state.is_active() {
955            // nothing to fill
956            return
957        }
958
959        // as long as there are slots available fill them with the best peers
960        while self.connection_info.has_out_capacity() {
961            let action = {
962                let (peer_id, peer) = match self.best_unconnected() {
963                    Some(peer) => peer,
964                    _ => break,
965                };
966
967                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
968
969                peer.state = PeerConnectionState::PendingOut;
970                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
971            };
972
973            self.connection_info.inc_pending_out();
974
975            self.queued_actions.push_back(action);
976        }
977    }
978
979    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
980        if let Some(peer) = self.peers.get_mut(&peer_id) {
981            let new_addr = PeerAddr::new_with_ports(
982                new_record.address,
983                new_record.tcp_port,
984                Some(new_record.udp_port),
985            );
986
987            if peer.addr != new_addr {
988                peer.addr = new_addr;
989                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
990            }
991        }
992    }
993
994    /// Keeps track of network state changes.
995    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
996        self.net_connection_state = state;
997    }
998
999    /// Returns the current network connection state.
1000    pub const fn connection_state(&self) -> &NetworkConnectionState {
1001        &self.net_connection_state
1002    }
1003
1004    /// Sets `net_connection_state` to `ShuttingDown`.
1005    pub const fn on_shutdown(&mut self) {
1006        self.net_connection_state = NetworkConnectionState::ShuttingDown;
1007    }
1008
1009    /// Advances the state.
1010    ///
1011    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
1012    /// [`PeersManager`] is polled.
1013    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
1014        loop {
1015            // drain buffered actions
1016            if let Some(action) = self.queued_actions.pop_front() {
1017                return Poll::Ready(action)
1018            }
1019
1020            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
1021                match cmd {
1022                    PeerCommand::Add(peer_id, addr) => {
1023                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
1024                    }
1025                    PeerCommand::Remove(peer) => self.remove_peer(peer),
1026                    PeerCommand::ReputationChange(peer_id, rep) => {
1027                        self.apply_reputation_change(&peer_id, rep)
1028                    }
1029                    PeerCommand::GetPeer(peer, tx) => {
1030                        let _ = tx.send(self.peers.get(&peer).cloned());
1031                    }
1032                    PeerCommand::GetPeers(tx) => {
1033                        let _ = tx.send(self.iter_peers().collect());
1034                    }
1035                }
1036            }
1037
1038            if self.release_interval.poll_tick(cx).is_ready() {
1039                let now = std::time::Instant::now();
1040                let (_, unbanned_peers) = self.ban_list.evict(now);
1041
1042                for peer_id in unbanned_peers {
1043                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1044                        peer.unban();
1045                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1046                    }
1047                }
1048
1049                // clear the backoff list of expired backoffs, and mark the relevant peers as
1050                // ready to be dialed
1051                self.backed_off_peers.retain(|peer_id, until| {
1052                    if now > *until {
1053                        if let Some(peer) = self.peers.get_mut(peer_id) {
1054                            peer.backed_off = false;
1055                        }
1056                        return false
1057                    }
1058                    true
1059                })
1060            }
1061
1062            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1063                self.fill_outbound_slots();
1064            }
1065
1066            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1067                self.on_resolved_peer(peer_id, new_record);
1068            }
1069
1070            if self.queued_actions.is_empty() {
1071                return Poll::Pending
1072            }
1073        }
1074    }
1075}
1076
1077impl Default for PeersManager {
1078    fn default() -> Self {
1079        Self::new(Default::default())
1080    }
1081}
1082
1083/// Tracks stats about connected nodes
1084#[derive(Debug, Clone, PartialEq, Eq, Default)]
1085pub struct ConnectionInfo {
1086    /// Counter for currently occupied slots for active outbound connections.
1087    num_outbound: usize,
1088    /// Counter for pending outbound connections.
1089    num_pending_out: usize,
1090    /// Counter for currently occupied slots for active inbound connections.
1091    num_inbound: usize,
1092    /// Counter for pending inbound connections.
1093    num_pending_in: usize,
1094    /// Restrictions on number of connections.
1095    config: ConnectionsConfig,
1096}
1097
1098// === impl ConnectionInfo ===
1099
1100impl ConnectionInfo {
1101    /// Returns a new [`ConnectionInfo`] with the given config.
1102    const fn new(config: ConnectionsConfig) -> Self {
1103        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1104    }
1105
1106    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1107    const fn has_out_capacity(&self) -> bool {
1108        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1109            self.num_outbound < self.config.max_outbound
1110    }
1111
1112    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1113    const fn has_in_capacity(&self) -> bool {
1114        self.num_inbound < self.config.max_inbound
1115    }
1116
1117    /// Returns `true` if we can handle an additional incoming pending connection.
1118    const fn has_in_pending_capacity(&self) -> bool {
1119        self.num_pending_in < self.config.max_inbound
1120    }
1121
1122    const fn decr_state(&mut self, state: PeerConnectionState) {
1123        match state {
1124            PeerConnectionState::Idle => {}
1125            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1126            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1127            PeerConnectionState::PendingOut => self.decr_pending_out(),
1128        }
1129    }
1130
1131    const fn decr_out(&mut self) {
1132        self.num_outbound -= 1;
1133    }
1134
1135    const fn inc_out(&mut self) {
1136        self.num_outbound += 1;
1137    }
1138
1139    const fn inc_pending_out(&mut self) {
1140        self.num_pending_out += 1;
1141    }
1142
1143    const fn inc_in(&mut self) {
1144        self.num_inbound += 1;
1145    }
1146
1147    const fn inc_pending_in(&mut self) {
1148        self.num_pending_in += 1;
1149    }
1150
1151    const fn decr_in(&mut self) {
1152        self.num_inbound -= 1;
1153    }
1154
1155    const fn decr_pending_out(&mut self) {
1156        self.num_pending_out -= 1;
1157    }
1158
1159    const fn decr_pending_in(&mut self) {
1160        self.num_pending_in -= 1;
1161    }
1162}
1163
1164/// Actions the peer manager can trigger.
1165#[derive(Debug)]
1166pub enum PeerAction {
1167    /// Start a new connection to a peer.
1168    Connect {
1169        /// The peer to connect to.
1170        peer_id: PeerId,
1171        /// Where to reach the node
1172        remote_addr: SocketAddr,
1173    },
1174    /// Disconnect an existing connection.
1175    Disconnect {
1176        /// The peer ID of the established connection.
1177        peer_id: PeerId,
1178        /// An optional reason for the disconnect.
1179        reason: Option<DisconnectReason>,
1180    },
1181    /// Disconnect an existing incoming connection, because the peers reputation is below the
1182    /// banned threshold or is on the [`BanList`]
1183    DisconnectBannedIncoming {
1184        /// The peer ID of the established connection.
1185        peer_id: PeerId,
1186    },
1187    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1188    DisconnectUntrustedIncoming {
1189        /// The peer ID.
1190        peer_id: PeerId,
1191    },
1192    /// Ban the peer in discovery.
1193    DiscoveryBanPeerId {
1194        /// The peer ID.
1195        peer_id: PeerId,
1196        /// The IP address.
1197        ip_addr: IpAddr,
1198    },
1199    /// Ban the IP in discovery.
1200    DiscoveryBanIp {
1201        /// The IP address.
1202        ip_addr: IpAddr,
1203    },
1204    /// Ban the peer temporarily
1205    BanPeer {
1206        /// The peer ID.
1207        peer_id: PeerId,
1208    },
1209    /// Unban the peer temporarily
1210    UnBanPeer {
1211        /// The peer ID.
1212        peer_id: PeerId,
1213    },
1214    /// Emit peerAdded event
1215    PeerAdded(PeerId),
1216    /// Emit peerRemoved event
1217    PeerRemoved(PeerId),
1218}
1219
1220/// Error thrown when a incoming connection is rejected right away
1221#[derive(Debug, Error, PartialEq, Eq)]
1222pub enum InboundConnectionError {
1223    /// The remote's ip address is banned
1224    IpBanned,
1225    /// No capacity for new inbound connections
1226    ExceedsCapacity,
1227}
1228
1229impl Display for InboundConnectionError {
1230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1231        write!(f, "{self:?}")
1232    }
1233}
1234
1235#[cfg(test)]
1236mod tests {
1237    use alloy_primitives::B512;
1238    use reth_eth_wire::{
1239        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1240        DisconnectReason,
1241    };
1242    use reth_net_banlist::BanList;
1243    use reth_network_api::Direction;
1244    use reth_network_peers::{PeerId, TrustedPeer};
1245    use reth_network_types::{
1246        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1247    };
1248    use std::{
1249        future::{poll_fn, Future},
1250        io,
1251        net::{IpAddr, Ipv4Addr, SocketAddr},
1252        pin::Pin,
1253        task::{Context, Poll},
1254        time::Duration,
1255    };
1256    use url::Host;
1257
1258    use super::PeersManager;
1259    use crate::{
1260        error::SessionError,
1261        peers::{
1262            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1263            PeerConnectionState,
1264        },
1265        session::PendingSessionHandshakeError,
1266        PeersConfig,
1267    };
1268
1269    struct PeerActionFuture<'a> {
1270        peers: &'a mut PeersManager,
1271    }
1272
1273    impl Future for PeerActionFuture<'_> {
1274        type Output = PeerAction;
1275
1276        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1277            self.get_mut().peers.poll(cx)
1278        }
1279    }
1280
1281    macro_rules! event {
1282        ($peers:expr) => {
1283            PeerActionFuture { peers: &mut $peers }.await
1284        };
1285    }
1286
1287    #[tokio::test]
1288    async fn test_insert() {
1289        let peer = PeerId::random();
1290        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1291        let mut peers = PeersManager::default();
1292        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1293
1294        match event!(peers) {
1295            PeerAction::PeerAdded(peer_id) => {
1296                assert_eq!(peer_id, peer);
1297            }
1298            _ => unreachable!(),
1299        }
1300        match event!(peers) {
1301            PeerAction::Connect { peer_id, remote_addr } => {
1302                assert_eq!(peer_id, peer);
1303                assert_eq!(remote_addr, socket_addr);
1304            }
1305            _ => unreachable!(),
1306        }
1307
1308        let (record, _) = peers.peer_by_id(peer).unwrap();
1309        assert_eq!(record.tcp_addr(), socket_addr);
1310        assert_eq!(record.udp_addr(), socket_addr);
1311    }
1312
1313    #[tokio::test]
1314    async fn test_insert_udp() {
1315        let peer = PeerId::random();
1316        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1317        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1318        let mut peers = PeersManager::default();
1319        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1320
1321        match event!(peers) {
1322            PeerAction::PeerAdded(peer_id) => {
1323                assert_eq!(peer_id, peer);
1324            }
1325            _ => unreachable!(),
1326        }
1327        match event!(peers) {
1328            PeerAction::Connect { peer_id, remote_addr } => {
1329                assert_eq!(peer_id, peer);
1330                assert_eq!(remote_addr, tcp_addr);
1331            }
1332            _ => unreachable!(),
1333        }
1334
1335        let (record, _) = peers.peer_by_id(peer).unwrap();
1336        assert_eq!(record.tcp_addr(), tcp_addr);
1337        assert_eq!(record.udp_addr(), udp_addr);
1338    }
1339
1340    #[tokio::test]
1341    async fn test_ban() {
1342        let peer = PeerId::random();
1343        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1344        let mut peers = PeersManager::default();
1345        peers.ban_peer(peer);
1346        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1347
1348        match event!(peers) {
1349            PeerAction::BanPeer { peer_id } => {
1350                assert_eq!(peer_id, peer);
1351            }
1352            _ => unreachable!(),
1353        }
1354
1355        poll_fn(|cx| {
1356            assert!(peers.poll(cx).is_pending());
1357            Poll::Ready(())
1358        })
1359        .await;
1360    }
1361
1362    #[tokio::test]
1363    async fn test_unban() {
1364        let peer = PeerId::random();
1365        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1366        let mut peers = PeersManager::default();
1367        peers.ban_peer(peer);
1368        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1369
1370        match event!(peers) {
1371            PeerAction::BanPeer { peer_id } => {
1372                assert_eq!(peer_id, peer);
1373            }
1374            _ => unreachable!(),
1375        }
1376
1377        poll_fn(|cx| {
1378            assert!(peers.poll(cx).is_pending());
1379            Poll::Ready(())
1380        })
1381        .await;
1382
1383        peers.unban_peer(peer);
1384
1385        match event!(peers) {
1386            PeerAction::UnBanPeer { peer_id } => {
1387                assert_eq!(peer_id, peer);
1388            }
1389            _ => unreachable!(),
1390        }
1391
1392        poll_fn(|cx| {
1393            assert!(peers.poll(cx).is_pending());
1394            Poll::Ready(())
1395        })
1396        .await;
1397    }
1398
1399    #[tokio::test]
1400    async fn test_backoff_on_busy() {
1401        let peer = PeerId::random();
1402        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1403
1404        let mut peers = PeersManager::new(PeersConfig::test());
1405        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1406
1407        match event!(peers) {
1408            PeerAction::PeerAdded(peer_id) => {
1409                assert_eq!(peer_id, peer);
1410            }
1411            _ => unreachable!(),
1412        }
1413        match event!(peers) {
1414            PeerAction::Connect { peer_id, .. } => {
1415                assert_eq!(peer_id, peer);
1416            }
1417            _ => unreachable!(),
1418        }
1419
1420        poll_fn(|cx| {
1421            assert!(peers.poll(cx).is_pending());
1422            Poll::Ready(())
1423        })
1424        .await;
1425
1426        peers.on_active_session_dropped(
1427            &socket_addr,
1428            &peer,
1429            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1430                DisconnectReason::TooManyPeers,
1431            )),
1432        );
1433
1434        poll_fn(|cx| {
1435            assert!(peers.poll(cx).is_pending());
1436            Poll::Ready(())
1437        })
1438        .await;
1439
1440        assert!(peers.backed_off_peers.contains_key(&peer));
1441        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1442
1443        tokio::time::sleep(peers.backoff_durations.low).await;
1444
1445        match event!(peers) {
1446            PeerAction::Connect { peer_id, .. } => {
1447                assert_eq!(peer_id, peer);
1448            }
1449            _ => unreachable!(),
1450        }
1451
1452        assert!(!peers.backed_off_peers.contains_key(&peer));
1453        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1454    }
1455
1456    #[tokio::test]
1457    async fn test_backoff_on_no_response() {
1458        let peer = PeerId::random();
1459        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1460
1461        let backoff_durations = PeerBackoffDurations::test();
1462        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1463        let mut peers = PeersManager::new(config);
1464        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1465
1466        match event!(peers) {
1467            PeerAction::PeerAdded(peer_id) => {
1468                assert_eq!(peer_id, peer);
1469            }
1470            _ => unreachable!(),
1471        }
1472        match event!(peers) {
1473            PeerAction::Connect { peer_id, .. } => {
1474                assert_eq!(peer_id, peer);
1475            }
1476            _ => unreachable!(),
1477        }
1478
1479        poll_fn(|cx| {
1480            assert!(peers.poll(cx).is_pending());
1481            Poll::Ready(())
1482        })
1483        .await;
1484
1485        peers.on_outgoing_pending_session_dropped(
1486            &socket_addr,
1487            &peer,
1488            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1489                EthHandshakeError::NoResponse,
1490            )),
1491        );
1492
1493        poll_fn(|cx| {
1494            assert!(peers.poll(cx).is_pending());
1495            Poll::Ready(())
1496        })
1497        .await;
1498
1499        assert!(peers.backed_off_peers.contains_key(&peer));
1500        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1501
1502        tokio::time::sleep(backoff_durations.high).await;
1503
1504        match event!(peers) {
1505            PeerAction::Connect { peer_id, .. } => {
1506                assert_eq!(peer_id, peer);
1507            }
1508            _ => unreachable!(),
1509        }
1510
1511        assert!(!peers.backed_off_peers.contains_key(&peer));
1512        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1513    }
1514
1515    #[tokio::test]
1516    async fn test_low_backoff() {
1517        let peer = PeerId::random();
1518        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1519        let config = PeersConfig::test();
1520        let mut peers = PeersManager::new(config);
1521        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1522        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1523
1524        let backoff_timestamp = peers
1525            .backoff_durations
1526            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1527
1528        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1529        assert!(backoff_timestamp <= expected);
1530    }
1531
1532    #[tokio::test]
1533    async fn test_multiple_backoff_calculations() {
1534        let peer = PeerId::random();
1535        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1536        let config = PeersConfig::default();
1537        let mut peers = PeersManager::new(config);
1538        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1539        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1540
1541        // Simulate a peer that was already backed off once
1542        peer_struct.severe_backoff_counter = 1;
1543
1544        let now = std::time::Instant::now();
1545
1546        // Simulate the increment that happens in on_connection_failure
1547        peer_struct.severe_backoff_counter += 1;
1548        // Get official backoff time
1549        let backoff_time = peers
1550            .backoff_durations
1551            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1552
1553        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1554        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1555
1556        // We can't use assert_eq! since there is a very small diff in the nano secs
1557        // Usually it is 1800s != 1799.9999996s
1558        assert!(backoff_time.duration_since(now) > backoff_duration);
1559    }
1560
1561    #[tokio::test]
1562    async fn test_ban_on_active_drop() {
1563        let peer = PeerId::random();
1564        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1565        let mut peers = PeersManager::default();
1566        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1567
1568        match event!(peers) {
1569            PeerAction::PeerAdded(peer_id) => {
1570                assert_eq!(peer_id, peer);
1571            }
1572            _ => unreachable!(),
1573        }
1574        match event!(peers) {
1575            PeerAction::Connect { peer_id, .. } => {
1576                assert_eq!(peer_id, peer);
1577            }
1578            _ => unreachable!(),
1579        }
1580
1581        poll_fn(|cx| {
1582            assert!(peers.poll(cx).is_pending());
1583            Poll::Ready(())
1584        })
1585        .await;
1586
1587        peers.on_active_session_dropped(
1588            &socket_addr,
1589            &peer,
1590            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1591                DisconnectReason::UselessPeer,
1592            )),
1593        );
1594
1595        match event!(peers) {
1596            PeerAction::PeerRemoved(peer_id) => {
1597                assert_eq!(peer_id, peer);
1598            }
1599            _ => unreachable!(),
1600        }
1601        match event!(peers) {
1602            PeerAction::BanPeer { peer_id } => {
1603                assert_eq!(peer_id, peer);
1604            }
1605            _ => unreachable!(),
1606        }
1607
1608        poll_fn(|cx| {
1609            assert!(peers.poll(cx).is_pending());
1610            Poll::Ready(())
1611        })
1612        .await;
1613
1614        assert!(!peers.peers.contains_key(&peer));
1615    }
1616
1617    #[tokio::test]
1618    async fn test_remove_on_max_backoff_count() {
1619        let peer = PeerId::random();
1620        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1621        let config = PeersConfig::test();
1622        let mut peers = PeersManager::new(config.clone());
1623        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1624        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1625
1626        // Simulate a peer that was already backed off once
1627        peer_struct.severe_backoff_counter = config.max_backoff_count;
1628
1629        match event!(peers) {
1630            PeerAction::PeerAdded(peer_id) => {
1631                assert_eq!(peer_id, peer);
1632            }
1633            _ => unreachable!(),
1634        }
1635        match event!(peers) {
1636            PeerAction::Connect { peer_id, .. } => {
1637                assert_eq!(peer_id, peer);
1638            }
1639            _ => unreachable!(),
1640        }
1641
1642        poll_fn(|cx| {
1643            assert!(peers.poll(cx).is_pending());
1644            Poll::Ready(())
1645        })
1646        .await;
1647
1648        peers.on_outgoing_pending_session_dropped(
1649            &socket_addr,
1650            &peer,
1651            &PendingSessionHandshakeError::Eth(
1652                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1653            ),
1654        );
1655
1656        match event!(peers) {
1657            PeerAction::PeerRemoved(peer_id) => {
1658                assert_eq!(peer_id, peer);
1659            }
1660            _ => unreachable!(),
1661        }
1662
1663        poll_fn(|cx| {
1664            assert!(peers.poll(cx).is_pending());
1665            Poll::Ready(())
1666        })
1667        .await;
1668
1669        assert!(!peers.peers.contains_key(&peer));
1670    }
1671
1672    #[tokio::test]
1673    async fn test_ban_on_pending_drop() {
1674        let peer = PeerId::random();
1675        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1676        let mut peers = PeersManager::default();
1677        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1678
1679        match event!(peers) {
1680            PeerAction::PeerAdded(peer_id) => {
1681                assert_eq!(peer_id, peer);
1682            }
1683            _ => unreachable!(),
1684        }
1685        match event!(peers) {
1686            PeerAction::Connect { peer_id, .. } => {
1687                assert_eq!(peer_id, peer);
1688            }
1689            _ => unreachable!(),
1690        }
1691
1692        poll_fn(|cx| {
1693            assert!(peers.poll(cx).is_pending());
1694            Poll::Ready(())
1695        })
1696        .await;
1697
1698        peers.on_outgoing_pending_session_dropped(
1699            &socket_addr,
1700            &peer,
1701            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1702                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1703            )),
1704        );
1705
1706        match event!(peers) {
1707            PeerAction::PeerRemoved(peer_id) => {
1708                assert_eq!(peer_id, peer);
1709            }
1710            _ => unreachable!(),
1711        }
1712        match event!(peers) {
1713            PeerAction::BanPeer { peer_id } => {
1714                assert_eq!(peer_id, peer);
1715            }
1716            _ => unreachable!(),
1717        }
1718
1719        poll_fn(|cx| {
1720            assert!(peers.poll(cx).is_pending());
1721            Poll::Ready(())
1722        })
1723        .await;
1724
1725        assert!(!peers.peers.contains_key(&peer));
1726    }
1727
1728    #[tokio::test]
1729    async fn test_internally_closed_incoming() {
1730        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1731        let mut peers = PeersManager::default();
1732
1733        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1734        assert_eq!(peers.connection_info.num_pending_in, 1);
1735        peers.on_incoming_pending_session_rejected_internally();
1736        assert_eq!(peers.connection_info.num_pending_in, 0);
1737    }
1738
1739    #[tokio::test]
1740    async fn test_reject_incoming_at_pending_capacity() {
1741        let mut peers = PeersManager::default();
1742
1743        for count in 1..=peers.connection_info.config.max_inbound {
1744            let socket_addr =
1745                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1746            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1747            assert_eq!(peers.connection_info.num_pending_in, count);
1748        }
1749        assert!(peers.connection_info.has_in_capacity());
1750        assert!(!peers.connection_info.has_in_pending_capacity());
1751
1752        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1753        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1754    }
1755
1756    #[tokio::test]
1757    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1758        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1759        let trusted = PeerId::random();
1760        peers.add_trusted_peer_id(trusted);
1761
1762        // connect the trusted peer
1763        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1764        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1765        peers.on_incoming_session_established(trusted, addr);
1766
1767        match event!(peers) {
1768            PeerAction::PeerAdded(id) => {
1769                assert_eq!(id, trusted);
1770            }
1771            _ => unreachable!(),
1772        }
1773
1774        // saturate the remaining inbound slots with untrusted peers
1775        let mut connected_untrusted_peer_ids = Vec::new();
1776        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1777            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1778            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1779            let peer_id = PeerId::random();
1780            peers.on_incoming_session_established(peer_id, addr);
1781            connected_untrusted_peer_ids.push(peer_id);
1782
1783            match event!(peers) {
1784                PeerAction::PeerAdded(id) => {
1785                    assert_eq!(id, peer_id);
1786                }
1787                _ => unreachable!(),
1788            }
1789        }
1790
1791        let mut pending_addrs = Vec::new();
1792
1793        // saturate available slots
1794        for i in 0..2 {
1795            let socket_addr =
1796                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1797            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1798
1799            pending_addrs.push(socket_addr);
1800        }
1801
1802        assert_eq!(peers.connection_info.num_pending_in, 2);
1803
1804        // try to handle additional incoming connections at capacity
1805        for i in 0..2 {
1806            let socket_addr =
1807                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1808            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1809        }
1810
1811        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1812            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1813                DisconnectReason::UselessPeer,
1814            )),
1815        ));
1816
1817        // Remove all pending peers
1818        for pending_addr in pending_addrs {
1819            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1820        }
1821
1822        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1823
1824        println!(
1825            "num_inbound: {}, has_in_capacity: {}",
1826            peers.connection_info.num_inbound,
1827            peers.connection_info.has_in_capacity()
1828        );
1829
1830        // disconnect a connected peer
1831        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1832
1833        println!(
1834            "num_inbound: {}, has_in_capacity: {}",
1835            peers.connection_info.num_inbound,
1836            peers.connection_info.has_in_capacity()
1837        );
1838
1839        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1840        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1841    }
1842
1843    #[tokio::test]
1844    async fn test_closed_incoming() {
1845        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1846        let mut peers = PeersManager::default();
1847
1848        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1849        assert_eq!(peers.connection_info.num_pending_in, 1);
1850        peers.on_incoming_pending_session_gracefully_closed();
1851        assert_eq!(peers.connection_info.num_pending_in, 0);
1852    }
1853
1854    #[tokio::test]
1855    async fn test_dropped_incoming() {
1856        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1857        let ban_duration = Duration::from_millis(500);
1858        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1859        let mut peers = PeersManager::new(config);
1860
1861        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1862        assert_eq!(peers.connection_info.num_pending_in, 1);
1863        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1864            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1865                DisconnectReason::UselessPeer,
1866            )),
1867        ));
1868
1869        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1870        assert_eq!(peers.connection_info.num_pending_in, 0);
1871        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1872
1873        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1874
1875        // unbanned after timeout
1876        tokio::time::sleep(ban_duration).await;
1877
1878        poll_fn(|cx| {
1879            let _ = peers.poll(cx);
1880            Poll::Ready(())
1881        })
1882        .await;
1883
1884        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1885        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1886    }
1887
1888    #[tokio::test]
1889    async fn test_reputation_change_connected() {
1890        let peer = PeerId::random();
1891        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1892        let mut peers = PeersManager::default();
1893        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1894
1895        match event!(peers) {
1896            PeerAction::PeerAdded(peer_id) => {
1897                assert_eq!(peer_id, peer);
1898            }
1899            _ => unreachable!(),
1900        }
1901        match event!(peers) {
1902            PeerAction::Connect { peer_id, remote_addr } => {
1903                assert_eq!(peer_id, peer);
1904                assert_eq!(remote_addr, socket_addr);
1905            }
1906            _ => unreachable!(),
1907        }
1908
1909        let p = peers.peers.get_mut(&peer).unwrap();
1910        assert_eq!(p.state, PeerConnectionState::PendingOut);
1911
1912        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1913
1914        let p = peers.peers.get(&peer).unwrap();
1915        assert_eq!(p.state, PeerConnectionState::PendingOut);
1916        assert!(p.is_banned());
1917
1918        peers.on_active_session_gracefully_closed(peer);
1919
1920        let p = peers.peers.get(&peer).unwrap();
1921        assert_eq!(p.state, PeerConnectionState::Idle);
1922        assert!(p.is_banned());
1923
1924        match event!(peers) {
1925            PeerAction::Disconnect { peer_id, .. } => {
1926                assert_eq!(peer_id, peer);
1927            }
1928            _ => unreachable!(),
1929        }
1930    }
1931
1932    #[tokio::test]
1933    async fn accept_incoming_trusted_unknown_peer_address() {
1934        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1935        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1936        // try to connect trusted peer
1937        let trusted = PeerId::random();
1938        peers.add_trusted_peer_id(trusted);
1939
1940        // saturate the inbound slots
1941        for i in 0..peers.connection_info.config.max_inbound {
1942            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1943            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1944            let peer_id = PeerId::random();
1945            peers.on_incoming_session_established(peer_id, addr);
1946
1947            match event!(peers) {
1948                PeerAction::PeerAdded(id) => {
1949                    assert_eq!(id, peer_id);
1950                }
1951                _ => unreachable!(),
1952            }
1953        }
1954
1955        // try to connect untrusted peer
1956        let untrusted = PeerId::random();
1957        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1958        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1959        peers.on_incoming_session_established(untrusted, socket_addr);
1960
1961        match event!(peers) {
1962            PeerAction::PeerAdded(id) => {
1963                assert_eq!(id, untrusted);
1964            }
1965            _ => unreachable!(),
1966        }
1967
1968        match event!(peers) {
1969            PeerAction::Disconnect { peer_id, reason } => {
1970                assert_eq!(peer_id, untrusted);
1971                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1972            }
1973            _ => unreachable!(),
1974        }
1975
1976        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1977        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1978        peers.on_incoming_session_established(trusted, socket_addr);
1979
1980        match event!(peers) {
1981            PeerAction::PeerAdded(id) => {
1982                assert_eq!(id, trusted);
1983            }
1984            _ => unreachable!(),
1985        }
1986
1987        poll_fn(|cx| {
1988            assert!(peers.poll(cx).is_pending());
1989            Poll::Ready(())
1990        })
1991        .await;
1992
1993        let peer = peers.peers.get(&trusted).unwrap();
1994        assert_eq!(peer.state, PeerConnectionState::In);
1995    }
1996
1997    #[tokio::test]
1998    async fn test_already_connected() {
1999        let peer = PeerId::random();
2000        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2001        let mut peers = PeersManager::default();
2002
2003        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
2004        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2005        assert_eq!(peers.connection_info.num_pending_in, 1);
2006
2007        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
2008        // to increase by 1
2009        peers.on_incoming_session_established(peer, socket_addr);
2010        let p = peers.peers.get_mut(&peer).expect("peer not found");
2011        assert_eq!(p.addr.tcp(), socket_addr);
2012        assert_eq!(peers.connection_info.num_pending_in, 0);
2013        assert_eq!(peers.connection_info.num_inbound, 1);
2014
2015        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
2016        // by 1
2017        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2018        assert_eq!(peers.connection_info.num_pending_in, 1);
2019
2020        // Simulate a rejection due to an already established connection, expecting the
2021        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
2022        // should not be changed.
2023        peers.on_already_connected(Direction::Incoming);
2024
2025        let p = peers.peers.get_mut(&peer).expect("peer not found");
2026        assert_eq!(p.addr.tcp(), socket_addr);
2027        assert_eq!(peers.connection_info.num_pending_in, 0);
2028        assert_eq!(peers.connection_info.num_inbound, 1);
2029    }
2030
2031    #[tokio::test]
2032    async fn test_reputation_change_trusted_peer() {
2033        let peer = PeerId::random();
2034        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2035        let mut peers = PeersManager::default();
2036        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2037
2038        match event!(peers) {
2039            PeerAction::PeerAdded(peer_id) => {
2040                assert_eq!(peer_id, peer);
2041            }
2042            _ => unreachable!(),
2043        }
2044        match event!(peers) {
2045            PeerAction::Connect { peer_id, remote_addr } => {
2046                assert_eq!(peer_id, peer);
2047                assert_eq!(remote_addr, socket_addr);
2048            }
2049            _ => unreachable!(),
2050        }
2051
2052        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2053        peers.on_active_outgoing_established(peer);
2054        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2055
2056        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2057
2058        {
2059            let p = peers.peers.get(&peer).unwrap();
2060            assert_eq!(p.state, PeerConnectionState::Out);
2061            // not banned yet
2062            assert!(!p.is_banned());
2063        }
2064
2065        // ensure peer is banned eventually
2066        loop {
2067            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2068
2069            let p = peers.peers.get(&peer).unwrap();
2070            if p.is_banned() {
2071                break
2072            }
2073        }
2074
2075        match event!(peers) {
2076            PeerAction::Disconnect { peer_id, .. } => {
2077                assert_eq!(peer_id, peer);
2078            }
2079            _ => unreachable!(),
2080        }
2081    }
2082
2083    #[tokio::test]
2084    async fn test_reputation_management() {
2085        let peer = PeerId::random();
2086        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2087        let mut peers = PeersManager::default();
2088        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2089        assert_eq!(peers.get_reputation(&peer), Some(0));
2090
2091        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2092        assert_eq!(peers.get_reputation(&peer), Some(1024));
2093
2094        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2095        assert_eq!(peers.get_reputation(&peer), Some(0));
2096    }
2097
2098    #[tokio::test]
2099    async fn test_remove_discovered_active() {
2100        let peer = PeerId::random();
2101        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2102        let mut peers = PeersManager::default();
2103        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2104
2105        match event!(peers) {
2106            PeerAction::PeerAdded(peer_id) => {
2107                assert_eq!(peer_id, peer);
2108            }
2109            _ => unreachable!(),
2110        }
2111        match event!(peers) {
2112            PeerAction::Connect { peer_id, remote_addr } => {
2113                assert_eq!(peer_id, peer);
2114                assert_eq!(remote_addr, socket_addr);
2115            }
2116            _ => unreachable!(),
2117        }
2118
2119        let p = peers.peers.get(&peer).unwrap();
2120        assert_eq!(p.state, PeerConnectionState::PendingOut);
2121
2122        peers.remove_peer(peer);
2123
2124        match event!(peers) {
2125            PeerAction::PeerRemoved(peer_id) => {
2126                assert_eq!(peer_id, peer);
2127            }
2128            _ => unreachable!(),
2129        }
2130        match event!(peers) {
2131            PeerAction::Disconnect { peer_id, .. } => {
2132                assert_eq!(peer_id, peer);
2133            }
2134            _ => unreachable!(),
2135        }
2136
2137        let p = peers.peers.get(&peer).unwrap();
2138        assert_eq!(p.state, PeerConnectionState::PendingOut);
2139
2140        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2141        let p = peers.peers.get(&peer).unwrap();
2142        assert_eq!(p.state, PeerConnectionState::PendingOut);
2143
2144        peers.on_active_session_gracefully_closed(peer);
2145        assert!(!peers.peers.contains_key(&peer));
2146    }
2147
2148    #[tokio::test]
2149    async fn test_fatal_outgoing_connection_error_trusted() {
2150        let peer = PeerId::random();
2151        let config = PeersConfig::test()
2152            .with_trusted_nodes(vec![TrustedPeer {
2153                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2154                tcp_port: 8008,
2155                udp_port: 8008,
2156                id: peer,
2157            }])
2158            .with_trusted_nodes_only(true);
2159        let mut peers = PeersManager::new(config);
2160        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2161
2162        match event!(peers) {
2163            PeerAction::Connect { peer_id, remote_addr } => {
2164                assert_eq!(peer_id, peer);
2165                assert_eq!(remote_addr, socket_addr);
2166            }
2167            _ => unreachable!(),
2168        }
2169
2170        let p = peers.peers.get(&peer).unwrap();
2171        assert_eq!(p.state, PeerConnectionState::PendingOut);
2172
2173        assert_eq!(peers.num_outbound_connections(), 0);
2174
2175        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2176            EthHandshakeError::NonStatusMessageInHandshake,
2177        ));
2178        assert!(err.is_fatal_protocol_error());
2179
2180        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2181        assert_eq!(peers.num_outbound_connections(), 0);
2182
2183        // try tmp ban peer
2184        match event!(peers) {
2185            PeerAction::BanPeer { peer_id } => {
2186                assert_eq!(peer_id, peer);
2187            }
2188            err => unreachable!("{err:?}"),
2189        }
2190
2191        // ensure we still have trusted peer
2192        assert!(peers.peers.contains_key(&peer));
2193
2194        // await for the ban to expire
2195        tokio::time::sleep(peers.backoff_durations.medium).await;
2196
2197        match event!(peers) {
2198            PeerAction::Connect { peer_id, remote_addr } => {
2199                assert_eq!(peer_id, peer);
2200                assert_eq!(remote_addr, socket_addr);
2201            }
2202            err => unreachable!("{err:?}"),
2203        }
2204    }
2205
2206    #[tokio::test]
2207    async fn test_outgoing_connection_error() {
2208        let peer = PeerId::random();
2209        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2210        let mut peers = PeersManager::default();
2211        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2212
2213        match event!(peers) {
2214            PeerAction::PeerAdded(peer_id) => {
2215                assert_eq!(peer_id, peer);
2216            }
2217            _ => unreachable!(),
2218        }
2219        match event!(peers) {
2220            PeerAction::Connect { peer_id, remote_addr } => {
2221                assert_eq!(peer_id, peer);
2222                assert_eq!(remote_addr, socket_addr);
2223            }
2224            _ => unreachable!(),
2225        }
2226
2227        let p = peers.peers.get(&peer).unwrap();
2228        assert_eq!(p.state, PeerConnectionState::PendingOut);
2229
2230        assert_eq!(peers.num_outbound_connections(), 0);
2231
2232        peers.on_outgoing_connection_failure(
2233            &socket_addr,
2234            &peer,
2235            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2236        );
2237
2238        assert_eq!(peers.num_outbound_connections(), 0);
2239    }
2240
2241    #[tokio::test]
2242    async fn test_outgoing_connection_gracefully_closed() {
2243        let peer = PeerId::random();
2244        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2245        let mut peers = PeersManager::default();
2246        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2247
2248        match event!(peers) {
2249            PeerAction::PeerAdded(peer_id) => {
2250                assert_eq!(peer_id, peer);
2251            }
2252            _ => unreachable!(),
2253        }
2254        match event!(peers) {
2255            PeerAction::Connect { peer_id, remote_addr } => {
2256                assert_eq!(peer_id, peer);
2257                assert_eq!(remote_addr, socket_addr);
2258            }
2259            _ => unreachable!(),
2260        }
2261
2262        let p = peers.peers.get(&peer).unwrap();
2263        assert_eq!(p.state, PeerConnectionState::PendingOut);
2264
2265        assert_eq!(peers.num_outbound_connections(), 0);
2266
2267        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2268
2269        assert_eq!(peers.num_outbound_connections(), 0);
2270        assert_eq!(peers.connection_info.num_pending_out, 0);
2271    }
2272
2273    #[tokio::test]
2274    async fn test_discovery_ban_list() {
2275        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2276        let socket_addr = SocketAddr::new(ip, 8008);
2277        let ban_list = BanList::new(vec![], vec![ip]);
2278        let config = PeersConfig::default().with_ban_list(ban_list);
2279        let mut peer_manager = PeersManager::new(config);
2280        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2281
2282        assert!(peer_manager.peers.is_empty());
2283    }
2284
2285    #[tokio::test]
2286    async fn test_on_pending_ban_list() {
2287        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2288        let socket_addr = SocketAddr::new(ip, 8008);
2289        let ban_list = BanList::new(vec![], vec![ip]);
2290        let config = PeersConfig::test().with_ban_list(ban_list);
2291        let mut peer_manager = PeersManager::new(config);
2292        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2293        // because we have no active peers this should be fine for testings
2294        match a {
2295            Ok(_) => panic!(),
2296            Err(err) => match err {
2297                InboundConnectionError::IpBanned => {
2298                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2299                }
2300                _ => unreachable!(),
2301            },
2302        }
2303    }
2304
2305    #[tokio::test]
2306    async fn test_on_active_inbound_ban_list() {
2307        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2308        let socket_addr = SocketAddr::new(ip, 8008);
2309        let given_peer_id = PeerId::random();
2310        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2311        let config = PeersConfig::test().with_ban_list(ban_list);
2312        let mut peer_manager = PeersManager::new(config);
2313        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2314        // non-trusted nodes should also increase pending_in
2315        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2316        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2317        // after the connection is established, the peer should be removed, the num_pending_in
2318        // should be decreased, and the num_inbound should not be increased
2319        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2320        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2321
2322        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2323            peer_manager.queued_actions.pop_front()
2324        else {
2325            panic!()
2326        };
2327
2328        assert_eq!(peer_id, given_peer_id)
2329    }
2330
2331    #[test]
2332    fn test_connection_limits() {
2333        let mut info = ConnectionInfo::default();
2334        info.inc_in();
2335        assert_eq!(info.num_inbound, 1);
2336        assert_eq!(info.num_outbound, 0);
2337        assert!(info.has_in_capacity());
2338
2339        info.decr_in();
2340        assert_eq!(info.num_inbound, 0);
2341        assert_eq!(info.num_outbound, 0);
2342
2343        info.inc_out();
2344        assert_eq!(info.num_inbound, 0);
2345        assert_eq!(info.num_outbound, 1);
2346        assert!(info.has_out_capacity());
2347
2348        info.decr_out();
2349        assert_eq!(info.num_inbound, 0);
2350        assert_eq!(info.num_outbound, 0);
2351    }
2352
2353    #[test]
2354    fn test_connection_peer_state() {
2355        let mut info = ConnectionInfo::default();
2356        info.inc_in();
2357
2358        info.decr_state(PeerConnectionState::In);
2359        assert_eq!(info.num_inbound, 0);
2360        assert_eq!(info.num_outbound, 0);
2361
2362        info.inc_out();
2363
2364        info.decr_state(PeerConnectionState::Out);
2365        assert_eq!(info.num_inbound, 0);
2366        assert_eq!(info.num_outbound, 0);
2367    }
2368
2369    #[tokio::test]
2370    async fn test_trusted_peers_are_prioritized() {
2371        let trusted_peer = PeerId::random();
2372        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2373        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2374            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2375            tcp_port: 8008,
2376            udp_port: 8008,
2377            id: trusted_peer,
2378        }]);
2379        let mut peers = PeersManager::new(config);
2380
2381        let basic_peer = PeerId::random();
2382        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2383        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2384
2385        match event!(peers) {
2386            PeerAction::PeerAdded(peer_id) => {
2387                assert_eq!(peer_id, basic_peer);
2388            }
2389            _ => unreachable!(),
2390        }
2391        match event!(peers) {
2392            PeerAction::Connect { peer_id, remote_addr } => {
2393                assert_eq!(peer_id, trusted_peer);
2394                assert_eq!(remote_addr, trusted_sock);
2395            }
2396            _ => unreachable!(),
2397        }
2398        match event!(peers) {
2399            PeerAction::Connect { peer_id, remote_addr } => {
2400                assert_eq!(peer_id, basic_peer);
2401                assert_eq!(remote_addr, basic_sock);
2402            }
2403            _ => unreachable!(),
2404        }
2405    }
2406
2407    #[tokio::test]
2408    async fn test_connect_trusted_nodes_only() {
2409        let trusted_peer = PeerId::random();
2410        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2411        let config = PeersConfig::test()
2412            .with_trusted_nodes(vec![TrustedPeer {
2413                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2414                tcp_port: 8008,
2415                udp_port: 8008,
2416                id: trusted_peer,
2417            }])
2418            .with_trusted_nodes_only(true);
2419        let mut peers = PeersManager::new(config);
2420
2421        let basic_peer = PeerId::random();
2422        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2423        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2424
2425        match event!(peers) {
2426            PeerAction::PeerAdded(peer_id) => {
2427                assert_eq!(peer_id, basic_peer);
2428            }
2429            _ => unreachable!(),
2430        }
2431        match event!(peers) {
2432            PeerAction::Connect { peer_id, remote_addr } => {
2433                assert_eq!(peer_id, trusted_peer);
2434                assert_eq!(remote_addr, trusted_sock);
2435            }
2436            _ => unreachable!(),
2437        }
2438        poll_fn(|cx| {
2439            assert!(peers.poll(cx).is_pending());
2440            Poll::Ready(())
2441        })
2442        .await;
2443    }
2444
2445    #[tokio::test]
2446    async fn test_incoming_with_trusted_nodes_only() {
2447        let trusted_peer = PeerId::random();
2448        let config = PeersConfig::test()
2449            .with_trusted_nodes(vec![TrustedPeer {
2450                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2451                tcp_port: 8008,
2452                udp_port: 8008,
2453                id: trusted_peer,
2454            }])
2455            .with_trusted_nodes_only(true);
2456        let mut peers = PeersManager::new(config);
2457
2458        let basic_peer = PeerId::random();
2459        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2460        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2461        // non-trusted nodes should also increase pending_in
2462        assert_eq!(peers.connection_info.num_pending_in, 1);
2463        peers.on_incoming_session_established(basic_peer, basic_sock);
2464        // after the connection is established, the peer should be removed, the num_pending_in
2465        // should be decreased, and the num_inbound mut not be increased
2466        assert_eq!(peers.connection_info.num_pending_in, 0);
2467        assert_eq!(peers.connection_info.num_inbound, 0);
2468
2469        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2470            peers.queued_actions.pop_front()
2471        else {
2472            panic!()
2473        };
2474        assert_eq!(basic_peer, peer_id);
2475        assert!(!peers.peers.contains_key(&basic_peer));
2476    }
2477
2478    #[tokio::test]
2479    async fn test_incoming_without_trusted_nodes_only() {
2480        let trusted_peer = PeerId::random();
2481        let config = PeersConfig::test()
2482            .with_trusted_nodes(vec![TrustedPeer {
2483                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2484                tcp_port: 8008,
2485                udp_port: 8008,
2486                id: trusted_peer,
2487            }])
2488            .with_trusted_nodes_only(false);
2489        let mut peers = PeersManager::new(config);
2490
2491        let basic_peer = PeerId::random();
2492        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2493        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2494
2495        // non-trusted nodes should also increase pending_in
2496        assert_eq!(peers.connection_info.num_pending_in, 1);
2497        peers.on_incoming_session_established(basic_peer, basic_sock);
2498        // after the connection is established, the peer should be removed, the num_pending_in
2499        // should be decreased, and the num_inbound must be increased
2500        assert_eq!(peers.connection_info.num_pending_in, 0);
2501        assert_eq!(peers.connection_info.num_inbound, 1);
2502        assert!(peers.peers.contains_key(&basic_peer));
2503    }
2504
2505    #[tokio::test]
2506    async fn test_incoming_at_capacity() {
2507        let mut config = PeersConfig::test();
2508        config.connection_info.max_inbound = 1;
2509        let mut peers = PeersManager::new(config);
2510
2511        let peer = PeerId::random();
2512        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2513        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2514
2515        peers.on_incoming_session_established(peer, addr);
2516
2517        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2518        assert_eq!(
2519            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2520            InboundConnectionError::ExceedsCapacity
2521        );
2522    }
2523
2524    #[tokio::test]
2525    async fn test_incoming_rate_limit() {
2526        let config = PeersConfig {
2527            incoming_ip_throttle_duration: Duration::from_millis(100),
2528            ..PeersConfig::test()
2529        };
2530        let mut peers = PeersManager::new(config);
2531
2532        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2533        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2534        assert_eq!(
2535            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2536            InboundConnectionError::IpBanned
2537        );
2538
2539        peers.release_interval.reset_immediately();
2540        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2541
2542        // await unban
2543        poll_fn(|cx| loop {
2544            if peers.poll(cx).is_pending() {
2545                return Poll::Ready(());
2546            }
2547        })
2548        .await;
2549
2550        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2551        assert_eq!(
2552            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2553            InboundConnectionError::IpBanned
2554        );
2555    }
2556
2557    #[tokio::test]
2558    async fn test_tick() {
2559        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2560        let socket_addr = SocketAddr::new(ip, 8008);
2561        let config = PeersConfig::test();
2562        let mut peer_manager = PeersManager::new(config);
2563        let peer_id = PeerId::random();
2564        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2565
2566        tokio::time::sleep(Duration::from_secs(1)).await;
2567        peer_manager.tick();
2568
2569        // still unconnected
2570        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2571
2572        // mark as connected
2573        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2574
2575        tokio::time::sleep(Duration::from_secs(1)).await;
2576        peer_manager.tick();
2577
2578        // still at default reputation
2579        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2580
2581        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2582
2583        tokio::time::sleep(Duration::from_secs(1)).await;
2584        peer_manager.tick();
2585
2586        // tick applied
2587        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2588    }
2589
2590    #[tokio::test]
2591    async fn test_remove_incoming_after_disconnect() {
2592        let peer_id = PeerId::random();
2593        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2594        let mut peers = PeersManager::default();
2595
2596        peers.on_incoming_pending_session(addr.ip()).unwrap();
2597        peers.on_incoming_session_established(peer_id, addr);
2598        let peer = peers.peers.get(&peer_id).unwrap();
2599        assert_eq!(peer.state, PeerConnectionState::In);
2600        assert!(peer.remove_after_disconnect);
2601
2602        peers.on_active_session_gracefully_closed(peer_id);
2603        assert!(!peers.peers.contains_key(&peer_id))
2604    }
2605
2606    #[tokio::test]
2607    async fn test_keep_incoming_after_disconnect_if_discovered() {
2608        let peer_id = PeerId::random();
2609        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2610        let mut peers = PeersManager::default();
2611
2612        peers.on_incoming_pending_session(addr.ip()).unwrap();
2613        peers.on_incoming_session_established(peer_id, addr);
2614        let peer = peers.peers.get(&peer_id).unwrap();
2615        assert_eq!(peer.state, PeerConnectionState::In);
2616        assert!(peer.remove_after_disconnect);
2617
2618        // trigger discovery manually while the peer is still connected
2619        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2620
2621        peers.on_active_session_gracefully_closed(peer_id);
2622
2623        let peer = peers.peers.get(&peer_id).unwrap();
2624        assert_eq!(peer.state, PeerConnectionState::Idle);
2625        assert!(!peer.remove_after_disconnect);
2626    }
2627
2628    #[tokio::test]
2629    async fn test_incoming_outgoing_already_connected() {
2630        let peer_id = PeerId::random();
2631        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2632        let mut peers = PeersManager::default();
2633
2634        peers.on_incoming_pending_session(addr.ip()).unwrap();
2635        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2636
2637        match event!(peers) {
2638            PeerAction::PeerAdded(_) => {}
2639            _ => unreachable!(),
2640        }
2641
2642        match event!(peers) {
2643            PeerAction::Connect { .. } => {}
2644            _ => unreachable!(),
2645        }
2646
2647        peers.on_incoming_session_established(peer_id, addr);
2648        peers.on_already_connected(Direction::Outgoing(peer_id));
2649        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2650        assert_eq!(peers.connection_info.num_inbound, 1);
2651        assert_eq!(peers.connection_info.num_pending_out, 0);
2652        assert_eq!(peers.connection_info.num_pending_in, 0);
2653        assert_eq!(peers.connection_info.num_outbound, 0);
2654    }
2655
2656    #[tokio::test]
2657    async fn test_already_connected_incoming_outgoing_connection_error() {
2658        let peer_id = PeerId::random();
2659        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2660        let mut peers = PeersManager::default();
2661
2662        peers.on_incoming_pending_session(addr.ip()).unwrap();
2663        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2664
2665        match event!(peers) {
2666            PeerAction::PeerAdded(_) => {}
2667            _ => unreachable!(),
2668        }
2669
2670        match event!(peers) {
2671            PeerAction::Connect { .. } => {}
2672            _ => unreachable!(),
2673        }
2674
2675        peers.on_incoming_session_established(peer_id, addr);
2676
2677        peers.on_outgoing_connection_failure(
2678            &addr,
2679            &peer_id,
2680            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2681        );
2682        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2683        assert_eq!(peers.connection_info.num_inbound, 1);
2684        assert_eq!(peers.connection_info.num_pending_out, 0);
2685        assert_eq!(peers.connection_info.num_pending_in, 0);
2686        assert_eq!(peers.connection_info.num_outbound, 0);
2687    }
2688
2689    #[tokio::test]
2690    async fn test_max_concurrent_dials() {
2691        let config = PeersConfig::default();
2692        let mut peer_manager = PeersManager::new(config);
2693        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2694        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2695        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2696            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2697        }
2698
2699        peer_manager.fill_outbound_slots();
2700        let dials = peer_manager
2701            .queued_actions
2702            .iter()
2703            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2704            .count();
2705        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2706    }
2707
2708    #[tokio::test]
2709    async fn test_max_num_of_pending_dials() {
2710        let config = PeersConfig::default();
2711        let mut peer_manager = PeersManager::new(config);
2712        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2713        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2714
2715        // add more peers than allowed
2716        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2717            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2718        }
2719
2720        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2721            match event!(peer_manager) {
2722                PeerAction::PeerAdded(_) => {}
2723                _ => unreachable!(),
2724            }
2725        }
2726
2727        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2728            match event!(peer_manager) {
2729                PeerAction::Connect { .. } => {}
2730                _ => unreachable!(),
2731            }
2732        }
2733
2734        // generate 'Connect' actions
2735        peer_manager.fill_outbound_slots();
2736
2737        // all dialed connections should be in 'PendingOut' state
2738        let dials = peer_manager.connection_info.num_pending_out;
2739        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2740
2741        let num_pendingout_states = peer_manager
2742            .peers
2743            .iter()
2744            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2745            .map(|(peer_id, _)| *peer_id)
2746            .collect::<Vec<PeerId>>();
2747        assert_eq!(
2748            num_pendingout_states.len(),
2749            peer_manager.connection_info.config.max_concurrent_outbound_dials
2750        );
2751
2752        // establish dialed connections
2753        for peer_id in &num_pendingout_states {
2754            peer_manager.on_active_outgoing_established(*peer_id);
2755        }
2756
2757        // all dialed connections should now be in 'Out' state
2758        for peer_id in &num_pendingout_states {
2759            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2760        }
2761
2762        // no more pending outbound connections
2763        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2764    }
2765
2766    #[tokio::test]
2767    async fn test_connect() {
2768        let peer = PeerId::random();
2769        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2770        let mut peers = PeersManager::default();
2771        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2772        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2773
2774        match event!(peers) {
2775            PeerAction::Connect { peer_id, remote_addr } => {
2776                assert_eq!(peer_id, peer);
2777                assert_eq!(remote_addr, socket_addr);
2778            }
2779            _ => unreachable!(),
2780        }
2781
2782        let (record, _) = peers.peer_by_id(peer).unwrap();
2783        assert_eq!(record.tcp_addr(), socket_addr);
2784        assert_eq!(record.udp_addr(), socket_addr);
2785
2786        // connect again
2787        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2788
2789        let (record, _) = peers.peer_by_id(peer).unwrap();
2790        assert_eq!(record.tcp_addr(), socket_addr);
2791        assert_eq!(record.udp_addr(), socket_addr);
2792    }
2793
2794    #[tokio::test]
2795    async fn test_incoming_connection_from_banned() {
2796        let peer = PeerId::random();
2797        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2798        let config = PeersConfig::test().with_max_inbound(3);
2799        let mut peers = PeersManager::new(config);
2800        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2801
2802        match event!(peers) {
2803            PeerAction::PeerAdded(peer_id) => {
2804                assert_eq!(peer_id, peer);
2805            }
2806            _ => unreachable!(),
2807        }
2808        match event!(peers) {
2809            PeerAction::Connect { peer_id, .. } => {
2810                assert_eq!(peer_id, peer);
2811            }
2812            _ => unreachable!(),
2813        }
2814
2815        poll_fn(|cx| {
2816            assert!(peers.poll(cx).is_pending());
2817            Poll::Ready(())
2818        })
2819        .await;
2820
2821        // simulate new connection drops with error
2822        loop {
2823            peers.on_active_session_dropped(
2824                &socket_addr,
2825                &peer,
2826                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2827                    reth_eth_wire::EthVersion::Eth68,
2828                    reth_eth_wire::EthMessageID::Status,
2829                )),
2830            );
2831
2832            if peers.peers.get(&peer).unwrap().is_banned() {
2833                break;
2834            }
2835
2836            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2837            peers.on_incoming_session_established(peer, socket_addr);
2838
2839            match event!(peers) {
2840                PeerAction::Connect { peer_id, .. } => {
2841                    assert_eq!(peer_id, peer);
2842                }
2843                _ => unreachable!(),
2844            }
2845        }
2846
2847        assert!(peers.peers.get(&peer).unwrap().is_banned());
2848
2849        // fill all incoming slots
2850        for _ in 0..peers.connection_info.config.max_inbound {
2851            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2852            peers.on_incoming_session_established(peer, socket_addr);
2853
2854            match event!(peers) {
2855                PeerAction::DisconnectBannedIncoming { peer_id } => {
2856                    assert_eq!(peer_id, peer);
2857                }
2858                _ => unreachable!(),
2859            }
2860        }
2861
2862        poll_fn(|cx| {
2863            assert!(peers.poll(cx).is_pending());
2864            Poll::Ready(())
2865        })
2866        .await;
2867
2868        assert_eq!(peers.connection_info.num_inbound, 0);
2869
2870        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2871
2872        // Assert we can still accept new connections
2873        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2874        assert_eq!(peers.connection_info.num_pending_in, 1);
2875
2876        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
2877        // connection info is updated via the peer's state which would be a noop here since the
2878        // banned peer's state is idle
2879        peers.on_active_session_gracefully_closed(peer);
2880        assert_eq!(peers.connection_info.num_inbound, 0);
2881    }
2882
2883    #[tokio::test]
2884    async fn test_add_pending_connect() {
2885        let peer = PeerId::random();
2886        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2887        let mut peers = PeersManager::default();
2888        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2889        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2890        assert_eq!(peers.connection_info.num_pending_out, 1);
2891    }
2892
2893    #[tokio::test]
2894    async fn test_dns_updates_peer_address() {
2895        let peer_id = PeerId::random();
2896        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2897        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2898
2899        let trusted = TrustedPeer {
2900            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2901            tcp_port: 8008,
2902            udp_port: 8008,
2903            id: peer_id,
2904        };
2905
2906        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2907        let mut manager = PeersManager::new(config);
2908        manager
2909            .trusted_peers_resolver
2910            .set_interval(tokio::time::interval(Duration::from_millis(1)));
2911
2912        manager.peers.insert(
2913            peer_id,
2914            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2915        );
2916
2917        for _ in 0..100 {
2918            let _ = event!(manager);
2919            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2920                break;
2921            }
2922            tokio::time::sleep(Duration::from_millis(10)).await;
2923        }
2924
2925        let updated_peer = manager.peers.get(&peer_id).unwrap();
2926        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2927    }
2928
2929    #[tokio::test]
2930    async fn test_ip_filter_blocks_inbound_connection() {
2931        use reth_net_banlist::IpFilter;
2932        use std::net::IpAddr;
2933
2934        // Create a filter that only allows 192.168.0.0/16
2935        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
2936        let config = PeersConfig::test().with_ip_filter(ip_filter);
2937        let mut peers = PeersManager::new(config);
2938
2939        // Try to connect from an allowed IP
2940        let allowed_ip: IpAddr = "192.168.1.100".parse().unwrap();
2941        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
2942
2943        // Try to connect from a disallowed IP
2944        let disallowed_ip: IpAddr = "10.0.0.1".parse().unwrap();
2945        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
2946    }
2947
2948    #[tokio::test]
2949    async fn test_ip_filter_blocks_outbound_connection() {
2950        use reth_net_banlist::IpFilter;
2951        use std::net::SocketAddr;
2952
2953        // Create a filter that only allows 192.168.0.0/16
2954        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16").unwrap();
2955        let config = PeersConfig::test().with_ip_filter(ip_filter);
2956        let mut peers = PeersManager::new(config);
2957
2958        let peer_id = PeerId::new([1; 64]);
2959
2960        // Try to add a peer with an allowed IP
2961        let allowed_addr: SocketAddr = "192.168.1.100:30303".parse().unwrap();
2962        peers.add_peer(peer_id, PeerAddr::from_tcp(allowed_addr), None);
2963        assert!(peers.peers.contains_key(&peer_id));
2964
2965        // Try to add a peer with a disallowed IP
2966        let peer_id2 = PeerId::new([2; 64]);
2967        let disallowed_addr: SocketAddr = "10.0.0.1:30303".parse().unwrap();
2968        peers.add_peer(peer_id2, PeerAddr::from_tcp(disallowed_addr), None);
2969        assert!(!peers.peers.contains_key(&peer_id2));
2970    }
2971
2972    #[tokio::test]
2973    async fn test_ip_filter_ipv6() {
2974        use reth_net_banlist::IpFilter;
2975        use std::net::IpAddr;
2976
2977        // Create a filter that only allows IPv6 range 2001:db8::/32
2978        let ip_filter = IpFilter::from_cidr_string("2001:db8::/32").unwrap();
2979        let config = PeersConfig::test().with_ip_filter(ip_filter);
2980        let mut peers = PeersManager::new(config);
2981
2982        // Try to connect from an allowed IPv6 address
2983        let allowed_ip: IpAddr = "2001:db8::1".parse().unwrap();
2984        assert!(peers.on_incoming_pending_session(allowed_ip).is_ok());
2985
2986        // Try to connect from a disallowed IPv6 address
2987        let disallowed_ip: IpAddr = "2001:db9::1".parse().unwrap();
2988        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
2989    }
2990
2991    #[tokio::test]
2992    async fn test_ip_filter_multiple_ranges() {
2993        use reth_net_banlist::IpFilter;
2994        use std::net::IpAddr;
2995
2996        // Create a filter that allows multiple ranges
2997        let ip_filter = IpFilter::from_cidr_string("192.168.0.0/16,10.0.0.0/8").unwrap();
2998        let config = PeersConfig::test().with_ip_filter(ip_filter);
2999        let mut peers = PeersManager::new(config);
3000
3001        // Try IPs from both allowed ranges
3002        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3003        let ip2: IpAddr = "10.5.10.20".parse().unwrap();
3004        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3005        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3006
3007        // Try IP from disallowed range
3008        let disallowed_ip: IpAddr = "172.16.0.1".parse().unwrap();
3009        assert!(peers.on_incoming_pending_session(disallowed_ip).is_err());
3010    }
3011
3012    #[tokio::test]
3013    async fn test_ip_filter_no_restriction() {
3014        use reth_net_banlist::IpFilter;
3015        use std::net::IpAddr;
3016
3017        // Create a filter with no restrictions (allow all)
3018        let ip_filter = IpFilter::allow_all();
3019        let config = PeersConfig::test().with_ip_filter(ip_filter);
3020        let mut peers = PeersManager::new(config);
3021
3022        // All IPs should be allowed
3023        let ip1: IpAddr = "192.168.1.1".parse().unwrap();
3024        let ip2: IpAddr = "10.0.0.1".parse().unwrap();
3025        let ip3: IpAddr = "8.8.8.8".parse().unwrap();
3026        assert!(peers.on_incoming_pending_session(ip1).is_ok());
3027        assert!(peers.on_incoming_pending_session(ip2).is_ok());
3028        assert!(peers.on_incoming_pending_session(ip3).is_ok());
3029    }
3030}