use crate::{
config::NetworkMode, message::PeerMessage, protocol::RlpxSubProtocol,
swarm::NetworkConnectionState, transactions::TransactionsHandle, FetchClient,
};
use alloy_primitives::B256;
use enr::Enr;
use futures::StreamExt;
use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
use reth_eth_wire::{
DisconnectReason, EthNetworkPrimitives, NetworkPrimitives, NewBlock,
NewPooledTransactionHashes, SharedTransactions,
};
use reth_ethereum_forks::Head;
use reth_network_api::{
events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
PeersInfo,
};
use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind};
use reth_tokio_util::{EventSender, EventStream};
use secp256k1::SecretKey;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
};
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
#[derive(Clone, Debug)]
pub struct NetworkHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
inner: Arc<NetworkInner<N>>,
}
impl<N: NetworkPrimitives> NetworkHandle<N> {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
num_active_peers: Arc<AtomicUsize>,
listener_address: Arc<Mutex<SocketAddr>>,
to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
secret_key: SecretKey,
local_peer_id: PeerId,
peers: PeersHandle,
network_mode: NetworkMode,
chain_id: Arc<AtomicU64>,
tx_gossip_disabled: bool,
discv4: Option<Discv4>,
discv5: Option<Discv5>,
event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
nat: Option<NatResolver>,
) -> Self {
let inner = NetworkInner {
num_active_peers,
to_manager_tx,
listener_address,
secret_key,
local_peer_id,
peers,
network_mode,
is_syncing: Arc::new(AtomicBool::new(false)),
initial_sync_done: Arc::new(AtomicBool::new(false)),
chain_id,
tx_gossip_disabled,
discv4,
discv5,
event_sender,
nat,
};
Self { inner: Arc::new(inner) }
}
pub fn peer_id(&self) -> &PeerId {
&self.inner.local_peer_id
}
fn manager(&self) -> &UnboundedSender<NetworkHandleMessage<N>> {
&self.inner.to_manager_tx
}
pub fn mode(&self) -> &NetworkMode {
&self.inner.network_mode
}
pub(crate) fn send_message(&self, msg: NetworkHandleMessage<N>) {
let _ = self.inner.to_manager_tx.send(msg);
}
pub fn update_status(&self, head: Head) {
self.send_message(NetworkHandleMessage::StatusUpdate { head });
}
pub fn announce_block(&self, block: NewBlock<N::Block>, hash: B256) {
self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
}
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest<N>) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}
pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
}
pub fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<N::BroadcastedTransaction>>) {
self.send_message(NetworkHandleMessage::SendTransaction {
peer_id,
msg: SharedTransactions(msg),
})
}
pub fn send_eth_message(&self, peer_id: PeerId, message: PeerMessage<N>) {
self.send_message(NetworkHandleMessage::EthMessage { peer_id, message })
}
pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
rx.await.unwrap()
}
pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
self.send_message(NetworkHandleMessage::Shutdown(tx));
rx.await
}
pub fn set_network_active(&self) {
self.set_network_conn(NetworkConnectionState::Active);
}
pub fn set_network_hibernate(&self) {
self.set_network_conn(NetworkConnectionState::Hibernate);
}
fn set_network_conn(&self, network_conn: NetworkConnectionState) {
self.send_message(NetworkHandleMessage::SetNetworkState(network_conn));
}
pub fn tx_gossip_disabled(&self) -> bool {
self.inner.tx_gossip_disabled
}
pub fn secret_key(&self) -> &SecretKey {
&self.inner.secret_key
}
}
impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
fn peer_events(&self) -> PeerEventStream {
let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
});
PeerEventStream::new(peer_events)
}
}
impl<N: NetworkPrimitives> NetworkEventListenerProvider for NetworkHandle<N> {
type Primitives = N;
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>> {
self.inner.event_sender.new_listener()
}
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
UnboundedReceiverStream::new(rx)
}
}
impl<N: NetworkPrimitives> NetworkProtocols for NetworkHandle<N> {
fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol) {
self.send_message(NetworkHandleMessage::AddRlpxSubProtocol(protocol))
}
}
impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
fn num_connected_peers(&self) -> usize {
self.inner.num_active_peers.load(Ordering::Relaxed)
}
fn local_node_record(&self) -> NodeRecord {
if let Some(discv4) = &self.inner.discv4 {
discv4.node_record()
} else if let Some(record) = self.inner.discv5.as_ref().and_then(|d| d.node_record()) {
record
} else {
let external_ip = self.inner.nat.and_then(|nat| nat.as_external_ip());
let mut socket_addr = *self.inner.listener_address.lock();
if let Some(ip) = external_ip {
socket_addr.set_ip(ip)
} else if socket_addr.ip().is_unspecified() {
if socket_addr.ip().is_ipv4() {
socket_addr.set_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
} else {
socket_addr.set_ip(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST));
}
}
NodeRecord::new(socket_addr, *self.peer_id())
}
}
fn local_enr(&self) -> Enr<SecretKey> {
let local_node_record = self.local_node_record();
let mut builder = Enr::builder();
builder.ip(local_node_record.address);
if local_node_record.address.is_ipv4() {
builder.udp4(local_node_record.udp_port);
builder.tcp4(local_node_record.tcp_port);
} else {
builder.udp6(local_node_record.udp_port);
builder.tcp6(local_node_record.tcp_port);
}
builder.build(&self.inner.secret_key).expect("valid enr")
}
}
impl<N: NetworkPrimitives> Peers for NetworkHandle<N> {
fn add_trusted_peer_id(&self, peer: PeerId) {
self.send_message(NetworkHandleMessage::AddTrustedPeerId(peer));
}
fn add_peer_kind(
&self,
peer: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
) {
let addr = PeerAddr::new(tcp_addr, udp_addr);
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
}
async fn get_peers_by_kind(&self, kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx));
Ok(rx.await?)
}
async fn get_all_peers(&self) -> Result<Vec<PeerInfo>, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfos(tx));
Ok(rx.await?)
}
async fn get_peer_by_id(&self, peer_id: PeerId) -> Result<Option<PeerInfo>, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfoById(peer_id, tx));
Ok(rx.await?)
}
async fn get_peers_by_id(&self, peer_ids: Vec<PeerId>) -> Result<Vec<PeerInfo>, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx));
Ok(rx.await?)
}
fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
}
fn disconnect_peer(&self, peer: PeerId) {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, None))
}
fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason) {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
}
fn connect_peer_kind(
&self,
peer_id: PeerId,
kind: PeerKind,
tcp_addr: SocketAddr,
udp_addr: Option<SocketAddr>,
) {
self.send_message(NetworkHandleMessage::ConnectPeer(
peer_id,
kind,
PeerAddr::new(tcp_addr, udp_addr),
))
}
fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
}
async fn reputation_by_id(&self, peer_id: PeerId) -> Result<Option<Reputation>, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetReputationById(peer_id, tx));
Ok(rx.await?)
}
}
impl<N: NetworkPrimitives> PeersHandleProvider for NetworkHandle<N> {
fn peers_handle(&self) -> &PeersHandle {
&self.inner.peers
}
}
impl<N: NetworkPrimitives> NetworkInfo for NetworkHandle<N> {
fn local_addr(&self) -> SocketAddr {
*self.inner.listener_address.lock()
}
async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetStatus(tx));
rx.await.map_err(Into::into)
}
fn chain_id(&self) -> u64 {
self.inner.chain_id.load(Ordering::Relaxed)
}
fn is_syncing(&self) -> bool {
SyncStateProvider::is_syncing(self)
}
fn is_initially_syncing(&self) -> bool {
SyncStateProvider::is_initially_syncing(self)
}
}
impl<N: NetworkPrimitives> SyncStateProvider for NetworkHandle<N> {
fn is_syncing(&self) -> bool {
self.inner.is_syncing.load(Ordering::Relaxed)
}
fn is_initially_syncing(&self) -> bool {
if self.inner.initial_sync_done.load(Ordering::Relaxed) {
return false
}
self.inner.is_syncing.load(Ordering::Relaxed)
}
}
impl<N: NetworkPrimitives> NetworkSyncUpdater for NetworkHandle<N> {
fn update_sync_state(&self, state: SyncState) {
let future_state = state.is_syncing();
let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
let syncing_to_idle_state_transition = prev_state && !future_state;
if syncing_to_idle_state_transition {
self.inner.initial_sync_done.store(true, Ordering::Relaxed);
}
}
fn update_status(&self, head: Head) {
self.send_message(NetworkHandleMessage::StatusUpdate { head });
}
}
impl<N: NetworkPrimitives> BlockDownloaderProvider for NetworkHandle<N> {
type Client = FetchClient<N>;
async fn fetch_client(&self) -> Result<Self::Client, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx));
rx.await
}
}
#[derive(Debug)]
struct NetworkInner<N: NetworkPrimitives = EthNetworkPrimitives> {
num_active_peers: Arc<AtomicUsize>,
to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
listener_address: Arc<Mutex<SocketAddr>>,
secret_key: SecretKey,
local_peer_id: PeerId,
peers: PeersHandle,
network_mode: NetworkMode,
is_syncing: Arc<AtomicBool>,
initial_sync_done: Arc<AtomicBool>,
chain_id: Arc<AtomicU64>,
tx_gossip_disabled: bool,
discv4: Option<Discv4>,
discv5: Option<Discv5>,
event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
nat: Option<NatResolver>,
}
pub trait NetworkProtocols: Send + Sync {
fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol);
}
#[derive(Debug)]
pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
AddTrustedPeerId(PeerId),
AddPeerAddress(PeerId, PeerKind, PeerAddr),
RemovePeer(PeerId, PeerKind),
DisconnectPeer(PeerId, Option<DisconnectReason>),
AnnounceBlock(NewBlock<N::Block>, B256),
SendTransaction {
peer_id: PeerId,
msg: SharedTransactions<N::BroadcastedTransaction>,
},
SendPooledTransactionHashes {
peer_id: PeerId,
msg: NewPooledTransactionHashes,
},
EthRequest {
peer_id: PeerId,
request: PeerRequest<N>,
},
EthMessage {
peer_id: PeerId,
message: PeerMessage<N>,
},
ReputationChange(PeerId, ReputationChangeKind),
FetchClient(oneshot::Sender<FetchClient<N>>),
StatusUpdate {
head: Head,
},
GetStatus(oneshot::Sender<NetworkStatus>),
GetPeerInfosByIds(Vec<PeerId>, oneshot::Sender<Vec<PeerInfo>>),
GetPeerInfos(oneshot::Sender<Vec<PeerInfo>>),
GetPeerInfoById(PeerId, oneshot::Sender<Option<PeerInfo>>),
GetPeerInfosByPeerKind(PeerKind, oneshot::Sender<Vec<PeerInfo>>),
GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
Shutdown(oneshot::Sender<()>),
SetNetworkState(NetworkConnectionState),
DiscoveryListener(UnboundedSender<DiscoveryEvent>),
AddRlpxSubProtocol(RlpxSubProtocol),
ConnectPeer(PeerId, PeerKind, PeerAddr),
}