reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    is_connection_failed_reputation,
18    peers::{
19        config::PeerBackoffDurations,
20        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21    },
22    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27    fmt::Display,
28    io::{self},
29    net::{IpAddr, SocketAddr},
30    task::{Context, Poll},
31    time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35    sync::mpsc,
36    time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41/// Maintains the state of _all_ the peers known to the network.
42///
43/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
44/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
45///
46/// The [`PeersManager`] will be notified on peer related changes
47#[derive(Debug)]
48pub struct PeersManager {
49    /// All peers known to the network
50    peers: HashMap<PeerId, Peer>,
51    /// The set of trusted peer ids.
52    ///
53    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
54    /// an address: [`Self::add_trusted_peer_id`]
55    trusted_peer_ids: HashSet<PeerId>,
56    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
57    /// peer's address when the DNS records change.
58    trusted_peers_resolver: TrustedPeersResolver,
59    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
60    manager_tx: mpsc::UnboundedSender<PeerCommand>,
61    /// Receiver half of the command channel.
62    handle_rx: UnboundedReceiverStream<PeerCommand>,
63    /// Buffered actions until the manager is polled.
64    queued_actions: VecDeque<PeerAction>,
65    /// Interval for triggering connections if there are free slots.
66    refill_slots_interval: Interval,
67    /// How to weigh reputation changes
68    reputation_weights: ReputationChangeWeights,
69    /// Tracks current slot stats.
70    connection_info: ConnectionInfo,
71    /// Tracks unwanted ips/peer ids.
72    ban_list: BanList,
73    /// Tracks currently backed off peers.
74    backed_off_peers: HashMap<PeerId, std::time::Instant>,
75    /// Interval at which to check for peers to unban and release from the backoff map.
76    release_interval: Interval,
77    /// How long to ban bad peers.
78    ban_duration: Duration,
79    /// How long peers to which we could not connect for non-fatal reasons, e.g.
80    /// [`DisconnectReason::TooManyPeers`], are put in time out.
81    backoff_durations: PeerBackoffDurations,
82    /// If non-trusted peers should be connected to, or the connection from non-trusted
83    /// incoming peers should be accepted.
84    trusted_nodes_only: bool,
85    /// Timestamp of the last time [`Self::tick`] was called.
86    last_tick: Instant,
87    /// Maximum number of backoff attempts before we give up on a peer and dropping.
88    max_backoff_count: u8,
89    /// Tracks the connection state of the node
90    net_connection_state: NetworkConnectionState,
91    /// How long to temporarily ban ip on an incoming connection attempt.
92    incoming_ip_throttle_duration: Duration,
93}
94
95impl PeersManager {
96    /// Create a new instance with the given config
97    pub fn new(config: PeersConfig) -> Self {
98        let PeersConfig {
99            refill_slots_interval,
100            connection_info,
101            reputation_weights,
102            ban_list,
103            ban_duration,
104            backoff_durations,
105            trusted_nodes,
106            trusted_nodes_only,
107            trusted_nodes_resolution_interval,
108            basic_nodes,
109            max_backoff_count,
110            incoming_ip_throttle_duration,
111        } = config;
112        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
113        let now = Instant::now();
114
115        // We use half of the interval to decrease the max duration to `150%` in worst case
116        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
117
118        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
119        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
120
121        for trusted_peer in &trusted_nodes {
122            match trusted_peer.resolve_blocking() {
123                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
124                    trusted_peer_ids.insert(id);
125                    peers.entry(id).or_insert_with(|| {
126                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
127                    });
128                }
129                Err(err) => {
130                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
131                }
132            }
133        }
134
135        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
136            peers.entry(id).or_insert_with(|| {
137                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
138            });
139        }
140
141        Self {
142            peers,
143            trusted_peer_ids,
144            trusted_peers_resolver: TrustedPeersResolver::new(
145                trusted_nodes,
146                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
147            ),
148            manager_tx,
149            handle_rx: UnboundedReceiverStream::new(handle_rx),
150            queued_actions: Default::default(),
151            reputation_weights,
152            refill_slots_interval: tokio::time::interval(refill_slots_interval),
153            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
154            connection_info: ConnectionInfo::new(connection_info),
155            ban_list,
156            backed_off_peers: Default::default(),
157            ban_duration,
158            backoff_durations,
159            trusted_nodes_only,
160            last_tick: Instant::now(),
161            max_backoff_count,
162            net_connection_state: NetworkConnectionState::default(),
163            incoming_ip_throttle_duration,
164        }
165    }
166
167    /// Returns a new [`PeersHandle`] that can send commands to this type.
168    pub(crate) fn handle(&self) -> PeersHandle {
169        PeersHandle::new(self.manager_tx.clone())
170    }
171
172    /// Returns the number of peers in the peer set
173    #[inline]
174    pub(crate) fn num_known_peers(&self) -> usize {
175        self.peers.len()
176    }
177
178    /// Returns an iterator over all peers
179    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
180        self.peers.iter().map(|(peer_id, v)| {
181            NodeRecord::new_with_ports(
182                v.addr.tcp().ip(),
183                v.addr.tcp().port(),
184                v.addr.udp().map(|addr| addr.port()),
185                *peer_id,
186            )
187        })
188    }
189
190    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
191    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
192        self.peers.get(&peer_id).map(|v| {
193            (
194                NodeRecord::new_with_ports(
195                    v.addr.tcp().ip(),
196                    v.addr.tcp().port(),
197                    v.addr.udp().map(|addr| addr.port()),
198                    peer_id,
199                ),
200                v.kind,
201            )
202        })
203    }
204
205    /// Returns an iterator over all peer ids for peers with the given kind
206    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
207        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
208    }
209
210    /// Returns the number of currently active inbound connections.
211    #[inline]
212    pub(crate) const fn num_inbound_connections(&self) -> usize {
213        self.connection_info.num_inbound
214    }
215
216    /// Returns the number of currently __active__ outbound connections.
217    #[inline]
218    pub(crate) const fn num_outbound_connections(&self) -> usize {
219        self.connection_info.num_outbound
220    }
221
222    /// Returns the number of currently pending outbound connections.
223    #[inline]
224    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
225        self.connection_info.num_pending_out
226    }
227
228    /// Returns the number of currently backed off peers.
229    #[inline]
230    pub(crate) fn num_backed_off_peers(&self) -> usize {
231        self.backed_off_peers.len()
232    }
233
234    /// Returns the number of idle trusted peers.
235    fn num_idle_trusted_peers(&self) -> usize {
236        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
237    }
238
239    /// Invoked when a new _incoming_ tcp connection is accepted.
240    ///
241    /// returns an error if the inbound ip address is on the ban list
242    pub(crate) fn on_incoming_pending_session(
243        &mut self,
244        addr: IpAddr,
245    ) -> Result<(), InboundConnectionError> {
246        if self.ban_list.is_banned_ip(&addr) {
247            return Err(InboundConnectionError::IpBanned)
248        }
249
250        // check if we even have slots for a new incoming connection
251        if !self.connection_info.has_in_capacity() {
252            if self.trusted_peer_ids.is_empty() {
253                // if we don't have any incoming slots and no trusted peers, we don't accept any new
254                // connections
255                return Err(InboundConnectionError::ExceedsCapacity)
256            }
257
258            // there's an edge case here where no incoming connections besides from trusted peers
259            // are allowed (max_inbound == 0), in which case we still need to allow new pending
260            // incoming connections until all trusted peers are connected.
261            let num_idle_trusted_peers = self.num_idle_trusted_peers();
262            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
263                // we still want to limit concurrent pending connections
264                let max_inbound =
265                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
266                if self.connection_info.num_pending_in < max_inbound {
267                    self.connection_info.inc_pending_in();
268                    return Ok(())
269                }
270            }
271
272            // all trusted peers are either connected or connecting
273            return Err(InboundConnectionError::ExceedsCapacity)
274        }
275
276        // also cap the incoming connections we can process at once
277        if !self.connection_info.has_in_pending_capacity() {
278            return Err(InboundConnectionError::ExceedsCapacity)
279        }
280
281        // apply the rate limit
282        self.throttle_incoming_ip(addr);
283
284        self.connection_info.inc_pending_in();
285        Ok(())
286    }
287
288    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
289    /// rejected.
290    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
291        self.connection_info.decr_pending_in();
292    }
293
294    /// Invoked when a pending session was closed.
295    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
296        self.connection_info.decr_pending_in()
297    }
298
299    /// Invoked when a pending session was closed.
300    pub(crate) fn on_incoming_pending_session_dropped(
301        &mut self,
302        remote_addr: SocketAddr,
303        err: &PendingSessionHandshakeError,
304    ) {
305        if err.is_fatal_protocol_error() {
306            self.ban_ip(remote_addr.ip());
307
308            if err.merits_discovery_ban() {
309                self.queued_actions
310                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
311            }
312        }
313
314        self.connection_info.decr_pending_in();
315    }
316
317    /// Called when a new _incoming_ active session was established to the given peer.
318    ///
319    /// This will update the state of the peer if not yet tracked.
320    ///
321    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
322    /// be scheduled.
323    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
324        self.connection_info.decr_pending_in();
325
326        // we only need to check the peer id here as the ip address will have been checked at
327        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
328        if self.ban_list.is_banned_peer(&peer_id) {
329            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
330            return
331        }
332
333        // check if the peer is trustable or not
334        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
335        if self.trusted_nodes_only && !is_trusted {
336            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
337            return
338        }
339
340        // start a new tick, so the peer is not immediately rewarded for the time since last tick
341        self.tick();
342
343        match self.peers.entry(peer_id) {
344            Entry::Occupied(mut entry) => {
345                let peer = entry.get_mut();
346                if peer.is_banned() {
347                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
348                    return
349                }
350                // it might be the case that we're also trying to connect to this peer at the same
351                // time, so we need to adjust the state here
352                if peer.state.is_pending_out() {
353                    self.connection_info.decr_state(peer.state);
354                }
355
356                peer.state = PeerConnectionState::In;
357
358                is_trusted = is_trusted || peer.is_trusted();
359            }
360            Entry::Vacant(entry) => {
361                // peer is missing in the table, we add it but mark it as to be removed after
362                // disconnect, because we only know the outgoing port
363                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
364                peer.remove_after_disconnect = true;
365                entry.insert(peer);
366                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
367            }
368        }
369
370        let has_in_capacity = self.connection_info.has_in_capacity();
371        // increment new incoming connection
372        self.connection_info.inc_in();
373
374        // disconnect the peer if we don't have capacity for more inbound connections
375        if !is_trusted && !has_in_capacity {
376            self.queued_actions.push_back(PeerAction::Disconnect {
377                peer_id,
378                reason: Some(DisconnectReason::TooManyPeers),
379            });
380        }
381    }
382
383    /// Bans the peer temporarily with the configured ban timeout
384    fn ban_peer(&mut self, peer_id: PeerId) {
385        let mut ban_duration = self.ban_duration;
386        if let Some(peer) = self.peers.get(&peer_id) {
387            if peer.is_trusted() || peer.is_static() {
388                // For misbehaving trusted or static peers, we provide a bit more leeway when
389                // penalizing them.
390                ban_duration = self.backoff_durations.low / 2;
391            }
392        }
393
394        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
395        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
396    }
397
398    /// Bans the IP temporarily with the configured ban timeout
399    fn ban_ip(&mut self, ip: IpAddr) {
400        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
401    }
402
403    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
404    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
405        self.ban_list
406            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
407    }
408
409    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
410    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
411        trace!(target: "net::peers", ?peer_id, "backing off");
412
413        if let Some(peer) = self.peers.get_mut(&peer_id) {
414            peer.backed_off = true;
415            self.backed_off_peers.insert(peer_id, until);
416        }
417    }
418
419    /// Unbans the peer
420    fn unban_peer(&mut self, peer_id: PeerId) {
421        self.ban_list.unban_peer(&peer_id);
422        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
423    }
424
425    /// Tick function to update reputation of all connected peers.
426    /// Peers are rewarded with reputation increases for the time they are connected since the last
427    /// tick. This is to prevent peers from being disconnected eventually due to slashed
428    /// reputation because of some bad messages (most likely transaction related)
429    fn tick(&mut self) {
430        let now = Instant::now();
431        // Determine the number of seconds since the last tick.
432        // Ensuring that now is always greater than last_tick to account for issues with system
433        // time.
434        let secs_since_last_tick =
435            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
436        self.last_tick = now;
437
438        // update reputation via seconds connected
439        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
440            // update reputation via seconds connected, but keep the target _around_ the default
441            // reputation.
442            if peer.1.reputation < DEFAULT_REPUTATION {
443                peer.1.reputation += secs_since_last_tick;
444            }
445        }
446    }
447
448    /// Returns the tracked reputation for a peer.
449    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
450        self.peers.get(peer_id).map(|peer| peer.reputation)
451    }
452
453    /// Apply the corresponding reputation change to the given peer.
454    ///
455    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
456    /// reputation changes that can be attributed to network conditions. If the peer is a
457    /// trusted peer, it will also be less strict with the reputation slashing.
458    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
459        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
460            // First check if we should reset the reputation
461            if rep.is_reset() {
462                peer.reset_reputation()
463            } else {
464                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
465                if peer.is_trusted() || peer.is_static() {
466                    // exempt trusted and static peers from reputation slashing for
467                    if matches!(
468                        rep,
469                        ReputationChangeKind::Dropped |
470                            ReputationChangeKind::BadAnnouncement |
471                            ReputationChangeKind::Timeout |
472                            ReputationChangeKind::AlreadySeenTransaction
473                    ) {
474                        return
475                    }
476
477                    // also be less strict with the reputation slashing for trusted peers
478                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
479                        // this caps the reputation change to the maximum allowed for trusted peers
480                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
481                    }
482                }
483                peer.apply_reputation(reputation_change, rep)
484            }
485        } else {
486            return
487        };
488
489        match outcome {
490            ReputationChangeOutcome::None => {}
491            ReputationChangeOutcome::Ban => {
492                self.ban_peer(*peer_id);
493            }
494            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
495            ReputationChangeOutcome::DisconnectAndBan => {
496                self.queued_actions.push_back(PeerAction::Disconnect {
497                    peer_id: *peer_id,
498                    reason: Some(DisconnectReason::DisconnectRequested),
499                });
500                self.ban_peer(*peer_id);
501            }
502        }
503    }
504
505    /// Gracefully disconnected a pending _outgoing_ session
506    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
507        if let Some(peer) = self.peers.get_mut(peer_id) {
508            self.connection_info.decr_state(peer.state);
509            peer.state = PeerConnectionState::Idle;
510        }
511    }
512
513    /// Invoked when an _outgoing_ pending session was closed during authentication or the
514    /// handshake.
515    pub(crate) fn on_outgoing_pending_session_dropped(
516        &mut self,
517        remote_addr: &SocketAddr,
518        peer_id: &PeerId,
519        err: &PendingSessionHandshakeError,
520    ) {
521        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
522    }
523
524    /// Gracefully disconnected an active session
525    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
526        match self.peers.entry(peer_id) {
527            Entry::Occupied(mut entry) => {
528                self.connection_info.decr_state(entry.get().state);
529
530                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
531                    // this peer should be removed from the set
532                    entry.remove();
533                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
534                } else {
535                    // reset the peer's state
536                    // we reset the backoff counter since we're able to establish a successful
537                    // session to that peer
538                    entry.get_mut().severe_backoff_counter = 0;
539                    entry.get_mut().state = PeerConnectionState::Idle;
540                    return
541                }
542            }
543            Entry::Vacant(_) => return,
544        }
545
546        self.fill_outbound_slots();
547    }
548
549    /// Called when a _pending_ outbound connection is successful.
550    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
551        if let Some(peer) = self.peers.get_mut(&peer_id) {
552            self.connection_info.decr_state(peer.state);
553            self.connection_info.inc_out();
554            peer.state = PeerConnectionState::Out;
555        }
556    }
557
558    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
559    ///
560    /// Depending on whether the error is fatal, the peer will be removed from the peer set
561    /// otherwise its reputation is slashed.
562    pub(crate) fn on_active_session_dropped(
563        &mut self,
564        remote_addr: &SocketAddr,
565        peer_id: &PeerId,
566        err: &EthStreamError,
567    ) {
568        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
569    }
570
571    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
572    /// connection.
573    pub(crate) fn on_outgoing_connection_failure(
574        &mut self,
575        remote_addr: &SocketAddr,
576        peer_id: &PeerId,
577        err: &io::Error,
578    ) {
579        // there's a race condition where we accepted an incoming connection while we were trying to
580        // connect to the same peer at the same time. if the outgoing connection failed
581        // after the incoming connection was accepted, we can ignore this error
582        if let Some(peer) = self.peers.get(peer_id) {
583            if peer.state.is_incoming() {
584                // we already have an active connection to the peer, so we can ignore this error
585                return
586            }
587
588            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
589                // trigger resolution task for trusted peer since multiple connection failures
590                // occurred
591                self.trusted_peers_resolver.interval.reset_immediately();
592            }
593        }
594
595        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
596    }
597
598    fn on_connection_failure(
599        &mut self,
600        remote_addr: &SocketAddr,
601        peer_id: &PeerId,
602        err: impl SessionError,
603        reputation_change: ReputationChangeKind,
604    ) {
605        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
606
607        if err.is_fatal_protocol_error() {
608            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
609            // remove the peer to which we can't establish a connection due to protocol related
610            // issues.
611            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
612                self.connection_info.decr_state(entry.get().state);
613                // only remove if the peer is not trusted
614                if entry.get().is_trusted() {
615                    entry.get_mut().state = PeerConnectionState::Idle;
616                } else {
617                    entry.remove();
618                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
619                    // If the error is caused by a peer that should be banned from discovery
620                    if err.merits_discovery_ban() {
621                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
622                            peer_id: *peer_id,
623                            ip_addr: remote_addr.ip(),
624                        })
625                    }
626                }
627            }
628
629            // ban the peer
630            self.ban_peer(*peer_id);
631        } else {
632            let mut backoff_until = None;
633            let mut remove_peer = false;
634
635            if let Some(peer) = self.peers.get_mut(peer_id) {
636                if let Some(kind) = err.should_backoff() {
637                    if peer.is_trusted() || peer.is_static() {
638                        // provide a bit more leeway for trusted peers and use a lower backoff so
639                        // that we keep re-trying them after backing off shortly
640                        let backoff = self.backoff_durations.low / 2;
641                        backoff_until = Some(std::time::Instant::now() + backoff);
642                    } else {
643                        // Increment peer.backoff_counter
644                        if kind.is_severe() {
645                            peer.severe_backoff_counter =
646                                peer.severe_backoff_counter.saturating_add(1);
647                        }
648
649                        let backoff_time =
650                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
651
652                        // The peer has signaled that it is currently unable to process any more
653                        // connections, so we will hold off on attempting any new connections for a
654                        // while
655                        backoff_until = Some(backoff_time);
656                    }
657                } else {
658                    // If the error was not a backoff error, we reduce the peer's reputation
659                    let reputation_change = self.reputation_weights.change(reputation_change);
660                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
661                };
662
663                self.connection_info.decr_state(peer.state);
664                peer.state = PeerConnectionState::Idle;
665
666                if peer.severe_backoff_counter > self.max_backoff_count &&
667                    !peer.is_trusted() &&
668                    !peer.is_static()
669                {
670                    // mark peer for removal if it has been backoff too many times and is _not_
671                    // trusted or static
672                    remove_peer = true;
673                }
674            }
675
676            // remove peer if it has been marked for removal
677            if remove_peer {
678                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
679                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
680            } else if let Some(backoff_until) = backoff_until {
681                // otherwise, backoff the peer if marked as such
682                self.backoff_peer_until(*peer_id, backoff_until);
683            }
684        }
685
686        self.fill_outbound_slots();
687    }
688
689    /// Invoked if a pending session was disconnected because there's already a connection to the
690    /// peer.
691    ///
692    /// If the session was an outgoing connection, this means that the peer initiated a connection
693    /// to us at the same time and this connection is already established.
694    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
695        match direction {
696            Direction::Incoming => {
697                // need to decrement the ingoing counter
698                self.connection_info.decr_pending_in();
699            }
700            Direction::Outgoing(_) => {
701                // cleanup is handled when the incoming active session is activated in
702                // `on_incoming_session_established`
703            }
704        }
705    }
706
707    /// Called as follow-up for a discovered peer.
708    ///
709    /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
710    /// protocol
711    pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
712        if let Some(peer) = self.peers.get_mut(&peer_id) {
713            trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
714            peer.fork_id = Some(fork_id);
715        }
716    }
717
718    /// Called for a newly discovered peer.
719    ///
720    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
721    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
722        self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
723    }
724
725    /// Marks the given peer as trusted.
726    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
727        self.trusted_peer_ids.insert(peer_id);
728    }
729
730    /// Called for a newly discovered trusted peer.
731    ///
732    /// If the peer already exists, then the address and kind will be updated.
733    #[cfg_attr(not(test), expect(dead_code))]
734    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
735        self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
736    }
737
738    /// Called for a newly discovered peer.
739    ///
740    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
741    pub(crate) fn add_peer_kind(
742        &mut self,
743        peer_id: PeerId,
744        kind: PeerKind,
745        addr: PeerAddr,
746        fork_id: Option<ForkId>,
747    ) {
748        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
749            return
750        }
751
752        match self.peers.entry(peer_id) {
753            Entry::Occupied(mut entry) => {
754                let peer = entry.get_mut();
755                peer.kind = kind;
756                peer.fork_id = fork_id;
757                peer.addr = addr;
758
759                if peer.state.is_incoming() {
760                    // now that we have an actual discovered address, for that peer and not just the
761                    // ip of the incoming connection, we don't need to remove the peer after
762                    // disconnecting, See `on_incoming_session_established`
763                    peer.remove_after_disconnect = false;
764                }
765            }
766            Entry::Vacant(entry) => {
767                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
768                let mut peer = Peer::with_kind(addr, kind);
769                peer.fork_id = fork_id;
770                entry.insert(peer);
771                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
772            }
773        }
774
775        if kind.is_trusted() {
776            self.trusted_peer_ids.insert(peer_id);
777        }
778    }
779
780    /// Removes the tracked node from the set.
781    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
782        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
783        if entry.get().is_trusted() {
784            return
785        }
786        let mut peer = entry.remove();
787
788        trace!(target: "net::peers", ?peer_id, "remove discovered node");
789        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
790
791        if peer.state.is_connected() {
792            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
793            // we terminate the active session here, but only remove the peer after the session
794            // was disconnected, this prevents the case where the session is scheduled for
795            // disconnect but the node is immediately rediscovered, See also
796            // [`Self::on_disconnected()`]
797            peer.remove_after_disconnect = true;
798            peer.state.disconnect();
799            self.peers.insert(peer_id, peer);
800            self.queued_actions.push_back(PeerAction::Disconnect {
801                peer_id,
802                reason: Some(DisconnectReason::DisconnectRequested),
803            })
804        }
805    }
806
807    /// Connect to the given peer. NOTE: if the maximum number out outbound sessions is reached,
808    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
809    #[cfg_attr(not(test), expect(dead_code))]
810    pub(crate) fn add_and_connect(
811        &mut self,
812        peer_id: PeerId,
813        addr: PeerAddr,
814        fork_id: Option<ForkId>,
815    ) {
816        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
817    }
818
819    /// Connects a peer and its address with the given kind.
820    ///
821    /// Note: This is invoked on demand via an external command received by the manager
822    pub(crate) fn add_and_connect_kind(
823        &mut self,
824        peer_id: PeerId,
825        kind: PeerKind,
826        addr: PeerAddr,
827        fork_id: Option<ForkId>,
828    ) {
829        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
830            return
831        }
832
833        match self.peers.entry(peer_id) {
834            Entry::Occupied(mut entry) => {
835                let peer = entry.get_mut();
836                peer.kind = kind;
837                peer.fork_id = fork_id;
838                peer.addr = addr;
839
840                if peer.state == PeerConnectionState::Idle {
841                    // Try connecting again.
842                    peer.state = PeerConnectionState::PendingOut;
843                    self.connection_info.inc_pending_out();
844                    self.queued_actions
845                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
846                }
847            }
848            Entry::Vacant(entry) => {
849                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
850                let mut peer = Peer::with_kind(addr, kind);
851                peer.state = PeerConnectionState::PendingOut;
852                peer.fork_id = fork_id;
853                entry.insert(peer);
854                self.connection_info.inc_pending_out();
855                self.queued_actions
856                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
857            }
858        }
859
860        if kind.is_trusted() {
861            self.trusted_peer_ids.insert(peer_id);
862        }
863    }
864
865    /// Removes the tracked node from the trusted set.
866    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
867        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
868        if !entry.get().is_trusted() {
869            return
870        }
871
872        let peer = entry.get_mut();
873        peer.kind = PeerKind::Basic;
874
875        self.trusted_peer_ids.remove(&peer_id);
876    }
877
878    /// Returns the idle peer with the highest reputation.
879    ///
880    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
881    /// not currently marked as banned or backed off.
882    ///
883    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
884    /// `trusted` peers.
885    ///
886    /// Returns `None` if no peer is available.
887    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
888        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
889            !peer.is_backed_off() &&
890                !peer.is_banned() &&
891                peer.state.is_unconnected() &&
892                (!self.trusted_nodes_only || peer.is_trusted())
893        });
894
895        // keep track of the best peer, if there's one
896        let mut best_peer = unconnected.next()?;
897
898        if best_peer.1.is_trusted() || best_peer.1.is_static() {
899            return Some((*best_peer.0, best_peer.1))
900        }
901
902        for maybe_better in unconnected {
903            // if the peer is trusted or static, return it immediately
904            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
905                return Some((*maybe_better.0, maybe_better.1))
906            }
907
908            // otherwise we keep track of the best peer using the reputation
909            if maybe_better.1.reputation > best_peer.1.reputation {
910                best_peer = maybe_better;
911            }
912        }
913        Some((*best_peer.0, best_peer.1))
914    }
915
916    /// If there's capacity for new outbound connections, this will queue new
917    /// [`PeerAction::Connect`] actions.
918    ///
919    /// New connections are only initiated, if slots are available and appropriate peers are
920    /// available.
921    fn fill_outbound_slots(&mut self) {
922        self.tick();
923
924        if !self.net_connection_state.is_active() {
925            // nothing to fill
926            return
927        }
928
929        // as long as there are slots available fill them with the best peers
930        while self.connection_info.has_out_capacity() {
931            let action = {
932                let (peer_id, peer) = match self.best_unconnected() {
933                    Some(peer) => peer,
934                    _ => break,
935                };
936
937                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
938
939                peer.state = PeerConnectionState::PendingOut;
940                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
941            };
942
943            self.connection_info.inc_pending_out();
944
945            self.queued_actions.push_back(action);
946        }
947    }
948
949    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
950        if let Some(peer) = self.peers.get_mut(&peer_id) {
951            let new_addr = PeerAddr::new_with_ports(
952                new_record.address,
953                new_record.tcp_port,
954                Some(new_record.udp_port),
955            );
956
957            if peer.addr != new_addr {
958                peer.addr = new_addr;
959                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
960            }
961        }
962    }
963
964    /// Keeps track of network state changes.
965    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
966        self.net_connection_state = state;
967    }
968
969    /// Returns the current network connection state.
970    pub const fn connection_state(&self) -> &NetworkConnectionState {
971        &self.net_connection_state
972    }
973
974    /// Sets `net_connection_state` to `ShuttingDown`.
975    pub const fn on_shutdown(&mut self) {
976        self.net_connection_state = NetworkConnectionState::ShuttingDown;
977    }
978
979    /// Advances the state.
980    ///
981    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
982    /// [`PeersManager`] is polled.
983    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
984        loop {
985            // drain buffered actions
986            if let Some(action) = self.queued_actions.pop_front() {
987                return Poll::Ready(action)
988            }
989
990            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
991                match cmd {
992                    PeerCommand::Add(peer_id, addr) => {
993                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
994                    }
995                    PeerCommand::Remove(peer) => self.remove_peer(peer),
996                    PeerCommand::ReputationChange(peer_id, rep) => {
997                        self.apply_reputation_change(&peer_id, rep)
998                    }
999                    PeerCommand::GetPeer(peer, tx) => {
1000                        let _ = tx.send(self.peers.get(&peer).cloned());
1001                    }
1002                    PeerCommand::GetPeers(tx) => {
1003                        let _ = tx.send(self.iter_peers().collect());
1004                    }
1005                }
1006            }
1007
1008            if self.release_interval.poll_tick(cx).is_ready() {
1009                let now = std::time::Instant::now();
1010                let (_, unbanned_peers) = self.ban_list.evict(now);
1011
1012                for peer_id in unbanned_peers {
1013                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1014                        peer.unban();
1015                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1016                    }
1017                }
1018
1019                // clear the backoff list of expired backoffs, and mark the relevant peers as
1020                // ready to be dialed
1021                self.backed_off_peers.retain(|peer_id, until| {
1022                    if now > *until {
1023                        if let Some(peer) = self.peers.get_mut(peer_id) {
1024                            peer.backed_off = false;
1025                        }
1026                        return false
1027                    }
1028                    true
1029                })
1030            }
1031
1032            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1033                self.fill_outbound_slots();
1034            }
1035
1036            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1037                self.on_resolved_peer(peer_id, new_record);
1038            }
1039
1040            if self.queued_actions.is_empty() {
1041                return Poll::Pending
1042            }
1043        }
1044    }
1045}
1046
1047impl Default for PeersManager {
1048    fn default() -> Self {
1049        Self::new(Default::default())
1050    }
1051}
1052
1053/// Tracks stats about connected nodes
1054#[derive(Debug, Clone, PartialEq, Eq, Default)]
1055pub struct ConnectionInfo {
1056    /// Counter for currently occupied slots for active outbound connections.
1057    num_outbound: usize,
1058    /// Counter for pending outbound connections.
1059    num_pending_out: usize,
1060    /// Counter for currently occupied slots for active inbound connections.
1061    num_inbound: usize,
1062    /// Counter for pending inbound connections.
1063    num_pending_in: usize,
1064    /// Restrictions on number of connections.
1065    config: ConnectionsConfig,
1066}
1067
1068// === impl ConnectionInfo ===
1069
1070impl ConnectionInfo {
1071    /// Returns a new [`ConnectionInfo`] with the given config.
1072    const fn new(config: ConnectionsConfig) -> Self {
1073        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1074    }
1075
1076    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1077    const fn has_out_capacity(&self) -> bool {
1078        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1079            self.num_outbound < self.config.max_outbound
1080    }
1081
1082    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1083    const fn has_in_capacity(&self) -> bool {
1084        self.num_inbound < self.config.max_inbound
1085    }
1086
1087    /// Returns `true` if we can handle an additional incoming pending connection.
1088    const fn has_in_pending_capacity(&self) -> bool {
1089        self.num_pending_in < self.config.max_inbound
1090    }
1091
1092    const fn decr_state(&mut self, state: PeerConnectionState) {
1093        match state {
1094            PeerConnectionState::Idle => {}
1095            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1096            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1097            PeerConnectionState::PendingOut => self.decr_pending_out(),
1098        }
1099    }
1100
1101    const fn decr_out(&mut self) {
1102        self.num_outbound -= 1;
1103    }
1104
1105    const fn inc_out(&mut self) {
1106        self.num_outbound += 1;
1107    }
1108
1109    const fn inc_pending_out(&mut self) {
1110        self.num_pending_out += 1;
1111    }
1112
1113    const fn inc_in(&mut self) {
1114        self.num_inbound += 1;
1115    }
1116
1117    const fn inc_pending_in(&mut self) {
1118        self.num_pending_in += 1;
1119    }
1120
1121    const fn decr_in(&mut self) {
1122        self.num_inbound -= 1;
1123    }
1124
1125    const fn decr_pending_out(&mut self) {
1126        self.num_pending_out -= 1;
1127    }
1128
1129    const fn decr_pending_in(&mut self) {
1130        self.num_pending_in -= 1;
1131    }
1132}
1133
1134/// Actions the peer manager can trigger.
1135#[derive(Debug)]
1136pub enum PeerAction {
1137    /// Start a new connection to a peer.
1138    Connect {
1139        /// The peer to connect to.
1140        peer_id: PeerId,
1141        /// Where to reach the node
1142        remote_addr: SocketAddr,
1143    },
1144    /// Disconnect an existing connection.
1145    Disconnect {
1146        /// The peer ID of the established connection.
1147        peer_id: PeerId,
1148        /// An optional reason for the disconnect.
1149        reason: Option<DisconnectReason>,
1150    },
1151    /// Disconnect an existing incoming connection, because the peers reputation is below the
1152    /// banned threshold or is on the [`BanList`]
1153    DisconnectBannedIncoming {
1154        /// The peer ID of the established connection.
1155        peer_id: PeerId,
1156    },
1157    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1158    DisconnectUntrustedIncoming {
1159        /// The peer ID.
1160        peer_id: PeerId,
1161    },
1162    /// Ban the peer in discovery.
1163    DiscoveryBanPeerId {
1164        /// The peer ID.
1165        peer_id: PeerId,
1166        /// The IP address.
1167        ip_addr: IpAddr,
1168    },
1169    /// Ban the IP in discovery.
1170    DiscoveryBanIp {
1171        /// The IP address.
1172        ip_addr: IpAddr,
1173    },
1174    /// Ban the peer temporarily
1175    BanPeer {
1176        /// The peer ID.
1177        peer_id: PeerId,
1178    },
1179    /// Unban the peer temporarily
1180    UnBanPeer {
1181        /// The peer ID.
1182        peer_id: PeerId,
1183    },
1184    /// Emit peerAdded event
1185    PeerAdded(PeerId),
1186    /// Emit peerRemoved event
1187    PeerRemoved(PeerId),
1188}
1189
1190/// Error thrown when a incoming connection is rejected right away
1191#[derive(Debug, Error, PartialEq, Eq)]
1192pub enum InboundConnectionError {
1193    /// The remote's ip address is banned
1194    IpBanned,
1195    /// No capacity for new inbound connections
1196    ExceedsCapacity,
1197}
1198
1199impl Display for InboundConnectionError {
1200    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1201        write!(f, "{self:?}")
1202    }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use alloy_primitives::B512;
1208    use reth_eth_wire::{
1209        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1210        DisconnectReason,
1211    };
1212    use reth_net_banlist::BanList;
1213    use reth_network_api::Direction;
1214    use reth_network_peers::{PeerId, TrustedPeer};
1215    use reth_network_types::{
1216        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1217    };
1218    use std::{
1219        future::{poll_fn, Future},
1220        io,
1221        net::{IpAddr, Ipv4Addr, SocketAddr},
1222        pin::Pin,
1223        task::{Context, Poll},
1224        time::Duration,
1225    };
1226    use url::Host;
1227
1228    use super::PeersManager;
1229    use crate::{
1230        error::SessionError,
1231        peers::{
1232            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1233            PeerConnectionState,
1234        },
1235        session::PendingSessionHandshakeError,
1236        PeersConfig,
1237    };
1238
1239    struct PeerActionFuture<'a> {
1240        peers: &'a mut PeersManager,
1241    }
1242
1243    impl Future for PeerActionFuture<'_> {
1244        type Output = PeerAction;
1245
1246        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1247            self.get_mut().peers.poll(cx)
1248        }
1249    }
1250
1251    macro_rules! event {
1252        ($peers:expr) => {
1253            PeerActionFuture { peers: &mut $peers }.await
1254        };
1255    }
1256
1257    #[tokio::test]
1258    async fn test_insert() {
1259        let peer = PeerId::random();
1260        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1261        let mut peers = PeersManager::default();
1262        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1263
1264        match event!(peers) {
1265            PeerAction::PeerAdded(peer_id) => {
1266                assert_eq!(peer_id, peer);
1267            }
1268            _ => unreachable!(),
1269        }
1270        match event!(peers) {
1271            PeerAction::Connect { peer_id, remote_addr } => {
1272                assert_eq!(peer_id, peer);
1273                assert_eq!(remote_addr, socket_addr);
1274            }
1275            _ => unreachable!(),
1276        }
1277
1278        let (record, _) = peers.peer_by_id(peer).unwrap();
1279        assert_eq!(record.tcp_addr(), socket_addr);
1280        assert_eq!(record.udp_addr(), socket_addr);
1281    }
1282
1283    #[tokio::test]
1284    async fn test_insert_udp() {
1285        let peer = PeerId::random();
1286        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1287        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1288        let mut peers = PeersManager::default();
1289        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1290
1291        match event!(peers) {
1292            PeerAction::PeerAdded(peer_id) => {
1293                assert_eq!(peer_id, peer);
1294            }
1295            _ => unreachable!(),
1296        }
1297        match event!(peers) {
1298            PeerAction::Connect { peer_id, remote_addr } => {
1299                assert_eq!(peer_id, peer);
1300                assert_eq!(remote_addr, tcp_addr);
1301            }
1302            _ => unreachable!(),
1303        }
1304
1305        let (record, _) = peers.peer_by_id(peer).unwrap();
1306        assert_eq!(record.tcp_addr(), tcp_addr);
1307        assert_eq!(record.udp_addr(), udp_addr);
1308    }
1309
1310    #[tokio::test]
1311    async fn test_ban() {
1312        let peer = PeerId::random();
1313        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1314        let mut peers = PeersManager::default();
1315        peers.ban_peer(peer);
1316        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1317
1318        match event!(peers) {
1319            PeerAction::BanPeer { peer_id } => {
1320                assert_eq!(peer_id, peer);
1321            }
1322            _ => unreachable!(),
1323        }
1324
1325        poll_fn(|cx| {
1326            assert!(peers.poll(cx).is_pending());
1327            Poll::Ready(())
1328        })
1329        .await;
1330    }
1331
1332    #[tokio::test]
1333    async fn test_unban() {
1334        let peer = PeerId::random();
1335        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1336        let mut peers = PeersManager::default();
1337        peers.ban_peer(peer);
1338        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1339
1340        match event!(peers) {
1341            PeerAction::BanPeer { peer_id } => {
1342                assert_eq!(peer_id, peer);
1343            }
1344            _ => unreachable!(),
1345        }
1346
1347        poll_fn(|cx| {
1348            assert!(peers.poll(cx).is_pending());
1349            Poll::Ready(())
1350        })
1351        .await;
1352
1353        peers.unban_peer(peer);
1354
1355        match event!(peers) {
1356            PeerAction::UnBanPeer { peer_id } => {
1357                assert_eq!(peer_id, peer);
1358            }
1359            _ => unreachable!(),
1360        }
1361
1362        poll_fn(|cx| {
1363            assert!(peers.poll(cx).is_pending());
1364            Poll::Ready(())
1365        })
1366        .await;
1367    }
1368
1369    #[tokio::test]
1370    async fn test_backoff_on_busy() {
1371        let peer = PeerId::random();
1372        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1373
1374        let mut peers = PeersManager::new(PeersConfig::test());
1375        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1376
1377        match event!(peers) {
1378            PeerAction::PeerAdded(peer_id) => {
1379                assert_eq!(peer_id, peer);
1380            }
1381            _ => unreachable!(),
1382        }
1383        match event!(peers) {
1384            PeerAction::Connect { peer_id, .. } => {
1385                assert_eq!(peer_id, peer);
1386            }
1387            _ => unreachable!(),
1388        }
1389
1390        poll_fn(|cx| {
1391            assert!(peers.poll(cx).is_pending());
1392            Poll::Ready(())
1393        })
1394        .await;
1395
1396        peers.on_active_session_dropped(
1397            &socket_addr,
1398            &peer,
1399            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1400                DisconnectReason::TooManyPeers,
1401            )),
1402        );
1403
1404        poll_fn(|cx| {
1405            assert!(peers.poll(cx).is_pending());
1406            Poll::Ready(())
1407        })
1408        .await;
1409
1410        assert!(peers.backed_off_peers.contains_key(&peer));
1411        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1412
1413        tokio::time::sleep(peers.backoff_durations.low).await;
1414
1415        match event!(peers) {
1416            PeerAction::Connect { peer_id, .. } => {
1417                assert_eq!(peer_id, peer);
1418            }
1419            _ => unreachable!(),
1420        }
1421
1422        assert!(!peers.backed_off_peers.contains_key(&peer));
1423        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1424    }
1425
1426    #[tokio::test]
1427    async fn test_backoff_on_no_response() {
1428        let peer = PeerId::random();
1429        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1430
1431        let backoff_durations = PeerBackoffDurations::test();
1432        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1433        let mut peers = PeersManager::new(config);
1434        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1435
1436        match event!(peers) {
1437            PeerAction::PeerAdded(peer_id) => {
1438                assert_eq!(peer_id, peer);
1439            }
1440            _ => unreachable!(),
1441        }
1442        match event!(peers) {
1443            PeerAction::Connect { peer_id, .. } => {
1444                assert_eq!(peer_id, peer);
1445            }
1446            _ => unreachable!(),
1447        }
1448
1449        poll_fn(|cx| {
1450            assert!(peers.poll(cx).is_pending());
1451            Poll::Ready(())
1452        })
1453        .await;
1454
1455        peers.on_outgoing_pending_session_dropped(
1456            &socket_addr,
1457            &peer,
1458            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1459                EthHandshakeError::NoResponse,
1460            )),
1461        );
1462
1463        poll_fn(|cx| {
1464            assert!(peers.poll(cx).is_pending());
1465            Poll::Ready(())
1466        })
1467        .await;
1468
1469        assert!(peers.backed_off_peers.contains_key(&peer));
1470        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1471
1472        tokio::time::sleep(backoff_durations.high).await;
1473
1474        match event!(peers) {
1475            PeerAction::Connect { peer_id, .. } => {
1476                assert_eq!(peer_id, peer);
1477            }
1478            _ => unreachable!(),
1479        }
1480
1481        assert!(!peers.backed_off_peers.contains_key(&peer));
1482        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1483    }
1484
1485    #[tokio::test]
1486    async fn test_low_backoff() {
1487        let peer = PeerId::random();
1488        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1489        let config = PeersConfig::test();
1490        let mut peers = PeersManager::new(config);
1491        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1492        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1493
1494        let backoff_timestamp = peers
1495            .backoff_durations
1496            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1497
1498        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1499        assert!(backoff_timestamp <= expected);
1500    }
1501
1502    #[tokio::test]
1503    async fn test_multiple_backoff_calculations() {
1504        let peer = PeerId::random();
1505        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1506        let config = PeersConfig::default();
1507        let mut peers = PeersManager::new(config);
1508        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1509        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1510
1511        // Simulate a peer that was already backed off once
1512        peer_struct.severe_backoff_counter = 1;
1513
1514        let now = std::time::Instant::now();
1515
1516        // Simulate the increment that happens in on_connection_failure
1517        peer_struct.severe_backoff_counter += 1;
1518        // Get official backoff time
1519        let backoff_time = peers
1520            .backoff_durations
1521            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1522
1523        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1524        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1525
1526        // We can't use assert_eq! since there is a very small diff in the nano secs
1527        // Usually it is 1800s != 1799.9999996s
1528        assert!(backoff_time.duration_since(now) > backoff_duration);
1529    }
1530
1531    #[tokio::test]
1532    async fn test_ban_on_active_drop() {
1533        let peer = PeerId::random();
1534        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1535        let mut peers = PeersManager::default();
1536        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1537
1538        match event!(peers) {
1539            PeerAction::PeerAdded(peer_id) => {
1540                assert_eq!(peer_id, peer);
1541            }
1542            _ => unreachable!(),
1543        }
1544        match event!(peers) {
1545            PeerAction::Connect { peer_id, .. } => {
1546                assert_eq!(peer_id, peer);
1547            }
1548            _ => unreachable!(),
1549        }
1550
1551        poll_fn(|cx| {
1552            assert!(peers.poll(cx).is_pending());
1553            Poll::Ready(())
1554        })
1555        .await;
1556
1557        peers.on_active_session_dropped(
1558            &socket_addr,
1559            &peer,
1560            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1561                DisconnectReason::UselessPeer,
1562            )),
1563        );
1564
1565        match event!(peers) {
1566            PeerAction::PeerRemoved(peer_id) => {
1567                assert_eq!(peer_id, peer);
1568            }
1569            _ => unreachable!(),
1570        }
1571        match event!(peers) {
1572            PeerAction::BanPeer { peer_id } => {
1573                assert_eq!(peer_id, peer);
1574            }
1575            _ => unreachable!(),
1576        }
1577
1578        poll_fn(|cx| {
1579            assert!(peers.poll(cx).is_pending());
1580            Poll::Ready(())
1581        })
1582        .await;
1583
1584        assert!(!peers.peers.contains_key(&peer));
1585    }
1586
1587    #[tokio::test]
1588    async fn test_remove_on_max_backoff_count() {
1589        let peer = PeerId::random();
1590        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1591        let config = PeersConfig::test();
1592        let mut peers = PeersManager::new(config.clone());
1593        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1594        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1595
1596        // Simulate a peer that was already backed off once
1597        peer_struct.severe_backoff_counter = config.max_backoff_count;
1598
1599        match event!(peers) {
1600            PeerAction::PeerAdded(peer_id) => {
1601                assert_eq!(peer_id, peer);
1602            }
1603            _ => unreachable!(),
1604        }
1605        match event!(peers) {
1606            PeerAction::Connect { peer_id, .. } => {
1607                assert_eq!(peer_id, peer);
1608            }
1609            _ => unreachable!(),
1610        }
1611
1612        poll_fn(|cx| {
1613            assert!(peers.poll(cx).is_pending());
1614            Poll::Ready(())
1615        })
1616        .await;
1617
1618        peers.on_outgoing_pending_session_dropped(
1619            &socket_addr,
1620            &peer,
1621            &PendingSessionHandshakeError::Eth(
1622                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1623            ),
1624        );
1625
1626        match event!(peers) {
1627            PeerAction::PeerRemoved(peer_id) => {
1628                assert_eq!(peer_id, peer);
1629            }
1630            _ => unreachable!(),
1631        }
1632
1633        poll_fn(|cx| {
1634            assert!(peers.poll(cx).is_pending());
1635            Poll::Ready(())
1636        })
1637        .await;
1638
1639        assert!(!peers.peers.contains_key(&peer));
1640    }
1641
1642    #[tokio::test]
1643    async fn test_ban_on_pending_drop() {
1644        let peer = PeerId::random();
1645        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1646        let mut peers = PeersManager::default();
1647        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1648
1649        match event!(peers) {
1650            PeerAction::PeerAdded(peer_id) => {
1651                assert_eq!(peer_id, peer);
1652            }
1653            _ => unreachable!(),
1654        }
1655        match event!(peers) {
1656            PeerAction::Connect { peer_id, .. } => {
1657                assert_eq!(peer_id, peer);
1658            }
1659            _ => unreachable!(),
1660        }
1661
1662        poll_fn(|cx| {
1663            assert!(peers.poll(cx).is_pending());
1664            Poll::Ready(())
1665        })
1666        .await;
1667
1668        peers.on_outgoing_pending_session_dropped(
1669            &socket_addr,
1670            &peer,
1671            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1672                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1673            )),
1674        );
1675
1676        match event!(peers) {
1677            PeerAction::PeerRemoved(peer_id) => {
1678                assert_eq!(peer_id, peer);
1679            }
1680            _ => unreachable!(),
1681        }
1682        match event!(peers) {
1683            PeerAction::BanPeer { peer_id } => {
1684                assert_eq!(peer_id, peer);
1685            }
1686            _ => unreachable!(),
1687        }
1688
1689        poll_fn(|cx| {
1690            assert!(peers.poll(cx).is_pending());
1691            Poll::Ready(())
1692        })
1693        .await;
1694
1695        assert!(!peers.peers.contains_key(&peer));
1696    }
1697
1698    #[tokio::test]
1699    async fn test_internally_closed_incoming() {
1700        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1701        let mut peers = PeersManager::default();
1702
1703        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1704        assert_eq!(peers.connection_info.num_pending_in, 1);
1705        peers.on_incoming_pending_session_rejected_internally();
1706        assert_eq!(peers.connection_info.num_pending_in, 0);
1707    }
1708
1709    #[tokio::test]
1710    async fn test_reject_incoming_at_pending_capacity() {
1711        let mut peers = PeersManager::default();
1712
1713        for count in 1..=peers.connection_info.config.max_inbound {
1714            let socket_addr =
1715                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1716            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1717            assert_eq!(peers.connection_info.num_pending_in, count);
1718        }
1719        assert!(peers.connection_info.has_in_capacity());
1720        assert!(!peers.connection_info.has_in_pending_capacity());
1721
1722        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1723        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1724    }
1725
1726    #[tokio::test]
1727    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1728        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1729        let trusted = PeerId::random();
1730        peers.add_trusted_peer_id(trusted);
1731
1732        // connect the trusted peer
1733        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1734        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1735        peers.on_incoming_session_established(trusted, addr);
1736
1737        match event!(peers) {
1738            PeerAction::PeerAdded(id) => {
1739                assert_eq!(id, trusted);
1740            }
1741            _ => unreachable!(),
1742        }
1743
1744        // saturate the remaining inbound slots with untrusted peers
1745        let mut connected_untrusted_peer_ids = Vec::new();
1746        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1747            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1748            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1749            let peer_id = PeerId::random();
1750            peers.on_incoming_session_established(peer_id, addr);
1751            connected_untrusted_peer_ids.push(peer_id);
1752
1753            match event!(peers) {
1754                PeerAction::PeerAdded(id) => {
1755                    assert_eq!(id, peer_id);
1756                }
1757                _ => unreachable!(),
1758            }
1759        }
1760
1761        let mut pending_addrs = Vec::new();
1762
1763        // saturate available slots
1764        for i in 0..2 {
1765            let socket_addr =
1766                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1767            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1768
1769            pending_addrs.push(socket_addr);
1770        }
1771
1772        assert_eq!(peers.connection_info.num_pending_in, 2);
1773
1774        // try to handle additional incoming connections at capacity
1775        for i in 0..2 {
1776            let socket_addr =
1777                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1778            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1779        }
1780
1781        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1782            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1783                DisconnectReason::UselessPeer,
1784            )),
1785        ));
1786
1787        // Remove all pending peers
1788        for pending_addr in pending_addrs {
1789            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1790        }
1791
1792        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1793
1794        println!(
1795            "num_inbound: {}, has_in_capacity: {}",
1796            peers.connection_info.num_inbound,
1797            peers.connection_info.has_in_capacity()
1798        );
1799
1800        // disconnect a connected peer
1801        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1802
1803        println!(
1804            "num_inbound: {}, has_in_capacity: {}",
1805            peers.connection_info.num_inbound,
1806            peers.connection_info.has_in_capacity()
1807        );
1808
1809        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1810        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1811    }
1812
1813    #[tokio::test]
1814    async fn test_closed_incoming() {
1815        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1816        let mut peers = PeersManager::default();
1817
1818        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1819        assert_eq!(peers.connection_info.num_pending_in, 1);
1820        peers.on_incoming_pending_session_gracefully_closed();
1821        assert_eq!(peers.connection_info.num_pending_in, 0);
1822    }
1823
1824    #[tokio::test]
1825    async fn test_dropped_incoming() {
1826        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1827        let ban_duration = Duration::from_millis(500);
1828        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1829        let mut peers = PeersManager::new(config);
1830
1831        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1832        assert_eq!(peers.connection_info.num_pending_in, 1);
1833        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1834            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1835                DisconnectReason::UselessPeer,
1836            )),
1837        ));
1838
1839        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1840        assert_eq!(peers.connection_info.num_pending_in, 0);
1841        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1842
1843        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1844
1845        // unbanned after timeout
1846        tokio::time::sleep(ban_duration).await;
1847
1848        poll_fn(|cx| {
1849            let _ = peers.poll(cx);
1850            Poll::Ready(())
1851        })
1852        .await;
1853
1854        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1855        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1856    }
1857
1858    #[tokio::test]
1859    async fn test_reputation_change_connected() {
1860        let peer = PeerId::random();
1861        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1862        let mut peers = PeersManager::default();
1863        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1864
1865        match event!(peers) {
1866            PeerAction::PeerAdded(peer_id) => {
1867                assert_eq!(peer_id, peer);
1868            }
1869            _ => unreachable!(),
1870        }
1871        match event!(peers) {
1872            PeerAction::Connect { peer_id, remote_addr } => {
1873                assert_eq!(peer_id, peer);
1874                assert_eq!(remote_addr, socket_addr);
1875            }
1876            _ => unreachable!(),
1877        }
1878
1879        let p = peers.peers.get_mut(&peer).unwrap();
1880        assert_eq!(p.state, PeerConnectionState::PendingOut);
1881
1882        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1883
1884        let p = peers.peers.get(&peer).unwrap();
1885        assert_eq!(p.state, PeerConnectionState::PendingOut);
1886        assert!(p.is_banned());
1887
1888        peers.on_active_session_gracefully_closed(peer);
1889
1890        let p = peers.peers.get(&peer).unwrap();
1891        assert_eq!(p.state, PeerConnectionState::Idle);
1892        assert!(p.is_banned());
1893
1894        match event!(peers) {
1895            PeerAction::Disconnect { peer_id, .. } => {
1896                assert_eq!(peer_id, peer);
1897            }
1898            _ => unreachable!(),
1899        }
1900    }
1901
1902    #[tokio::test]
1903    async fn accept_incoming_trusted_unknown_peer_address() {
1904        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1905        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1906        // try to connect trusted peer
1907        let trusted = PeerId::random();
1908        peers.add_trusted_peer_id(trusted);
1909
1910        // saturate the inbound slots
1911        for i in 0..peers.connection_info.config.max_inbound {
1912            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1913            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1914            let peer_id = PeerId::random();
1915            peers.on_incoming_session_established(peer_id, addr);
1916
1917            match event!(peers) {
1918                PeerAction::PeerAdded(id) => {
1919                    assert_eq!(id, peer_id);
1920                }
1921                _ => unreachable!(),
1922            }
1923        }
1924
1925        // try to connect untrusted peer
1926        let untrusted = PeerId::random();
1927        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1928        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1929        peers.on_incoming_session_established(untrusted, socket_addr);
1930
1931        match event!(peers) {
1932            PeerAction::PeerAdded(id) => {
1933                assert_eq!(id, untrusted);
1934            }
1935            _ => unreachable!(),
1936        }
1937
1938        match event!(peers) {
1939            PeerAction::Disconnect { peer_id, reason } => {
1940                assert_eq!(peer_id, untrusted);
1941                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1942            }
1943            _ => unreachable!(),
1944        }
1945
1946        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1947        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1948        peers.on_incoming_session_established(trusted, socket_addr);
1949
1950        match event!(peers) {
1951            PeerAction::PeerAdded(id) => {
1952                assert_eq!(id, trusted);
1953            }
1954            _ => unreachable!(),
1955        }
1956
1957        poll_fn(|cx| {
1958            assert!(peers.poll(cx).is_pending());
1959            Poll::Ready(())
1960        })
1961        .await;
1962
1963        let peer = peers.peers.get(&trusted).unwrap();
1964        assert_eq!(peer.state, PeerConnectionState::In);
1965    }
1966
1967    #[tokio::test]
1968    async fn test_already_connected() {
1969        let peer = PeerId::random();
1970        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1971        let mut peers = PeersManager::default();
1972
1973        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
1974        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1975        assert_eq!(peers.connection_info.num_pending_in, 1);
1976
1977        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
1978        // to increase by 1
1979        peers.on_incoming_session_established(peer, socket_addr);
1980        let p = peers.peers.get_mut(&peer).expect("peer not found");
1981        assert_eq!(p.addr.tcp(), socket_addr);
1982        assert_eq!(peers.connection_info.num_pending_in, 0);
1983        assert_eq!(peers.connection_info.num_inbound, 1);
1984
1985        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
1986        // by 1
1987        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1988        assert_eq!(peers.connection_info.num_pending_in, 1);
1989
1990        // Simulate a rejection due to an already established connection, expecting the
1991        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
1992        // should not be changed.
1993        peers.on_already_connected(Direction::Incoming);
1994
1995        let p = peers.peers.get_mut(&peer).expect("peer not found");
1996        assert_eq!(p.addr.tcp(), socket_addr);
1997        assert_eq!(peers.connection_info.num_pending_in, 0);
1998        assert_eq!(peers.connection_info.num_inbound, 1);
1999    }
2000
2001    #[tokio::test]
2002    async fn test_reputation_change_trusted_peer() {
2003        let peer = PeerId::random();
2004        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2005        let mut peers = PeersManager::default();
2006        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2007
2008        match event!(peers) {
2009            PeerAction::PeerAdded(peer_id) => {
2010                assert_eq!(peer_id, peer);
2011            }
2012            _ => unreachable!(),
2013        }
2014        match event!(peers) {
2015            PeerAction::Connect { peer_id, remote_addr } => {
2016                assert_eq!(peer_id, peer);
2017                assert_eq!(remote_addr, socket_addr);
2018            }
2019            _ => unreachable!(),
2020        }
2021
2022        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2023        peers.on_active_outgoing_established(peer);
2024        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2025
2026        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2027
2028        {
2029            let p = peers.peers.get(&peer).unwrap();
2030            assert_eq!(p.state, PeerConnectionState::Out);
2031            // not banned yet
2032            assert!(!p.is_banned());
2033        }
2034
2035        // ensure peer is banned eventually
2036        loop {
2037            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2038
2039            let p = peers.peers.get(&peer).unwrap();
2040            if p.is_banned() {
2041                break
2042            }
2043        }
2044
2045        match event!(peers) {
2046            PeerAction::Disconnect { peer_id, .. } => {
2047                assert_eq!(peer_id, peer);
2048            }
2049            _ => unreachable!(),
2050        }
2051    }
2052
2053    #[tokio::test]
2054    async fn test_reputation_management() {
2055        let peer = PeerId::random();
2056        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2057        let mut peers = PeersManager::default();
2058        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2059        assert_eq!(peers.get_reputation(&peer), Some(0));
2060
2061        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2062        assert_eq!(peers.get_reputation(&peer), Some(1024));
2063
2064        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2065        assert_eq!(peers.get_reputation(&peer), Some(0));
2066    }
2067
2068    #[tokio::test]
2069    async fn test_remove_discovered_active() {
2070        let peer = PeerId::random();
2071        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2072        let mut peers = PeersManager::default();
2073        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2074
2075        match event!(peers) {
2076            PeerAction::PeerAdded(peer_id) => {
2077                assert_eq!(peer_id, peer);
2078            }
2079            _ => unreachable!(),
2080        }
2081        match event!(peers) {
2082            PeerAction::Connect { peer_id, remote_addr } => {
2083                assert_eq!(peer_id, peer);
2084                assert_eq!(remote_addr, socket_addr);
2085            }
2086            _ => unreachable!(),
2087        }
2088
2089        let p = peers.peers.get(&peer).unwrap();
2090        assert_eq!(p.state, PeerConnectionState::PendingOut);
2091
2092        peers.remove_peer(peer);
2093
2094        match event!(peers) {
2095            PeerAction::PeerRemoved(peer_id) => {
2096                assert_eq!(peer_id, peer);
2097            }
2098            _ => unreachable!(),
2099        }
2100        match event!(peers) {
2101            PeerAction::Disconnect { peer_id, .. } => {
2102                assert_eq!(peer_id, peer);
2103            }
2104            _ => unreachable!(),
2105        }
2106
2107        let p = peers.peers.get(&peer).unwrap();
2108        assert_eq!(p.state, PeerConnectionState::PendingOut);
2109
2110        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2111        let p = peers.peers.get(&peer).unwrap();
2112        assert_eq!(p.state, PeerConnectionState::PendingOut);
2113
2114        peers.on_active_session_gracefully_closed(peer);
2115        assert!(!peers.peers.contains_key(&peer));
2116    }
2117
2118    #[tokio::test]
2119    async fn test_fatal_outgoing_connection_error_trusted() {
2120        let peer = PeerId::random();
2121        let config = PeersConfig::test()
2122            .with_trusted_nodes(vec![TrustedPeer {
2123                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2124                tcp_port: 8008,
2125                udp_port: 8008,
2126                id: peer,
2127            }])
2128            .with_trusted_nodes_only(true);
2129        let mut peers = PeersManager::new(config);
2130        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2131
2132        match event!(peers) {
2133            PeerAction::Connect { peer_id, remote_addr } => {
2134                assert_eq!(peer_id, peer);
2135                assert_eq!(remote_addr, socket_addr);
2136            }
2137            _ => unreachable!(),
2138        }
2139
2140        let p = peers.peers.get(&peer).unwrap();
2141        assert_eq!(p.state, PeerConnectionState::PendingOut);
2142
2143        assert_eq!(peers.num_outbound_connections(), 0);
2144
2145        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2146            EthHandshakeError::NonStatusMessageInHandshake,
2147        ));
2148        assert!(err.is_fatal_protocol_error());
2149
2150        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2151        assert_eq!(peers.num_outbound_connections(), 0);
2152
2153        // try tmp ban peer
2154        match event!(peers) {
2155            PeerAction::BanPeer { peer_id } => {
2156                assert_eq!(peer_id, peer);
2157            }
2158            err => unreachable!("{err:?}"),
2159        }
2160
2161        // ensure we still have trusted peer
2162        assert!(peers.peers.contains_key(&peer));
2163
2164        // await for the ban to expire
2165        tokio::time::sleep(peers.backoff_durations.medium).await;
2166
2167        match event!(peers) {
2168            PeerAction::Connect { peer_id, remote_addr } => {
2169                assert_eq!(peer_id, peer);
2170                assert_eq!(remote_addr, socket_addr);
2171            }
2172            err => unreachable!("{err:?}"),
2173        }
2174    }
2175
2176    #[tokio::test]
2177    async fn test_outgoing_connection_error() {
2178        let peer = PeerId::random();
2179        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2180        let mut peers = PeersManager::default();
2181        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2182
2183        match event!(peers) {
2184            PeerAction::PeerAdded(peer_id) => {
2185                assert_eq!(peer_id, peer);
2186            }
2187            _ => unreachable!(),
2188        }
2189        match event!(peers) {
2190            PeerAction::Connect { peer_id, remote_addr } => {
2191                assert_eq!(peer_id, peer);
2192                assert_eq!(remote_addr, socket_addr);
2193            }
2194            _ => unreachable!(),
2195        }
2196
2197        let p = peers.peers.get(&peer).unwrap();
2198        assert_eq!(p.state, PeerConnectionState::PendingOut);
2199
2200        assert_eq!(peers.num_outbound_connections(), 0);
2201
2202        peers.on_outgoing_connection_failure(
2203            &socket_addr,
2204            &peer,
2205            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2206        );
2207
2208        assert_eq!(peers.num_outbound_connections(), 0);
2209    }
2210
2211    #[tokio::test]
2212    async fn test_outgoing_connection_gracefully_closed() {
2213        let peer = PeerId::random();
2214        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2215        let mut peers = PeersManager::default();
2216        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2217
2218        match event!(peers) {
2219            PeerAction::PeerAdded(peer_id) => {
2220                assert_eq!(peer_id, peer);
2221            }
2222            _ => unreachable!(),
2223        }
2224        match event!(peers) {
2225            PeerAction::Connect { peer_id, remote_addr } => {
2226                assert_eq!(peer_id, peer);
2227                assert_eq!(remote_addr, socket_addr);
2228            }
2229            _ => unreachable!(),
2230        }
2231
2232        let p = peers.peers.get(&peer).unwrap();
2233        assert_eq!(p.state, PeerConnectionState::PendingOut);
2234
2235        assert_eq!(peers.num_outbound_connections(), 0);
2236
2237        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2238
2239        assert_eq!(peers.num_outbound_connections(), 0);
2240        assert_eq!(peers.connection_info.num_pending_out, 0);
2241    }
2242
2243    #[tokio::test]
2244    async fn test_discovery_ban_list() {
2245        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2246        let socket_addr = SocketAddr::new(ip, 8008);
2247        let ban_list = BanList::new(vec![], vec![ip]);
2248        let config = PeersConfig::default().with_ban_list(ban_list);
2249        let mut peer_manager = PeersManager::new(config);
2250        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2251
2252        assert!(peer_manager.peers.is_empty());
2253    }
2254
2255    #[tokio::test]
2256    async fn test_on_pending_ban_list() {
2257        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2258        let socket_addr = SocketAddr::new(ip, 8008);
2259        let ban_list = BanList::new(vec![], vec![ip]);
2260        let config = PeersConfig::test().with_ban_list(ban_list);
2261        let mut peer_manager = PeersManager::new(config);
2262        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2263        // because we have no active peers this should be fine for testings
2264        match a {
2265            Ok(_) => panic!(),
2266            Err(err) => match err {
2267                InboundConnectionError::IpBanned => {
2268                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2269                }
2270                _ => unreachable!(),
2271            },
2272        }
2273    }
2274
2275    #[tokio::test]
2276    async fn test_on_active_inbound_ban_list() {
2277        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2278        let socket_addr = SocketAddr::new(ip, 8008);
2279        let given_peer_id = PeerId::random();
2280        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2281        let config = PeersConfig::test().with_ban_list(ban_list);
2282        let mut peer_manager = PeersManager::new(config);
2283        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2284        // non-trusted nodes should also increase pending_in
2285        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2286        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2287        // after the connection is established, the peer should be removed, the num_pending_in
2288        // should be decreased, and the num_inbound should not be increased
2289        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2290        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2291
2292        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2293            peer_manager.queued_actions.pop_front()
2294        else {
2295            panic!()
2296        };
2297
2298        assert_eq!(peer_id, given_peer_id)
2299    }
2300
2301    #[test]
2302    fn test_connection_limits() {
2303        let mut info = ConnectionInfo::default();
2304        info.inc_in();
2305        assert_eq!(info.num_inbound, 1);
2306        assert_eq!(info.num_outbound, 0);
2307        assert!(info.has_in_capacity());
2308
2309        info.decr_in();
2310        assert_eq!(info.num_inbound, 0);
2311        assert_eq!(info.num_outbound, 0);
2312
2313        info.inc_out();
2314        assert_eq!(info.num_inbound, 0);
2315        assert_eq!(info.num_outbound, 1);
2316        assert!(info.has_out_capacity());
2317
2318        info.decr_out();
2319        assert_eq!(info.num_inbound, 0);
2320        assert_eq!(info.num_outbound, 0);
2321    }
2322
2323    #[test]
2324    fn test_connection_peer_state() {
2325        let mut info = ConnectionInfo::default();
2326        info.inc_in();
2327
2328        info.decr_state(PeerConnectionState::In);
2329        assert_eq!(info.num_inbound, 0);
2330        assert_eq!(info.num_outbound, 0);
2331
2332        info.inc_out();
2333
2334        info.decr_state(PeerConnectionState::Out);
2335        assert_eq!(info.num_inbound, 0);
2336        assert_eq!(info.num_outbound, 0);
2337    }
2338
2339    #[tokio::test]
2340    async fn test_trusted_peers_are_prioritized() {
2341        let trusted_peer = PeerId::random();
2342        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2343        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2344            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2345            tcp_port: 8008,
2346            udp_port: 8008,
2347            id: trusted_peer,
2348        }]);
2349        let mut peers = PeersManager::new(config);
2350
2351        let basic_peer = PeerId::random();
2352        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2353        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2354
2355        match event!(peers) {
2356            PeerAction::PeerAdded(peer_id) => {
2357                assert_eq!(peer_id, basic_peer);
2358            }
2359            _ => unreachable!(),
2360        }
2361        match event!(peers) {
2362            PeerAction::Connect { peer_id, remote_addr } => {
2363                assert_eq!(peer_id, trusted_peer);
2364                assert_eq!(remote_addr, trusted_sock);
2365            }
2366            _ => unreachable!(),
2367        }
2368        match event!(peers) {
2369            PeerAction::Connect { peer_id, remote_addr } => {
2370                assert_eq!(peer_id, basic_peer);
2371                assert_eq!(remote_addr, basic_sock);
2372            }
2373            _ => unreachable!(),
2374        }
2375    }
2376
2377    #[tokio::test]
2378    async fn test_connect_trusted_nodes_only() {
2379        let trusted_peer = PeerId::random();
2380        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2381        let config = PeersConfig::test()
2382            .with_trusted_nodes(vec![TrustedPeer {
2383                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2384                tcp_port: 8008,
2385                udp_port: 8008,
2386                id: trusted_peer,
2387            }])
2388            .with_trusted_nodes_only(true);
2389        let mut peers = PeersManager::new(config);
2390
2391        let basic_peer = PeerId::random();
2392        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2393        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2394
2395        match event!(peers) {
2396            PeerAction::PeerAdded(peer_id) => {
2397                assert_eq!(peer_id, basic_peer);
2398            }
2399            _ => unreachable!(),
2400        }
2401        match event!(peers) {
2402            PeerAction::Connect { peer_id, remote_addr } => {
2403                assert_eq!(peer_id, trusted_peer);
2404                assert_eq!(remote_addr, trusted_sock);
2405            }
2406            _ => unreachable!(),
2407        }
2408        poll_fn(|cx| {
2409            assert!(peers.poll(cx).is_pending());
2410            Poll::Ready(())
2411        })
2412        .await;
2413    }
2414
2415    #[tokio::test]
2416    async fn test_incoming_with_trusted_nodes_only() {
2417        let trusted_peer = PeerId::random();
2418        let config = PeersConfig::test()
2419            .with_trusted_nodes(vec![TrustedPeer {
2420                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2421                tcp_port: 8008,
2422                udp_port: 8008,
2423                id: trusted_peer,
2424            }])
2425            .with_trusted_nodes_only(true);
2426        let mut peers = PeersManager::new(config);
2427
2428        let basic_peer = PeerId::random();
2429        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2430        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2431        // non-trusted nodes should also increase pending_in
2432        assert_eq!(peers.connection_info.num_pending_in, 1);
2433        peers.on_incoming_session_established(basic_peer, basic_sock);
2434        // after the connection is established, the peer should be removed, the num_pending_in
2435        // should be decreased, and the num_inbound mut not be increased
2436        assert_eq!(peers.connection_info.num_pending_in, 0);
2437        assert_eq!(peers.connection_info.num_inbound, 0);
2438
2439        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2440            peers.queued_actions.pop_front()
2441        else {
2442            panic!()
2443        };
2444        assert_eq!(basic_peer, peer_id);
2445        assert!(!peers.peers.contains_key(&basic_peer));
2446    }
2447
2448    #[tokio::test]
2449    async fn test_incoming_without_trusted_nodes_only() {
2450        let trusted_peer = PeerId::random();
2451        let config = PeersConfig::test()
2452            .with_trusted_nodes(vec![TrustedPeer {
2453                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2454                tcp_port: 8008,
2455                udp_port: 8008,
2456                id: trusted_peer,
2457            }])
2458            .with_trusted_nodes_only(false);
2459        let mut peers = PeersManager::new(config);
2460
2461        let basic_peer = PeerId::random();
2462        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2463        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2464
2465        // non-trusted nodes should also increase pending_in
2466        assert_eq!(peers.connection_info.num_pending_in, 1);
2467        peers.on_incoming_session_established(basic_peer, basic_sock);
2468        // after the connection is established, the peer should be removed, the num_pending_in
2469        // should be decreased, and the num_inbound must be increased
2470        assert_eq!(peers.connection_info.num_pending_in, 0);
2471        assert_eq!(peers.connection_info.num_inbound, 1);
2472        assert!(peers.peers.contains_key(&basic_peer));
2473    }
2474
2475    #[tokio::test]
2476    async fn test_incoming_at_capacity() {
2477        let mut config = PeersConfig::test();
2478        config.connection_info.max_inbound = 1;
2479        let mut peers = PeersManager::new(config);
2480
2481        let peer = PeerId::random();
2482        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2483        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2484
2485        peers.on_incoming_session_established(peer, addr);
2486
2487        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2488        assert_eq!(
2489            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2490            InboundConnectionError::ExceedsCapacity
2491        );
2492    }
2493
2494    #[tokio::test]
2495    async fn test_incoming_rate_limit() {
2496        let config = PeersConfig {
2497            incoming_ip_throttle_duration: Duration::from_millis(100),
2498            ..PeersConfig::test()
2499        };
2500        let mut peers = PeersManager::new(config);
2501
2502        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2503        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2504        assert_eq!(
2505            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2506            InboundConnectionError::IpBanned
2507        );
2508
2509        peers.release_interval.reset_immediately();
2510        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2511
2512        // await unban
2513        poll_fn(|cx| loop {
2514            if peers.poll(cx).is_pending() {
2515                return Poll::Ready(());
2516            }
2517        })
2518        .await;
2519
2520        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2521        assert_eq!(
2522            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2523            InboundConnectionError::IpBanned
2524        );
2525    }
2526
2527    #[tokio::test]
2528    async fn test_tick() {
2529        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2530        let socket_addr = SocketAddr::new(ip, 8008);
2531        let config = PeersConfig::test();
2532        let mut peer_manager = PeersManager::new(config);
2533        let peer_id = PeerId::random();
2534        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2535
2536        tokio::time::sleep(Duration::from_secs(1)).await;
2537        peer_manager.tick();
2538
2539        // still unconnected
2540        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2541
2542        // mark as connected
2543        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2544
2545        tokio::time::sleep(Duration::from_secs(1)).await;
2546        peer_manager.tick();
2547
2548        // still at default reputation
2549        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2550
2551        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2552
2553        tokio::time::sleep(Duration::from_secs(1)).await;
2554        peer_manager.tick();
2555
2556        // tick applied
2557        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2558    }
2559
2560    #[tokio::test]
2561    async fn test_remove_incoming_after_disconnect() {
2562        let peer_id = PeerId::random();
2563        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2564        let mut peers = PeersManager::default();
2565
2566        peers.on_incoming_pending_session(addr.ip()).unwrap();
2567        peers.on_incoming_session_established(peer_id, addr);
2568        let peer = peers.peers.get(&peer_id).unwrap();
2569        assert_eq!(peer.state, PeerConnectionState::In);
2570        assert!(peer.remove_after_disconnect);
2571
2572        peers.on_active_session_gracefully_closed(peer_id);
2573        assert!(!peers.peers.contains_key(&peer_id))
2574    }
2575
2576    #[tokio::test]
2577    async fn test_keep_incoming_after_disconnect_if_discovered() {
2578        let peer_id = PeerId::random();
2579        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2580        let mut peers = PeersManager::default();
2581
2582        peers.on_incoming_pending_session(addr.ip()).unwrap();
2583        peers.on_incoming_session_established(peer_id, addr);
2584        let peer = peers.peers.get(&peer_id).unwrap();
2585        assert_eq!(peer.state, PeerConnectionState::In);
2586        assert!(peer.remove_after_disconnect);
2587
2588        // trigger discovery manually while the peer is still connected
2589        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2590
2591        peers.on_active_session_gracefully_closed(peer_id);
2592
2593        let peer = peers.peers.get(&peer_id).unwrap();
2594        assert_eq!(peer.state, PeerConnectionState::Idle);
2595        assert!(!peer.remove_after_disconnect);
2596    }
2597
2598    #[tokio::test]
2599    async fn test_incoming_outgoing_already_connected() {
2600        let peer_id = PeerId::random();
2601        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2602        let mut peers = PeersManager::default();
2603
2604        peers.on_incoming_pending_session(addr.ip()).unwrap();
2605        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2606
2607        match event!(peers) {
2608            PeerAction::PeerAdded(_) => {}
2609            _ => unreachable!(),
2610        }
2611
2612        match event!(peers) {
2613            PeerAction::Connect { .. } => {}
2614            _ => unreachable!(),
2615        }
2616
2617        peers.on_incoming_session_established(peer_id, addr);
2618        peers.on_already_connected(Direction::Outgoing(peer_id));
2619        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2620        assert_eq!(peers.connection_info.num_inbound, 1);
2621        assert_eq!(peers.connection_info.num_pending_out, 0);
2622        assert_eq!(peers.connection_info.num_pending_in, 0);
2623        assert_eq!(peers.connection_info.num_outbound, 0);
2624    }
2625
2626    #[tokio::test]
2627    async fn test_already_connected_incoming_outgoing_connection_error() {
2628        let peer_id = PeerId::random();
2629        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2630        let mut peers = PeersManager::default();
2631
2632        peers.on_incoming_pending_session(addr.ip()).unwrap();
2633        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2634
2635        match event!(peers) {
2636            PeerAction::PeerAdded(_) => {}
2637            _ => unreachable!(),
2638        }
2639
2640        match event!(peers) {
2641            PeerAction::Connect { .. } => {}
2642            _ => unreachable!(),
2643        }
2644
2645        peers.on_incoming_session_established(peer_id, addr);
2646
2647        peers.on_outgoing_connection_failure(
2648            &addr,
2649            &peer_id,
2650            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2651        );
2652        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2653        assert_eq!(peers.connection_info.num_inbound, 1);
2654        assert_eq!(peers.connection_info.num_pending_out, 0);
2655        assert_eq!(peers.connection_info.num_pending_in, 0);
2656        assert_eq!(peers.connection_info.num_outbound, 0);
2657    }
2658
2659    #[tokio::test]
2660    async fn test_max_concurrent_dials() {
2661        let config = PeersConfig::default();
2662        let mut peer_manager = PeersManager::new(config);
2663        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2664        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2665        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2666            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2667        }
2668
2669        peer_manager.fill_outbound_slots();
2670        let dials = peer_manager
2671            .queued_actions
2672            .iter()
2673            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2674            .count();
2675        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2676    }
2677
2678    #[tokio::test]
2679    async fn test_max_num_of_pending_dials() {
2680        let config = PeersConfig::default();
2681        let mut peer_manager = PeersManager::new(config);
2682        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2683        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2684
2685        // add more peers than allowed
2686        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2687            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2688        }
2689
2690        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2691            match event!(peer_manager) {
2692                PeerAction::PeerAdded(_) => {}
2693                _ => unreachable!(),
2694            }
2695        }
2696
2697        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2698            match event!(peer_manager) {
2699                PeerAction::Connect { .. } => {}
2700                _ => unreachable!(),
2701            }
2702        }
2703
2704        // generate 'Connect' actions
2705        peer_manager.fill_outbound_slots();
2706
2707        // all dialed connections should be in 'PendingOut' state
2708        let dials = peer_manager.connection_info.num_pending_out;
2709        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2710
2711        let num_pendingout_states = peer_manager
2712            .peers
2713            .iter()
2714            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2715            .map(|(peer_id, _)| *peer_id)
2716            .collect::<Vec<PeerId>>();
2717        assert_eq!(
2718            num_pendingout_states.len(),
2719            peer_manager.connection_info.config.max_concurrent_outbound_dials
2720        );
2721
2722        // establish dialed connections
2723        for peer_id in &num_pendingout_states {
2724            peer_manager.on_active_outgoing_established(*peer_id);
2725        }
2726
2727        // all dialed connections should now be in 'Out' state
2728        for peer_id in &num_pendingout_states {
2729            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2730        }
2731
2732        // no more pending outbound connections
2733        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2734    }
2735
2736    #[tokio::test]
2737    async fn test_connect() {
2738        let peer = PeerId::random();
2739        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2740        let mut peers = PeersManager::default();
2741        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2742        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2743
2744        match event!(peers) {
2745            PeerAction::Connect { peer_id, remote_addr } => {
2746                assert_eq!(peer_id, peer);
2747                assert_eq!(remote_addr, socket_addr);
2748            }
2749            _ => unreachable!(),
2750        }
2751
2752        let (record, _) = peers.peer_by_id(peer).unwrap();
2753        assert_eq!(record.tcp_addr(), socket_addr);
2754        assert_eq!(record.udp_addr(), socket_addr);
2755
2756        // connect again
2757        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2758
2759        let (record, _) = peers.peer_by_id(peer).unwrap();
2760        assert_eq!(record.tcp_addr(), socket_addr);
2761        assert_eq!(record.udp_addr(), socket_addr);
2762    }
2763
2764    #[tokio::test]
2765    async fn test_incoming_connection_from_banned() {
2766        let peer = PeerId::random();
2767        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2768        let config = PeersConfig::test().with_max_inbound(3);
2769        let mut peers = PeersManager::new(config);
2770        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2771
2772        match event!(peers) {
2773            PeerAction::PeerAdded(peer_id) => {
2774                assert_eq!(peer_id, peer);
2775            }
2776            _ => unreachable!(),
2777        }
2778        match event!(peers) {
2779            PeerAction::Connect { peer_id, .. } => {
2780                assert_eq!(peer_id, peer);
2781            }
2782            _ => unreachable!(),
2783        }
2784
2785        poll_fn(|cx| {
2786            assert!(peers.poll(cx).is_pending());
2787            Poll::Ready(())
2788        })
2789        .await;
2790
2791        // simulate new connection drops with error
2792        loop {
2793            peers.on_active_session_dropped(
2794                &socket_addr,
2795                &peer,
2796                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2797                    reth_eth_wire::EthVersion::Eth68,
2798                    reth_eth_wire::EthMessageID::Status,
2799                )),
2800            );
2801
2802            if peers.peers.get(&peer).unwrap().is_banned() {
2803                break;
2804            }
2805
2806            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2807            peers.on_incoming_session_established(peer, socket_addr);
2808
2809            match event!(peers) {
2810                PeerAction::Connect { peer_id, .. } => {
2811                    assert_eq!(peer_id, peer);
2812                }
2813                _ => unreachable!(),
2814            }
2815        }
2816
2817        assert!(peers.peers.get(&peer).unwrap().is_banned());
2818
2819        // fill all incoming slots
2820        for _ in 0..peers.connection_info.config.max_inbound {
2821            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2822            peers.on_incoming_session_established(peer, socket_addr);
2823
2824            match event!(peers) {
2825                PeerAction::DisconnectBannedIncoming { peer_id } => {
2826                    assert_eq!(peer_id, peer);
2827                }
2828                _ => unreachable!(),
2829            }
2830        }
2831
2832        poll_fn(|cx| {
2833            assert!(peers.poll(cx).is_pending());
2834            Poll::Ready(())
2835        })
2836        .await;
2837
2838        assert_eq!(peers.connection_info.num_inbound, 0);
2839
2840        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2841
2842        // Assert we can still accept new connections
2843        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2844        assert_eq!(peers.connection_info.num_pending_in, 1);
2845
2846        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
2847        // connection info is updated via the peer's state which would be a noop here since the
2848        // banned peer's state is idle
2849        peers.on_active_session_gracefully_closed(peer);
2850        assert_eq!(peers.connection_info.num_inbound, 0);
2851    }
2852
2853    #[tokio::test]
2854    async fn test_add_pending_connect() {
2855        let peer = PeerId::random();
2856        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2857        let mut peers = PeersManager::default();
2858        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2859        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2860        assert_eq!(peers.connection_info.num_pending_out, 1);
2861    }
2862
2863    #[tokio::test]
2864    async fn test_dns_updates_peer_address() {
2865        let peer_id = PeerId::random();
2866        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2867        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2868
2869        let trusted = TrustedPeer {
2870            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2871            tcp_port: 8008,
2872            udp_port: 8008,
2873            id: peer_id,
2874        };
2875
2876        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2877        let mut manager = PeersManager::new(config);
2878        manager
2879            .trusted_peers_resolver
2880            .set_interval(tokio::time::interval(Duration::from_millis(1)));
2881
2882        manager.peers.insert(
2883            peer_id,
2884            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2885        );
2886
2887        for _ in 0..100 {
2888            let _ = event!(manager);
2889            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2890                break;
2891            }
2892            tokio::time::sleep(Duration::from_millis(10)).await;
2893        }
2894
2895        let updated_peer = manager.peers.get(&peer_id).unwrap();
2896        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2897    }
2898}