use crate::{
error::SessionError,
session::{Direction, PendingSessionHandshakeError},
swarm::NetworkConnectionState,
};
use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_ethereum_forks::ForkId;
use reth_net_banlist::BanList;
use reth_network_api::test_utils::{PeerCommand, PeersHandle};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{
peers::{
config::PeerBackoffDurations,
reputation::{DEFAULT_REPUTATION, MAX_TRUSTED_PEER_REPUTATION_CHANGE},
},
ConnectionsConfig, Peer, PeerAddr, PeerConnectionState, PeerKind, PeersConfig,
ReputationChangeKind, ReputationChangeOutcome, ReputationChangeWeights,
};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
fmt::Display,
io::{self},
net::{IpAddr, SocketAddr},
task::{Context, Poll},
time::Duration,
};
use thiserror::Error;
use tokio::{
sync::mpsc,
time::{Instant, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{trace, warn};
#[derive(Debug)]
pub struct PeersManager {
peers: HashMap<PeerId, Peer>,
trusted_peer_ids: HashSet<PeerId>,
manager_tx: mpsc::UnboundedSender<PeerCommand>,
handle_rx: UnboundedReceiverStream<PeerCommand>,
queued_actions: VecDeque<PeerAction>,
refill_slots_interval: Interval,
reputation_weights: ReputationChangeWeights,
connection_info: ConnectionInfo,
ban_list: BanList,
backed_off_peers: HashMap<PeerId, std::time::Instant>,
release_interval: Interval,
ban_duration: Duration,
backoff_durations: PeerBackoffDurations,
trusted_nodes_only: bool,
last_tick: Instant,
max_backoff_count: u8,
net_connection_state: NetworkConnectionState,
incoming_ip_throttle_duration: Duration,
}
impl PeersManager {
pub fn new(config: PeersConfig) -> Self {
let PeersConfig {
refill_slots_interval,
connection_info,
reputation_weights,
ban_list,
ban_duration,
backoff_durations,
trusted_nodes,
trusted_nodes_only,
basic_nodes,
max_backoff_count,
incoming_ip_throttle_duration,
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
let unban_interval = ban_duration.min(backoff_durations.low) / 2;
let mut peers = HashMap::with_capacity(trusted_nodes.len() + basic_nodes.len());
let mut trusted_peer_ids = HashSet::with_capacity(trusted_nodes.len());
for trusted_peer in trusted_nodes {
match trusted_peer.resolve_blocking() {
Ok(NodeRecord { address, tcp_port, udp_port, id }) => {
trusted_peer_ids.insert(id);
peers.entry(id).or_insert_with(|| {
Peer::trusted(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
});
}
Err(err) => {
warn!(target: "net::peers", ?err, "Failed to resolve trusted peer");
}
}
}
for NodeRecord { address, tcp_port, udp_port, id } in basic_nodes {
peers.entry(id).or_insert_with(|| {
Peer::new(PeerAddr::new_with_ports(address, tcp_port, Some(udp_port)))
});
}
Self {
peers,
trusted_peer_ids,
manager_tx,
handle_rx: UnboundedReceiverStream::new(handle_rx),
queued_actions: Default::default(),
reputation_weights,
refill_slots_interval: tokio::time::interval(refill_slots_interval),
release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
connection_info: ConnectionInfo::new(connection_info),
ban_list,
backed_off_peers: Default::default(),
ban_duration,
backoff_durations,
trusted_nodes_only,
last_tick: Instant::now(),
max_backoff_count,
net_connection_state: NetworkConnectionState::default(),
incoming_ip_throttle_duration,
}
}
pub(crate) fn handle(&self) -> PeersHandle {
PeersHandle::new(self.manager_tx.clone())
}
#[inline]
pub(crate) fn num_known_peers(&self) -> usize {
self.peers.len()
}
pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.peers.iter().map(|(peer_id, v)| {
NodeRecord::new_with_ports(
v.addr.tcp().ip(),
v.addr.tcp().port(),
v.addr.udp().map(|addr| addr.port()),
*peer_id,
)
})
}
pub(crate) fn peer_by_id(&self, peer_id: PeerId) -> Option<(NodeRecord, PeerKind)> {
self.peers.get(&peer_id).map(|v| {
(
NodeRecord::new_with_ports(
v.addr.tcp().ip(),
v.addr.tcp().port(),
v.addr.udp().map(|addr| addr.port()),
peer_id,
),
v.kind,
)
})
}
pub(crate) fn peers_by_kind(&self, kind: PeerKind) -> impl Iterator<Item = PeerId> + '_ {
self.peers.iter().filter_map(move |(peer_id, peer)| (peer.kind == kind).then_some(*peer_id))
}
#[inline]
pub(crate) const fn num_inbound_connections(&self) -> usize {
self.connection_info.num_inbound
}
#[inline]
pub(crate) const fn num_outbound_connections(&self) -> usize {
self.connection_info.num_outbound
}
#[inline]
pub(crate) const fn num_pending_outbound_connections(&self) -> usize {
self.connection_info.num_pending_out
}
#[inline]
pub(crate) fn num_backed_off_peers(&self) -> usize {
self.backed_off_peers.len()
}
fn num_idle_trusted_peers(&self) -> usize {
self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count()
}
pub(crate) fn on_incoming_pending_session(
&mut self,
addr: IpAddr,
) -> Result<(), InboundConnectionError> {
if self.ban_list.is_banned_ip(&addr) {
return Err(InboundConnectionError::IpBanned)
}
if !self.connection_info.has_in_capacity() {
if self.trusted_peer_ids.is_empty() {
return Err(InboundConnectionError::ExceedsCapacity)
}
let num_idle_trusted_peers = self.num_idle_trusted_peers();
if num_idle_trusted_peers <= self.trusted_peer_ids.len() {
let max_inbound =
self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
if self.connection_info.num_pending_in <= max_inbound {
self.connection_info.inc_pending_in();
}
return Ok(())
}
return Err(InboundConnectionError::ExceedsCapacity)
}
if !self.connection_info.has_in_pending_capacity() {
return Err(InboundConnectionError::ExceedsCapacity)
}
self.throttle_incoming_ip(addr);
self.connection_info.inc_pending_in();
Ok(())
}
pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) {
self.connection_info.decr_pending_in();
}
pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) {
self.connection_info.decr_pending_in()
}
pub(crate) fn on_incoming_pending_session_dropped(
&mut self,
remote_addr: SocketAddr,
err: &PendingSessionHandshakeError,
) {
if err.is_fatal_protocol_error() {
self.ban_ip(remote_addr.ip());
if err.merits_discovery_ban() {
self.queued_actions
.push_back(PeerAction::DiscoveryBanIp { ip_addr: remote_addr.ip() })
}
}
self.connection_info.decr_pending_in();
}
pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
self.connection_info.decr_pending_in();
if self.ban_list.is_banned_peer(&peer_id) {
self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
return
}
let mut is_trusted = self.trusted_peer_ids.contains(&peer_id);
if self.trusted_nodes_only && !is_trusted {
self.queued_actions.push_back(PeerAction::DisconnectUntrustedIncoming { peer_id });
return
}
self.tick();
let has_in_capacity = self.connection_info.has_in_capacity();
self.connection_info.inc_in();
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let peer = entry.get_mut();
if peer.is_banned() {
self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
return
}
if peer.state.is_pending_out() {
self.connection_info.decr_state(peer.state);
}
peer.state = PeerConnectionState::In;
is_trusted = is_trusted || peer.is_trusted();
}
Entry::Vacant(entry) => {
let mut peer = Peer::with_state(PeerAddr::from_tcp(addr), PeerConnectionState::In);
peer.remove_after_disconnect = true;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
if !is_trusted && !has_in_capacity {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
});
}
}
fn ban_peer(&mut self, peer_id: PeerId) {
let mut ban_duration = self.ban_duration;
if let Some(peer) = self.peers.get(&peer_id) {
if peer.is_trusted() || peer.is_static() {
ban_duration = self.backoff_durations.low / 2;
}
}
self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + ban_duration);
self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
}
fn ban_ip(&mut self, ip: IpAddr) {
self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
}
fn throttle_incoming_ip(&mut self, ip: IpAddr) {
self.ban_list
.ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
}
fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
trace!(target: "net::peers", ?peer_id, "backing off");
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.backed_off = true;
self.backed_off_peers.insert(peer_id, until);
}
}
fn unban_peer(&mut self, peer_id: PeerId) {
self.ban_list.unban_peer(&peer_id);
self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
}
fn tick(&mut self) {
let now = Instant::now();
let secs_since_last_tick =
if self.last_tick > now { 0 } else { (now - self.last_tick).as_secs() as i32 };
self.last_tick = now;
for peer in self.peers.iter_mut().filter(|(_, peer)| peer.state.is_connected()) {
if peer.1.reputation < DEFAULT_REPUTATION {
peer.1.reputation += secs_since_last_tick;
}
}
}
pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
self.peers.get(peer_id).map(|peer| peer.reputation)
}
pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
if rep.is_reset() {
peer.reset_reputation()
} else {
let mut reputation_change = self.reputation_weights.change(rep).as_i32();
if peer.is_trusted() || peer.is_static() {
if matches!(
rep,
ReputationChangeKind::Dropped |
ReputationChangeKind::BadAnnouncement |
ReputationChangeKind::Timeout |
ReputationChangeKind::AlreadySeenTransaction
) {
return
}
if reputation_change < MAX_TRUSTED_PEER_REPUTATION_CHANGE {
reputation_change = MAX_TRUSTED_PEER_REPUTATION_CHANGE;
}
}
peer.apply_reputation(reputation_change)
}
} else {
return
};
match outcome {
ReputationChangeOutcome::None => {}
ReputationChangeOutcome::Ban => {
self.ban_peer(*peer_id);
}
ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
ReputationChangeOutcome::DisconnectAndBan => {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id: *peer_id,
reason: Some(DisconnectReason::DisconnectRequested),
});
self.ban_peer(*peer_id);
}
}
}
pub(crate) fn on_outgoing_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
}
}
pub(crate) fn on_outgoing_pending_session_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: &PendingSessionHandshakeError,
) {
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
}
pub(crate) fn on_active_session_gracefully_closed(&mut self, peer_id: PeerId) {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
self.connection_info.decr_state(entry.get().state);
if entry.get().remove_after_disconnect && !entry.get().is_trusted() {
entry.remove();
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
} else {
entry.get_mut().severe_backoff_counter = 0;
entry.get_mut().state = PeerConnectionState::Idle;
return
}
}
Entry::Vacant(_) => return,
}
self.fill_outbound_slots();
}
pub(crate) fn on_active_outgoing_established(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
self.connection_info.decr_state(peer.state);
self.connection_info.inc_out();
peer.state = PeerConnectionState::Out;
}
}
pub(crate) fn on_active_session_dropped(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: &EthStreamError,
) {
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
}
pub(crate) fn on_outgoing_connection_failure(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: &io::Error,
) {
if let Some(peer) = self.peers.get(peer_id) {
if peer.state.is_incoming() {
return
}
}
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
}
fn on_connection_failure(
&mut self,
remote_addr: &SocketAddr,
peer_id: &PeerId,
err: impl SessionError,
reputation_change: ReputationChangeKind,
) {
trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "handling failed connection");
if err.is_fatal_protocol_error() {
trace!(target: "net::peers", ?remote_addr, ?peer_id, %err, "fatal connection error");
if let Entry::Occupied(mut entry) = self.peers.entry(*peer_id) {
self.connection_info.decr_state(entry.get().state);
if entry.get().is_trusted() {
entry.get_mut().state = PeerConnectionState::Idle;
} else {
entry.remove();
self.queued_actions.push_back(PeerAction::PeerRemoved(*peer_id));
if err.merits_discovery_ban() {
self.queued_actions.push_back(PeerAction::DiscoveryBanPeerId {
peer_id: *peer_id,
ip_addr: remote_addr.ip(),
})
}
}
}
self.ban_peer(*peer_id);
} else {
let mut backoff_until = None;
let mut remove_peer = false;
if let Some(peer) = self.peers.get_mut(peer_id) {
if let Some(kind) = err.should_backoff() {
if kind.is_severe() {
peer.severe_backoff_counter = peer.severe_backoff_counter.saturating_add(1);
}
let backoff_time =
self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
backoff_until = Some(backoff_time);
} else {
let reputation_change = self.reputation_weights.change(reputation_change);
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
};
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
if peer.severe_backoff_counter > self.max_backoff_count && !peer.is_trusted() {
remove_peer = true;
}
}
if remove_peer {
let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
} else if let Some(backoff_until) = backoff_until {
self.backoff_peer_until(*peer_id, backoff_until);
}
}
self.fill_outbound_slots();
}
pub(crate) fn on_already_connected(&mut self, direction: Direction) {
match direction {
Direction::Incoming => {
self.connection_info.decr_pending_in();
}
Direction::Outgoing(_) => {
}
}
}
pub(crate) fn set_discovered_fork_id(&mut self, peer_id: PeerId, fork_id: ForkId) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
trace!(target: "net::peers", ?peer_id, ?fork_id, "set discovered fork id");
peer.fork_id = Some(fork_id);
}
}
pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId>) {
self.add_peer_kind(peer_id, PeerKind::Basic, addr, fork_id)
}
pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
self.trusted_peer_ids.insert(peer_id);
}
#[allow(dead_code)]
pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: PeerAddr) {
self.add_peer_kind(peer_id, PeerKind::Trusted, addr, None)
}
pub(crate) fn add_peer_kind(
&mut self,
peer_id: PeerId,
kind: PeerKind,
addr: PeerAddr,
fork_id: Option<ForkId>,
) {
if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
return
}
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let peer = entry.get_mut();
peer.kind = kind;
peer.fork_id = fork_id;
peer.addr = addr;
if peer.state.is_incoming() {
peer.remove_after_disconnect = false;
}
}
Entry::Vacant(entry) => {
trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "discovered new node");
let mut peer = Peer::with_kind(addr, kind);
peer.fork_id = fork_id;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
if kind.is_trusted() {
self.trusted_peer_ids.insert(peer_id);
}
}
pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
if entry.get().is_trusted() {
return
}
let mut peer = entry.remove();
trace!(target: "net::peers", ?peer_id, "remove discovered node");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
if peer.state.is_connected() {
trace!(target: "net::peers", ?peer_id, "disconnecting on remove from discovery");
peer.remove_after_disconnect = true;
peer.state.disconnect();
self.peers.insert(peer_id, peer);
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::DisconnectRequested),
})
}
}
#[allow(dead_code)]
pub(crate) fn add_and_connect(
&mut self,
peer_id: PeerId,
addr: PeerAddr,
fork_id: Option<ForkId>,
) {
self.add_and_connect_kind(peer_id, PeerKind::Basic, addr, fork_id)
}
pub(crate) fn add_and_connect_kind(
&mut self,
peer_id: PeerId,
kind: PeerKind,
addr: PeerAddr,
fork_id: Option<ForkId>,
) {
if self.ban_list.is_banned(&peer_id, &addr.tcp().ip()) {
return
}
match self.peers.entry(peer_id) {
Entry::Vacant(entry) => {
trace!(target: "net::peers", ?peer_id, addr=?addr.tcp(), "connects new node");
let mut peer = Peer::with_kind(addr, kind);
peer.state = PeerConnectionState::PendingOut;
peer.fork_id = fork_id;
entry.insert(peer);
self.queued_actions
.push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() });
}
_ => return,
}
if kind.is_trusted() {
self.trusted_peer_ids.insert(peer_id);
}
}
pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
if !entry.get().is_trusted() {
return
}
let peer = entry.get_mut();
peer.kind = PeerKind::Basic;
self.trusted_peer_ids.remove(&peer_id);
}
fn best_unconnected(&mut self) -> Option<(PeerId, &mut Peer)> {
let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
!peer.is_backed_off() &&
!peer.is_banned() &&
peer.state.is_unconnected() &&
(!self.trusted_nodes_only || peer.is_trusted())
});
let mut best_peer = unconnected.next()?;
if best_peer.1.is_trusted() || best_peer.1.is_static() {
return Some((*best_peer.0, best_peer.1))
}
for maybe_better in unconnected {
if maybe_better.1.is_trusted() || maybe_better.1.is_static() {
return Some((*maybe_better.0, maybe_better.1))
}
if maybe_better.1.reputation > best_peer.1.reputation {
best_peer = maybe_better;
}
}
Some((*best_peer.0, best_peer.1))
}
fn fill_outbound_slots(&mut self) {
self.tick();
if !self.net_connection_state.is_active() {
return
}
while self.connection_info.has_out_capacity() {
let action = {
let (peer_id, peer) = match self.best_unconnected() {
Some(peer) => peer,
_ => break,
};
trace!(target: "net::peers", ?peer_id, addr=?peer.addr, "schedule outbound connection");
peer.state = PeerConnectionState::PendingOut;
PeerAction::Connect { peer_id, remote_addr: peer.addr.tcp() }
};
self.connection_info.inc_pending_out();
self.queued_actions.push_back(action);
}
}
pub fn on_network_state_change(&mut self, state: NetworkConnectionState) {
self.net_connection_state = state;
}
pub const fn connection_state(&self) -> &NetworkConnectionState {
&self.net_connection_state
}
pub fn on_shutdown(&mut self) {
self.net_connection_state = NetworkConnectionState::ShuttingDown;
}
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
loop {
if let Some(action) = self.queued_actions.pop_front() {
return Poll::Ready(action)
}
while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
match cmd {
PeerCommand::Add(peer_id, addr) => {
self.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
}
PeerCommand::Remove(peer) => self.remove_peer(peer),
PeerCommand::ReputationChange(peer_id, rep) => {
self.apply_reputation_change(&peer_id, rep)
}
PeerCommand::GetPeer(peer, tx) => {
let _ = tx.send(self.peers.get(&peer).cloned());
}
PeerCommand::GetPeers(tx) => {
let _ = tx.send(self.iter_peers().collect());
}
}
}
if self.release_interval.poll_tick(cx).is_ready() {
let now = std::time::Instant::now();
let (_, unbanned_peers) = self.ban_list.evict(now);
for peer_id in unbanned_peers {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.unban();
self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
}
}
self.backed_off_peers.retain(|peer_id, until| {
if now > *until {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.backed_off = false;
}
return false
}
true
})
}
while self.refill_slots_interval.poll_tick(cx).is_ready() {
self.fill_outbound_slots();
}
if self.queued_actions.is_empty() {
return Poll::Pending
}
}
}
}
impl Default for PeersManager {
fn default() -> Self {
Self::new(Default::default())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ConnectionInfo {
num_outbound: usize,
num_pending_out: usize,
num_inbound: usize,
num_pending_in: usize,
config: ConnectionsConfig,
}
impl ConnectionInfo {
const fn new(config: ConnectionsConfig) -> Self {
Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 }
}
const fn has_out_capacity(&self) -> bool {
self.num_pending_out < self.config.max_concurrent_outbound_dials &&
self.num_outbound < self.config.max_outbound
}
const fn has_in_capacity(&self) -> bool {
self.num_inbound < self.config.max_inbound
}
const fn has_in_pending_capacity(&self) -> bool {
self.num_pending_in < self.config.max_inbound
}
fn decr_state(&mut self, state: PeerConnectionState) {
match state {
PeerConnectionState::Idle => {}
PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
PeerConnectionState::PendingOut => self.decr_pending_out(),
}
}
fn decr_out(&mut self) {
self.num_outbound -= 1;
}
fn inc_out(&mut self) {
self.num_outbound += 1;
}
fn inc_pending_out(&mut self) {
self.num_pending_out += 1;
}
fn inc_in(&mut self) {
self.num_inbound += 1;
}
fn inc_pending_in(&mut self) {
self.num_pending_in += 1;
}
fn decr_in(&mut self) {
self.num_inbound -= 1;
}
fn decr_pending_out(&mut self) {
self.num_pending_out -= 1;
}
fn decr_pending_in(&mut self) {
self.num_pending_in -= 1;
}
}
#[derive(Debug)]
pub enum PeerAction {
Connect {
peer_id: PeerId,
remote_addr: SocketAddr,
},
Disconnect {
peer_id: PeerId,
reason: Option<DisconnectReason>,
},
DisconnectBannedIncoming {
peer_id: PeerId,
},
DisconnectUntrustedIncoming {
peer_id: PeerId,
},
DiscoveryBanPeerId {
peer_id: PeerId,
ip_addr: IpAddr,
},
DiscoveryBanIp {
ip_addr: IpAddr,
},
BanPeer {
peer_id: PeerId,
},
UnBanPeer {
peer_id: PeerId,
},
PeerAdded(PeerId),
PeerRemoved(PeerId),
}
#[derive(Debug, Error, PartialEq, Eq)]
pub enum InboundConnectionError {
IpBanned,
ExceedsCapacity,
}
impl Display for InboundConnectionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
#[cfg(test)]
mod tests {
use alloy_primitives::B512;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
DisconnectReason,
};
use reth_net_banlist::BanList;
use reth_network_api::Direction;
use reth_network_peers::{PeerId, TrustedPeer};
use reth_network_types::{
peers::reputation::DEFAULT_REPUTATION, BackoffKind, ReputationChangeKind,
};
use std::{
future::{poll_fn, Future},
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use url::Host;
use super::PeersManager;
use crate::{
error::SessionError,
peers::{
ConnectionInfo, InboundConnectionError, PeerAction, PeerAddr, PeerBackoffDurations,
PeerConnectionState,
},
session::PendingSessionHandshakeError,
PeersConfig,
};
struct PeerActionFuture<'a> {
peers: &'a mut PeersManager,
}
impl Future for PeerActionFuture<'_> {
type Output = PeerAction;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().peers.poll(cx)
}
}
macro_rules! event {
($peers:expr) => {
PeerActionFuture { peers: &mut $peers }.await
};
}
#[tokio::test]
async fn test_insert() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
}
#[tokio::test]
async fn test_insert_udp() {
let peer = PeerId::random();
let tcp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let udp_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::new(tcp_addr, Some(udp_addr)), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, tcp_addr);
}
_ => unreachable!(),
}
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), tcp_addr);
assert_eq!(record.udp_addr(), udp_addr);
}
#[tokio::test]
async fn test_ban() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.ban_peer(peer);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn test_unban() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.ban_peer(peer);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.unban_peer(peer);
match event!(peers) {
PeerAction::UnBanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn test_backoff_on_busy() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::new(PeersConfig::test());
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_active_session_dropped(
&socket_addr,
&peer,
&EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
DisconnectReason::TooManyPeers,
)),
);
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(peers.backed_off_peers.contains_key(&peer));
assert!(peers.peers.get(&peer).unwrap().is_backed_off());
tokio::time::sleep(peers.backoff_durations.low).await;
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
assert!(!peers.backed_off_peers.contains_key(&peer));
assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
}
#[tokio::test]
async fn test_backoff_on_no_response() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let backoff_durations = PeerBackoffDurations::test();
let config = PeersConfig { backoff_durations, ..PeersConfig::test() };
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
EthHandshakeError::NoResponse,
)),
);
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(peers.backed_off_peers.contains_key(&peer));
assert!(peers.peers.get(&peer).unwrap().is_backed_off());
tokio::time::sleep(backoff_durations.high).await;
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
assert!(!peers.backed_off_peers.contains_key(&peer));
assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
}
#[tokio::test]
async fn test_low_backoff() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test();
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();
let backoff_timestamp = peers
.backoff_durations
.backoff_until(BackoffKind::Low, peer_struct.severe_backoff_counter);
let expected = std::time::Instant::now() + peers.backoff_durations.low;
assert!(backoff_timestamp <= expected);
}
#[tokio::test]
async fn test_multiple_backoff_calculations() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::default();
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();
peer_struct.severe_backoff_counter = 1;
let now = std::time::Instant::now();
peer_struct.severe_backoff_counter += 1;
let backoff_time = peers
.backoff_durations
.backoff_until(BackoffKind::High, peer_struct.severe_backoff_counter);
let backoff_duration = std::time::Duration::new(30 * 60, 0);
assert!(backoff_time.duration_since(now) > backoff_duration);
}
#[tokio::test]
async fn test_ban_on_active_drop() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_active_session_dropped(
&socket_addr,
&peer,
&EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
DisconnectReason::UselessPeer,
)),
);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(!peers.peers.contains_key(&peer));
}
#[tokio::test]
async fn test_remove_on_max_backoff_count() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test();
let mut peers = PeersManager::new(config.clone());
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let peer_struct = peers.peers.get_mut(&peer).unwrap();
peer_struct.severe_backoff_counter = config.max_backoff_count;
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(
io::Error::new(io::ErrorKind::ConnectionRefused, "peer unreachable").into(),
),
);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(!peers.peers.contains_key(&peer));
}
#[tokio::test]
async fn test_ban_on_pending_drop() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_outgoing_pending_session_dropped(
&socket_addr,
&peer,
&PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::Disconnected(DisconnectReason::UselessPeer),
)),
);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(!peers.peers.contains_key(&peer));
}
#[tokio::test]
async fn test_internally_closed_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_pending_session_rejected_internally();
assert_eq!(peers.connection_info.num_pending_in, 0);
}
#[tokio::test]
async fn test_reject_incoming_at_pending_capacity() {
let mut peers = PeersManager::default();
for count in 1..=peers.connection_info.config.max_inbound {
let socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, count);
}
assert!(peers.connection_info.has_in_capacity());
assert!(!peers.connection_info.has_in_pending_capacity());
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
}
#[tokio::test]
async fn test_closed_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_pending_session_gracefully_closed();
assert_eq!(peers.connection_info.num_pending_in, 0);
}
#[tokio::test]
async fn test_dropped_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 0, 1, 2)), 8008);
let ban_duration = Duration::from_millis(500);
let config = PeersConfig { ban_duration, ..PeersConfig::test() };
let mut peers = PeersManager::new(config);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::UselessPeer,
)),
));
peers.on_incoming_pending_session_dropped(socket_addr, &err);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
tokio::time::sleep(ban_duration).await;
poll_fn(|cx| {
let _ = peers.poll(cx);
Poll::Ready(())
})
.await;
assert!(!peers.ban_list.is_banned_ip(&socket_addr.ip()));
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
}
#[tokio::test]
async fn test_reputation_change_connected() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let p = peers.peers.get_mut(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
assert!(p.is_banned());
peers.on_active_session_gracefully_closed(peer);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Idle);
assert!(p.is_banned());
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn accept_incoming_trusted_unknown_peer_address() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
let trusted = PeerId::random();
peers.add_trusted_peer_id(trusted);
for i in 0..peers.connection_info.config.max_inbound {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, i as u8)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
let peer_id = PeerId::random();
peers.on_incoming_session_established(peer_id, addr);
match event!(peers) {
PeerAction::PeerAdded(id) => {
assert_eq!(id, peer_id);
}
_ => unreachable!(),
}
}
let untrusted = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(untrusted, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(id) => {
assert_eq!(id, untrusted);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Disconnect { peer_id, reason } => {
assert_eq!(peer_id, untrusted);
assert_eq!(reason, Some(DisconnectReason::TooManyPeers));
}
_ => unreachable!(),
}
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(trusted, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(id) => {
assert_eq!(id, trusted);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
let peer = peers.peers.get(&trusted).unwrap();
assert_eq!(peer.state, PeerConnectionState::In);
}
#[tokio::test]
async fn test_already_connected() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_session_established(peer, socket_addr);
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr.tcp(), socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_already_connected(Direction::Incoming);
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr.tcp(), socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
}
#[tokio::test]
async fn test_reputation_change_trusted_peer() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_trusted_peer(peer, PeerAddr::from_tcp(socket_addr));
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::PendingOut);
peers.on_active_outgoing_established(peer);
assert_eq!(peers.peers.get_mut(&peer).unwrap().state, PeerConnectionState::Out);
peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
{
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Out);
assert!(!p.is_banned());
}
loop {
peers.apply_reputation_change(&peer, ReputationChangeKind::BadMessage);
let p = peers.peers.get(&peer).unwrap();
if p.is_banned() {
break
}
}
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_reputation_management() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
assert_eq!(peers.get_reputation(&peer), Some(0));
peers.apply_reputation_change(&peer, ReputationChangeKind::Other(1024));
assert_eq!(peers.get_reputation(&peer), Some(1024));
peers.apply_reputation_change(&peer, ReputationChangeKind::Reset);
assert_eq!(peers.get_reputation(&peer), Some(0));
}
#[tokio::test]
async fn test_remove_discovered_active() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
peers.remove_peer(peer);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
peers.on_active_session_gracefully_closed(peer);
assert!(!peers.peers.contains_key(&peer));
}
#[tokio::test]
async fn test_fatal_outgoing_connection_error_trusted() {
let peer = PeerId::random();
let config = PeersConfig::test()
.with_trusted_nodes(vec![TrustedPeer {
host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
tcp_port: 8008,
udp_port: 8008,
id: peer,
}])
.with_trusted_nodes_only(true);
let mut peers = PeersManager::new(config);
let socket_addr = peers.peers.get(&peer).unwrap().addr.tcp();
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
assert_eq!(peers.num_outbound_connections(), 0);
let err = PendingSessionHandshakeError::Eth(EthStreamError::EthHandshakeError(
EthHandshakeError::NonStatusMessageInHandshake,
));
assert!(err.is_fatal_protocol_error());
peers.on_outgoing_pending_session_dropped(&socket_addr, &peer, &err);
assert_eq!(peers.num_outbound_connections(), 0);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
err => unreachable!("{err:?}"),
}
assert!(peers.peers.contains_key(&peer));
tokio::time::sleep(peers.backoff_durations.medium).await;
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
err => unreachable!("{err:?}"),
}
}
#[tokio::test]
async fn test_outgoing_connection_error() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
assert_eq!(peers.num_outbound_connections(), 0);
peers.on_outgoing_connection_failure(
&socket_addr,
&peer,
&io::Error::new(io::ErrorKind::ConnectionRefused, ""),
);
assert_eq!(peers.num_outbound_connections(), 0);
}
#[tokio::test]
async fn test_outgoing_connection_gracefully_closed() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::PendingOut);
assert_eq!(peers.num_outbound_connections(), 0);
peers.on_outgoing_pending_session_gracefully_closed(&peer);
assert_eq!(peers.num_outbound_connections(), 0);
assert_eq!(peers.connection_info.num_pending_out, 0);
}
#[tokio::test]
async fn test_discovery_ban_list() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);
let ban_list = BanList::new(vec![], vec![ip]);
let config = PeersConfig::default().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
peer_manager.add_peer(B512::default(), PeerAddr::from_tcp(socket_addr), None);
assert!(peer_manager.peers.is_empty());
}
#[tokio::test]
async fn test_on_pending_ban_list() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);
let ban_list = BanList::new(vec![], vec![ip]);
let config = PeersConfig::test().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
let a = peer_manager.on_incoming_pending_session(socket_addr.ip());
match a {
Ok(_) => panic!(),
Err(err) => match err {
InboundConnectionError::IpBanned {} => {
assert_eq!(peer_manager.connection_info.num_pending_in, 0)
}
_ => unreachable!(),
},
}
}
#[tokio::test]
async fn test_on_active_inbound_ban_list() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);
let given_peer_id = PeerId::random();
let ban_list = BanList::new(vec![given_peer_id], vec![]);
let config = PeersConfig::test().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
assert!(peer_manager.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peer_manager.connection_info.num_pending_in, 1);
peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
assert_eq!(peer_manager.connection_info.num_pending_in, 0);
assert_eq!(peer_manager.connection_info.num_inbound, 0);
let Some(PeerAction::DisconnectBannedIncoming { peer_id }) =
peer_manager.queued_actions.pop_front()
else {
panic!()
};
assert_eq!(peer_id, given_peer_id)
}
#[test]
fn test_connection_limits() {
let mut info = ConnectionInfo::default();
info.inc_in();
assert_eq!(info.num_inbound, 1);
assert_eq!(info.num_outbound, 0);
assert!(info.has_in_capacity());
info.decr_in();
assert_eq!(info.num_inbound, 0);
assert_eq!(info.num_outbound, 0);
info.inc_out();
assert_eq!(info.num_inbound, 0);
assert_eq!(info.num_outbound, 1);
assert!(info.has_out_capacity());
info.decr_out();
assert_eq!(info.num_inbound, 0);
assert_eq!(info.num_outbound, 0);
}
#[test]
fn test_connection_peer_state() {
let mut info = ConnectionInfo::default();
info.inc_in();
info.decr_state(PeerConnectionState::In);
assert_eq!(info.num_inbound, 0);
assert_eq!(info.num_outbound, 0);
info.inc_out();
info.decr_state(PeerConnectionState::Out);
assert_eq!(info.num_inbound, 0);
assert_eq!(info.num_outbound, 0);
}
#[tokio::test]
async fn test_trusted_peers_are_prioritized() {
let trusted_peer = PeerId::random();
let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test().with_trusted_nodes(vec![TrustedPeer {
host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
tcp_port: 8008,
udp_port: 8008,
id: trusted_peer,
}]);
let mut peers = PeersManager::new(config);
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, basic_peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, trusted_peer);
assert_eq!(remote_addr, trusted_sock);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, basic_peer);
assert_eq!(remote_addr, basic_sock);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_connect_trusted_nodes_only() {
let trusted_peer = PeerId::random();
let trusted_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test()
.with_trusted_nodes(vec![TrustedPeer {
host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
tcp_port: 8008,
udp_port: 8008,
id: trusted_peer,
}])
.with_trusted_nodes_only(true);
let mut peers = PeersManager::new(config);
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_peer(basic_peer, PeerAddr::from_tcp(basic_sock), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, basic_peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, trusted_peer);
assert_eq!(remote_addr, trusted_sock);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn test_incoming_with_trusted_nodes_only() {
let trusted_peer = PeerId::random();
let config = PeersConfig::test()
.with_trusted_nodes(vec![TrustedPeer {
host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
tcp_port: 8008,
udp_port: 8008,
id: trusted_peer,
}])
.with_trusted_nodes_only(true);
let mut peers = PeersManager::new(config);
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_session_established(basic_peer, basic_sock);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 0);
let Some(PeerAction::DisconnectUntrustedIncoming { peer_id }) =
peers.queued_actions.pop_front()
else {
panic!()
};
assert_eq!(basic_peer, peer_id);
assert!(!peers.peers.contains_key(&basic_peer));
}
#[tokio::test]
async fn test_incoming_without_trusted_nodes_only() {
let trusted_peer = PeerId::random();
let config = PeersConfig::test()
.with_trusted_nodes(vec![TrustedPeer {
host: Host::Ipv4(Ipv4Addr::new(127, 0, 1, 2)),
tcp_port: 8008,
udp_port: 8008,
id: trusted_peer,
}])
.with_trusted_nodes_only(false);
let mut peers = PeersManager::new(config);
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
assert!(peers.on_incoming_pending_session(basic_sock.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_session_established(basic_peer, basic_sock);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
assert!(peers.peers.contains_key(&basic_peer));
}
#[tokio::test]
async fn test_incoming_at_capacity() {
let mut config = PeersConfig::test();
config.connection_info.max_inbound = 1;
let mut peers = PeersManager::new(config);
let peer = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
peers.on_incoming_session_established(peer, addr);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
assert_eq!(
peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
InboundConnectionError::ExceedsCapacity
);
}
#[tokio::test]
async fn test_incoming_rate_limit() {
let config = PeersConfig {
incoming_ip_throttle_duration: Duration::from_millis(100),
..PeersConfig::test()
};
let mut peers = PeersManager::new(config);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
assert_eq!(
peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
InboundConnectionError::IpBanned
);
peers.release_interval.reset_immediately();
tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
poll_fn(|cx| loop {
if peers.poll(cx).is_pending() {
return Poll::Ready(());
}
})
.await;
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
assert_eq!(
peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
InboundConnectionError::IpBanned
);
}
#[tokio::test]
async fn test_tick() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);
let config = PeersConfig::test();
let mut peer_manager = PeersManager::new(config);
let peer_id = PeerId::random();
peer_manager.add_peer(peer_id, PeerAddr::from_tcp(socket_addr), None);
tokio::time::sleep(Duration::from_secs(1)).await;
peer_manager.tick();
assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
peer_manager.peers.get_mut(&peer_id).unwrap().state = PeerConnectionState::Out;
tokio::time::sleep(Duration::from_secs(1)).await;
peer_manager.tick();
assert_eq!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation, DEFAULT_REPUTATION);
peer_manager.peers.get_mut(&peer_id).unwrap().reputation -= 1;
tokio::time::sleep(Duration::from_secs(1)).await;
peer_manager.tick();
assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
}
#[tokio::test]
async fn test_remove_incoming_after_disconnect() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.on_incoming_session_established(peer_id, addr);
let peer = peers.peers.get(&peer_id).unwrap();
assert_eq!(peer.state, PeerConnectionState::In);
assert!(peer.remove_after_disconnect);
peers.on_active_session_gracefully_closed(peer_id);
assert!(!peers.peers.contains_key(&peer_id))
}
#[tokio::test]
async fn test_keep_incoming_after_disconnect_if_discovered() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.on_incoming_session_established(peer_id, addr);
let peer = peers.peers.get(&peer_id).unwrap();
assert_eq!(peer.state, PeerConnectionState::In);
assert!(peer.remove_after_disconnect);
peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
peers.on_active_session_gracefully_closed(peer_id);
let peer = peers.peers.get(&peer_id).unwrap();
assert_eq!(peer.state, PeerConnectionState::Idle);
assert!(!peer.remove_after_disconnect);
}
#[tokio::test]
async fn test_incoming_outgoing_already_connected() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
match event!(peers) {
PeerAction::PeerAdded(_) => {}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { .. } => {}
_ => unreachable!(),
}
peers.on_incoming_session_established(peer_id, addr);
peers.on_already_connected(Direction::Outgoing(peer_id));
assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
assert_eq!(peers.connection_info.num_inbound, 1);
assert_eq!(peers.connection_info.num_pending_out, 0);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_outbound, 0);
}
#[tokio::test]
async fn test_already_connected_incoming_outgoing_connection_error() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, PeerAddr::from_tcp(addr), None);
match event!(peers) {
PeerAction::PeerAdded(_) => {}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { .. } => {}
_ => unreachable!(),
}
peers.on_incoming_session_established(peer_id, addr);
peers.on_outgoing_connection_failure(
&addr,
&peer_id,
&io::Error::new(io::ErrorKind::ConnectionRefused, ""),
);
assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
assert_eq!(peers.connection_info.num_inbound, 1);
assert_eq!(peers.connection_info.num_pending_out, 0);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_outbound, 0);
}
#[tokio::test]
async fn test_max_concurrent_dials() {
let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), peer_addr, None);
}
peer_manager.fill_outbound_slots();
let dials = peer_manager
.queued_actions
.iter()
.filter(|ev| matches!(ev, PeerAction::Connect { .. }))
.count();
assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
}
#[tokio::test]
async fn test_max_num_of_pending_dials() {
let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let peer_addr = PeerAddr::from_tcp(SocketAddr::new(ip, 8008));
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), peer_addr, None);
}
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials * 2 {
match event!(peer_manager) {
PeerAction::PeerAdded(_) => {}
_ => unreachable!(),
}
}
for _ in 0..peer_manager.connection_info.config.max_concurrent_outbound_dials {
match event!(peer_manager) {
PeerAction::Connect { .. } => {}
_ => unreachable!(),
}
}
peer_manager.fill_outbound_slots();
let dials = peer_manager.connection_info.num_pending_out;
assert_eq!(dials, peer_manager.connection_info.config.max_concurrent_outbound_dials);
let num_pendingout_states = peer_manager
.peers
.iter()
.filter(|(_, peer)| peer.state == PeerConnectionState::PendingOut)
.map(|(peer_id, _)| *peer_id)
.collect::<Vec<PeerId>>();
assert_eq!(
num_pendingout_states.len(),
peer_manager.connection_info.config.max_concurrent_outbound_dials
);
for peer_id in &num_pendingout_states {
peer_manager.on_active_outgoing_established(*peer_id);
}
for peer_id in &num_pendingout_states {
assert_eq!(peer_manager.peers.get(peer_id).unwrap().state, PeerConnectionState::Out);
}
assert_eq!(peer_manager.connection_info.num_pending_out, 0);
}
#[tokio::test]
async fn test_connect() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut);
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None);
let (record, _) = peers.peer_by_id(peer).unwrap();
assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr);
}
}