reth_network/
peers.rs

1//! Peer related implementations
2
3use crate::{
4    error::SessionError,
5    session::{Direction, PendingSessionHandshakeError},
6    swarm::NetworkConnectionState,
7    trusted_peers_resolver::TrustedPeersResolver,
8};
9use futures::StreamExt;
10
11use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
12use reth_ethereum_forks::ForkId;
13use reth_net_banlist::BanList;
14use reth_network_api::test_utils::{PeerCommand, PeersHandle};
15use reth_network_peers::{NodeRecord, PeerId};
16use reth_network_types::{
17    peers::{
18        config::PeerBackoffDurations,
19        reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
20    },
21    ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
22    ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
23};
24use std::{
25    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
26    fmt::Display,
27    io::{self},
28    net::{IpAddr, SocketAddr},
29    task::{Context, Poll},
30    time::Duration,
31};
32use thiserror::Error;
33use tokio::{
34    sync::mpsc,
35    time::{Instant, Interval},
36};
37use tokio_stream::wrappers::UnboundedReceiverStream;
38use tracing::{trace, warn};
39
40/// Maintains the state of _all_ the peers known to the network.
41///
42/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
43/// From this type, connections to peers are established or disconnected, see [`PeerAction`].
44///
45/// The [`PeersManager`] will be notified on peer related changes
46#[derive(Debug)]
47pub struct PeersManager {
48    /// All peers known to the network
49    peers: HashMap<PeerId, Peer>,
50    /// The set of trusted peer ids.
51    ///
52    /// This tracks peer ids that are considered trusted, but for which we don't necessarily have
53    /// an address: [`Self::add_trusted_peer_id`]
54    trusted_peer_ids: HashSet<PeerId>,
55    /// A resolver used to periodically resolve DNS names for trusted peers. This updates the
56    /// peer's address when the DNS records change.
57    trusted_peers_resolver: TrustedPeersResolver,
58    /// Copy of the sender half, so new [`PeersHandle`] can be created on demand.
59    manager_tx: mpsc::UnboundedSender<PeerCommand>,
60    /// Receiver half of the command channel.
61    handle_rx: UnboundedReceiverStream<PeerCommand>,
62    /// Buffered actions until the manager is polled.
63    queued_actions: VecDeque<PeerAction>,
64    /// Interval for triggering connections if there are free slots.
65    refill_slots_interval: Interval,
66    /// How to weigh reputation changes
67    reputation_weights: ReputationChangeWeights,
68    /// Tracks current slot stats.
69    connection_info: ConnectionInfo,
70    /// Tracks unwanted ips/peer ids.
71    ban_list: BanList,
72    /// Tracks currently backed off peers.
73    backed_off_peers: HashMap<PeerId, std::time::Instant>,
74    /// Interval at which to check for peers to unban and release from the backoff map.
75    release_interval: Interval,
76    /// How long to ban bad peers.
77    ban_duration: Duration,
78    /// How long peers to which we could not connect for non-fatal reasons, e.g.
79    /// [`DisconnectReason::TooManyPeers`], are put in time out.
80    backoff_durations: PeerBackoffDurations,
81    /// If non-trusted peers should be connected to, or the connection from non-trusted
82    /// incoming peers should be accepted.
83    trusted_nodes_only: bool,
84    /// Timestamp of the last time [`Self::tick`] was called.
85    last_tick: Instant,
86    /// Maximum number of backoff attempts before we give up on a peer and dropping.
87    max_backoff_count: u8,
88    /// Tracks the connection state of the node
89    net_connection_state: NetworkConnectionState,
90    /// How long to temporarily ban ip on an incoming connection attempt.
91    incoming_ip_throttle_duration: Duration,
92}
93
94impl PeersManager {
95    /// Create a new instance with the given config
96    pub fn new(config: PeersConfig) -> Self {
97        let PeersConfig {
98            refill_slots_interval,
99            connection_info,
100            reputation_weights,
101            ban_list,
102            ban_duration,
103            backoff_durations,
104            trusted_nodes,
105            trusted_nodes_only,
106            basic_nodes,
107            max_backoff_count,
108            incoming_ip_throttle_duration,
109        } = config;
110        let (manager_tx, handle_rx) = mpsc::unbounded_channel();
111        let now = Instant::now();
112
113        // We use half of the interval to decrease the max duration to `150%` in worst case
114        let unban_interval = ban_duration.min(backoff_durations.low) / 2;
115
116        let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
117        let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
118
119        for trusted_peer in &trusted_nodes {
120            match trusted_peer.resolve_blocking() {
121                Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
122                    trusted_peer_ids.insert(id);
123                    peers.entry(id).or_insert_with(|| {
124                        Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
125                    });
126                }
127                Err(err) => {
128                    warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
129                }
130            }
131        }
132
133        for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
134            peers.entry(id).or_insert_with(|| {
135                Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
136            });
137        }
138
139        Self {
140            peers,
141            trusted_peer_ids,
142            trusted_peers_resolver: TrustedPeersResolver::new(
143                trusted_nodes,
144                tokio::time::interval(Duration::from_secs(60 * 60)), // 1 hour
145            ),
146            manager_tx,
147            handle_rx: UnboundedReceiverStream::new(handle_rx),
148            queued_actions: Default::default(),
149            reputation_weights,
150            refill_slots_interval: tokio::time::interval(refill_slots_interval),
151            release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
152            connection_info: ConnectionInfo::new(connection_info),
153            ban_list,
154            backed_off_peers: Default::default(),
155            ban_duration,
156            backoff_durations,
157            trusted_nodes_only,
158            last_tick: Instant::now(),
159            max_backoff_count,
160            net_connection_state: NetworkConnectionState::default(),
161            incoming_ip_throttle_duration,
162        }
163    }
164
165    /// Returns a new [`PeersHandle`] that can send commands to this type.
166    pub(crate) fn handle(&self) -> PeersHandle {
167        PeersHandle::new(self.manager_tx.clone())
168    }
169
170    /// Returns the number of peers in the peer set
171    #[inline]
172    pub(crate) fn num_known_peers(&self) -> usize {
173        self.peers.len()
174    }
175
176    /// Returns an iterator over all peers
177    pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
178        self.peers.iter().map(|(peer_id, v)| {
179            NodeRecord::new_with_ports(
180                v.addr.tcp().ip(),
181                v.addr.tcp().port(),
182                v.addr.udp().map(|addr| addr.port()),
183                *peer_id,
184            )
185        })
186    }
187
188    /// Returns the `NodeRecord` and `PeerKind` for the given peer id
189    pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
190        self.peers.get(&peer_id).map(|v| {
191            (
192                NodeRecord::new_with_ports(
193                    v.addr.tcp().ip(),
194                    v.addr.tcp().port(),
195                    v.addr.udp().map(|addr| addr.port()),
196                    peer_id,
197                ),
198                v.kind,
199            )
200        })
201    }
202
203    /// Returns an iterator over all peer ids for peers with the given kind
204    pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
205        self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
206    }
207
208    /// Returns the number of currently active inbound connections.
209    #[inline]
210    pub(crate) const fn num_inbound_connections(&self) -> usize {
211        self.connection_info.num_inbound
212    }
213
214    /// Returns the number of currently __active__ outbound connections.
215    #[inline]
216    pub(crate) const fn num_outbound_connections(&self) -> usize {
217        self.connection_info.num_outbound
218    }
219
220    /// Returns the number of currently pending outbound connections.
221    #[inline]
222    pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
223        self.connection_info.num_pending_out
224    }
225
226    /// Returns the number of currently backed off peers.
227    #[inline]
228    pub(crate) fn num_backed_off_peers(&self) -> usize {
229        self.backed_off_peers.len()
230    }
231
232    /// Returns the number of idle trusted peers.
233    fn num_idle_trusted_peers(&self) -> usize {
234        self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
235    }
236
237    /// Invoked when a new _incoming_ tcp connection is accepted.
238    ///
239    /// returns an error if the inbound ip address is on the ban list
240    pub(crate) fn on_incoming_pending_session(
241        &mut self,
242        addr: IpAddr,
243    ) -> Result<(), InboundConnectionError> {
244        if self.ban_list.is_banned_ip(&addr) {
245            return Err(InboundConnectionError::IpBanned)
246        }
247
248        // check if we even have slots for a new incoming connection
249        if !self.connection_info.has_in_capacity() {
250            if self.trusted_peer_ids.is_empty() {
251                // if we don't have any incoming slots and no trusted peers, we don't accept any new
252                // connections
253                return Err(InboundConnectionError::ExceedsCapacity)
254            }
255
256            // there's an edge case here where no incoming connections besides from trusted peers
257            // are allowed (max_inbound == 0), in which case we still need to allow new pending
258            // incoming connections until all trusted peers are connected.
259            let num_idle_trusted_peers = self.num_idle_trusted_peers();
260            if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
261                // we still want to limit concurrent pending connections
262                let max_inbound =
263                    self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
264                if self.connection_info.num_pending_in < max_inbound {
265                    self.connection_info.inc_pending_in();
266                    return Ok(())
267                }
268            }
269
270            // all trusted peers are either connected or connecting
271            return Err(InboundConnectionError::ExceedsCapacity)
272        }
273
274        // also cap the incoming connections we can process at once
275        if !self.connection_info.has_in_pending_capacity() {
276            return Err(InboundConnectionError::ExceedsCapacity)
277        }
278
279        // apply the rate limit
280        self.throttle_incoming_ip(addr);
281
282        self.connection_info.inc_pending_in();
283        Ok(())
284    }
285
286    /// Invoked when a previous call to [`Self::on_incoming_pending_session`] succeeded but it was
287    /// rejected.
288    pub(crate) const fn on_incoming_pending_session_rejected_internally(&mut self) {
289        self.connection_info.decr_pending_in();
290    }
291
292    /// Invoked when a pending session was closed.
293    pub(crate) const fn on_incoming_pending_session_gracefully_closed(&mut self) {
294        self.connection_info.decr_pending_in()
295    }
296
297    /// Invoked when a pending session was closed.
298    pub(crate) fn on_incoming_pending_session_dropped(
299        &mut self,
300        remote_addr: SocketAddr,
301        err: &PendingSessionHandshakeError,
302    ) {
303        if err.is_fatal_protocol_error() {
304            self.ban_ip(remote_addr.ip());
305
306            if err.merits_discovery_ban() {
307                self.queued_actions
308                    .push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
309            }
310        }
311
312        self.connection_info.decr_pending_in();
313    }
314
315    /// Called when a new _incoming_ active session was established to the given peer.
316    ///
317    /// This will update the state of the peer if not yet tracked.
318    ///
319    /// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
320    /// be scheduled.
321    pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
322        self.connection_info.decr_pending_in();
323
324        // we only need to check the peer id here as the ip address will have been checked at
325        // on_incoming_pending_session. We also check if the peer is in the backoff list here.
326        if self.ban_list.is_banned_peer(&peer_id) {
327            self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
328            return
329        }
330
331        // check if the peer is trustable or not
332        let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
333        if self.trusted_nodes_only && !is_trusted {
334            self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
335            return
336        }
337
338        // start a new tick, so the peer is not immediately rewarded for the time since last tick
339        self.tick();
340
341        match self.peers.entry(peer_id) {
342            Entry::Occupied(mut entry) => {
343                let peer = entry.get_mut();
344                if peer.is_banned() {
345                    self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
346                    return
347                }
348                // it might be the case that we're also trying to connect to this peer at the same
349                // time, so we need to adjust the state here
350                if peer.state.is_pending_out() {
351                    self.connection_info.decr_state(peer.state);
352                }
353
354                peer.state = PeerConnectionState::In;
355
356                is_trusted = is_trusted || peer.is_trusted();
357            }
358            Entry::Vacant(entry) => {
359                // peer is missing in the table, we add it but mark it as to be removed after
360                // disconnect, because we only know the outgoing port
361                let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
362                peer.remove_after_disconnect = true;
363                entry.insert(peer);
364                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
365            }
366        }
367
368        let has_in_capacity = self.connection_info.has_in_capacity();
369        // increment new incoming connection
370        self.connection_info.inc_in();
371
372        // disconnect the peer if we don't have capacity for more inbound connections
373        if !is_trusted && !has_in_capacity {
374            self.queued_actions.push_back(PeerAction::Disconnect {
375                peer_id,
376                reason: Some(DisconnectReason::TooManyPeers),
377            });
378        }
379    }
380
381    /// Bans the peer temporarily with the configured ban timeout
382    fn ban_peer(&mut self, peer_id: PeerId) {
383        let mut ban_duration = self.ban_duration;
384        if let Some(peer) = self.peers.get(&peer_id) {
385            if peer.is_trusted() || peer.is_static() {
386                // For misbehaving trusted or static peers, we provide a bit more leeway when
387                // penalizing them.
388                ban_duration = self.backoff_durations.low / 2;
389            }
390        }
391
392        self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
393        self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
394    }
395
396    /// Bans the IP temporarily with the configured ban timeout
397    fn ban_ip(&mut self, ip: IpAddr) {
398        self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
399    }
400
401    /// Bans the IP temporarily to rate limit inbound connection attempts per IP.
402    fn throttle_incoming_ip(&mut self, ip: IpAddr) {
403        self.ban_list
404            .ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
405    }
406
407    /// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
408    fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
409        trace!(target: "net::peers", ?peer_id, "backing off");
410
411        if let Some(peer) = self.peers.get_mut(&peer_id) {
412            peer.backed_off = true;
413            self.backed_off_peers.insert(peer_id, until);
414        }
415    }
416
417    /// Unbans the peer
418    fn unban_peer(&mut self, peer_id: PeerId) {
419        self.ban_list.unban_peer(&peer_id);
420        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
421    }
422
423    /// Tick function to update reputation of all connected peers.
424    /// Peers are rewarded with reputation increases for the time they are connected since the last
425    /// tick. This is to prevent peers from being disconnected eventually due to slashed
426    /// reputation because of some bad messages (most likely transaction related)
427    fn tick(&mut self) {
428        let now = Instant::now();
429        // Determine the number of seconds since the last tick.
430        // Ensuring that now is always greater than last_tick to account for issues with system
431        // time.
432        let secs_since_last_tick =
433            if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
434        self.last_tick = now;
435
436        // update reputation via seconds connected
437        for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
438            // update reputation via seconds connected, but keep the target _around_ the default
439            // reputation.
440            if peer.1.reputation < DEFAULT_REPUTATION {
441                peer.1.reputation += secs_since_last_tick;
442            }
443        }
444    }
445
446    /// Returns the tracked reputation for a peer.
447    pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
448        self.peers.get(peer_id).map(|peer| peer.reputation)
449    }
450
451    /// Apply the corresponding reputation change to the given peer.
452    ///
453    /// If the peer is a trusted peer, it will be exempt from reputation slashing for certain
454    /// reputation changes that can be attributed to network conditions. If the peer is a
455    /// trusted peer, it will also be less strict with the reputation slashing.
456    pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
457        let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
458            // First check if we should reset the reputation
459            if rep.is_reset() {
460                peer.reset_reputation()
461            } else {
462                let mut reputation_change = self.reputation_weights.change(rep).as_i32();
463                if peer.is_trusted() || peer.is_static() {
464                    // exempt trusted and static peers from reputation slashing for
465                    if matches!(
466                        rep,
467                        ReputationChangeKind::Dropped |
468                            ReputationChangeKind::BadAnnouncement |
469                            ReputationChangeKind::Timeout |
470                            ReputationChangeKind::AlreadySeenTransaction
471                    ) {
472                        return
473                    }
474
475                    // also be less strict with the reputation slashing for trusted peers
476                    if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
477                        // this caps the reputation change to the maximum allowed for trusted peers
478                        reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
479                    }
480                }
481                peer.apply_reputation(reputation_change)
482            }
483        } else {
484            return
485        };
486
487        match outcome {
488            ReputationChangeOutcome::None => {}
489            ReputationChangeOutcome::Ban => {
490                self.ban_peer(*peer_id);
491            }
492            ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
493            ReputationChangeOutcome::DisconnectAndBan => {
494                self.queued_actions.push_back(PeerAction::Disconnect {
495                    peer_id: *peer_id,
496                    reason: Some(DisconnectReason::DisconnectRequested),
497                });
498                self.ban_peer(*peer_id);
499            }
500        }
501    }
502
503    /// Gracefully disconnected a pending _outgoing_ session
504    pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
505        if let Some(peer) = self.peers.get_mut(peer_id) {
506            self.connection_info.decr_state(peer.state);
507            peer.state = PeerConnectionState::Idle;
508        }
509    }
510
511    /// Invoked when an _outgoing_ pending session was closed during authentication or the
512    /// handshake.
513    pub(crate) fn on_outgoing_pending_session_dropped(
514        &mut self,
515        remote_addr: &SocketAddr,
516        peer_id: &PeerId,
517        err: &PendingSessionHandshakeError,
518    ) {
519        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
520    }
521
522    /// Gracefully disconnected an active session
523    pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
524        match self.peers.entry(peer_id) {
525            Entry::Occupied(mut entry) => {
526                self.connection_info.decr_state(entry.get().state);
527
528                if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
529                    // this peer should be removed from the set
530                    entry.remove();
531                    self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
532                } else {
533                    // reset the peer's state
534                    // we reset the backoff counter since we're able to establish a successful
535                    // session to that peer
536                    entry.get_mut().severe_backoff_counter = 0;
537                    entry.get_mut().state = PeerConnectionState::Idle;
538                    return
539                }
540            }
541            Entry::Vacant(_) => return,
542        }
543
544        self.fill_outbound_slots();
545    }
546
547    /// Called when a _pending_ outbound connection is successful.
548    pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
549        if let Some(peer) = self.peers.get_mut(&peer_id) {
550            self.connection_info.decr_state(peer.state);
551            self.connection_info.inc_out();
552            peer.state = PeerConnectionState::Out;
553        }
554    }
555
556    /// Called when an _active_ session to a peer was forcefully dropped due to an error.
557    ///
558    /// Depending on whether the error is fatal, the peer will be removed from the peer set
559    /// otherwise its reputation is slashed.
560    pub(crate) fn on_active_session_dropped(
561        &mut self,
562        remote_addr: &SocketAddr,
563        peer_id: &PeerId,
564        err: &EthStreamError,
565    ) {
566        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
567    }
568
569    /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
570    /// connection.
571    pub(crate) fn on_outgoing_connection_failure(
572        &mut self,
573        remote_addr: &SocketAddr,
574        peer_id: &PeerId,
575        err: &io::Error,
576    ) {
577        // there's a race condition where we accepted an incoming connection while we were trying to
578        // connect to the same peer at the same time. if the outgoing connection failed
579        // after the incoming connection was accepted, we can ignore this error
580        if let Some(peer) = self.peers.get(peer_id) {
581            if peer.state.is_incoming() {
582                // we already have an active connection to the peer, so we can ignore this error
583                return
584            }
585        }
586
587        self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
588    }
589
590    fn on_connection_failure(
591        &mut self,
592        remote_addr: &SocketAddr,
593        peer_id: &PeerId,
594        err: impl SessionError,
595        reputation_change: ReputationChangeKind,
596    ) {
597        trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
598
599        if err.is_fatal_protocol_error() {
600            trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
601            // remove the peer to which we can't establish a connection due to protocol related
602            // issues.
603            if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
604                self.connection_info.decr_state(entry.get().state);
605                // only remove if the peer is not trusted
606                if entry.get().is_trusted() {
607                    entry.get_mut().state = PeerConnectionState::Idle;
608                } else {
609                    entry.remove();
610                    self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
611                    // If the error is caused by a peer that should be banned from discovery
612                    if err.merits_discovery_ban() {
613                        self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
614                            peer_id: *peer_id,
615                            ip_addr: remote_addr.ip(),
616                        })
617                    }
618                }
619            }
620
621            // ban the peer
622            self.ban_peer(*peer_id);
623        } else {
624            let mut backoff_until = None;
625            let mut remove_peer = false;
626
627            if let Some(peer) = self.peers.get_mut(peer_id) {
628                if let Some(kind) = err.should_backoff() {
629                    if peer.is_trusted() || peer.is_static() {
630                        // provide a bit more leeway for trusted peers and use a lower backoff so
631                        // that we keep re-trying them after backing off shortly
632                        let backoff = self.backoff_durations.low / 2;
633                        backoff_until = Some(std::time::Instant::now() + backoff);
634                    } else {
635                        // Increment peer.backoff_counter
636                        if kind.is_severe() {
637                            peer.severe_backoff_counter =
638                                peer.severe_backoff_counter.saturating_add(1);
639                        }
640
641                        let backoff_time =
642                            self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
643
644                        // The peer has signaled that it is currently unable to process any more
645                        // connections, so we will hold off on attempting any new connections for a
646                        // while
647                        backoff_until = Some(backoff_time);
648                    }
649                } else {
650                    // If the error was not a backoff error, we reduce the peer's reputation
651                    let reputation_change = self.reputation_weights.change(reputation_change);
652                    peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
653                };
654
655                self.connection_info.decr_state(peer.state);
656                peer.state = PeerConnectionState::Idle;
657
658                if peer.severe_backoff_counter > self.max_backoff_count &&
659                    !peer.is_trusted() &&
660                    !peer.is_static()
661                {
662                    // mark peer for removal if it has been backoff too many times and is _not_
663                    // trusted or static
664                    remove_peer = true;
665                }
666            }
667
668            // remove peer if it has been marked for removal
669            if remove_peer {
670                let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
671                self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
672            } else if let Some(backoff_until) = backoff_until {
673                // otherwise, backoff the peer if marked as such
674                self.backoff_peer_until(*peer_id, backoff_until);
675            }
676        }
677
678        self.fill_outbound_slots();
679    }
680
681    /// Invoked if a pending session was disconnected because there's already a connection to the
682    /// peer.
683    ///
684    /// If the session was an outgoing connection, this means that the peer initiated a connection
685    /// to us at the same time and this connection is already established.
686    pub(crate) const fn on_already_connected(&mut self, direction: Direction) {
687        match direction {
688            Direction::Incoming => {
689                // need to decrement the ingoing counter
690                self.connection_info.decr_pending_in();
691            }
692            Direction::Outgoing(_) => {
693                // cleanup is handled when the incoming active session is activated in
694                // `on_incoming_session_established`
695            }
696        }
697    }
698
699    /// Called as follow-up for a discovered peer.
700    ///
701    /// The [`ForkId`] is retrieved from an ENR record that the peer announces over the discovery
702    /// protocol
703    pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
704        if let Some(peer) = self.peers.get_mut(&peer_id) {
705            trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
706            peer.fork_id = Some(fork_id);
707        }
708    }
709
710    /// Called for a newly discovered peer.
711    ///
712    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
713    pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
714        self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
715    }
716
717    /// Marks the given peer as trusted.
718    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
719        self.trusted_peer_ids.insert(peer_id);
720    }
721
722    /// Called for a newly discovered trusted peer.
723    ///
724    /// If the peer already exists, then the address and kind will be updated.
725    #[cfg_attr(not(test), expect(dead_code))]
726    pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
727        self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
728    }
729
730    /// Called for a newly discovered peer.
731    ///
732    /// If the peer already exists, then the address, kind and `fork_id` will be updated.
733    pub(crate) fn add_peer_kind(
734        &mut self,
735        peer_id: PeerId,
736        kind: PeerKind,
737        addr: PeerAddr,
738        fork_id: Option<ForkId>,
739    ) {
740        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
741            return
742        }
743
744        match self.peers.entry(peer_id) {
745            Entry::Occupied(mut entry) => {
746                let peer = entry.get_mut();
747                peer.kind = kind;
748                peer.fork_id = fork_id;
749                peer.addr = addr;
750
751                if peer.state.is_incoming() {
752                    // now that we have an actual discovered address, for that peer and not just the
753                    // ip of the incoming connection, we don't need to remove the peer after
754                    // disconnecting, See `on_incoming_session_established`
755                    peer.remove_after_disconnect = false;
756                }
757            }
758            Entry::Vacant(entry) => {
759                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
760                let mut peer = Peer::with_kind(addr, kind);
761                peer.fork_id = fork_id;
762                entry.insert(peer);
763                self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
764            }
765        }
766
767        if kind.is_trusted() {
768            self.trusted_peer_ids.insert(peer_id);
769        }
770    }
771
772    /// Removes the tracked node from the set.
773    pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
774        let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
775        if entry.get().is_trusted() {
776            return
777        }
778        let mut peer = entry.remove();
779
780        trace!(target: "net::peers", ?peer_id, "remove discovered node");
781        self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
782
783        if peer.state.is_connected() {
784            trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
785            // we terminate the active session here, but only remove the peer after the session
786            // was disconnected, this prevents the case where the session is scheduled for
787            // disconnect but the node is immediately rediscovered, See also
788            // [`Self::on_disconnected()`]
789            peer.remove_after_disconnect = true;
790            peer.state.disconnect();
791            self.peers.insert(peer_id, peer);
792            self.queued_actions.push_back(PeerAction::Disconnect {
793                peer_id,
794                reason: Some(DisconnectReason::DisconnectRequested),
795            })
796        }
797    }
798
799    /// Connect to the given peer. NOTE: if the maximum number out outbound sessions is reached,
800    /// this won't do anything. See `reth_network::SessionManager::dial_outbound`.
801    #[cfg_attr(not(test), expect(dead_code))]
802    pub(crate) fn add_and_connect(
803        &mut self,
804        peer_id: PeerId,
805        addr: PeerAddr,
806        fork_id: Option<ForkId>,
807    ) {
808        self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
809    }
810
811    /// Connects a peer and its address with the given kind.
812    ///
813    /// Note: This is invoked on demand via an external command received by the manager
814    pub(crate) fn add_and_connect_kind(
815        &mut self,
816        peer_id: PeerId,
817        kind: PeerKind,
818        addr: PeerAddr,
819        fork_id: Option<ForkId>,
820    ) {
821        if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
822            return
823        }
824
825        match self.peers.entry(peer_id) {
826            Entry::Occupied(mut entry) => {
827                let peer = entry.get_mut();
828                peer.kind = kind;
829                peer.fork_id = fork_id;
830                peer.addr = addr;
831
832                if peer.state == PeerConnectionState::Idle {
833                    // Try connecting again.
834                    peer.state = PeerConnectionState::PendingOut;
835                    self.connection_info.inc_pending_out();
836                    self.queued_actions
837                        .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
838                }
839            }
840            Entry::Vacant(entry) => {
841                trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
842                let mut peer = Peer::with_kind(addr, kind);
843                peer.state = PeerConnectionState::PendingOut;
844                peer.fork_id = fork_id;
845                entry.insert(peer);
846                self.connection_info.inc_pending_out();
847                self.queued_actions
848                    .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
849            }
850        }
851
852        if kind.is_trusted() {
853            self.trusted_peer_ids.insert(peer_id);
854        }
855    }
856
857    /// Removes the tracked node from the trusted set.
858    pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
859        let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
860        if !entry.get().is_trusted() {
861            return
862        }
863
864        let peer = entry.get_mut();
865        peer.kind = PeerKind::Basic;
866
867        self.trusted_peer_ids.remove(&peer_id);
868    }
869
870    /// Returns the idle peer with the highest reputation.
871    ///
872    /// Peers that are `trusted` or `static`, see [`PeerKind`], are prioritized as long as they're
873    /// not currently marked as banned or backed off.
874    ///
875    /// If `trusted_nodes_only` is enabled, see [`PeersConfig`], then this will only consider
876    /// `trusted` peers.
877    ///
878    /// Returns `None` if no peer is available.
879    fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
880        let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
881            !peer.is_backed_off() &&
882                !peer.is_banned() &&
883                peer.state.is_unconnected() &&
884                (!self.trusted_nodes_only || peer.is_trusted())
885        });
886
887        // keep track of the best peer, if there's one
888        let mut best_peer = unconnected.next()?;
889
890        if best_peer.1.is_trusted() || best_peer.1.is_static() {
891            return Some((*best_peer.0, best_peer.1))
892        }
893
894        for maybe_better in unconnected {
895            // if the peer is trusted or static, return it immediately
896            if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
897                return Some((*maybe_better.0, maybe_better.1))
898            }
899
900            // otherwise we keep track of the best peer using the reputation
901            if maybe_better.1.reputation > best_peer.1.reputation {
902                best_peer = maybe_better;
903            }
904        }
905        Some((*best_peer.0, best_peer.1))
906    }
907
908    /// If there's capacity for new outbound connections, this will queue new
909    /// [`PeerAction::Connect`] actions.
910    ///
911    /// New connections are only initiated, if slots are available and appropriate peers are
912    /// available.
913    fn fill_outbound_slots(&mut self) {
914        self.tick();
915
916        if !self.net_connection_state.is_active() {
917            // nothing to fill
918            return
919        }
920
921        // as long as there are slots available fill them with the best peers
922        while self.connection_info.has_out_capacity() {
923            let action = {
924                let (peer_id, peer) = match self.best_unconnected() {
925                    Some(peer) => peer,
926                    _ => break,
927                };
928
929                trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
930
931                peer.state = PeerConnectionState::PendingOut;
932                PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
933            };
934
935            self.connection_info.inc_pending_out();
936
937            self.queued_actions.push_back(action);
938        }
939    }
940
941    fn on_resolved_peer(&mut self, peer_id: PeerId, new_record: NodeRecord) {
942        if let Some(peer) = self.peers.get_mut(&peer_id) {
943            let new_addr = PeerAddr::new_with_ports(
944                new_record.address,
945                new_record.tcp_port,
946                Some(new_record.udp_port),
947            );
948
949            if peer.addr != new_addr {
950                peer.addr = new_addr;
951                trace!(target: "net::peers", ?peer_id, addre=?peer.addr, "Updated resolved trusted peer address");
952            }
953        }
954    }
955
956    /// Keeps track of network state changes.
957    pub const fn on_network_state_change(&mut self, state: NetworkConnectionState) {
958        self.net_connection_state = state;
959    }
960
961    /// Returns the current network connection state.
962    pub const fn connection_state(&self) -> &NetworkConnectionState {
963        &self.net_connection_state
964    }
965
966    /// Sets `net_connection_state` to `ShuttingDown`.
967    pub const fn on_shutdown(&mut self) {
968        self.net_connection_state = NetworkConnectionState::ShuttingDown;
969    }
970
971    /// Advances the state.
972    ///
973    /// Event hooks invoked externally may trigger a new [`PeerAction`] that are buffered until
974    /// [`PeersManager`] is polled.
975    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
976        loop {
977            // drain buffered actions
978            if let Some(action) = self.queued_actions.pop_front() {
979                return Poll::Ready(action)
980            }
981
982            while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
983                match cmd {
984                    PeerCommand::Add(peer_id, addr) => {
985                        self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
986                    }
987                    PeerCommand::Remove(peer) => self.remove_peer(peer),
988                    PeerCommand::ReputationChange(peer_id, rep) => {
989                        self.apply_reputation_change(&peer_id, rep)
990                    }
991                    PeerCommand::GetPeer(peer, tx) => {
992                        let _ = tx.send(self.peers.get(&peer).cloned());
993                    }
994                    PeerCommand::GetPeers(tx) => {
995                        let _ = tx.send(self.iter_peers().collect());
996                    }
997                }
998            }
999
1000            if self.release_interval.poll_tick(cx).is_ready() {
1001                let now = std::time::Instant::now();
1002                let (_, unbanned_peers) = self.ban_list.evict(now);
1003
1004                for peer_id in unbanned_peers {
1005                    if let Some(peer) = self.peers.get_mut(&peer_id) {
1006                        peer.unban();
1007                        self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
1008                    }
1009                }
1010
1011                // clear the backoff list of expired backoffs, and mark the relevant peers as
1012                // ready to be dialed
1013                self.backed_off_peers.retain(|peer_id, until| {
1014                    if now > *until {
1015                        if let Some(peer) = self.peers.get_mut(peer_id) {
1016                            peer.backed_off = false;
1017                        }
1018                        return false
1019                    }
1020                    true
1021                })
1022            }
1023
1024            while self.refill_slots_interval.poll_tick(cx).is_ready() {
1025                self.fill_outbound_slots();
1026            }
1027
1028            if let Poll::Ready((peer_id, new_record)) = self.trusted_peers_resolver.poll(cx) {
1029                self.on_resolved_peer(peer_id, new_record);
1030            }
1031
1032            if self.queued_actions.is_empty() {
1033                return Poll::Pending
1034            }
1035        }
1036    }
1037}
1038
1039impl Default for PeersManager {
1040    fn default() -> Self {
1041        Self::new(Default::default())
1042    }
1043}
1044
1045/// Tracks stats about connected nodes
1046#[derive(Debug, Clone, PartialEq, Eq, Default)]
1047pub struct ConnectionInfo {
1048    /// Counter for currently occupied slots for active outbound connections.
1049    num_outbound: usize,
1050    /// Counter for pending outbound connections.
1051    num_pending_out: usize,
1052    /// Counter for currently occupied slots for active inbound connections.
1053    num_inbound: usize,
1054    /// Counter for pending inbound connections.
1055    num_pending_in: usize,
1056    /// Restrictions on number of connections.
1057    config: ConnectionsConfig,
1058}
1059
1060// === impl ConnectionInfo ===
1061
1062impl ConnectionInfo {
1063    /// Returns a new [`ConnectionInfo`] with the given config.
1064    const fn new(config: ConnectionsConfig) -> Self {
1065        Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
1066    }
1067
1068    ///  Returns `true` if there's still capacity to perform an outgoing connection.
1069    const fn has_out_capacity(&self) -> bool {
1070        self.num_pending_out < self.config.max_concurrent_outbound_dials &&
1071            self.num_outbound < self.config.max_outbound
1072    }
1073
1074    ///  Returns `true` if there's still capacity to accept a new incoming connection.
1075    const fn has_in_capacity(&self) -> bool {
1076        self.num_inbound < self.config.max_inbound
1077    }
1078
1079    /// Returns `true` if we can handle an additional incoming pending connection.
1080    const fn has_in_pending_capacity(&self) -> bool {
1081        self.num_pending_in < self.config.max_inbound
1082    }
1083
1084    const fn decr_state(&mut self, state: PeerConnectionState) {
1085        match state {
1086            PeerConnectionState::Idle => {}
1087            PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
1088            PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
1089            PeerConnectionState::PendingOut => self.decr_pending_out(),
1090        }
1091    }
1092
1093    const fn decr_out(&mut self) {
1094        self.num_outbound -= 1;
1095    }
1096
1097    const fn inc_out(&mut self) {
1098        self.num_outbound += 1;
1099    }
1100
1101    const fn inc_pending_out(&mut self) {
1102        self.num_pending_out += 1;
1103    }
1104
1105    const fn inc_in(&mut self) {
1106        self.num_inbound += 1;
1107    }
1108
1109    const fn inc_pending_in(&mut self) {
1110        self.num_pending_in += 1;
1111    }
1112
1113    const fn decr_in(&mut self) {
1114        self.num_inbound -= 1;
1115    }
1116
1117    const fn decr_pending_out(&mut self) {
1118        self.num_pending_out -= 1;
1119    }
1120
1121    const fn decr_pending_in(&mut self) {
1122        self.num_pending_in -= 1;
1123    }
1124}
1125
1126/// Actions the peer manager can trigger.
1127#[derive(Debug)]
1128pub enum PeerAction {
1129    /// Start a new connection to a peer.
1130    Connect {
1131        /// The peer to connect to.
1132        peer_id: PeerId,
1133        /// Where to reach the node
1134        remote_addr: SocketAddr,
1135    },
1136    /// Disconnect an existing connection.
1137    Disconnect {
1138        /// The peer ID of the established connection.
1139        peer_id: PeerId,
1140        /// An optional reason for the disconnect.
1141        reason: Option<DisconnectReason>,
1142    },
1143    /// Disconnect an existing incoming connection, because the peers reputation is below the
1144    /// banned threshold or is on the [`BanList`]
1145    DisconnectBannedIncoming {
1146        /// The peer ID of the established connection.
1147        peer_id: PeerId,
1148    },
1149    /// Disconnect an untrusted incoming connection when trust-node-only is enabled.
1150    DisconnectUntrustedIncoming {
1151        /// The peer ID.
1152        peer_id: PeerId,
1153    },
1154    /// Ban the peer in discovery.
1155    DiscoveryBanPeerId {
1156        /// The peer ID.
1157        peer_id: PeerId,
1158        /// The IP address.
1159        ip_addr: IpAddr,
1160    },
1161    /// Ban the IP in discovery.
1162    DiscoveryBanIp {
1163        /// The IP address.
1164        ip_addr: IpAddr,
1165    },
1166    /// Ban the peer temporarily
1167    BanPeer {
1168        /// The peer ID.
1169        peer_id: PeerId,
1170    },
1171    /// Unban the peer temporarily
1172    UnBanPeer {
1173        /// The peer ID.
1174        peer_id: PeerId,
1175    },
1176    /// Emit peerAdded event
1177    PeerAdded(PeerId),
1178    /// Emit peerRemoved event
1179    PeerRemoved(PeerId),
1180}
1181
1182/// Error thrown when a incoming connection is rejected right away
1183#[derive(Debug, Error, PartialEq, Eq)]
1184pub enum InboundConnectionError {
1185    /// The remote's ip address is banned
1186    IpBanned,
1187    /// No capacity for new inbound connections
1188    ExceedsCapacity,
1189}
1190
1191impl Display for InboundConnectionError {
1192    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1193        write!(f, "{self:?}")
1194    }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199    use alloy_primitives::B512;
1200    use reth_eth_wire::{
1201        errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
1202        DisconnectReason,
1203    };
1204    use reth_net_banlist::BanList;
1205    use reth_network_api::Direction;
1206    use reth_network_peers::{PeerId, TrustedPeer};
1207    use reth_network_types::{
1208        peers::reputation::DEFAULT_REPUTATION, BackoffKind, Peer, ReputationChangeKind,
1209    };
1210    use std::{
1211        future::{poll_fn, Future},
1212        io,
1213        net::{IpAddr, Ipv4Addr, SocketAddr},
1214        pin::Pin,
1215        task::{Context, Poll},
1216        time::Duration,
1217    };
1218    use url::Host;
1219
1220    use super::PeersManager;
1221    use crate::{
1222        error::SessionError,
1223        peers::{
1224            ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
1225            PeerConnectionState,
1226        },
1227        session::PendingSessionHandshakeError,
1228        PeersConfig,
1229    };
1230
1231    struct PeerActionFuture<'a> {
1232        peers: &'a mut PeersManager,
1233    }
1234
1235    impl Future for PeerActionFuture<'_> {
1236        type Output = PeerAction;
1237
1238        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1239            self.get_mut().peers.poll(cx)
1240        }
1241    }
1242
1243    macro_rules! event {
1244        ($peers:expr) => {
1245            PeerActionFuture { peers: &mut $peers }.await
1246        };
1247    }
1248
1249    #[tokio::test]
1250    async fn test_insert() {
1251        let peer = PeerId::random();
1252        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1253        let mut peers = PeersManager::default();
1254        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1255
1256        match event!(peers) {
1257            PeerAction::PeerAdded(peer_id) => {
1258                assert_eq!(peer_id, peer);
1259            }
1260            _ => unreachable!(),
1261        }
1262        match event!(peers) {
1263            PeerAction::Connect { peer_id, remote_addr } => {
1264                assert_eq!(peer_id, peer);
1265                assert_eq!(remote_addr, socket_addr);
1266            }
1267            _ => unreachable!(),
1268        }
1269
1270        let (record, _) = peers.peer_by_id(peer).unwrap();
1271        assert_eq!(record.tcp_addr(), socket_addr);
1272        assert_eq!(record.udp_addr(), socket_addr);
1273    }
1274
1275    #[tokio::test]
1276    async fn test_insert_udp() {
1277        let peer = PeerId::random();
1278        let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1279        let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
1280        let mut peers = PeersManager::default();
1281        peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
1282
1283        match event!(peers) {
1284            PeerAction::PeerAdded(peer_id) => {
1285                assert_eq!(peer_id, peer);
1286            }
1287            _ => unreachable!(),
1288        }
1289        match event!(peers) {
1290            PeerAction::Connect { peer_id, remote_addr } => {
1291                assert_eq!(peer_id, peer);
1292                assert_eq!(remote_addr, tcp_addr);
1293            }
1294            _ => unreachable!(),
1295        }
1296
1297        let (record, _) = peers.peer_by_id(peer).unwrap();
1298        assert_eq!(record.tcp_addr(), tcp_addr);
1299        assert_eq!(record.udp_addr(), udp_addr);
1300    }
1301
1302    #[tokio::test]
1303    async fn test_ban() {
1304        let peer = PeerId::random();
1305        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1306        let mut peers = PeersManager::default();
1307        peers.ban_peer(peer);
1308        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1309
1310        match event!(peers) {
1311            PeerAction::BanPeer { peer_id } => {
1312                assert_eq!(peer_id, peer);
1313            }
1314            _ => unreachable!(),
1315        }
1316
1317        poll_fn(|cx| {
1318            assert!(peers.poll(cx).is_pending());
1319            Poll::Ready(())
1320        })
1321        .await;
1322    }
1323
1324    #[tokio::test]
1325    async fn test_unban() {
1326        let peer = PeerId::random();
1327        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1328        let mut peers = PeersManager::default();
1329        peers.ban_peer(peer);
1330        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1331
1332        match event!(peers) {
1333            PeerAction::BanPeer { peer_id } => {
1334                assert_eq!(peer_id, peer);
1335            }
1336            _ => unreachable!(),
1337        }
1338
1339        poll_fn(|cx| {
1340            assert!(peers.poll(cx).is_pending());
1341            Poll::Ready(())
1342        })
1343        .await;
1344
1345        peers.unban_peer(peer);
1346
1347        match event!(peers) {
1348            PeerAction::UnBanPeer { peer_id } => {
1349                assert_eq!(peer_id, peer);
1350            }
1351            _ => unreachable!(),
1352        }
1353
1354        poll_fn(|cx| {
1355            assert!(peers.poll(cx).is_pending());
1356            Poll::Ready(())
1357        })
1358        .await;
1359    }
1360
1361    #[tokio::test]
1362    async fn test_backoff_on_busy() {
1363        let peer = PeerId::random();
1364        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1365
1366        let mut peers = PeersManager::new(PeersConfig::test());
1367        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1368
1369        match event!(peers) {
1370            PeerAction::PeerAdded(peer_id) => {
1371                assert_eq!(peer_id, peer);
1372            }
1373            _ => unreachable!(),
1374        }
1375        match event!(peers) {
1376            PeerAction::Connect { peer_id, .. } => {
1377                assert_eq!(peer_id, peer);
1378            }
1379            _ => unreachable!(),
1380        }
1381
1382        poll_fn(|cx| {
1383            assert!(peers.poll(cx).is_pending());
1384            Poll::Ready(())
1385        })
1386        .await;
1387
1388        peers.on_active_session_dropped(
1389            &socket_addr,
1390            &peer,
1391            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1392                DisconnectReason::TooManyPeers,
1393            )),
1394        );
1395
1396        poll_fn(|cx| {
1397            assert!(peers.poll(cx).is_pending());
1398            Poll::Ready(())
1399        })
1400        .await;
1401
1402        assert!(peers.backed_off_peers.contains_key(&peer));
1403        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1404
1405        tokio::time::sleep(peers.backoff_durations.low).await;
1406
1407        match event!(peers) {
1408            PeerAction::Connect { peer_id, .. } => {
1409                assert_eq!(peer_id, peer);
1410            }
1411            _ => unreachable!(),
1412        }
1413
1414        assert!(!peers.backed_off_peers.contains_key(&peer));
1415        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1416    }
1417
1418    #[tokio::test]
1419    async fn test_backoff_on_no_response() {
1420        let peer = PeerId::random();
1421        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1422
1423        let backoff_durations = PeerBackoffDurations::test();
1424        let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
1425        let mut peers = PeersManager::new(config);
1426        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1427
1428        match event!(peers) {
1429            PeerAction::PeerAdded(peer_id) => {
1430                assert_eq!(peer_id, peer);
1431            }
1432            _ => unreachable!(),
1433        }
1434        match event!(peers) {
1435            PeerAction::Connect { peer_id, .. } => {
1436                assert_eq!(peer_id, peer);
1437            }
1438            _ => unreachable!(),
1439        }
1440
1441        poll_fn(|cx| {
1442            assert!(peers.poll(cx).is_pending());
1443            Poll::Ready(())
1444        })
1445        .await;
1446
1447        peers.on_outgoing_pending_session_dropped(
1448            &socket_addr,
1449            &peer,
1450            &PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
1451                EthHandshakeError::NoResponse,
1452            )),
1453        );
1454
1455        poll_fn(|cx| {
1456            assert!(peers.poll(cx).is_pending());
1457            Poll::Ready(())
1458        })
1459        .await;
1460
1461        assert!(peers.backed_off_peers.contains_key(&peer));
1462        assert!(peers.peers.get(&peer).unwrap().is_backed_off());
1463
1464        tokio::time::sleep(backoff_durations.high).await;
1465
1466        match event!(peers) {
1467            PeerAction::Connect { peer_id, .. } => {
1468                assert_eq!(peer_id, peer);
1469            }
1470            _ => unreachable!(),
1471        }
1472
1473        assert!(!peers.backed_off_peers.contains_key(&peer));
1474        assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
1475    }
1476
1477    #[tokio::test]
1478    async fn test_low_backoff() {
1479        let peer = PeerId::random();
1480        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1481        let config = PeersConfig::test();
1482        let mut peers = PeersManager::new(config);
1483        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1484        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1485
1486        let backoff_timestamp = peers
1487            .backoff_durations
1488            .backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
1489
1490        let expected = std::time::Instant::now() + peers.backoff_durations.low;
1491        assert!(backoff_timestamp <= expected);
1492    }
1493
1494    #[tokio::test]
1495    async fn test_multiple_backoff_calculations() {
1496        let peer = PeerId::random();
1497        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1498        let config = PeersConfig::default();
1499        let mut peers = PeersManager::new(config);
1500        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1501        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1502
1503        // Simulate a peer that was already backed off once
1504        peer_struct.severe_backoff_counter = 1;
1505
1506        let now = std::time::Instant::now();
1507
1508        // Simulate the increment that happens in on_connection_failure
1509        peer_struct.severe_backoff_counter += 1;
1510        // Get official backoff time
1511        let backoff_time = peers
1512            .backoff_durations
1513            .backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
1514
1515        // Duration of the backoff should be 2 * 15 minutes = 30 minutes
1516        let backoff_duration = std::time::Duration::new(30 * 60, 0);
1517
1518        // We can't use assert_eq! since there is a very small diff in the nano secs
1519        // Usually it is 1800s != 1799.9999996s
1520        assert!(backoff_time.duration_since(now) > backoff_duration);
1521    }
1522
1523    #[tokio::test]
1524    async fn test_ban_on_active_drop() {
1525        let peer = PeerId::random();
1526        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1527        let mut peers = PeersManager::default();
1528        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1529
1530        match event!(peers) {
1531            PeerAction::PeerAdded(peer_id) => {
1532                assert_eq!(peer_id, peer);
1533            }
1534            _ => unreachable!(),
1535        }
1536        match event!(peers) {
1537            PeerAction::Connect { peer_id, .. } => {
1538                assert_eq!(peer_id, peer);
1539            }
1540            _ => unreachable!(),
1541        }
1542
1543        poll_fn(|cx| {
1544            assert!(peers.poll(cx).is_pending());
1545            Poll::Ready(())
1546        })
1547        .await;
1548
1549        peers.on_active_session_dropped(
1550            &socket_addr,
1551            &peer,
1552            &EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
1553                DisconnectReason::UselessPeer,
1554            )),
1555        );
1556
1557        match event!(peers) {
1558            PeerAction::PeerRemoved(peer_id) => {
1559                assert_eq!(peer_id, peer);
1560            }
1561            _ => unreachable!(),
1562        }
1563        match event!(peers) {
1564            PeerAction::BanPeer { peer_id } => {
1565                assert_eq!(peer_id, peer);
1566            }
1567            _ => unreachable!(),
1568        }
1569
1570        poll_fn(|cx| {
1571            assert!(peers.poll(cx).is_pending());
1572            Poll::Ready(())
1573        })
1574        .await;
1575
1576        assert!(!peers.peers.contains_key(&peer));
1577    }
1578
1579    #[tokio::test]
1580    async fn test_remove_on_max_backoff_count() {
1581        let peer = PeerId::random();
1582        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1583        let config = PeersConfig::test();
1584        let mut peers = PeersManager::new(config.clone());
1585        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1586        let peer_struct = peers.peers.get_mut(&peer).unwrap();
1587
1588        // Simulate a peer that was already backed off once
1589        peer_struct.severe_backoff_counter = config.max_backoff_count;
1590
1591        match event!(peers) {
1592            PeerAction::PeerAdded(peer_id) => {
1593                assert_eq!(peer_id, peer);
1594            }
1595            _ => unreachable!(),
1596        }
1597        match event!(peers) {
1598            PeerAction::Connect { peer_id, .. } => {
1599                assert_eq!(peer_id, peer);
1600            }
1601            _ => unreachable!(),
1602        }
1603
1604        poll_fn(|cx| {
1605            assert!(peers.poll(cx).is_pending());
1606            Poll::Ready(())
1607        })
1608        .await;
1609
1610        peers.on_outgoing_pending_session_dropped(
1611            &socket_addr,
1612            &peer,
1613            &PendingSessionHandshakeError::Eth(
1614                io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
1615            ),
1616        );
1617
1618        match event!(peers) {
1619            PeerAction::PeerRemoved(peer_id) => {
1620                assert_eq!(peer_id, peer);
1621            }
1622            _ => unreachable!(),
1623        }
1624
1625        poll_fn(|cx| {
1626            assert!(peers.poll(cx).is_pending());
1627            Poll::Ready(())
1628        })
1629        .await;
1630
1631        assert!(!peers.peers.contains_key(&peer));
1632    }
1633
1634    #[tokio::test]
1635    async fn test_ban_on_pending_drop() {
1636        let peer = PeerId::random();
1637        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1638        let mut peers = PeersManager::default();
1639        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1640
1641        match event!(peers) {
1642            PeerAction::PeerAdded(peer_id) => {
1643                assert_eq!(peer_id, peer);
1644            }
1645            _ => unreachable!(),
1646        }
1647        match event!(peers) {
1648            PeerAction::Connect { peer_id, .. } => {
1649                assert_eq!(peer_id, peer);
1650            }
1651            _ => unreachable!(),
1652        }
1653
1654        poll_fn(|cx| {
1655            assert!(peers.poll(cx).is_pending());
1656            Poll::Ready(())
1657        })
1658        .await;
1659
1660        peers.on_outgoing_pending_session_dropped(
1661            &socket_addr,
1662            &peer,
1663            &PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1664                P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
1665            )),
1666        );
1667
1668        match event!(peers) {
1669            PeerAction::PeerRemoved(peer_id) => {
1670                assert_eq!(peer_id, peer);
1671            }
1672            _ => unreachable!(),
1673        }
1674        match event!(peers) {
1675            PeerAction::BanPeer { peer_id } => {
1676                assert_eq!(peer_id, peer);
1677            }
1678            _ => unreachable!(),
1679        }
1680
1681        poll_fn(|cx| {
1682            assert!(peers.poll(cx).is_pending());
1683            Poll::Ready(())
1684        })
1685        .await;
1686
1687        assert!(!peers.peers.contains_key(&peer));
1688    }
1689
1690    #[tokio::test]
1691    async fn test_internally_closed_incoming() {
1692        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1693        let mut peers = PeersManager::default();
1694
1695        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1696        assert_eq!(peers.connection_info.num_pending_in, 1);
1697        peers.on_incoming_pending_session_rejected_internally();
1698        assert_eq!(peers.connection_info.num_pending_in, 0);
1699    }
1700
1701    #[tokio::test]
1702    async fn test_reject_incoming_at_pending_capacity() {
1703        let mut peers = PeersManager::default();
1704
1705        for count in 1..=peers.connection_info.config.max_inbound {
1706            let socket_addr =
1707                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
1708            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1709            assert_eq!(peers.connection_info.num_pending_in, count);
1710        }
1711        assert!(peers.connection_info.has_in_capacity());
1712        assert!(!peers.connection_info.has_in_pending_capacity());
1713
1714        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1715        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1716    }
1717
1718    #[tokio::test]
1719    async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
1720        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1721        let trusted = PeerId::random();
1722        peers.add_trusted_peer_id(trusted);
1723
1724        // connect the trusted peer
1725        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
1726        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1727        peers.on_incoming_session_established(trusted, addr);
1728
1729        match event!(peers) {
1730            PeerAction::PeerAdded(id) => {
1731                assert_eq!(id, trusted);
1732            }
1733            _ => unreachable!(),
1734        }
1735
1736        // saturate the remaining inbound slots with untrusted peers
1737        let mut connected_untrusted_peer_ids = Vec::new();
1738        for i in 0..(peers.connection_info.config.max_inbound - 1) {
1739            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
1740            assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
1741            let peer_id = PeerId::random();
1742            peers.on_incoming_session_established(peer_id, addr);
1743            connected_untrusted_peer_ids.push(peer_id);
1744
1745            match event!(peers) {
1746                PeerAction::PeerAdded(id) => {
1747                    assert_eq!(id, peer_id);
1748                }
1749                _ => unreachable!(),
1750            }
1751        }
1752
1753        let mut pending_addrs = Vec::new();
1754
1755        // saturate available slots
1756        for i in 0..2 {
1757            let socket_addr =
1758                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
1759            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1760
1761            pending_addrs.push(socket_addr);
1762        }
1763
1764        assert_eq!(peers.connection_info.num_pending_in, 2);
1765
1766        // try to handle additional incoming connections at capacity
1767        for i in 0..2 {
1768            let socket_addr =
1769                SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
1770            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1771        }
1772
1773        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1774            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1775                DisconnectReason::UselessPeer,
1776            )),
1777        ));
1778
1779        // Remove all pending peers
1780        for pending_addr in pending_addrs {
1781            peers.on_incoming_pending_session_dropped(pending_addr, &err);
1782        }
1783
1784        println!("num_pending_in: {}", peers.connection_info.num_pending_in);
1785
1786        println!(
1787            "num_inbound: {}, has_in_capacity: {}",
1788            peers.connection_info.num_inbound,
1789            peers.connection_info.has_in_capacity()
1790        );
1791
1792        // disconnect a connected peer
1793        peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);
1794
1795        println!(
1796            "num_inbound: {}, has_in_capacity: {}",
1797            peers.connection_info.num_inbound,
1798            peers.connection_info.has_in_capacity()
1799        );
1800
1801        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1802        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1803    }
1804
1805    #[tokio::test]
1806    async fn test_closed_incoming() {
1807        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1808        let mut peers = PeersManager::default();
1809
1810        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1811        assert_eq!(peers.connection_info.num_pending_in, 1);
1812        peers.on_incoming_pending_session_gracefully_closed();
1813        assert_eq!(peers.connection_info.num_pending_in, 0);
1814    }
1815
1816    #[tokio::test]
1817    async fn test_dropped_incoming() {
1818        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
1819        let ban_duration = Duration::from_millis(500);
1820        let config = PeersConfig { ban_duration, ..PeersConfig::test() };
1821        let mut peers = PeersManager::new(config);
1822
1823        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1824        assert_eq!(peers.connection_info.num_pending_in, 1);
1825        let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
1826            P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
1827                DisconnectReason::UselessPeer,
1828            )),
1829        ));
1830
1831        peers.on_incoming_pending_session_dropped(socket_addr, &err);
1832        assert_eq!(peers.connection_info.num_pending_in, 0);
1833        assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
1834
1835        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
1836
1837        // unbanned after timeout
1838        tokio::time::sleep(ban_duration).await;
1839
1840        poll_fn(|cx| {
1841            let _ = peers.poll(cx);
1842            Poll::Ready(())
1843        })
1844        .await;
1845
1846        assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
1847        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1848    }
1849
1850    #[tokio::test]
1851    async fn test_reputation_change_connected() {
1852        let peer = PeerId::random();
1853        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1854        let mut peers = PeersManager::default();
1855        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
1856
1857        match event!(peers) {
1858            PeerAction::PeerAdded(peer_id) => {
1859                assert_eq!(peer_id, peer);
1860            }
1861            _ => unreachable!(),
1862        }
1863        match event!(peers) {
1864            PeerAction::Connect { peer_id, remote_addr } => {
1865                assert_eq!(peer_id, peer);
1866                assert_eq!(remote_addr, socket_addr);
1867            }
1868            _ => unreachable!(),
1869        }
1870
1871        let p = peers.peers.get_mut(&peer).unwrap();
1872        assert_eq!(p.state, PeerConnectionState::PendingOut);
1873
1874        peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
1875
1876        let p = peers.peers.get(&peer).unwrap();
1877        assert_eq!(p.state, PeerConnectionState::PendingOut);
1878        assert!(p.is_banned());
1879
1880        peers.on_active_session_gracefully_closed(peer);
1881
1882        let p = peers.peers.get(&peer).unwrap();
1883        assert_eq!(p.state, PeerConnectionState::Idle);
1884        assert!(p.is_banned());
1885
1886        match event!(peers) {
1887            PeerAction::Disconnect { peer_id, .. } => {
1888                assert_eq!(peer_id, peer);
1889            }
1890            _ => unreachable!(),
1891        }
1892    }
1893
1894    #[tokio::test]
1895    async fn accept_incoming_trusted_unknown_peer_address() {
1896        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1897        let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
1898        // try to connect trusted peer
1899        let trusted = PeerId::random();
1900        peers.add_trusted_peer_id(trusted);
1901
1902        // saturate the inbound slots
1903        for i in 0..peers.connection_info.config.max_inbound {
1904            let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
1905            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1906            let peer_id = PeerId::random();
1907            peers.on_incoming_session_established(peer_id, addr);
1908
1909            match event!(peers) {
1910                PeerAction::PeerAdded(id) => {
1911                    assert_eq!(id, peer_id);
1912                }
1913                _ => unreachable!(),
1914            }
1915        }
1916
1917        // try to connect untrusted peer
1918        let untrusted = PeerId::random();
1919        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
1920        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1921        peers.on_incoming_session_established(untrusted, socket_addr);
1922
1923        match event!(peers) {
1924            PeerAction::PeerAdded(id) => {
1925                assert_eq!(id, untrusted);
1926            }
1927            _ => unreachable!(),
1928        }
1929
1930        match event!(peers) {
1931            PeerAction::Disconnect { peer_id, reason } => {
1932                assert_eq!(peer_id, untrusted);
1933                assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
1934            }
1935            _ => unreachable!(),
1936        }
1937
1938        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
1939        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1940        peers.on_incoming_session_established(trusted, socket_addr);
1941
1942        match event!(peers) {
1943            PeerAction::PeerAdded(id) => {
1944                assert_eq!(id, trusted);
1945            }
1946            _ => unreachable!(),
1947        }
1948
1949        poll_fn(|cx| {
1950            assert!(peers.poll(cx).is_pending());
1951            Poll::Ready(())
1952        })
1953        .await;
1954
1955        let peer = peers.peers.get(&trusted).unwrap();
1956        assert_eq!(peer.state, PeerConnectionState::In);
1957    }
1958
1959    #[tokio::test]
1960    async fn test_already_connected() {
1961        let peer = PeerId::random();
1962        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1963        let mut peers = PeersManager::default();
1964
1965        // Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
1966        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1967        assert_eq!(peers.connection_info.num_pending_in, 1);
1968
1969        // Establish a session with the peer, expecting the peer to be added and the `num_inbound`
1970        // to increase by 1
1971        peers.on_incoming_session_established(peer, socket_addr);
1972        let p = peers.peers.get_mut(&peer).expect("peer not found");
1973        assert_eq!(p.addr.tcp(), socket_addr);
1974        assert_eq!(peers.connection_info.num_pending_in, 0);
1975        assert_eq!(peers.connection_info.num_inbound, 1);
1976
1977        // Attempt to establish another incoming session, expecting the `num_pending_in` to increase
1978        // by 1
1979        assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
1980        assert_eq!(peers.connection_info.num_pending_in, 1);
1981
1982        // Simulate a rejection due to an already established connection, expecting the
1983        // `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
1984        // should not be changed.
1985        peers.on_already_connected(Direction::Incoming);
1986
1987        let p = peers.peers.get_mut(&peer).expect("peer not found");
1988        assert_eq!(p.addr.tcp(), socket_addr);
1989        assert_eq!(peers.connection_info.num_pending_in, 0);
1990        assert_eq!(peers.connection_info.num_inbound, 1);
1991    }
1992
1993    #[tokio::test]
1994    async fn test_reputation_change_trusted_peer() {
1995        let peer = PeerId::random();
1996        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
1997        let mut peers = PeersManager::default();
1998        peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
1999
2000        match event!(peers) {
2001            PeerAction::PeerAdded(peer_id) => {
2002                assert_eq!(peer_id, peer);
2003            }
2004            _ => unreachable!(),
2005        }
2006        match event!(peers) {
2007            PeerAction::Connect { peer_id, remote_addr } => {
2008                assert_eq!(peer_id, peer);
2009                assert_eq!(remote_addr, socket_addr);
2010            }
2011            _ => unreachable!(),
2012        }
2013
2014        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
2015        peers.on_active_outgoing_established(peer);
2016        assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
2017
2018        peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2019
2020        {
2021            let p = peers.peers.get(&peer).unwrap();
2022            assert_eq!(p.state, PeerConnectionState::Out);
2023            // not banned yet
2024            assert!(!p.is_banned());
2025        }
2026
2027        // ensure peer is banned eventually
2028        loop {
2029            peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
2030
2031            let p = peers.peers.get(&peer).unwrap();
2032            if p.is_banned() {
2033                break
2034            }
2035        }
2036
2037        match event!(peers) {
2038            PeerAction::Disconnect { peer_id, .. } => {
2039                assert_eq!(peer_id, peer);
2040            }
2041            _ => unreachable!(),
2042        }
2043    }
2044
2045    #[tokio::test]
2046    async fn test_reputation_management() {
2047        let peer = PeerId::random();
2048        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2049        let mut peers = PeersManager::default();
2050        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2051        assert_eq!(peers.get_reputation(&peer), Some(0));
2052
2053        peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
2054        assert_eq!(peers.get_reputation(&peer), Some(1024));
2055
2056        peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
2057        assert_eq!(peers.get_reputation(&peer), Some(0));
2058    }
2059
2060    #[tokio::test]
2061    async fn test_remove_discovered_active() {
2062        let peer = PeerId::random();
2063        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2064        let mut peers = PeersManager::default();
2065        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2066
2067        match event!(peers) {
2068            PeerAction::PeerAdded(peer_id) => {
2069                assert_eq!(peer_id, peer);
2070            }
2071            _ => unreachable!(),
2072        }
2073        match event!(peers) {
2074            PeerAction::Connect { peer_id, remote_addr } => {
2075                assert_eq!(peer_id, peer);
2076                assert_eq!(remote_addr, socket_addr);
2077            }
2078            _ => unreachable!(),
2079        }
2080
2081        let p = peers.peers.get(&peer).unwrap();
2082        assert_eq!(p.state, PeerConnectionState::PendingOut);
2083
2084        peers.remove_peer(peer);
2085
2086        match event!(peers) {
2087            PeerAction::PeerRemoved(peer_id) => {
2088                assert_eq!(peer_id, peer);
2089            }
2090            _ => unreachable!(),
2091        }
2092        match event!(peers) {
2093            PeerAction::Disconnect { peer_id, .. } => {
2094                assert_eq!(peer_id, peer);
2095            }
2096            _ => unreachable!(),
2097        }
2098
2099        let p = peers.peers.get(&peer).unwrap();
2100        assert_eq!(p.state, PeerConnectionState::PendingOut);
2101
2102        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2103        let p = peers.peers.get(&peer).unwrap();
2104        assert_eq!(p.state, PeerConnectionState::PendingOut);
2105
2106        peers.on_active_session_gracefully_closed(peer);
2107        assert!(!peers.peers.contains_key(&peer));
2108    }
2109
2110    #[tokio::test]
2111    async fn test_fatal_outgoing_connection_error_trusted() {
2112        let peer = PeerId::random();
2113        let config = PeersConfig::test()
2114            .with_trusted_nodes(vec![TrustedPeer {
2115                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2116                tcp_port: 8008,
2117                udp_port: 8008,
2118                id: peer,
2119            }])
2120            .with_trusted_nodes_only(true);
2121        let mut peers = PeersManager::new(config);
2122        let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
2123
2124        match event!(peers) {
2125            PeerAction::Connect { peer_id, remote_addr } => {
2126                assert_eq!(peer_id, peer);
2127                assert_eq!(remote_addr, socket_addr);
2128            }
2129            _ => unreachable!(),
2130        }
2131
2132        let p = peers.peers.get(&peer).unwrap();
2133        assert_eq!(p.state, PeerConnectionState::PendingOut);
2134
2135        assert_eq!(peers.num_outbound_connections(), 0);
2136
2137        let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
2138            EthHandshakeError::NonStatusMessageInHandshake,
2139        ));
2140        assert!(err.is_fatal_protocol_error());
2141
2142        peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
2143        assert_eq!(peers.num_outbound_connections(), 0);
2144
2145        // try tmp ban peer
2146        match event!(peers) {
2147            PeerAction::BanPeer { peer_id } => {
2148                assert_eq!(peer_id, peer);
2149            }
2150            err => unreachable!("{err:?}"),
2151        }
2152
2153        // ensure we still have trusted peer
2154        assert!(peers.peers.contains_key(&peer));
2155
2156        // await for the ban to expire
2157        tokio::time::sleep(peers.backoff_durations.medium).await;
2158
2159        match event!(peers) {
2160            PeerAction::Connect { peer_id, remote_addr } => {
2161                assert_eq!(peer_id, peer);
2162                assert_eq!(remote_addr, socket_addr);
2163            }
2164            err => unreachable!("{err:?}"),
2165        }
2166    }
2167
2168    #[tokio::test]
2169    async fn test_outgoing_connection_error() {
2170        let peer = PeerId::random();
2171        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2172        let mut peers = PeersManager::default();
2173        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2174
2175        match event!(peers) {
2176            PeerAction::PeerAdded(peer_id) => {
2177                assert_eq!(peer_id, peer);
2178            }
2179            _ => unreachable!(),
2180        }
2181        match event!(peers) {
2182            PeerAction::Connect { peer_id, remote_addr } => {
2183                assert_eq!(peer_id, peer);
2184                assert_eq!(remote_addr, socket_addr);
2185            }
2186            _ => unreachable!(),
2187        }
2188
2189        let p = peers.peers.get(&peer).unwrap();
2190        assert_eq!(p.state, PeerConnectionState::PendingOut);
2191
2192        assert_eq!(peers.num_outbound_connections(), 0);
2193
2194        peers.on_outgoing_connection_failure(
2195            &socket_addr,
2196            &peer,
2197            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2198        );
2199
2200        assert_eq!(peers.num_outbound_connections(), 0);
2201    }
2202
2203    #[tokio::test]
2204    async fn test_outgoing_connection_gracefully_closed() {
2205        let peer = PeerId::random();
2206        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2207        let mut peers = PeersManager::default();
2208        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2209
2210        match event!(peers) {
2211            PeerAction::PeerAdded(peer_id) => {
2212                assert_eq!(peer_id, peer);
2213            }
2214            _ => unreachable!(),
2215        }
2216        match event!(peers) {
2217            PeerAction::Connect { peer_id, remote_addr } => {
2218                assert_eq!(peer_id, peer);
2219                assert_eq!(remote_addr, socket_addr);
2220            }
2221            _ => unreachable!(),
2222        }
2223
2224        let p = peers.peers.get(&peer).unwrap();
2225        assert_eq!(p.state, PeerConnectionState::PendingOut);
2226
2227        assert_eq!(peers.num_outbound_connections(), 0);
2228
2229        peers.on_outgoing_pending_session_gracefully_closed(&peer);
2230
2231        assert_eq!(peers.num_outbound_connections(), 0);
2232        assert_eq!(peers.connection_info.num_pending_out, 0);
2233    }
2234
2235    #[tokio::test]
2236    async fn test_discovery_ban_list() {
2237        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2238        let socket_addr = SocketAddr::new(ip, 8008);
2239        let ban_list = BanList::new(vec![], vec![ip]);
2240        let config = PeersConfig::default().with_ban_list(ban_list);
2241        let mut peer_manager = PeersManager::new(config);
2242        peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
2243
2244        assert!(peer_manager.peers.is_empty());
2245    }
2246
2247    #[tokio::test]
2248    async fn test_on_pending_ban_list() {
2249        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2250        let socket_addr = SocketAddr::new(ip, 8008);
2251        let ban_list = BanList::new(vec![], vec![ip]);
2252        let config = PeersConfig::test().with_ban_list(ban_list);
2253        let mut peer_manager = PeersManager::new(config);
2254        let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
2255        // because we have no active peers this should be fine for testings
2256        match a {
2257            Ok(_) => panic!(),
2258            Err(err) => match err {
2259                InboundConnectionError::IpBanned => {
2260                    assert_eq!(peer_manager.connection_info.num_pending_in, 0)
2261                }
2262                _ => unreachable!(),
2263            },
2264        }
2265    }
2266
2267    #[tokio::test]
2268    async fn test_on_active_inbound_ban_list() {
2269        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2270        let socket_addr = SocketAddr::new(ip, 8008);
2271        let given_peer_id = PeerId::random();
2272        let ban_list = BanList::new(vec![given_peer_id], vec![]);
2273        let config = PeersConfig::test().with_ban_list(ban_list);
2274        let mut peer_manager = PeersManager::new(config);
2275        assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
2276        // non-trusted nodes should also increase pending_in
2277        assert_eq!(peer_manager.connection_info.num_pending_in, 1);
2278        peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
2279        // after the connection is established, the peer should be removed, the num_pending_in
2280        // should be decreased, and the num_inbound should not be increased
2281        assert_eq!(peer_manager.connection_info.num_pending_in, 0);
2282        assert_eq!(peer_manager.connection_info.num_inbound, 0);
2283
2284        let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
2285            peer_manager.queued_actions.pop_front()
2286        else {
2287            panic!()
2288        };
2289
2290        assert_eq!(peer_id, given_peer_id)
2291    }
2292
2293    #[test]
2294    fn test_connection_limits() {
2295        let mut info = ConnectionInfo::default();
2296        info.inc_in();
2297        assert_eq!(info.num_inbound, 1);
2298        assert_eq!(info.num_outbound, 0);
2299        assert!(info.has_in_capacity());
2300
2301        info.decr_in();
2302        assert_eq!(info.num_inbound, 0);
2303        assert_eq!(info.num_outbound, 0);
2304
2305        info.inc_out();
2306        assert_eq!(info.num_inbound, 0);
2307        assert_eq!(info.num_outbound, 1);
2308        assert!(info.has_out_capacity());
2309
2310        info.decr_out();
2311        assert_eq!(info.num_inbound, 0);
2312        assert_eq!(info.num_outbound, 0);
2313    }
2314
2315    #[test]
2316    fn test_connection_peer_state() {
2317        let mut info = ConnectionInfo::default();
2318        info.inc_in();
2319
2320        info.decr_state(PeerConnectionState::In);
2321        assert_eq!(info.num_inbound, 0);
2322        assert_eq!(info.num_outbound, 0);
2323
2324        info.inc_out();
2325
2326        info.decr_state(PeerConnectionState::Out);
2327        assert_eq!(info.num_inbound, 0);
2328        assert_eq!(info.num_outbound, 0);
2329    }
2330
2331    #[tokio::test]
2332    async fn test_trusted_peers_are_prioritized() {
2333        let trusted_peer = PeerId::random();
2334        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2335        let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
2336            host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2337            tcp_port: 8008,
2338            udp_port: 8008,
2339            id: trusted_peer,
2340        }]);
2341        let mut peers = PeersManager::new(config);
2342
2343        let basic_peer = PeerId::random();
2344        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2345        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2346
2347        match event!(peers) {
2348            PeerAction::PeerAdded(peer_id) => {
2349                assert_eq!(peer_id, basic_peer);
2350            }
2351            _ => unreachable!(),
2352        }
2353        match event!(peers) {
2354            PeerAction::Connect { peer_id, remote_addr } => {
2355                assert_eq!(peer_id, trusted_peer);
2356                assert_eq!(remote_addr, trusted_sock);
2357            }
2358            _ => unreachable!(),
2359        }
2360        match event!(peers) {
2361            PeerAction::Connect { peer_id, remote_addr } => {
2362                assert_eq!(peer_id, basic_peer);
2363                assert_eq!(remote_addr, basic_sock);
2364            }
2365            _ => unreachable!(),
2366        }
2367    }
2368
2369    #[tokio::test]
2370    async fn test_connect_trusted_nodes_only() {
2371        let trusted_peer = PeerId::random();
2372        let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2373        let config = PeersConfig::test()
2374            .with_trusted_nodes(vec![TrustedPeer {
2375                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2376                tcp_port: 8008,
2377                udp_port: 8008,
2378                id: trusted_peer,
2379            }])
2380            .with_trusted_nodes_only(true);
2381        let mut peers = PeersManager::new(config);
2382
2383        let basic_peer = PeerId::random();
2384        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2385        peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
2386
2387        match event!(peers) {
2388            PeerAction::PeerAdded(peer_id) => {
2389                assert_eq!(peer_id, basic_peer);
2390            }
2391            _ => unreachable!(),
2392        }
2393        match event!(peers) {
2394            PeerAction::Connect { peer_id, remote_addr } => {
2395                assert_eq!(peer_id, trusted_peer);
2396                assert_eq!(remote_addr, trusted_sock);
2397            }
2398            _ => unreachable!(),
2399        }
2400        poll_fn(|cx| {
2401            assert!(peers.poll(cx).is_pending());
2402            Poll::Ready(())
2403        })
2404        .await;
2405    }
2406
2407    #[tokio::test]
2408    async fn test_incoming_with_trusted_nodes_only() {
2409        let trusted_peer = PeerId::random();
2410        let config = PeersConfig::test()
2411            .with_trusted_nodes(vec![TrustedPeer {
2412                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2413                tcp_port: 8008,
2414                udp_port: 8008,
2415                id: trusted_peer,
2416            }])
2417            .with_trusted_nodes_only(true);
2418        let mut peers = PeersManager::new(config);
2419
2420        let basic_peer = PeerId::random();
2421        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2422        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2423        // non-trusted nodes should also increase pending_in
2424        assert_eq!(peers.connection_info.num_pending_in, 1);
2425        peers.on_incoming_session_established(basic_peer, basic_sock);
2426        // after the connection is established, the peer should be removed, the num_pending_in
2427        // should be decreased, and the num_inbound mut not be increased
2428        assert_eq!(peers.connection_info.num_pending_in, 0);
2429        assert_eq!(peers.connection_info.num_inbound, 0);
2430
2431        let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
2432            peers.queued_actions.pop_front()
2433        else {
2434            panic!()
2435        };
2436        assert_eq!(basic_peer, peer_id);
2437        assert!(!peers.peers.contains_key(&basic_peer));
2438    }
2439
2440    #[tokio::test]
2441    async fn test_incoming_without_trusted_nodes_only() {
2442        let trusted_peer = PeerId::random();
2443        let config = PeersConfig::test()
2444            .with_trusted_nodes(vec![TrustedPeer {
2445                host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
2446                tcp_port: 8008,
2447                udp_port: 8008,
2448                id: trusted_peer,
2449            }])
2450            .with_trusted_nodes_only(false);
2451        let mut peers = PeersManager::new(config);
2452
2453        let basic_peer = PeerId::random();
2454        let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2455        assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
2456
2457        // non-trusted nodes should also increase pending_in
2458        assert_eq!(peers.connection_info.num_pending_in, 1);
2459        peers.on_incoming_session_established(basic_peer, basic_sock);
2460        // after the connection is established, the peer should be removed, the num_pending_in
2461        // should be decreased, and the num_inbound must be increased
2462        assert_eq!(peers.connection_info.num_pending_in, 0);
2463        assert_eq!(peers.connection_info.num_inbound, 1);
2464        assert!(peers.peers.contains_key(&basic_peer));
2465    }
2466
2467    #[tokio::test]
2468    async fn test_incoming_at_capacity() {
2469        let mut config = PeersConfig::test();
2470        config.connection_info.max_inbound = 1;
2471        let mut peers = PeersManager::new(config);
2472
2473        let peer = PeerId::random();
2474        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2475        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2476
2477        peers.on_incoming_session_established(peer, addr);
2478
2479        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2480        assert_eq!(
2481            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2482            InboundConnectionError::ExceedsCapacity
2483        );
2484    }
2485
2486    #[tokio::test]
2487    async fn test_incoming_rate_limit() {
2488        let config = PeersConfig {
2489            incoming_ip_throttle_duration: Duration::from_millis(100),
2490            ..PeersConfig::test()
2491        };
2492        let mut peers = PeersManager::new(config);
2493
2494        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
2495        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2496        assert_eq!(
2497            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2498            InboundConnectionError::IpBanned
2499        );
2500
2501        peers.release_interval.reset_immediately();
2502        tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
2503
2504        // await unban
2505        poll_fn(|cx| loop {
2506            if peers.poll(cx).is_pending() {
2507                return Poll::Ready(());
2508            }
2509        })
2510        .await;
2511
2512        assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
2513        assert_eq!(
2514            peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
2515            InboundConnectionError::IpBanned
2516        );
2517    }
2518
2519    #[tokio::test]
2520    async fn test_tick() {
2521        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2522        let socket_addr = SocketAddr::new(ip, 8008);
2523        let config = PeersConfig::test();
2524        let mut peer_manager = PeersManager::new(config);
2525        let peer_id = PeerId::random();
2526        peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
2527
2528        tokio::time::sleep(Duration::from_secs(1)).await;
2529        peer_manager.tick();
2530
2531        // still unconnected
2532        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2533
2534        // mark as connected
2535        peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
2536
2537        tokio::time::sleep(Duration::from_secs(1)).await;
2538        peer_manager.tick();
2539
2540        // still at default reputation
2541        assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
2542
2543        peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
2544
2545        tokio::time::sleep(Duration::from_secs(1)).await;
2546        peer_manager.tick();
2547
2548        // tick applied
2549        assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
2550    }
2551
2552    #[tokio::test]
2553    async fn test_remove_incoming_after_disconnect() {
2554        let peer_id = PeerId::random();
2555        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2556        let mut peers = PeersManager::default();
2557
2558        peers.on_incoming_pending_session(addr.ip()).unwrap();
2559        peers.on_incoming_session_established(peer_id, addr);
2560        let peer = peers.peers.get(&peer_id).unwrap();
2561        assert_eq!(peer.state, PeerConnectionState::In);
2562        assert!(peer.remove_after_disconnect);
2563
2564        peers.on_active_session_gracefully_closed(peer_id);
2565        assert!(!peers.peers.contains_key(&peer_id))
2566    }
2567
2568    #[tokio::test]
2569    async fn test_keep_incoming_after_disconnect_if_discovered() {
2570        let peer_id = PeerId::random();
2571        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2572        let mut peers = PeersManager::default();
2573
2574        peers.on_incoming_pending_session(addr.ip()).unwrap();
2575        peers.on_incoming_session_established(peer_id, addr);
2576        let peer = peers.peers.get(&peer_id).unwrap();
2577        assert_eq!(peer.state, PeerConnectionState::In);
2578        assert!(peer.remove_after_disconnect);
2579
2580        // trigger discovery manually while the peer is still connected
2581        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2582
2583        peers.on_active_session_gracefully_closed(peer_id);
2584
2585        let peer = peers.peers.get(&peer_id).unwrap();
2586        assert_eq!(peer.state, PeerConnectionState::Idle);
2587        assert!(!peer.remove_after_disconnect);
2588    }
2589
2590    #[tokio::test]
2591    async fn test_incoming_outgoing_already_connected() {
2592        let peer_id = PeerId::random();
2593        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2594        let mut peers = PeersManager::default();
2595
2596        peers.on_incoming_pending_session(addr.ip()).unwrap();
2597        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2598
2599        match event!(peers) {
2600            PeerAction::PeerAdded(_) => {}
2601            _ => unreachable!(),
2602        }
2603
2604        match event!(peers) {
2605            PeerAction::Connect { .. } => {}
2606            _ => unreachable!(),
2607        }
2608
2609        peers.on_incoming_session_established(peer_id, addr);
2610        peers.on_already_connected(Direction::Outgoing(peer_id));
2611        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2612        assert_eq!(peers.connection_info.num_inbound, 1);
2613        assert_eq!(peers.connection_info.num_pending_out, 0);
2614        assert_eq!(peers.connection_info.num_pending_in, 0);
2615        assert_eq!(peers.connection_info.num_outbound, 0);
2616    }
2617
2618    #[tokio::test]
2619    async fn test_already_connected_incoming_outgoing_connection_error() {
2620        let peer_id = PeerId::random();
2621        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
2622        let mut peers = PeersManager::default();
2623
2624        peers.on_incoming_pending_session(addr.ip()).unwrap();
2625        peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
2626
2627        match event!(peers) {
2628            PeerAction::PeerAdded(_) => {}
2629            _ => unreachable!(),
2630        }
2631
2632        match event!(peers) {
2633            PeerAction::Connect { .. } => {}
2634            _ => unreachable!(),
2635        }
2636
2637        peers.on_incoming_session_established(peer_id, addr);
2638
2639        peers.on_outgoing_connection_failure(
2640            &addr,
2641            &peer_id,
2642            &io::Error::new(io::ErrorKind::ConnectionRefused, ""),
2643        );
2644        assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
2645        assert_eq!(peers.connection_info.num_inbound, 1);
2646        assert_eq!(peers.connection_info.num_pending_out, 0);
2647        assert_eq!(peers.connection_info.num_pending_in, 0);
2648        assert_eq!(peers.connection_info.num_outbound, 0);
2649    }
2650
2651    #[tokio::test]
2652    async fn test_max_concurrent_dials() {
2653        let config = PeersConfig::default();
2654        let mut peer_manager = PeersManager::new(config);
2655        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2656        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2657        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2658            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2659        }
2660
2661        peer_manager.fill_outbound_slots();
2662        let dials = peer_manager
2663            .queued_actions
2664            .iter()
2665            .filter(|ev| matches!(ev, PeerAction::Connect { .. }))
2666            .count();
2667        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2668    }
2669
2670    #[tokio::test]
2671    async fn test_max_num_of_pending_dials() {
2672        let config = PeersConfig::default();
2673        let mut peer_manager = PeersManager::new(config);
2674        let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
2675        let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
2676
2677        // add more peers than allowed
2678        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2679            peer_manager.add_peer(PeerId::random(), peer_addr, None);
2680        }
2681
2682        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
2683            match event!(peer_manager) {
2684                PeerAction::PeerAdded(_) => {}
2685                _ => unreachable!(),
2686            }
2687        }
2688
2689        for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
2690            match event!(peer_manager) {
2691                PeerAction::Connect { .. } => {}
2692                _ => unreachable!(),
2693            }
2694        }
2695
2696        // generate 'Connect' actions
2697        peer_manager.fill_outbound_slots();
2698
2699        // all dialed connections should be in 'PendingOut' state
2700        let dials = peer_manager.connection_info.num_pending_out;
2701        assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
2702
2703        let num_pendingout_states = peer_manager
2704            .peers
2705            .iter()
2706            .filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
2707            .map(|(peer_id, _)| *peer_id)
2708            .collect::<Vec<PeerId>>();
2709        assert_eq!(
2710            num_pendingout_states.len(),
2711            peer_manager.connection_info.config.max_concurrent_outbound_dials
2712        );
2713
2714        // establish dialed connections
2715        for peer_id in &num_pendingout_states {
2716            peer_manager.on_active_outgoing_established(*peer_id);
2717        }
2718
2719        // all dialed connections should now be in 'Out' state
2720        for peer_id in &num_pendingout_states {
2721            assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
2722        }
2723
2724        // no more pending outbound connections
2725        assert_eq!(peer_manager.connection_info.num_pending_out, 0);
2726    }
2727
2728    #[tokio::test]
2729    async fn test_connect() {
2730        let peer = PeerId::random();
2731        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2732        let mut peers = PeersManager::default();
2733        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2734        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2735
2736        match event!(peers) {
2737            PeerAction::Connect { peer_id, remote_addr } => {
2738                assert_eq!(peer_id, peer);
2739                assert_eq!(remote_addr, socket_addr);
2740            }
2741            _ => unreachable!(),
2742        }
2743
2744        let (record, _) = peers.peer_by_id(peer).unwrap();
2745        assert_eq!(record.tcp_addr(), socket_addr);
2746        assert_eq!(record.udp_addr(), socket_addr);
2747
2748        // connect again
2749        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2750
2751        let (record, _) = peers.peer_by_id(peer).unwrap();
2752        assert_eq!(record.tcp_addr(), socket_addr);
2753        assert_eq!(record.udp_addr(), socket_addr);
2754    }
2755
2756    #[tokio::test]
2757    async fn test_incoming_connection_from_banned() {
2758        let peer = PeerId::random();
2759        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2760        let config = PeersConfig::test().with_max_inbound(3);
2761        let mut peers = PeersManager::new(config);
2762        peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
2763
2764        match event!(peers) {
2765            PeerAction::PeerAdded(peer_id) => {
2766                assert_eq!(peer_id, peer);
2767            }
2768            _ => unreachable!(),
2769        }
2770        match event!(peers) {
2771            PeerAction::Connect { peer_id, .. } => {
2772                assert_eq!(peer_id, peer);
2773            }
2774            _ => unreachable!(),
2775        }
2776
2777        poll_fn(|cx| {
2778            assert!(peers.poll(cx).is_pending());
2779            Poll::Ready(())
2780        })
2781        .await;
2782
2783        // simulate new connection drops with error
2784        loop {
2785            peers.on_active_session_dropped(
2786                &socket_addr,
2787                &peer,
2788                &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
2789                    reth_eth_wire::EthVersion::Eth68,
2790                    reth_eth_wire::EthMessageID::Status,
2791                )),
2792            );
2793
2794            if peers.peers.get(&peer).unwrap().is_banned() {
2795                break;
2796            }
2797
2798            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2799            peers.on_incoming_session_established(peer, socket_addr);
2800
2801            match event!(peers) {
2802                PeerAction::Connect { peer_id, .. } => {
2803                    assert_eq!(peer_id, peer);
2804                }
2805                _ => unreachable!(),
2806            }
2807        }
2808
2809        assert!(peers.peers.get(&peer).unwrap().is_banned());
2810
2811        // fill all incoming slots
2812        for _ in 0..peers.connection_info.config.max_inbound {
2813            assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
2814            peers.on_incoming_session_established(peer, socket_addr);
2815
2816            match event!(peers) {
2817                PeerAction::DisconnectBannedIncoming { peer_id } => {
2818                    assert_eq!(peer_id, peer);
2819                }
2820                _ => unreachable!(),
2821            }
2822        }
2823
2824        poll_fn(|cx| {
2825            assert!(peers.poll(cx).is_pending());
2826            Poll::Ready(())
2827        })
2828        .await;
2829
2830        assert_eq!(peers.connection_info.num_inbound, 0);
2831
2832        let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
2833
2834        // Assert we can still accept new connections
2835        assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
2836        assert_eq!(peers.connection_info.num_pending_in, 1);
2837
2838        // the triggered DisconnectBannedIncoming will result in dropped connections, assert that
2839        // connection info is updated via the peer's state which would be a noop here since the
2840        // banned peer's state is idle
2841        peers.on_active_session_gracefully_closed(peer);
2842        assert_eq!(peers.connection_info.num_inbound, 0);
2843    }
2844
2845    #[tokio::test]
2846    async fn test_add_pending_connect() {
2847        let peer = PeerId::random();
2848        let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
2849        let mut peers = PeersManager::default();
2850        peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
2851        assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
2852        assert_eq!(peers.connection_info.num_pending_out, 1);
2853    }
2854
2855    #[tokio::test]
2856    async fn test_dns_updates_peer_address() {
2857        let peer_id = PeerId::random();
2858        let initial_socket = SocketAddr::new("1.1.1.1".parse::<IpAddr>().unwrap(), 8008);
2859        let updated_ip = "2.2.2.2".parse::<IpAddr>().unwrap();
2860
2861        let trusted = TrustedPeer {
2862            host: url::Host::Ipv4("2.2.2.2".parse().unwrap()),
2863            tcp_port: 8008,
2864            udp_port: 8008,
2865            id: peer_id,
2866        };
2867
2868        let config = PeersConfig::test().with_trusted_nodes(vec![trusted.clone()]);
2869        let mut manager = PeersManager::new(config);
2870        manager
2871            .trusted_peers_resolver
2872            .set_interval(tokio::time::interval(Duration::from_millis(1)));
2873
2874        manager.peers.insert(
2875            peer_id,
2876            Peer::trusted(PeerAddr::new_with_ports(initial_socket.ip(), 8008, Some(8008))),
2877        );
2878
2879        for _ in 0..100 {
2880            let _ = event!(manager);
2881            if manager.peers.get(&peer_id).unwrap().addr.tcp().ip() == updated_ip {
2882                break;
2883            }
2884            tokio::time::sleep(Duration::from_millis(10)).await;
2885        }
2886
2887        let updated_peer = manager.peers.get(&peer_id).unwrap();
2888        assert_eq!(updated_peer.addr.tcp().ip(), updated_ip);
2889    }
2890}