reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    is_connection_failed_reputation,
18    peers::{
19        config::PeerBackoffDurations,
20        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
21    },
22    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
23    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
24};
25use std::{
26    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
27    fmt::Display,
28    io::{self},
29    net::{IpAddr, SocketAddr},
30    task::{Context, Poll},
31    time::Duration,
32};
33use thiserror::Error;
34use tokio::{
35    sync::mpsc,
36    time::{Instant, Interval},
37};
38use tokio_stream::wrappers::UnboundedReceiverStream;
39use tracing::{trace, warn};
40
41/// Maintains the state of _all_ the peers known to the network.
42///
43/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
44/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
45///
46/// The [`PeersManager`] will be notified on peer related changes
47#[derive(Debug)]
48pub struct PeersManager {
49    /// All peers known to the network
50    peers: HashMap<PeerId, Peer>,
51    /// The set of trusted peer ids.
52    ///
53    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
54    /// an address: [`Self::add_trusted_peer_id`]
55    trusted_peer_ids: HashSet<PeerId>,
56    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
57    /// peer's address when the DNS records change.
58    trusted_peers_resolver: TrustedPeersResolver,
59    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
60    manager_tx: mpsc::UnboundedSender<PeerCommand>,
61    /// Receiver half of the command channel.
62    handle_rx: UnboundedReceiverStream<PeerCommand>,
63    /// Buffered actions until the manager is polled.
64    queued_actions: VecDeque<PeerAction>,
65    /// Interval for triggering connections if there are free slots.
66    refill_slots_interval: Interval,
67    /// How to weigh reputation changes
68    reputation_weights: ReputationChangeWeights,
69    /// Tracks current slot stats.
70    connection_info: ConnectionInfo,
71    /// Tracks unwanted ips/peer ids.
72    ban_list: BanList,
73    /// Tracks currently backed off peers.
74    backed_off_peers: HashMap<PeerId, std::time::Instant>,
75    /// Interval at which to check for peers to unban and release from the backoff map.
76    release_interval: Interval,
77    /// How long to ban bad peers.
78    ban_duration: Duration,
79    /// How long peers to which we could not connect for non-fatal reasons, e.g.
80    /// [`DisconnectReason::TooManyPeers`], are put in time out.
81    backoff_durations: PeerBackoffDurations,
82    /// If non-trusted peers should be connected to, or the connection from non-trusted
83    /// incoming peers should be accepted.
84    trusted_nodes_only: bool,
85    /// Timestamp of the last time [`Self::tick`] was called.
86    last_tick: Instant,
87    /// Maximum number of backoff attempts before we give up on a peer and dropping.
88    max_backoff_count: u8,
89    /// Tracks the connection state of the node
90    net_connection_state: NetworkConnectionState,
91    /// How long to temporarily ban ip on an incoming connection attempt.
92    incoming_ip_throttle_duration: Duration,
93}
94
95impl PeersManager {
96    /// Create a new instance with the given config
97    pub fn new(config: PeersConfig) -> Self {
98        let PeersConfig {
99            refill_slots_interval,
100            connection_info,
101            reputation_weights,
102            ban_list,
103            ban_duration,
104            backoff_durations,
105            trusted_nodes,
106            trusted_nodes_only,
107            trusted_nodes_resolution_interval,
108            basic_nodes,
109            max_backoff_count,
110            incoming_ip_throttle_duration,
111        } = config;
112        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
113        let now = Instant::now();
114
115        // We use half of the interval to decrease the max duration to `150%` in worst case
116        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
117
118        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
119        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
120
121        for trusted_peer in &trusted_nodes {
122            match trusted_peer.resolve_blocking() {
123                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
124                    trusted_peer_ids.insert(id);
125                    peers.entry(id).or_insert_with(|| {
126                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
127                    });
128                }
129                Err(err) => {
130                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
131                }
132            }
133        }
134
135        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
136            peers.entry(id).or_insert_with(|| {
137                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
138            });
139        }
140
141        Self {
142            peers,
143            trusted_peer_ids,
144            trusted_peers_resolver: TrustedPeersResolver::new(
145                trusted_nodes,
146                tokio::time::interval(trusted_nodes_resolution_interval), // 1 hour
147            ),
148            manager_tx,
149            handle_rx: UnboundedReceiverStream::new(handle_rx),
150            queued_actions: Default::default(),
151            reputation_weights,
152            refill_slots_interval: tokio::time::interval(refill_slots_interval),
153            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
154            connection_info: ConnectionInfo::new(connection_info),
155            ban_list,
156            backed_off_peers: Default::default(),
157            ban_duration,
158            backoff_durations,
159            trusted_nodes_only,
160            last_tick: Instant::now(),
161            max_backoff_count,
162            net_connection_state: NetworkConnectionState::default(),
163            incoming_ip_throttle_duration,
164        }
165    }
166
167    /// Returns a new [`PeersHandle`] that can send commands to this type.
168    pub(crate) fn handle(&self) -> PeersHandle {
169        PeersHandle::new(self.manager_tx.clone())
170    }
171
172    /// Returns the number of peers in the peer set
173    #[inline]
174    pub(crate) fn num_known_peers(&self) -> usize {
175        self.peers.len()
176    }
177
178    /// Returns an iterator over all peers
179    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
180        self.peers.iter().map(|(peer_id, v)| {
181            NodeRecord::new_with_ports(
182                v.addr.tcp().ip(),
183                v.addr.tcp().port(),
184                v.addr.udp().map(|addr| addr.port()),
185                *peer_id,
186            )
187        })
188    }
189
190    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
191    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
192        self.peers.get(&peer_id).map(|v| {
193            (
194                NodeRecord::new_with_ports(
195                    v.addr.tcp().ip(),
196                    v.addr.tcp().port(),
197                    v.addr.udp().map(|addr| addr.port()),
198                    peer_id,
199                ),
200                v.kind,
201            )
202        })
203    }
204
205    /// Returns an iterator over all peer ids for peers with the given kind
206    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
207        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
208    }
209
210    /// Returns the number of currently active inbound connections.
211    #[inline]
212    pub(crate) const fn num_inbound_connections(&self) -> usize {
213        self.connection_info.num_inbound
214    }
215
216    /// Returns the number of currently __active__ outbound connections.
217    #[inline]
218    pub(crate) const fn num_outbound_connections(&self) -> usize {
219        self.connection_info.num_outbound
220    }
221
222    /// Returns the number of currently pending outbound connections.
223    #[inline]
224    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
225        self.connection_info.num_pending_out
226    }
227
228    /// Returns the number of currently backed off peers.
229    #[inline]
230    pub(crate) fn num_backed_off_peers(&self) -> usize {
231        self.backed_off_peers.len()
232    }
233
234    /// Returns the number of idle trusted peers.
235    fn num_idle_trusted_peers(&self) -> usize {
236        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
237    }
238
239    /// Invoked when a new _incoming_ tcp connection is accepted.
240    ///
241    /// returns an error if the inbound ip address is on the ban list
242    pub(crate) fn on_incoming_pending_session(
243        &mut self,
244        addr: IpAddr,
245    ) -> Result<(), InboundConnectionError> {
246        if self.ban_list.is_banned_ip(&addr) {
247            return Err(InboundConnectionError::IpBanned)
248        }
249
250        // check if we even have slots for a new incoming connection
251        if !self.connection_info.has_in_capacity() {
252            if self.trusted_peer_ids.is_empty() {
253                // if we don't have any incoming slots and no trusted peers, we don't accept any new
254                // connections
255                return Err(InboundConnectionError::ExceedsCapacity)
256            }
257
258            // there's an edge case here where no incoming connections besides from trusted peers
259            // are allowed (max_inbound == 0), in which case we still need to allow new pending
260            // incoming connections until all trusted peers are connected.
261            let num_idle_trusted_peers = self.num_idle_trusted_peers();
262            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
263                // we still want to limit concurrent pending connections
264                let max_inbound =
265                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
266                if self.connection_info.num_pending_in < max_inbound {
267                    self.connection_info.inc_pending_in();
268                    return Ok(())
269                }
270            }
271
272            // all trusted peers are either connected or connecting
273            return Err(InboundConnectionError::ExceedsCapacity)
274        }
275
276        // also cap the incoming connections we can process at once
277        if !self.connection_info.has_in_pending_capacity() {
278            return Err(InboundConnectionError::ExceedsCapacity)
279        }
280
281        // apply the rate limit
282        self.throttle_incoming_ip(addr);
283
284        self.connection_info.inc_pending_in();
285        Ok(())
286    }
287
288    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
289    /// rejected.
290    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
291        self.connection_info.decr_pending_in();
292    }
293
294    /// Invoked when a pending session was closed.
295    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
296        self.connection_info.decr_pending_in()
297    }
298
299    /// Invoked when a pending session was closed.
300    pub(crate) fn on_incoming_pending_session_dropped(
301        &mut self,
302        remote_addr: SocketAddr,
303        err: &PendingSessionHandshakeError,
304    ) {
305        if err.is_fatal_protocol_error() {
306            self.ban_ip(remote_addr.ip());
307
308            if err.merits_discovery_ban() {
309                self.queued_actions
310                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
311            }
312        }
313
314        self.connection_info.decr_pending_in();
315    }
316
317    /// Called when a new _incoming_ active session was established to the given peer.
318    ///
319    /// This will update the state of the peer if not yet tracked.
320    ///
321    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
322    /// be scheduled.
323    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
324        self.connection_info.decr_pending_in();
325
326        // we only need to check the peer id here as the ip address will have been checked at
327        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
328        if self.ban_list.is_banned_peer(&peer_id) {
329            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
330            return
331        }
332
333        // check if the peer is trustable or not
334        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
335        if self.trusted_nodes_only && !is_trusted {
336            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
337            return
338        }
339
340        // start a new tick, so the peer is not immediately rewarded for the time since last tick
341        self.tick();
342
343        match self.peers.entry(peer_id) {
344            Entry::Occupied(mut entry) => {
345                let peer = entry.get_mut();
346                if peer.is_banned() {
347                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
348                    return
349                }
350                // it might be the case that we're also trying to connect to this peer at the same
351                // time, so we need to adjust the state here
352                if peer.state.is_pending_out() {
353                    self.connection_info.decr_state(peer.state);
354                }
355
356                peer.state = PeerConnectionState::In;
357
358                is_trusted = is_trusted || peer.is_trusted();
359            }
360            Entry::Vacant(entry) => {
361                // peer is missing in the table, we add it but mark it as to be removed after
362                // disconnect, because we only know the outgoing port
363                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
364                peer.remove_after_disconnect = true;
365                entry.insert(peer);
366                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
367            }
368        }
369
370        let has_in_capacity = self.connection_info.has_in_capacity();
371        // increment new incoming connection
372        self.connection_info.inc_in();
373
374        // disconnect the peer if we don't have capacity for more inbound connections
375        if !is_trusted && !has_in_capacity {
376            self.queued_actions.push_back(PeerAction::Disconnect {
377                peer_id,
378                reason: Some(DisconnectReason::TooManyPeers),
379            });
380        }
381    }
382
383    /// Bans the peer temporarily with the configured ban timeout
384    fn ban_peer(&mut self, peer_id: PeerId) {
385        let ban_duration = if let Some(peer) = self.peers.get(&peer_id) &&
386            (peer.is_trusted() || peer.is_static())
387        {
388            // For misbehaving trusted or static peers, we provide a bit more leeway when
389            // penalizing them.
390            self.backoff_durations.low / 2
391        } else {
392            self.ban_duration
393        };
394
395        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
396        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
397    }
398
399    /// Bans the IP temporarily with the configured ban timeout
400    fn ban_ip(&mut self, ip: IpAddr) {
401        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
402    }
403
404    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
405    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
406        self.ban_list
407            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
408    }
409
410    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
411    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
412        trace!(target: "net::peers", ?peer_id, "backing off");
413
414        if let Some(peer) = self.peers.get_mut(&peer_id) {
415            peer.backed_off = true;
416            self.backed_off_peers.insert(peer_id, until);
417        }
418    }
419
420    /// Unbans the peer
421    fn unban_peer(&mut self, peer_id: PeerId) {
422        self.ban_list.unban_peer(&peer_id);
423        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
424    }
425
426    /// Tick function to update reputation of all connected peers.
427    /// Peers are rewarded with reputation increases for the time they are connected since the last
428    /// tick. This is to prevent peers from being disconnected eventually due to slashed
429    /// reputation because of some bad messages (most likely transaction related)
430    fn tick(&mut self) {
431        let now = Instant::now();
432        // Determine the number of seconds since the last tick.
433        // Ensuring that now is always greater than last_tick to account for issues with system
434        // time.
435        let secs_since_last_tick =
436            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
437        self.last_tick = now;
438
439        // update reputation via seconds connected
440        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
441            // update reputation via seconds connected, but keep the target _around_ the default
442            // reputation.
443            if peer.1.reputation < DEFAULT_REPUTATION {
444                peer.1.reputation += secs_since_last_tick;
445            }
446        }
447    }
448
449    /// Returns the tracked reputation for a peer.
450    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
451        self.peers.get(peer_id).map(|peer| peer.reputation)
452    }
453
454    /// Apply the corresponding reputation change to the given peer.
455    ///
456    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
457    /// reputation changes that can be attributed to network conditions. If the peer is a
458    /// trusted peer, it will also be less strict with the reputation slashing.
459    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
460        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
461            // First check if we should reset the reputation
462            if rep.is_reset() {
463                peer.reset_reputation()
464            } else {
465                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
466                if peer.is_trusted() || peer.is_static() {
467                    // exempt trusted and static peers from reputation slashing for
468                    if matches!(
469                        rep,
470                        ReputationChangeKind::Dropped |
471                            ReputationChangeKind::BadAnnouncement |
472                            ReputationChangeKind::Timeout |
473                            ReputationChangeKind::AlreadySeenTransaction
474                    ) {
475                        return
476                    }
477
478                    // also be less strict with the reputation slashing for trusted peers
479                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
480                        // this caps the reputation change to the maximum allowed for trusted peers
481                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
482                    }
483                }
484                peer.apply_reputation(reputation_change, rep)
485            }
486        } else {
487            return
488        };
489
490        match outcome {
491            ReputationChangeOutcome::None => {}
492            ReputationChangeOutcome::Ban => {
493                self.ban_peer(*peer_id);
494            }
495            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
496            ReputationChangeOutcome::DisconnectAndBan => {
497                self.queued_actions.push_back(PeerAction::Disconnect {
498                    peer_id: *peer_id,
499                    reason: Some(DisconnectReason::DisconnectRequested),
500                });
501                self.ban_peer(*peer_id);
502            }
503        }
504    }
505
506    /// Gracefully disconnected a pending _outgoing_ session
507    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
508        if let Some(peer) = self.peers.get_mut(peer_id) {
509            self.connection_info.decr_state(peer.state);
510            peer.state = PeerConnectionState::Idle;
511        }
512    }
513
514    /// Invoked when an _outgoing_ pending session was closed during authentication or the
515    /// handshake.
516    pub(crate) fn on_outgoing_pending_session_dropped(
517        &mut self,
518        remote_addr: &SocketAddr,
519        peer_id: &PeerId,
520        err: &PendingSessionHandshakeError,
521    ) {
522        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
523    }
524
525    /// Gracefully disconnected an active session
526    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
527        match self.peers.entry(peer_id) {
528            Entry::Occupied(mut entry) => {
529                self.connection_info.decr_state(entry.get().state);
530
531                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
532                    // this peer should be removed from the set
533                    entry.remove();
534                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
535                } else {
536                    // reset the peer's state
537                    // we reset the backoff counter since we're able to establish a successful
538                    // session to that peer
539                    entry.get_mut().severe_backoff_counter = 0;
540                    entry.get_mut().state = PeerConnectionState::Idle;
541                    return
542                }
543            }
544            Entry::Vacant(_) => return,
545        }
546
547        self.fill_outbound_slots();
548    }
549
550    /// Called when a _pending_ outbound connection is successful.
551    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
552        if let Some(peer) = self.peers.get_mut(&peer_id) {
553            self.connection_info.decr_state(peer.state);
554            self.connection_info.inc_out();
555            peer.state = PeerConnectionState::Out;
556        }
557    }
558
559    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
560    ///
561    /// Depending on whether the error is fatal, the peer will be removed from the peer set
562    /// otherwise its reputation is slashed.
563    pub(crate) fn on_active_session_dropped(
564        &mut self,
565        remote_addr: &SocketAddr,
566        peer_id: &PeerId,
567        err: &EthStreamError,
568    ) {
569        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
570    }
571
572    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
573    /// connection.
574    pub(crate) fn on_outgoing_connection_failure(
575        &mut self,
576        remote_addr: &SocketAddr,
577        peer_id: &PeerId,
578        err: &io::Error,
579    ) {
580        // there's a race condition where we accepted an incoming connection while we were trying to
581        // connect to the same peer at the same time. if the outgoing connection failed
582        // after the incoming connection was accepted, we can ignore this error
583        if let Some(peer) = self.peers.get(peer_id) {
584            if peer.state.is_incoming() {
585                // we already have an active connection to the peer, so we can ignore this error
586                return
587            }
588
589            if peer.is_trusted() && is_connection_failed_reputation(peer.reputation) {
590                // trigger resolution task for trusted peer since multiple connection failures
591                // occurred
592                self.trusted_peers_resolver.interval.reset_immediately();
593            }
594        }
595
596        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
597    }
598
599    fn on_connection_failure(
600        &mut self,
601        remote_addr: &SocketAddr,
602        peer_id: &PeerId,
603        err: impl SessionError,
604        reputation_change: ReputationChangeKind,
605    ) {
606        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
607
608        if err.is_fatal_protocol_error() {
609            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
610            // remove the peer to which we can't establish a connection due to protocol related
611            // issues.
612            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
613                self.connection_info.decr_state(entry.get().state);
614                // only remove if the peer is not trusted
615                if entry.get().is_trusted() {
616                    entry.get_mut().state = PeerConnectionState::Idle;
617                } else {
618                    entry.remove();
619                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
620                    // If the error is caused by a peer that should be banned from discovery
621                    if err.merits_discovery_ban() {
622                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
623                            peer_id: *peer_id,
624                            ip_addr: remote_addr.ip(),
625                        })
626                    }
627                }
628            }
629
630            // ban the peer
631            self.ban_peer(*peer_id);
632        } else {
633            let mut backoff_until = None;
634            let mut remove_peer = false;
635
636            if let Some(peer) = self.peers.get_mut(peer_id) {
637                if let Some(kind) = err.should_backoff() {
638                    if peer.is_trusted() || peer.is_static() {
639                        // provide a bit more leeway for trusted peers and use a lower backoff so
640                        // that we keep re-trying them after backing off shortly, but we should at
641                        // least backoff for the low duration to not violate the ip based inbound
642                        // connection throttle that peer has in place, because this peer might not
643                        // have us registered as a trusted peer.
644                        let backoff = self.backoff_durations.low;
645                        backoff_until = Some(std::time::Instant::now() + backoff);
646                    } else {
647                        // Increment peer.backoff_counter
648                        if kind.is_severe() {
649                            peer.severe_backoff_counter =
650                                peer.severe_backoff_counter.saturating_add(1);
651                        }
652
653                        let backoff_time =
654                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
655
656                        // The peer has signaled that it is currently unable to process any more
657                        // connections, so we will hold off on attempting any new connections for a
658                        // while
659                        backoff_until = Some(backoff_time);
660                    }
661                } else {
662                    // If the error was not a backoff error, we reduce the peer's reputation
663                    let reputation_change = self.reputation_weights.change(reputation_change);
664                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
665                };
666
667                self.connection_info.decr_state(peer.state);
668                peer.state = PeerConnectionState::Idle;
669
670                if peer.severe_backoff_counter > self.max_backoff_count &&
671                    !peer.is_trusted() &&
672                    !peer.is_static()
673                {
674                    // mark peer for removal if it has been backoff too many times and is _not_
675                    // trusted or static
676                    remove_peer = true;
677                }
678            }
679
680            // remove peer if it has been marked for removal
681            if remove_peer {
682                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
683                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
684            } else if let Some(backoff_until) = backoff_until {
685                // otherwise, backoff the peer if marked as such
686                self.backoff_peer_until(*peer_id, backoff_until);
687            }
688        }
689
690        self.fill_outbound_slots();
691    }
692
693    /// Invoked if a pending session was disconnected because there's already a connection to the
694    /// peer.
695    ///
696    /// If the session was an outgoing connection, this means that the peer initiated a connection
697    /// to us at the same time and this connection is already established.
698    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
699        match direction {
700            Direction::Incoming => {
701                // need to decrement the ingoing counter
702                self.connection_info.decr_pending_in();
703            }
704            Direction::Outgoing(_) => {
705                // cleanup is handled when the incoming active session is activated in
706                // `on_incoming_session_established`
707            }
708        }
709    }
710
711    /// Called as follow-up for a discovered peer.
712    ///
713    /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
714    /// protocol
715    pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
716        if let Some(peer) = self.peers.get_mut(&peer_id) {
717            trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
718            peer.fork_id = Some(fork_id);
719        }
720    }
721
722    /// Called for a newly discovered peer.
723    ///
724    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
725    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
726        self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
727    }
728
729    /// Marks the given peer as trusted.
730    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
731        self.trusted_peer_ids.insert(peer_id);
732    }
733
734    /// Called for a newly discovered trusted peer.
735    ///
736    /// If the peer already exists, then the address and kind will be updated.
737    #[cfg_attr(not(test), expect(dead_code))]
738    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
739        self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
740    }
741
742    /// Called for a newly discovered peer.
743    ///
744    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
745    pub(crate) fn add_peer_kind(
746        &mut self,
747        peer_id: PeerId,
748        kind: PeerKind,
749        addr: PeerAddr,
750        fork_id: Option<ForkId>,
751    ) {
752        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
753            return
754        }
755
756        match self.peers.entry(peer_id) {
757            Entry::Occupied(mut entry) => {
758                let peer = entry.get_mut();
759                peer.kind = kind;
760                peer.fork_id = fork_id;
761                peer.addr = addr;
762
763                if peer.state.is_incoming() {
764                    // now that we have an actual discovered address, for that peer and not just the
765                    // ip of the incoming connection, we don't need to remove the peer after
766                    // disconnecting, See `on_incoming_session_established`
767                    peer.remove_after_disconnect = false;
768                }
769            }
770            Entry::Vacant(entry) => {
771                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
772                let mut peer = Peer::with_kind(addr, kind);
773                peer.fork_id = fork_id;
774                entry.insert(peer);
775                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
776            }
777        }
778
779        if kind.is_trusted() {
780            self.trusted_peer_ids.insert(peer_id);
781        }
782    }
783
784    /// Removes the tracked node from the set.
785    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
786        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
787        if entry.get().is_trusted() {
788            return
789        }
790        let mut peer = entry.remove();
791
792        trace!(target: "net::peers", ?peer_id, "remove discovered node");
793        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
794
795        if peer.state.is_connected() {
796            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
797            // we terminate the active session here, but only remove the peer after the session
798            // was disconnected, this prevents the case where the session is scheduled for
799            // disconnect but the node is immediately rediscovered, See also
800            // [`Self::on_disconnected()`]
801            peer.remove_after_disconnect = true;
802            peer.state.disconnect();
803            self.peers.insert(peer_id, peer);
804            self.queued_actions.push_back(PeerAction::Disconnect {
805                peer_id,
806                reason: Some(DisconnectReason::DisconnectRequested),
807            })
808        }
809    }
810
811    /// Connect to the given peer. NOTE: if the maximum number of outbound sessions is reached,
812    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
813    #[cfg_attr(not(test), expect(dead_code))]
814    pub(crate) fn add_and_connect(
815        &mut self,
816        peer_id: PeerId,
817        addr: PeerAddr,
818        fork_id: Option<ForkId>,
819    ) {
820        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
821    }
822
823    /// Connects a peer and its address with the given kind.
824    ///
825    /// Note: This is invoked on demand via an external command received by the manager
826    pub(crate) fn add_and_connect_kind(
827        &mut self,
828        peer_id: PeerId,
829        kind: PeerKind,
830        addr: PeerAddr,
831        fork_id: Option<ForkId>,
832    ) {
833        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
834            return
835        }
836
837        match self.peers.entry(peer_id) {
838            Entry::Occupied(mut entry) => {
839                let peer = entry.get_mut();
840                peer.kind = kind;
841                peer.fork_id = fork_id;
842                peer.addr = addr;
843
844                if peer.state == PeerConnectionState::Idle {
845                    // Try connecting again.
846                    peer.state = PeerConnectionState::PendingOut;
847                    self.connection_info.inc_pending_out();
848                    self.queued_actions
849                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
850                }
851            }
852            Entry::Vacant(entry) => {
853                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
854                let mut peer = Peer::with_kind(addr, kind);
855                peer.state = PeerConnectionState::PendingOut;
856                peer.fork_id = fork_id;
857                entry.insert(peer);
858                self.connection_info.inc_pending_out();
859                self.queued_actions
860                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
861            }
862        }
863
864        if kind.is_trusted() {
865            self.trusted_peer_ids.insert(peer_id);
866        }
867    }
868
869    /// Removes the tracked node from the trusted set.
870    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
871        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
872        if !entry.get().is_trusted() {
873            return
874        }
875
876        let peer = entry.get_mut();
877        peer.kind = PeerKind::Basic;
878
879        self.trusted_peer_ids.remove(&peer_id);
880    }
881
882    /// Returns the idle peer with the highest reputation.
883    ///
884    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
885    /// not currently marked as banned or backed off.
886    ///
887    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
888    /// `trusted` peers.
889    ///
890    /// Returns `None` if no peer is available.
891    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
892        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
893            !peer.is_backed_off() &&
894                !peer.is_banned() &&
895                peer.state.is_unconnected() &&
896                (!self.trusted_nodes_only || peer.is_trusted())
897        });
898
899        // keep track of the best peer, if there's one
900        let mut best_peer = unconnected.next()?;
901
902        if best_peer.1.is_trusted() || best_peer.1.is_static() {
903            return Some((*best_peer.0, best_peer.1))
904        }
905
906        for maybe_better in unconnected {
907            // if the peer is trusted or static, return it immediately
908            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
909                return Some((*maybe_better.0, maybe_better.1))
910            }
911
912            // otherwise we keep track of the best peer using the reputation
913            if maybe_better.1.reputation > best_peer.1.reputation {
914                best_peer = maybe_better;
915            }
916        }
917        Some((*best_peer.0, best_peer.1))
918    }
919
920    /// If there's capacity for new outbound connections, this will queue new
921    /// [`PeerAction::Connect`] actions.
922    ///
923    /// New connections are only initiated, if slots are available and appropriate peers are
924    /// available.
925    fn fill_outbound_slots(&mut self) {
926        self.tick();
927
928        if !self.net_connection_state.is_active() {
929            // nothing to fill
930            return
931        }
932
933        // as long as there are slots available fill them with the best peers
934        while self.connection_info.has_out_capacity() {
935            let action = {
936                let (peer_id, peer) = match self.best_unconnected() {
937                    Some(peer) => peer,
938                    _ => break,
939                };
940
941                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
942
943                peer.state = PeerConnectionState::PendingOut;
944                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
945            };
946
947            self.connection_info.inc_pending_out();
948
949            self.queued_actions.push_back(action);
950        }
951    }
952
953    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
954        if let Some(peer) = self.peers.get_mut(&peer_id) {
955            let new_addr = PeerAddr::new_with_ports(
956                new_record.address,
957                new_record.tcp_port,
958                Some(new_record.udp_port),
959            );
960
961            if peer.addr != new_addr {
962                peer.addr = new_addr;
963                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "Updated resolved trusted peer address");
964            }
965        }
966    }
967
968    /// Keeps track of network state changes.
969    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
970        self.net_connection_state = state;
971    }
972
973    /// Returns the current network connection state.
974    pub const fn connection_state(&self) -> &NetworkConnectionState {
975        &self.net_connection_state
976    }
977
978    /// Sets `net_connection_state` to `ShuttingDown`.
979    pub const fn on_shutdown(&mut self) {
980        self.net_connection_state = NetworkConnectionState::ShuttingDown;
981    }
982
983    /// Advances the state.
984    ///
985    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
986    /// [`PeersManager`] is polled.
987    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
988        loop {
989            // drain buffered actions
990            if let Some(action) = self.queued_actions.pop_front() {
991                return Poll::Ready(action)
992            }
993
994            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
995                match cmd {
996                    PeerCommand::Add(peer_id, addr) => {
997                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
998                    }
999                    PeerCommand::Remove(peer) => self.remove_peer(peer),
1000                    PeerCommand::ReputationChange(peer_id, rep) => {
1001                        self.apply_reputation_change(&peer_id, rep)
1002                    }
1003                    PeerCommand::GetPeer(peer, tx) => {
1004                        let _ = tx.send(self.peers.get(&peer).cloned());
1005                    }
1006                    PeerCommand::GetPeers(tx) => {
1007                        let _ = tx.send(self.iter_peers().collect());
1008                    }
1009                }
1010            }
1011
1012            if self.release_interval.poll_tick(cx).is_ready() {
1013                let now = std::time::Instant::now();
1014                let (_, unbanned_peers) = self.ban_list.evict(now);
1015
1016                for peer_id in unbanned_peers {
1017                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1018                        peer.unban();
1019                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1020                    }
1021                }
1022
1023                // clear the backoff list of expired backoffs, and mark the relevant peers as
1024                // ready to be dialed
1025                self.backed_off_peers.retain(|peer_id, until| {
1026                    if now > *until {
1027                        if let Some(peer) = self.peers.get_mut(peer_id) {
1028                            peer.backed_off = false;
1029                        }
1030                        return false
1031                    }
1032                    true
1033                })
1034            }
1035
1036            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1037                self.fill_outbound_slots();
1038            }
1039
1040            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1041                self.on_resolved_peer(peer_id, new_record);
1042            }
1043
1044            if self.queued_actions.is_empty() {
1045                return Poll::Pending
1046            }
1047        }
1048    }
1049}
1050
1051impl Default for PeersManager {
1052    fn default() -> Self {
1053        Self::new(Default::default())
1054    }
1055}
1056
1057/// Tracks stats about connected nodes
1058#[derive(Debug, Clone, PartialEq, Eq, Default)]
1059pub struct ConnectionInfo {
1060    /// Counter for currently occupied slots for active outbound connections.
1061    num_outbound: usize,
1062    /// Counter for pending outbound connections.
1063    num_pending_out: usize,
1064    /// Counter for currently occupied slots for active inbound connections.
1065    num_inbound: usize,
1066    /// Counter for pending inbound connections.
1067    num_pending_in: usize,
1068    /// Restrictions on number of connections.
1069    config: ConnectionsConfig,
1070}
1071
1072// === impl ConnectionInfo ===
1073
1074impl ConnectionInfo {
1075    /// Returns a new [`ConnectionInfo`] with the given config.
1076    const fn new(config: ConnectionsConfig) -> Self {
1077        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1078    }
1079
1080    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1081    const fn has_out_capacity(&self) -> bool {
1082        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1083            self.num_outbound < self.config.max_outbound
1084    }
1085
1086    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1087    const fn has_in_capacity(&self) -> bool {
1088        self.num_inbound < self.config.max_inbound
1089    }
1090
1091    /// Returns `true` if we can handle an additional incoming pending connection.
1092    const fn has_in_pending_capacity(&self) -> bool {
1093        self.num_pending_in < self.config.max_inbound
1094    }
1095
1096    const fn decr_state(&mut self, state: PeerConnectionState) {
1097        match state {
1098            PeerConnectionState::Idle => {}
1099            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1100            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1101            PeerConnectionState::PendingOut => self.decr_pending_out(),
1102        }
1103    }
1104
1105    const fn decr_out(&mut self) {
1106        self.num_outbound -= 1;
1107    }
1108
1109    const fn inc_out(&mut self) {
1110        self.num_outbound += 1;
1111    }
1112
1113    const fn inc_pending_out(&mut self) {
1114        self.num_pending_out += 1;
1115    }
1116
1117    const fn inc_in(&mut self) {
1118        self.num_inbound += 1;
1119    }
1120
1121    const fn inc_pending_in(&mut self) {
1122        self.num_pending_in += 1;
1123    }
1124
1125    const fn decr_in(&mut self) {
1126        self.num_inbound -= 1;
1127    }
1128
1129    const fn decr_pending_out(&mut self) {
1130        self.num_pending_out -= 1;
1131    }
1132
1133    const fn decr_pending_in(&mut self) {
1134        self.num_pending_in -= 1;
1135    }
1136}
1137
1138/// Actions the peer manager can trigger.
1139#[derive(Debug)]
1140pub enum PeerAction {
1141    /// Start a new connection to a peer.
1142    Connect {
1143        /// The peer to connect to.
1144        peer_id: PeerId,
1145        /// Where to reach the node
1146        remote_addr: SocketAddr,
1147    },
1148    /// Disconnect an existing connection.
1149    Disconnect {
1150        /// The peer ID of the established connection.
1151        peer_id: PeerId,
1152        /// An optional reason for the disconnect.
1153        reason: Option<DisconnectReason>,
1154    },
1155    /// Disconnect an existing incoming connection, because the peers reputation is below the
1156    /// banned threshold or is on the [`BanList`]
1157    DisconnectBannedIncoming {
1158        /// The peer ID of the established connection.
1159        peer_id: PeerId,
1160    },
1161    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1162    DisconnectUntrustedIncoming {
1163        /// The peer ID.
1164        peer_id: PeerId,
1165    },
1166    /// Ban the peer in discovery.
1167    DiscoveryBanPeerId {
1168        /// The peer ID.
1169        peer_id: PeerId,
1170        /// The IP address.
1171        ip_addr: IpAddr,
1172    },
1173    /// Ban the IP in discovery.
1174    DiscoveryBanIp {
1175        /// The IP address.
1176        ip_addr: IpAddr,
1177    },
1178    /// Ban the peer temporarily
1179    BanPeer {
1180        /// The peer ID.
1181        peer_id: PeerId,
1182    },
1183    /// Unban the peer temporarily
1184    UnBanPeer {
1185        /// The peer ID.
1186        peer_id: PeerId,
1187    },
1188    /// Emit peerAdded event
1189    PeerAdded(PeerId),
1190    /// Emit peerRemoved event
1191    PeerRemoved(PeerId),
1192}
1193
1194/// Error thrown when a incoming connection is rejected right away
1195#[derive(Debug, Error, PartialEq, Eq)]
1196pub enum InboundConnectionError {
1197    /// The remote's ip address is banned
1198    IpBanned,
1199    /// No capacity for new inbound connections
1200    ExceedsCapacity,
1201}
1202
1203impl Display for InboundConnectionError {
1204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1205        write!(f, "{self:?}")
1206    }
1207}
1208
1209#[cfg(test)]
1210mod tests {
1211    use alloy_primitives::B512;
1212    use reth_eth_wire::{
1213        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1214        DisconnectReason,
1215    };
1216    use reth_net_banlist::BanList;
1217    use reth_network_api::Direction;
1218    use reth_network_peers::{PeerId, TrustedPeer};
1219    use reth_network_types::{
1220        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1221    };
1222    use std::{
1223        future::{poll_fn, Future},
1224        io,
1225        net::{IpAddr, Ipv4Addr, SocketAddr},
1226        pin::Pin,
1227        task::{Context, Poll},
1228        time::Duration,
1229    };
1230    use url::Host;
1231
1232    use super::PeersManager;
1233    use crate::{
1234        error::SessionError,
1235        peers::{
1236            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1237            PeerConnectionState,
1238        },
1239        session::PendingSessionHandshakeError,
1240        PeersConfig,
1241    };
1242
1243    struct PeerActionFuture<'a> {
1244        peers: &'a mut PeersManager,
1245    }
1246
1247    impl Future for PeerActionFuture<'_> {
1248        type Output = PeerAction;
1249
1250        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1251            self.get_mut().peers.poll(cx)
1252        }
1253    }
1254
1255    macro_rules! event {
1256        ($peers:expr) => {
1257            PeerActionFuture { peers: &mut $peers }.await
1258        };
1259    }
1260
1261    #[tokio::test]
1262    async fn test_insert() {
1263        let peer = PeerId::random();
1264        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1265        let mut peers = PeersManager::default();
1266        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1267
1268        match event!(peers) {
1269            PeerAction::PeerAdded(peer_id) => {
1270                assert_eq!(peer_id, peer);
1271            }
1272            _ => unreachable!(),
1273        }
1274        match event!(peers) {
1275            PeerAction::Connect { peer_id, remote_addr } => {
1276                assert_eq!(peer_id, peer);
1277                assert_eq!(remote_addr, socket_addr);
1278            }
1279            _ => unreachable!(),
1280        }
1281
1282        let (record, _) = peers.peer_by_id(peer).unwrap();
1283        assert_eq!(record.tcp_addr(), socket_addr);
1284        assert_eq!(record.udp_addr(), socket_addr);
1285    }
1286
1287    #[tokio::test]
1288    async fn test_insert_udp() {
1289        let peer = PeerId::random();
1290        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1291        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1292        let mut peers = PeersManager::default();
1293        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1294
1295        match event!(peers) {
1296            PeerAction::PeerAdded(peer_id) => {
1297                assert_eq!(peer_id, peer);
1298            }
1299            _ => unreachable!(),
1300        }
1301        match event!(peers) {
1302            PeerAction::Connect { peer_id, remote_addr } => {
1303                assert_eq!(peer_id, peer);
1304                assert_eq!(remote_addr, tcp_addr);
1305            }
1306            _ => unreachable!(),
1307        }
1308
1309        let (record, _) = peers.peer_by_id(peer).unwrap();
1310        assert_eq!(record.tcp_addr(), tcp_addr);
1311        assert_eq!(record.udp_addr(), udp_addr);
1312    }
1313
1314    #[tokio::test]
1315    async fn test_ban() {
1316        let peer = PeerId::random();
1317        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1318        let mut peers = PeersManager::default();
1319        peers.ban_peer(peer);
1320        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1321
1322        match event!(peers) {
1323            PeerAction::BanPeer { peer_id } => {
1324                assert_eq!(peer_id, peer);
1325            }
1326            _ => unreachable!(),
1327        }
1328
1329        poll_fn(|cx| {
1330            assert!(peers.poll(cx).is_pending());
1331            Poll::Ready(())
1332        })
1333        .await;
1334    }
1335
1336    #[tokio::test]
1337    async fn test_unban() {
1338        let peer = PeerId::random();
1339        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1340        let mut peers = PeersManager::default();
1341        peers.ban_peer(peer);
1342        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1343
1344        match event!(peers) {
1345            PeerAction::BanPeer { peer_id } => {
1346                assert_eq!(peer_id, peer);
1347            }
1348            _ => unreachable!(),
1349        }
1350
1351        poll_fn(|cx| {
1352            assert!(peers.poll(cx).is_pending());
1353            Poll::Ready(())
1354        })
1355        .await;
1356
1357        peers.unban_peer(peer);
1358
1359        match event!(peers) {
1360            PeerAction::UnBanPeer { peer_id } => {
1361                assert_eq!(peer_id, peer);
1362            }
1363            _ => unreachable!(),
1364        }
1365
1366        poll_fn(|cx| {
1367            assert!(peers.poll(cx).is_pending());
1368            Poll::Ready(())
1369        })
1370        .await;
1371    }
1372
1373    #[tokio::test]
1374    async fn test_backoff_on_busy() {
1375        let peer = PeerId::random();
1376        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1377
1378        let mut peers = PeersManager::new(PeersConfig::test());
1379        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1380
1381        match event!(peers) {
1382            PeerAction::PeerAdded(peer_id) => {
1383                assert_eq!(peer_id, peer);
1384            }
1385            _ => unreachable!(),
1386        }
1387        match event!(peers) {
1388            PeerAction::Connect { peer_id, .. } => {
1389                assert_eq!(peer_id, peer);
1390            }
1391            _ => unreachable!(),
1392        }
1393
1394        poll_fn(|cx| {
1395            assert!(peers.poll(cx).is_pending());
1396            Poll::Ready(())
1397        })
1398        .await;
1399
1400        peers.on_active_session_dropped(
1401            &socket_addr,
1402            &peer,
1403            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1404                DisconnectReason::TooManyPeers,
1405            )),
1406        );
1407
1408        poll_fn(|cx| {
1409            assert!(peers.poll(cx).is_pending());
1410            Poll::Ready(())
1411        })
1412        .await;
1413
1414        assert!(peers.backed_off_peers.contains_key(&peer));
1415        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1416
1417        tokio::time::sleep(peers.backoff_durations.low).await;
1418
1419        match event!(peers) {
1420            PeerAction::Connect { peer_id, .. } => {
1421                assert_eq!(peer_id, peer);
1422            }
1423            _ => unreachable!(),
1424        }
1425
1426        assert!(!peers.backed_off_peers.contains_key(&peer));
1427        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1428    }
1429
1430    #[tokio::test]
1431    async fn test_backoff_on_no_response() {
1432        let peer = PeerId::random();
1433        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1434
1435        let backoff_durations = PeerBackoffDurations::test();
1436        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1437        let mut peers = PeersManager::new(config);
1438        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1439
1440        match event!(peers) {
1441            PeerAction::PeerAdded(peer_id) => {
1442                assert_eq!(peer_id, peer);
1443            }
1444            _ => unreachable!(),
1445        }
1446        match event!(peers) {
1447            PeerAction::Connect { peer_id, .. } => {
1448                assert_eq!(peer_id, peer);
1449            }
1450            _ => unreachable!(),
1451        }
1452
1453        poll_fn(|cx| {
1454            assert!(peers.poll(cx).is_pending());
1455            Poll::Ready(())
1456        })
1457        .await;
1458
1459        peers.on_outgoing_pending_session_dropped(
1460            &socket_addr,
1461            &peer,
1462            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1463                EthHandshakeError::NoResponse,
1464            )),
1465        );
1466
1467        poll_fn(|cx| {
1468            assert!(peers.poll(cx).is_pending());
1469            Poll::Ready(())
1470        })
1471        .await;
1472
1473        assert!(peers.backed_off_peers.contains_key(&peer));
1474        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1475
1476        tokio::time::sleep(backoff_durations.high).await;
1477
1478        match event!(peers) {
1479            PeerAction::Connect { peer_id, .. } => {
1480                assert_eq!(peer_id, peer);
1481            }
1482            _ => unreachable!(),
1483        }
1484
1485        assert!(!peers.backed_off_peers.contains_key(&peer));
1486        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1487    }
1488
1489    #[tokio::test]
1490    async fn test_low_backoff() {
1491        let peer = PeerId::random();
1492        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1493        let config = PeersConfig::test();
1494        let mut peers = PeersManager::new(config);
1495        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1496        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1497
1498        let backoff_timestamp = peers
1499            .backoff_durations
1500            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1501
1502        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1503        assert!(backoff_timestamp <= expected);
1504    }
1505
1506    #[tokio::test]
1507    async fn test_multiple_backoff_calculations() {
1508        let peer = PeerId::random();
1509        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1510        let config = PeersConfig::default();
1511        let mut peers = PeersManager::new(config);
1512        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1513        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1514
1515        // Simulate a peer that was already backed off once
1516        peer_struct.severe_backoff_counter = 1;
1517
1518        let now = std::time::Instant::now();
1519
1520        // Simulate the increment that happens in on_connection_failure
1521        peer_struct.severe_backoff_counter += 1;
1522        // Get official backoff time
1523        let backoff_time = peers
1524            .backoff_durations
1525            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1526
1527        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1528        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1529
1530        // We can't use assert_eq! since there is a very small diff in the nano secs
1531        // Usually it is 1800s != 1799.9999996s
1532        assert!(backoff_time.duration_since(now) > backoff_duration);
1533    }
1534
1535    #[tokio::test]
1536    async fn test_ban_on_active_drop() {
1537        let peer = PeerId::random();
1538        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1539        let mut peers = PeersManager::default();
1540        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1541
1542        match event!(peers) {
1543            PeerAction::PeerAdded(peer_id) => {
1544                assert_eq!(peer_id, peer);
1545            }
1546            _ => unreachable!(),
1547        }
1548        match event!(peers) {
1549            PeerAction::Connect { peer_id, .. } => {
1550                assert_eq!(peer_id, peer);
1551            }
1552            _ => unreachable!(),
1553        }
1554
1555        poll_fn(|cx| {
1556            assert!(peers.poll(cx).is_pending());
1557            Poll::Ready(())
1558        })
1559        .await;
1560
1561        peers.on_active_session_dropped(
1562            &socket_addr,
1563            &peer,
1564            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1565                DisconnectReason::UselessPeer,
1566            )),
1567        );
1568
1569        match event!(peers) {
1570            PeerAction::PeerRemoved(peer_id) => {
1571                assert_eq!(peer_id, peer);
1572            }
1573            _ => unreachable!(),
1574        }
1575        match event!(peers) {
1576            PeerAction::BanPeer { peer_id } => {
1577                assert_eq!(peer_id, peer);
1578            }
1579            _ => unreachable!(),
1580        }
1581
1582        poll_fn(|cx| {
1583            assert!(peers.poll(cx).is_pending());
1584            Poll::Ready(())
1585        })
1586        .await;
1587
1588        assert!(!peers.peers.contains_key(&peer));
1589    }
1590
1591    #[tokio::test]
1592    async fn test_remove_on_max_backoff_count() {
1593        let peer = PeerId::random();
1594        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1595        let config = PeersConfig::test();
1596        let mut peers = PeersManager::new(config.clone());
1597        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1598        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1599
1600        // Simulate a peer that was already backed off once
1601        peer_struct.severe_backoff_counter = config.max_backoff_count;
1602
1603        match event!(peers) {
1604            PeerAction::PeerAdded(peer_id) => {
1605                assert_eq!(peer_id, peer);
1606            }
1607            _ => unreachable!(),
1608        }
1609        match event!(peers) {
1610            PeerAction::Connect { peer_id, .. } => {
1611                assert_eq!(peer_id, peer);
1612            }
1613            _ => unreachable!(),
1614        }
1615
1616        poll_fn(|cx| {
1617            assert!(peers.poll(cx).is_pending());
1618            Poll::Ready(())
1619        })
1620        .await;
1621
1622        peers.on_outgoing_pending_session_dropped(
1623            &socket_addr,
1624            &peer,
1625            &PendingSessionHandshakeError::Eth(
1626                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1627            ),
1628        );
1629
1630        match event!(peers) {
1631            PeerAction::PeerRemoved(peer_id) => {
1632                assert_eq!(peer_id, peer);
1633            }
1634            _ => unreachable!(),
1635        }
1636
1637        poll_fn(|cx| {
1638            assert!(peers.poll(cx).is_pending());
1639            Poll::Ready(())
1640        })
1641        .await;
1642
1643        assert!(!peers.peers.contains_key(&peer));
1644    }
1645
1646    #[tokio::test]
1647    async fn test_ban_on_pending_drop() {
1648        let peer = PeerId::random();
1649        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1650        let mut peers = PeersManager::default();
1651        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1652
1653        match event!(peers) {
1654            PeerAction::PeerAdded(peer_id) => {
1655                assert_eq!(peer_id, peer);
1656            }
1657            _ => unreachable!(),
1658        }
1659        match event!(peers) {
1660            PeerAction::Connect { peer_id, .. } => {
1661                assert_eq!(peer_id, peer);
1662            }
1663            _ => unreachable!(),
1664        }
1665
1666        poll_fn(|cx| {
1667            assert!(peers.poll(cx).is_pending());
1668            Poll::Ready(())
1669        })
1670        .await;
1671
1672        peers.on_outgoing_pending_session_dropped(
1673            &socket_addr,
1674            &peer,
1675            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1676                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1677            )),
1678        );
1679
1680        match event!(peers) {
1681            PeerAction::PeerRemoved(peer_id) => {
1682                assert_eq!(peer_id, peer);
1683            }
1684            _ => unreachable!(),
1685        }
1686        match event!(peers) {
1687            PeerAction::BanPeer { peer_id } => {
1688                assert_eq!(peer_id, peer);
1689            }
1690            _ => unreachable!(),
1691        }
1692
1693        poll_fn(|cx| {
1694            assert!(peers.poll(cx).is_pending());
1695            Poll::Ready(())
1696        })
1697        .await;
1698
1699        assert!(!peers.peers.contains_key(&peer));
1700    }
1701
1702    #[tokio::test]
1703    async fn test_internally_closed_incoming() {
1704        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1705        let mut peers = PeersManager::default();
1706
1707        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1708        assert_eq!(peers.connection_info.num_pending_in, 1);
1709        peers.on_incoming_pending_session_rejected_internally();
1710        assert_eq!(peers.connection_info.num_pending_in, 0);
1711    }
1712
1713    #[tokio::test]
1714    async fn test_reject_incoming_at_pending_capacity() {
1715        let mut peers = PeersManager::default();
1716
1717        for count in 1..=peers.connection_info.config.max_inbound {
1718            let socket_addr =
1719                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1720            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1721            assert_eq!(peers.connection_info.num_pending_in, count);
1722        }
1723        assert!(peers.connection_info.has_in_capacity());
1724        assert!(!peers.connection_info.has_in_pending_capacity());
1725
1726        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1727        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1728    }
1729
1730    #[tokio::test]
1731    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1732        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1733        let trusted = PeerId::random();
1734        peers.add_trusted_peer_id(trusted);
1735
1736        // connect the trusted peer
1737        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1738        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1739        peers.on_incoming_session_established(trusted, addr);
1740
1741        match event!(peers) {
1742            PeerAction::PeerAdded(id) => {
1743                assert_eq!(id, trusted);
1744            }
1745            _ => unreachable!(),
1746        }
1747
1748        // saturate the remaining inbound slots with untrusted peers
1749        let mut connected_untrusted_peer_ids = Vec::new();
1750        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1751            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1752            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1753            let peer_id = PeerId::random();
1754            peers.on_incoming_session_established(peer_id, addr);
1755            connected_untrusted_peer_ids.push(peer_id);
1756
1757            match event!(peers) {
1758                PeerAction::PeerAdded(id) => {
1759                    assert_eq!(id, peer_id);
1760                }
1761                _ => unreachable!(),
1762            }
1763        }
1764
1765        let mut pending_addrs = Vec::new();
1766
1767        // saturate available slots
1768        for i in 0..2 {
1769            let socket_addr =
1770                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1771            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1772
1773            pending_addrs.push(socket_addr);
1774        }
1775
1776        assert_eq!(peers.connection_info.num_pending_in, 2);
1777
1778        // try to handle additional incoming connections at capacity
1779        for i in 0..2 {
1780            let socket_addr =
1781                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1782            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1783        }
1784
1785        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1786            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1787                DisconnectReason::UselessPeer,
1788            )),
1789        ));
1790
1791        // Remove all pending peers
1792        for pending_addr in pending_addrs {
1793            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1794        }
1795
1796        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1797
1798        println!(
1799            "num_inbound: {}, has_in_capacity: {}",
1800            peers.connection_info.num_inbound,
1801            peers.connection_info.has_in_capacity()
1802        );
1803
1804        // disconnect a connected peer
1805        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1806
1807        println!(
1808            "num_inbound: {}, has_in_capacity: {}",
1809            peers.connection_info.num_inbound,
1810            peers.connection_info.has_in_capacity()
1811        );
1812
1813        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1814        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1815    }
1816
1817    #[tokio::test]
1818    async fn test_closed_incoming() {
1819        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1820        let mut peers = PeersManager::default();
1821
1822        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1823        assert_eq!(peers.connection_info.num_pending_in, 1);
1824        peers.on_incoming_pending_session_gracefully_closed();
1825        assert_eq!(peers.connection_info.num_pending_in, 0);
1826    }
1827
1828    #[tokio::test]
1829    async fn test_dropped_incoming() {
1830        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1831        let ban_duration = Duration::from_millis(500);
1832        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1833        let mut peers = PeersManager::new(config);
1834
1835        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1836        assert_eq!(peers.connection_info.num_pending_in, 1);
1837        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1838            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1839                DisconnectReason::UselessPeer,
1840            )),
1841        ));
1842
1843        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1844        assert_eq!(peers.connection_info.num_pending_in, 0);
1845        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1846
1847        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1848
1849        // unbanned after timeout
1850        tokio::time::sleep(ban_duration).await;
1851
1852        poll_fn(|cx| {
1853            let _ = peers.poll(cx);
1854            Poll::Ready(())
1855        })
1856        .await;
1857
1858        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1859        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1860    }
1861
1862    #[tokio::test]
1863    async fn test_reputation_change_connected() {
1864        let peer = PeerId::random();
1865        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1866        let mut peers = PeersManager::default();
1867        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1868
1869        match event!(peers) {
1870            PeerAction::PeerAdded(peer_id) => {
1871                assert_eq!(peer_id, peer);
1872            }
1873            _ => unreachable!(),
1874        }
1875        match event!(peers) {
1876            PeerAction::Connect { peer_id, remote_addr } => {
1877                assert_eq!(peer_id, peer);
1878                assert_eq!(remote_addr, socket_addr);
1879            }
1880            _ => unreachable!(),
1881        }
1882
1883        let p = peers.peers.get_mut(&peer).unwrap();
1884        assert_eq!(p.state, PeerConnectionState::PendingOut);
1885
1886        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1887
1888        let p = peers.peers.get(&peer).unwrap();
1889        assert_eq!(p.state, PeerConnectionState::PendingOut);
1890        assert!(p.is_banned());
1891
1892        peers.on_active_session_gracefully_closed(peer);
1893
1894        let p = peers.peers.get(&peer).unwrap();
1895        assert_eq!(p.state, PeerConnectionState::Idle);
1896        assert!(p.is_banned());
1897
1898        match event!(peers) {
1899            PeerAction::Disconnect { peer_id, .. } => {
1900                assert_eq!(peer_id, peer);
1901            }
1902            _ => unreachable!(),
1903        }
1904    }
1905
1906    #[tokio::test]
1907    async fn accept_incoming_trusted_unknown_peer_address() {
1908        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1909        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1910        // try to connect trusted peer
1911        let trusted = PeerId::random();
1912        peers.add_trusted_peer_id(trusted);
1913
1914        // saturate the inbound slots
1915        for i in 0..peers.connection_info.config.max_inbound {
1916            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1917            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1918            let peer_id = PeerId::random();
1919            peers.on_incoming_session_established(peer_id, addr);
1920
1921            match event!(peers) {
1922                PeerAction::PeerAdded(id) => {
1923                    assert_eq!(id, peer_id);
1924                }
1925                _ => unreachable!(),
1926            }
1927        }
1928
1929        // try to connect untrusted peer
1930        let untrusted = PeerId::random();
1931        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1932        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1933        peers.on_incoming_session_established(untrusted, socket_addr);
1934
1935        match event!(peers) {
1936            PeerAction::PeerAdded(id) => {
1937                assert_eq!(id, untrusted);
1938            }
1939            _ => unreachable!(),
1940        }
1941
1942        match event!(peers) {
1943            PeerAction::Disconnect { peer_id, reason } => {
1944                assert_eq!(peer_id, untrusted);
1945                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1946            }
1947            _ => unreachable!(),
1948        }
1949
1950        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1951        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1952        peers.on_incoming_session_established(trusted, socket_addr);
1953
1954        match event!(peers) {
1955            PeerAction::PeerAdded(id) => {
1956                assert_eq!(id, trusted);
1957            }
1958            _ => unreachable!(),
1959        }
1960
1961        poll_fn(|cx| {
1962            assert!(peers.poll(cx).is_pending());
1963            Poll::Ready(())
1964        })
1965        .await;
1966
1967        let peer = peers.peers.get(&trusted).unwrap();
1968        assert_eq!(peer.state, PeerConnectionState::In);
1969    }
1970
1971    #[tokio::test]
1972    async fn test_already_connected() {
1973        let peer = PeerId::random();
1974        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1975        let mut peers = PeersManager::default();
1976
1977        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
1978        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1979        assert_eq!(peers.connection_info.num_pending_in, 1);
1980
1981        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
1982        // to increase by 1
1983        peers.on_incoming_session_established(peer, socket_addr);
1984        let p = peers.peers.get_mut(&peer).expect("peer not found");
1985        assert_eq!(p.addr.tcp(), socket_addr);
1986        assert_eq!(peers.connection_info.num_pending_in, 0);
1987        assert_eq!(peers.connection_info.num_inbound, 1);
1988
1989        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
1990        // by 1
1991        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1992        assert_eq!(peers.connection_info.num_pending_in, 1);
1993
1994        // Simulate a rejection due to an already established connection, expecting the
1995        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
1996        // should not be changed.
1997        peers.on_already_connected(Direction::Incoming);
1998
1999        let p = peers.peers.get_mut(&peer).expect("peer not found");
2000        assert_eq!(p.addr.tcp(), socket_addr);
2001        assert_eq!(peers.connection_info.num_pending_in, 0);
2002        assert_eq!(peers.connection_info.num_inbound, 1);
2003    }
2004
2005    #[tokio::test]
2006    async fn test_reputation_change_trusted_peer() {
2007        let peer = PeerId::random();
2008        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2009        let mut peers = PeersManager::default();
2010        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
2011
2012        match event!(peers) {
2013            PeerAction::PeerAdded(peer_id) => {
2014                assert_eq!(peer_id, peer);
2015            }
2016            _ => unreachable!(),
2017        }
2018        match event!(peers) {
2019            PeerAction::Connect { peer_id, remote_addr } => {
2020                assert_eq!(peer_id, peer);
2021                assert_eq!(remote_addr, socket_addr);
2022            }
2023            _ => unreachable!(),
2024        }
2025
2026        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2027        peers.on_active_outgoing_established(peer);
2028        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2029
2030        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2031
2032        {
2033            let p = peers.peers.get(&peer).unwrap();
2034            assert_eq!(p.state, PeerConnectionState::Out);
2035            // not banned yet
2036            assert!(!p.is_banned());
2037        }
2038
2039        // ensure peer is banned eventually
2040        loop {
2041            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2042
2043            let p = peers.peers.get(&peer).unwrap();
2044            if p.is_banned() {
2045                break
2046            }
2047        }
2048
2049        match event!(peers) {
2050            PeerAction::Disconnect { peer_id, .. } => {
2051                assert_eq!(peer_id, peer);
2052            }
2053            _ => unreachable!(),
2054        }
2055    }
2056
2057    #[tokio::test]
2058    async fn test_reputation_management() {
2059        let peer = PeerId::random();
2060        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2061        let mut peers = PeersManager::default();
2062        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2063        assert_eq!(peers.get_reputation(&peer), Some(0));
2064
2065        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2066        assert_eq!(peers.get_reputation(&peer), Some(1024));
2067
2068        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2069        assert_eq!(peers.get_reputation(&peer), Some(0));
2070    }
2071
2072    #[tokio::test]
2073    async fn test_remove_discovered_active() {
2074        let peer = PeerId::random();
2075        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2076        let mut peers = PeersManager::default();
2077        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2078
2079        match event!(peers) {
2080            PeerAction::PeerAdded(peer_id) => {
2081                assert_eq!(peer_id, peer);
2082            }
2083            _ => unreachable!(),
2084        }
2085        match event!(peers) {
2086            PeerAction::Connect { peer_id, remote_addr } => {
2087                assert_eq!(peer_id, peer);
2088                assert_eq!(remote_addr, socket_addr);
2089            }
2090            _ => unreachable!(),
2091        }
2092
2093        let p = peers.peers.get(&peer).unwrap();
2094        assert_eq!(p.state, PeerConnectionState::PendingOut);
2095
2096        peers.remove_peer(peer);
2097
2098        match event!(peers) {
2099            PeerAction::PeerRemoved(peer_id) => {
2100                assert_eq!(peer_id, peer);
2101            }
2102            _ => unreachable!(),
2103        }
2104        match event!(peers) {
2105            PeerAction::Disconnect { peer_id, .. } => {
2106                assert_eq!(peer_id, peer);
2107            }
2108            _ => unreachable!(),
2109        }
2110
2111        let p = peers.peers.get(&peer).unwrap();
2112        assert_eq!(p.state, PeerConnectionState::PendingOut);
2113
2114        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2115        let p = peers.peers.get(&peer).unwrap();
2116        assert_eq!(p.state, PeerConnectionState::PendingOut);
2117
2118        peers.on_active_session_gracefully_closed(peer);
2119        assert!(!peers.peers.contains_key(&peer));
2120    }
2121
2122    #[tokio::test]
2123    async fn test_fatal_outgoing_connection_error_trusted() {
2124        let peer = PeerId::random();
2125        let config = PeersConfig::test()
2126            .with_trusted_nodes(vec![TrustedPeer {
2127                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2128                tcp_port: 8008,
2129                udp_port: 8008,
2130                id: peer,
2131            }])
2132            .with_trusted_nodes_only(true);
2133        let mut peers = PeersManager::new(config);
2134        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2135
2136        match event!(peers) {
2137            PeerAction::Connect { peer_id, remote_addr } => {
2138                assert_eq!(peer_id, peer);
2139                assert_eq!(remote_addr, socket_addr);
2140            }
2141            _ => unreachable!(),
2142        }
2143
2144        let p = peers.peers.get(&peer).unwrap();
2145        assert_eq!(p.state, PeerConnectionState::PendingOut);
2146
2147        assert_eq!(peers.num_outbound_connections(), 0);
2148
2149        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2150            EthHandshakeError::NonStatusMessageInHandshake,
2151        ));
2152        assert!(err.is_fatal_protocol_error());
2153
2154        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2155        assert_eq!(peers.num_outbound_connections(), 0);
2156
2157        // try tmp ban peer
2158        match event!(peers) {
2159            PeerAction::BanPeer { peer_id } => {
2160                assert_eq!(peer_id, peer);
2161            }
2162            err => unreachable!("{err:?}"),
2163        }
2164
2165        // ensure we still have trusted peer
2166        assert!(peers.peers.contains_key(&peer));
2167
2168        // await for the ban to expire
2169        tokio::time::sleep(peers.backoff_durations.medium).await;
2170
2171        match event!(peers) {
2172            PeerAction::Connect { peer_id, remote_addr } => {
2173                assert_eq!(peer_id, peer);
2174                assert_eq!(remote_addr, socket_addr);
2175            }
2176            err => unreachable!("{err:?}"),
2177        }
2178    }
2179
2180    #[tokio::test]
2181    async fn test_outgoing_connection_error() {
2182        let peer = PeerId::random();
2183        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2184        let mut peers = PeersManager::default();
2185        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2186
2187        match event!(peers) {
2188            PeerAction::PeerAdded(peer_id) => {
2189                assert_eq!(peer_id, peer);
2190            }
2191            _ => unreachable!(),
2192        }
2193        match event!(peers) {
2194            PeerAction::Connect { peer_id, remote_addr } => {
2195                assert_eq!(peer_id, peer);
2196                assert_eq!(remote_addr, socket_addr);
2197            }
2198            _ => unreachable!(),
2199        }
2200
2201        let p = peers.peers.get(&peer).unwrap();
2202        assert_eq!(p.state, PeerConnectionState::PendingOut);
2203
2204        assert_eq!(peers.num_outbound_connections(), 0);
2205
2206        peers.on_outgoing_connection_failure(
2207            &socket_addr,
2208            &peer,
2209            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2210        );
2211
2212        assert_eq!(peers.num_outbound_connections(), 0);
2213    }
2214
2215    #[tokio::test]
2216    async fn test_outgoing_connection_gracefully_closed() {
2217        let peer = PeerId::random();
2218        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2219        let mut peers = PeersManager::default();
2220        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2221
2222        match event!(peers) {
2223            PeerAction::PeerAdded(peer_id) => {
2224                assert_eq!(peer_id, peer);
2225            }
2226            _ => unreachable!(),
2227        }
2228        match event!(peers) {
2229            PeerAction::Connect { peer_id, remote_addr } => {
2230                assert_eq!(peer_id, peer);
2231                assert_eq!(remote_addr, socket_addr);
2232            }
2233            _ => unreachable!(),
2234        }
2235
2236        let p = peers.peers.get(&peer).unwrap();
2237        assert_eq!(p.state, PeerConnectionState::PendingOut);
2238
2239        assert_eq!(peers.num_outbound_connections(), 0);
2240
2241        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2242
2243        assert_eq!(peers.num_outbound_connections(), 0);
2244        assert_eq!(peers.connection_info.num_pending_out, 0);
2245    }
2246
2247    #[tokio::test]
2248    async fn test_discovery_ban_list() {
2249        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2250        let socket_addr = SocketAddr::new(ip, 8008);
2251        let ban_list = BanList::new(vec![], vec![ip]);
2252        let config = PeersConfig::default().with_ban_list(ban_list);
2253        let mut peer_manager = PeersManager::new(config);
2254        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2255
2256        assert!(peer_manager.peers.is_empty());
2257    }
2258
2259    #[tokio::test]
2260    async fn test_on_pending_ban_list() {
2261        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2262        let socket_addr = SocketAddr::new(ip, 8008);
2263        let ban_list = BanList::new(vec![], vec![ip]);
2264        let config = PeersConfig::test().with_ban_list(ban_list);
2265        let mut peer_manager = PeersManager::new(config);
2266        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2267        // because we have no active peers this should be fine for testings
2268        match a {
2269            Ok(_) => panic!(),
2270            Err(err) => match err {
2271                InboundConnectionError::IpBanned => {
2272                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2273                }
2274                _ => unreachable!(),
2275            },
2276        }
2277    }
2278
2279    #[tokio::test]
2280    async fn test_on_active_inbound_ban_list() {
2281        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2282        let socket_addr = SocketAddr::new(ip, 8008);
2283        let given_peer_id = PeerId::random();
2284        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2285        let config = PeersConfig::test().with_ban_list(ban_list);
2286        let mut peer_manager = PeersManager::new(config);
2287        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2288        // non-trusted nodes should also increase pending_in
2289        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2290        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2291        // after the connection is established, the peer should be removed, the num_pending_in
2292        // should be decreased, and the num_inbound should not be increased
2293        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2294        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2295
2296        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2297            peer_manager.queued_actions.pop_front()
2298        else {
2299            panic!()
2300        };
2301
2302        assert_eq!(peer_id, given_peer_id)
2303    }
2304
2305    #[test]
2306    fn test_connection_limits() {
2307        let mut info = ConnectionInfo::default();
2308        info.inc_in();
2309        assert_eq!(info.num_inbound, 1);
2310        assert_eq!(info.num_outbound, 0);
2311        assert!(info.has_in_capacity());
2312
2313        info.decr_in();
2314        assert_eq!(info.num_inbound, 0);
2315        assert_eq!(info.num_outbound, 0);
2316
2317        info.inc_out();
2318        assert_eq!(info.num_inbound, 0);
2319        assert_eq!(info.num_outbound, 1);
2320        assert!(info.has_out_capacity());
2321
2322        info.decr_out();
2323        assert_eq!(info.num_inbound, 0);
2324        assert_eq!(info.num_outbound, 0);
2325    }
2326
2327    #[test]
2328    fn test_connection_peer_state() {
2329        let mut info = ConnectionInfo::default();
2330        info.inc_in();
2331
2332        info.decr_state(PeerConnectionState::In);
2333        assert_eq!(info.num_inbound, 0);
2334        assert_eq!(info.num_outbound, 0);
2335
2336        info.inc_out();
2337
2338        info.decr_state(PeerConnectionState::Out);
2339        assert_eq!(info.num_inbound, 0);
2340        assert_eq!(info.num_outbound, 0);
2341    }
2342
2343    #[tokio::test]
2344    async fn test_trusted_peers_are_prioritized() {
2345        let trusted_peer = PeerId::random();
2346        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2347        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2348            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2349            tcp_port: 8008,
2350            udp_port: 8008,
2351            id: trusted_peer,
2352        }]);
2353        let mut peers = PeersManager::new(config);
2354
2355        let basic_peer = PeerId::random();
2356        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2357        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2358
2359        match event!(peers) {
2360            PeerAction::PeerAdded(peer_id) => {
2361                assert_eq!(peer_id, basic_peer);
2362            }
2363            _ => unreachable!(),
2364        }
2365        match event!(peers) {
2366            PeerAction::Connect { peer_id, remote_addr } => {
2367                assert_eq!(peer_id, trusted_peer);
2368                assert_eq!(remote_addr, trusted_sock);
2369            }
2370            _ => unreachable!(),
2371        }
2372        match event!(peers) {
2373            PeerAction::Connect { peer_id, remote_addr } => {
2374                assert_eq!(peer_id, basic_peer);
2375                assert_eq!(remote_addr, basic_sock);
2376            }
2377            _ => unreachable!(),
2378        }
2379    }
2380
2381    #[tokio::test]
2382    async fn test_connect_trusted_nodes_only() {
2383        let trusted_peer = PeerId::random();
2384        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2385        let config = PeersConfig::test()
2386            .with_trusted_nodes(vec![TrustedPeer {
2387                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2388                tcp_port: 8008,
2389                udp_port: 8008,
2390                id: trusted_peer,
2391            }])
2392            .with_trusted_nodes_only(true);
2393        let mut peers = PeersManager::new(config);
2394
2395        let basic_peer = PeerId::random();
2396        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2397        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2398
2399        match event!(peers) {
2400            PeerAction::PeerAdded(peer_id) => {
2401                assert_eq!(peer_id, basic_peer);
2402            }
2403            _ => unreachable!(),
2404        }
2405        match event!(peers) {
2406            PeerAction::Connect { peer_id, remote_addr } => {
2407                assert_eq!(peer_id, trusted_peer);
2408                assert_eq!(remote_addr, trusted_sock);
2409            }
2410            _ => unreachable!(),
2411        }
2412        poll_fn(|cx| {
2413            assert!(peers.poll(cx).is_pending());
2414            Poll::Ready(())
2415        })
2416        .await;
2417    }
2418
2419    #[tokio::test]
2420    async fn test_incoming_with_trusted_nodes_only() {
2421        let trusted_peer = PeerId::random();
2422        let config = PeersConfig::test()
2423            .with_trusted_nodes(vec![TrustedPeer {
2424                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2425                tcp_port: 8008,
2426                udp_port: 8008,
2427                id: trusted_peer,
2428            }])
2429            .with_trusted_nodes_only(true);
2430        let mut peers = PeersManager::new(config);
2431
2432        let basic_peer = PeerId::random();
2433        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2434        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2435        // non-trusted nodes should also increase pending_in
2436        assert_eq!(peers.connection_info.num_pending_in, 1);
2437        peers.on_incoming_session_established(basic_peer, basic_sock);
2438        // after the connection is established, the peer should be removed, the num_pending_in
2439        // should be decreased, and the num_inbound mut not be increased
2440        assert_eq!(peers.connection_info.num_pending_in, 0);
2441        assert_eq!(peers.connection_info.num_inbound, 0);
2442
2443        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2444            peers.queued_actions.pop_front()
2445        else {
2446            panic!()
2447        };
2448        assert_eq!(basic_peer, peer_id);
2449        assert!(!peers.peers.contains_key(&basic_peer));
2450    }
2451
2452    #[tokio::test]
2453    async fn test_incoming_without_trusted_nodes_only() {
2454        let trusted_peer = PeerId::random();
2455        let config = PeersConfig::test()
2456            .with_trusted_nodes(vec![TrustedPeer {
2457                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2458                tcp_port: 8008,
2459                udp_port: 8008,
2460                id: trusted_peer,
2461            }])
2462            .with_trusted_nodes_only(false);
2463        let mut peers = PeersManager::new(config);
2464
2465        let basic_peer = PeerId::random();
2466        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2467        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2468
2469        // non-trusted nodes should also increase pending_in
2470        assert_eq!(peers.connection_info.num_pending_in, 1);
2471        peers.on_incoming_session_established(basic_peer, basic_sock);
2472        // after the connection is established, the peer should be removed, the num_pending_in
2473        // should be decreased, and the num_inbound must be increased
2474        assert_eq!(peers.connection_info.num_pending_in, 0);
2475        assert_eq!(peers.connection_info.num_inbound, 1);
2476        assert!(peers.peers.contains_key(&basic_peer));
2477    }
2478
2479    #[tokio::test]
2480    async fn test_incoming_at_capacity() {
2481        let mut config = PeersConfig::test();
2482        config.connection_info.max_inbound = 1;
2483        let mut peers = PeersManager::new(config);
2484
2485        let peer = PeerId::random();
2486        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2487        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2488
2489        peers.on_incoming_session_established(peer, addr);
2490
2491        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2492        assert_eq!(
2493            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2494            InboundConnectionError::ExceedsCapacity
2495        );
2496    }
2497
2498    #[tokio::test]
2499    async fn test_incoming_rate_limit() {
2500        let config = PeersConfig {
2501            incoming_ip_throttle_duration: Duration::from_millis(100),
2502            ..PeersConfig::test()
2503        };
2504        let mut peers = PeersManager::new(config);
2505
2506        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2507        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2508        assert_eq!(
2509            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2510            InboundConnectionError::IpBanned
2511        );
2512
2513        peers.release_interval.reset_immediately();
2514        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2515
2516        // await unban
2517        poll_fn(|cx| loop {
2518            if peers.poll(cx).is_pending() {
2519                return Poll::Ready(());
2520            }
2521        })
2522        .await;
2523
2524        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2525        assert_eq!(
2526            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2527            InboundConnectionError::IpBanned
2528        );
2529    }
2530
2531    #[tokio::test]
2532    async fn test_tick() {
2533        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2534        let socket_addr = SocketAddr::new(ip, 8008);
2535        let config = PeersConfig::test();
2536        let mut peer_manager = PeersManager::new(config);
2537        let peer_id = PeerId::random();
2538        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2539
2540        tokio::time::sleep(Duration::from_secs(1)).await;
2541        peer_manager.tick();
2542
2543        // still unconnected
2544        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2545
2546        // mark as connected
2547        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2548
2549        tokio::time::sleep(Duration::from_secs(1)).await;
2550        peer_manager.tick();
2551
2552        // still at default reputation
2553        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2554
2555        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2556
2557        tokio::time::sleep(Duration::from_secs(1)).await;
2558        peer_manager.tick();
2559
2560        // tick applied
2561        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2562    }
2563
2564    #[tokio::test]
2565    async fn test_remove_incoming_after_disconnect() {
2566        let peer_id = PeerId::random();
2567        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2568        let mut peers = PeersManager::default();
2569
2570        peers.on_incoming_pending_session(addr.ip()).unwrap();
2571        peers.on_incoming_session_established(peer_id, addr);
2572        let peer = peers.peers.get(&peer_id).unwrap();
2573        assert_eq!(peer.state, PeerConnectionState::In);
2574        assert!(peer.remove_after_disconnect);
2575
2576        peers.on_active_session_gracefully_closed(peer_id);
2577        assert!(!peers.peers.contains_key(&peer_id))
2578    }
2579
2580    #[tokio::test]
2581    async fn test_keep_incoming_after_disconnect_if_discovered() {
2582        let peer_id = PeerId::random();
2583        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2584        let mut peers = PeersManager::default();
2585
2586        peers.on_incoming_pending_session(addr.ip()).unwrap();
2587        peers.on_incoming_session_established(peer_id, addr);
2588        let peer = peers.peers.get(&peer_id).unwrap();
2589        assert_eq!(peer.state, PeerConnectionState::In);
2590        assert!(peer.remove_after_disconnect);
2591
2592        // trigger discovery manually while the peer is still connected
2593        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2594
2595        peers.on_active_session_gracefully_closed(peer_id);
2596
2597        let peer = peers.peers.get(&peer_id).unwrap();
2598        assert_eq!(peer.state, PeerConnectionState::Idle);
2599        assert!(!peer.remove_after_disconnect);
2600    }
2601
2602    #[tokio::test]
2603    async fn test_incoming_outgoing_already_connected() {
2604        let peer_id = PeerId::random();
2605        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2606        let mut peers = PeersManager::default();
2607
2608        peers.on_incoming_pending_session(addr.ip()).unwrap();
2609        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2610
2611        match event!(peers) {
2612            PeerAction::PeerAdded(_) => {}
2613            _ => unreachable!(),
2614        }
2615
2616        match event!(peers) {
2617            PeerAction::Connect { .. } => {}
2618            _ => unreachable!(),
2619        }
2620
2621        peers.on_incoming_session_established(peer_id, addr);
2622        peers.on_already_connected(Direction::Outgoing(peer_id));
2623        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2624        assert_eq!(peers.connection_info.num_inbound, 1);
2625        assert_eq!(peers.connection_info.num_pending_out, 0);
2626        assert_eq!(peers.connection_info.num_pending_in, 0);
2627        assert_eq!(peers.connection_info.num_outbound, 0);
2628    }
2629
2630    #[tokio::test]
2631    async fn test_already_connected_incoming_outgoing_connection_error() {
2632        let peer_id = PeerId::random();
2633        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2634        let mut peers = PeersManager::default();
2635
2636        peers.on_incoming_pending_session(addr.ip()).unwrap();
2637        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2638
2639        match event!(peers) {
2640            PeerAction::PeerAdded(_) => {}
2641            _ => unreachable!(),
2642        }
2643
2644        match event!(peers) {
2645            PeerAction::Connect { .. } => {}
2646            _ => unreachable!(),
2647        }
2648
2649        peers.on_incoming_session_established(peer_id, addr);
2650
2651        peers.on_outgoing_connection_failure(
2652            &addr,
2653            &peer_id,
2654            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2655        );
2656        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2657        assert_eq!(peers.connection_info.num_inbound, 1);
2658        assert_eq!(peers.connection_info.num_pending_out, 0);
2659        assert_eq!(peers.connection_info.num_pending_in, 0);
2660        assert_eq!(peers.connection_info.num_outbound, 0);
2661    }
2662
2663    #[tokio::test]
2664    async fn test_max_concurrent_dials() {
2665        let config = PeersConfig::default();
2666        let mut peer_manager = PeersManager::new(config);
2667        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2668        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2669        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2670            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2671        }
2672
2673        peer_manager.fill_outbound_slots();
2674        let dials = peer_manager
2675            .queued_actions
2676            .iter()
2677            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2678            .count();
2679        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2680    }
2681
2682    #[tokio::test]
2683    async fn test_max_num_of_pending_dials() {
2684        let config = PeersConfig::default();
2685        let mut peer_manager = PeersManager::new(config);
2686        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2687        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2688
2689        // add more peers than allowed
2690        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2691            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2692        }
2693
2694        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2695            match event!(peer_manager) {
2696                PeerAction::PeerAdded(_) => {}
2697                _ => unreachable!(),
2698            }
2699        }
2700
2701        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2702            match event!(peer_manager) {
2703                PeerAction::Connect { .. } => {}
2704                _ => unreachable!(),
2705            }
2706        }
2707
2708        // generate 'Connect' actions
2709        peer_manager.fill_outbound_slots();
2710
2711        // all dialed connections should be in 'PendingOut' state
2712        let dials = peer_manager.connection_info.num_pending_out;
2713        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2714
2715        let num_pendingout_states = peer_manager
2716            .peers
2717            .iter()
2718            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2719            .map(|(peer_id, _)| *peer_id)
2720            .collect::<Vec<PeerId>>();
2721        assert_eq!(
2722            num_pendingout_states.len(),
2723            peer_manager.connection_info.config.max_concurrent_outbound_dials
2724        );
2725
2726        // establish dialed connections
2727        for peer_id in &num_pendingout_states {
2728            peer_manager.on_active_outgoing_established(*peer_id);
2729        }
2730
2731        // all dialed connections should now be in 'Out' state
2732        for peer_id in &num_pendingout_states {
2733            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2734        }
2735
2736        // no more pending outbound connections
2737        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2738    }
2739
2740    #[tokio::test]
2741    async fn test_connect() {
2742        let peer = PeerId::random();
2743        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2744        let mut peers = PeersManager::default();
2745        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2746        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2747
2748        match event!(peers) {
2749            PeerAction::Connect { peer_id, remote_addr } => {
2750                assert_eq!(peer_id, peer);
2751                assert_eq!(remote_addr, socket_addr);
2752            }
2753            _ => unreachable!(),
2754        }
2755
2756        let (record, _) = peers.peer_by_id(peer).unwrap();
2757        assert_eq!(record.tcp_addr(), socket_addr);
2758        assert_eq!(record.udp_addr(), socket_addr);
2759
2760        // connect again
2761        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2762
2763        let (record, _) = peers.peer_by_id(peer).unwrap();
2764        assert_eq!(record.tcp_addr(), socket_addr);
2765        assert_eq!(record.udp_addr(), socket_addr);
2766    }
2767
2768    #[tokio::test]
2769    async fn test_incoming_connection_from_banned() {
2770        let peer = PeerId::random();
2771        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2772        let config = PeersConfig::test().with_max_inbound(3);
2773        let mut peers = PeersManager::new(config);
2774        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2775
2776        match event!(peers) {
2777            PeerAction::PeerAdded(peer_id) => {
2778                assert_eq!(peer_id, peer);
2779            }
2780            _ => unreachable!(),
2781        }
2782        match event!(peers) {
2783            PeerAction::Connect { peer_id, .. } => {
2784                assert_eq!(peer_id, peer);
2785            }
2786            _ => unreachable!(),
2787        }
2788
2789        poll_fn(|cx| {
2790            assert!(peers.poll(cx).is_pending());
2791            Poll::Ready(())
2792        })
2793        .await;
2794
2795        // simulate new connection drops with error
2796        loop {
2797            peers.on_active_session_dropped(
2798                &socket_addr,
2799                &peer,
2800                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2801                    reth_eth_wire::EthVersion::Eth68,
2802                    reth_eth_wire::EthMessageID::Status,
2803                )),
2804            );
2805
2806            if peers.peers.get(&peer).unwrap().is_banned() {
2807                break;
2808            }
2809
2810            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2811            peers.on_incoming_session_established(peer, socket_addr);
2812
2813            match event!(peers) {
2814                PeerAction::Connect { peer_id, .. } => {
2815                    assert_eq!(peer_id, peer);
2816                }
2817                _ => unreachable!(),
2818            }
2819        }
2820
2821        assert!(peers.peers.get(&peer).unwrap().is_banned());
2822
2823        // fill all incoming slots
2824        for _ in 0..peers.connection_info.config.max_inbound {
2825            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2826            peers.on_incoming_session_established(peer, socket_addr);
2827
2828            match event!(peers) {
2829                PeerAction::DisconnectBannedIncoming { peer_id } => {
2830                    assert_eq!(peer_id, peer);
2831                }
2832                _ => unreachable!(),
2833            }
2834        }
2835
2836        poll_fn(|cx| {
2837            assert!(peers.poll(cx).is_pending());
2838            Poll::Ready(())
2839        })
2840        .await;
2841
2842        assert_eq!(peers.connection_info.num_inbound, 0);
2843
2844        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2845
2846        // Assert we can still accept new connections
2847        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2848        assert_eq!(peers.connection_info.num_pending_in, 1);
2849
2850        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
2851        // connection info is updated via the peer's state which would be a noop here since the
2852        // banned peer's state is idle
2853        peers.on_active_session_gracefully_closed(peer);
2854        assert_eq!(peers.connection_info.num_inbound, 0);
2855    }
2856
2857    #[tokio::test]
2858    async fn test_add_pending_connect() {
2859        let peer = PeerId::random();
2860        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2861        let mut peers = PeersManager::default();
2862        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2863        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2864        assert_eq!(peers.connection_info.num_pending_out, 1);
2865    }
2866
2867    #[tokio::test]
2868    async fn test_dns_updates_peer_address() {
2869        let peer_id = PeerId::random();
2870        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2871        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2872
2873        let trusted = TrustedPeer {
2874            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2875            tcp_port: 8008,
2876            udp_port: 8008,
2877            id: peer_id,
2878        };
2879
2880        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2881        let mut manager = PeersManager::new(config);
2882        manager
2883            .trusted_peers_resolver
2884            .set_interval(tokio::time::interval(Duration::from_millis(1)));
2885
2886        manager.peers.insert(
2887            peer_id,
2888            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2889        );
2890
2891        for _ in 0..100 {
2892            let _ = event!(manager);
2893            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2894                break;
2895            }
2896            tokio::time::sleep(Duration::from_millis(10)).await;
2897        }
2898
2899        let updated_peer = manager.peers.get(&peer_id).unwrap();
2900        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2901    }
2902}