1use crate::{
2 config::NetworkMode, message::PeerMessage, protocol::RlpxSubProtocol,
3 swarm::NetworkConnectionState, transactions::TransactionsHandle, FetchClient,
4};
5use alloy_primitives::B256;
6use enr::Enr;
7use futures::StreamExt;
8use parking_lot::Mutex;
9use reth_discv4::{Discv4, NatResolver};
10use reth_discv5::Discv5;
11use reth_eth_wire::{
12 BlockRangeUpdate, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
13 NewPooledTransactionHashes, SharedTransactions,
14};
15use reth_ethereum_forks::Head;
16use reth_network_api::{
17 events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
18 test_utils::{PeersHandle, PeersHandleProvider},
19 BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
20 NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
21 PeersInfo,
22};
23use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
24use reth_network_peers::{NodeRecord, PeerId};
25use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind};
26use reth_tokio_util::{EventSender, EventStream};
27use secp256k1::SecretKey;
28use std::{
29 net::SocketAddr,
30 sync::{
31 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
32 Arc,
33 },
34};
35use tokio::sync::{
36 mpsc::{self, UnboundedSender},
37 oneshot,
38};
39use tokio_stream::wrappers::UnboundedReceiverStream;
40
41#[derive(Clone, Debug)]
45pub struct NetworkHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
46 inner: Arc<NetworkInner<N>>,
48}
49
50impl<N: NetworkPrimitives> NetworkHandle<N> {
53 #[expect(clippy::too_many_arguments)]
55 pub(crate) fn new(
56 num_active_peers: Arc<AtomicUsize>,
57 listener_address: Arc<Mutex<SocketAddr>>,
58 to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
59 secret_key: SecretKey,
60 local_peer_id: PeerId,
61 peers: PeersHandle,
62 network_mode: NetworkMode,
63 chain_id: Arc<AtomicU64>,
64 tx_gossip_disabled: bool,
65 discv4: Option<Discv4>,
66 discv5: Option<Discv5>,
67 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
68 nat: Option<NatResolver>,
69 ) -> Self {
70 let inner = NetworkInner {
71 num_active_peers,
72 to_manager_tx,
73 listener_address,
74 secret_key,
75 local_peer_id,
76 peers,
77 network_mode,
78 is_syncing: Arc::new(AtomicBool::new(false)),
79 initial_sync_done: Arc::new(AtomicBool::new(false)),
80 chain_id,
81 tx_gossip_disabled,
82 discv4,
83 discv5,
84 event_sender,
85 nat,
86 };
87 Self { inner: Arc::new(inner) }
88 }
89
90 pub fn peer_id(&self) -> &PeerId {
92 &self.inner.local_peer_id
93 }
94
95 fn manager(&self) -> &UnboundedSender<NetworkHandleMessage<N>> {
96 &self.inner.to_manager_tx
97 }
98
99 pub fn mode(&self) -> &NetworkMode {
101 &self.inner.network_mode
102 }
103
104 pub(crate) fn send_message(&self, msg: NetworkHandleMessage<N>) {
106 let _ = self.inner.to_manager_tx.send(msg);
107 }
108
109 pub fn update_status(&self, head: Head) {
111 self.send_message(NetworkHandleMessage::StatusUpdate { head });
112 }
113
114 pub fn announce_block(&self, block: N::NewBlockPayload, hash: B256) {
120 self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
121 }
122
123 pub fn send_request(&self, peer_id: PeerId, request: PeerRequest<N>) {
125 self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
126 }
127
128 pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
130 self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
131 }
132
133 pub fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<N::BroadcastedTransaction>>) {
135 self.send_message(NetworkHandleMessage::SendTransaction {
136 peer_id,
137 msg: SharedTransactions(msg),
138 })
139 }
140
141 pub fn send_eth_message(&self, peer_id: PeerId, message: PeerMessage<N>) {
143 self.send_message(NetworkHandleMessage::EthMessage { peer_id, message })
144 }
145
146 pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
150 let (tx, rx) = oneshot::channel();
151 let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
152 rx.await.unwrap()
153 }
154
155 pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
160 let (tx, rx) = oneshot::channel();
161 self.send_message(NetworkHandleMessage::Shutdown(tx));
162 rx.await
163 }
164
165 pub fn set_network_active(&self) {
169 self.set_network_conn(NetworkConnectionState::Active);
170 }
171
172 pub fn set_network_hibernate(&self) {
176 self.set_network_conn(NetworkConnectionState::Hibernate);
177 }
178
179 fn set_network_conn(&self, network_conn: NetworkConnectionState) {
181 self.send_message(NetworkHandleMessage::SetNetworkState(network_conn));
182 }
183
184 pub fn tx_gossip_disabled(&self) -> bool {
186 self.inner.tx_gossip_disabled
187 }
188
189 pub fn secret_key(&self) -> &SecretKey {
191 &self.inner.secret_key
192 }
193
194 pub fn discv4(&self) -> Option<&Discv4> {
196 self.inner.discv4.as_ref()
197 }
198
199 pub fn discv5(&self) -> Option<&Discv5> {
201 self.inner.discv5.as_ref()
202 }
203}
204
205impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
208 fn peer_events(&self) -> PeerEventStream {
210 let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
211 NetworkEvent::Peer(peer_event) => peer_event,
212 NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
213 });
214 PeerEventStream::new(peer_events)
215 }
216}
217
218impl<N: NetworkPrimitives> NetworkEventListenerProvider for NetworkHandle<N> {
219 type Primitives = N;
220
221 fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>> {
222 self.inner.event_sender.new_listener()
223 }
224
225 fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
226 let (tx, rx) = mpsc::unbounded_channel();
227 let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
228 UnboundedReceiverStream::new(rx)
229 }
230}
231
232impl<N: NetworkPrimitives> NetworkProtocols for NetworkHandle<N> {
233 fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol) {
234 self.send_message(NetworkHandleMessage::AddRlpxSubProtocol(protocol))
235 }
236}
237
238impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
239 fn num_connected_peers(&self) -> usize {
240 self.inner.num_active_peers.load(Ordering::Relaxed)
241 }
242
243 fn local_node_record(&self) -> NodeRecord {
244 if let Some(discv4) = &self.inner.discv4 {
245 discv4.node_record()
248 } else if let Some(discv5) = self.inner.discv5.as_ref() {
249 if let Some(external) =
251 self.inner.nat.clone().and_then(|nat| nat.as_external_ip(discv5.local_port()))
252 {
253 NodeRecord::new((external, discv5.local_port()).into(), *self.peer_id())
254 } else {
255 self.inner.discv5.as_ref().and_then(|d| d.node_record()).unwrap_or_else(|| {
257 NodeRecord::new(
258 (std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), discv5.local_port())
259 .into(),
260 *self.peer_id(),
261 )
262 })
263 }
264 .with_tcp_port(self.inner.listener_address.lock().port())
266 } else {
267 let mut socket_addr = *self.inner.listener_address.lock();
268
269 let external_ip =
270 self.inner.nat.clone().and_then(|nat| nat.as_external_ip(socket_addr.port()));
271
272 if let Some(ip) = external_ip {
273 socket_addr.set_ip(ip)
275 } else if socket_addr.ip().is_unspecified() {
276 if socket_addr.ip().is_ipv4() {
278 socket_addr.set_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
279 } else {
280 socket_addr.set_ip(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST));
281 }
282 }
283
284 NodeRecord::new(socket_addr, *self.peer_id())
285 }
286 }
287
288 fn local_enr(&self) -> Enr<SecretKey> {
289 let local_node_record = self.local_node_record();
290 let mut builder = Enr::builder();
291 builder.ip(local_node_record.address);
292 if local_node_record.address.is_ipv4() {
293 builder.udp4(local_node_record.udp_port);
294 builder.tcp4(local_node_record.tcp_port);
295
296 if let Some(discv5) = self.inner.discv5.as_ref() {
298 let discv5_enr = discv5.local_enr();
299 if let Some(ip6) = discv5_enr.ip6() {
300 builder.ip6(ip6);
301 }
302 if let Some(udp6) = discv5_enr.udp6() {
303 builder.udp6(udp6);
304 }
305 if let Some(tcp6) = discv5_enr.tcp6() {
306 builder.tcp6(tcp6);
307 }
308 }
309 } else {
310 builder.udp6(local_node_record.udp_port);
311 builder.tcp6(local_node_record.tcp_port);
312 }
313
314 builder.build(&self.inner.secret_key).expect("valid enr")
315 }
316}
317
318impl<N: NetworkPrimitives> Peers for NetworkHandle<N> {
319 fn add_trusted_peer_id(&self, peer: PeerId) {
320 self.send_message(NetworkHandleMessage::AddTrustedPeerId(peer));
321 }
322
323 fn add_peer_kind(
326 &self,
327 peer: PeerId,
328 kind: Option<PeerKind>,
329 tcp_addr: SocketAddr,
330 udp_addr: Option<SocketAddr>,
331 ) {
332 let addr = PeerAddr::new(tcp_addr, udp_addr);
333 self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
334 }
335
336 async fn get_peers_by_kind(&self, kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> {
337 let (tx, rx) = oneshot::channel();
338 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx));
339 Ok(rx.await?)
340 }
341
342 async fn get_all_peers(&self) -> Result<Vec<PeerInfo>, NetworkError> {
343 let (tx, rx) = oneshot::channel();
344 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfos(tx));
345 Ok(rx.await?)
346 }
347
348 async fn get_peer_by_id(&self, peer_id: PeerId) -> Result<Option<PeerInfo>, NetworkError> {
349 let (tx, rx) = oneshot::channel();
350 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfoById(peer_id, tx));
351 Ok(rx.await?)
352 }
353
354 async fn get_peers_by_id(&self, peer_ids: Vec<PeerId>) -> Result<Vec<PeerInfo>, NetworkError> {
355 let (tx, rx) = oneshot::channel();
356 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx));
357 Ok(rx.await?)
358 }
359
360 fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
363 self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
364 }
365
366 fn disconnect_peer(&self, peer: PeerId) {
369 self.send_message(NetworkHandleMessage::DisconnectPeer(peer, None))
370 }
371
372 fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason) {
375 self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
376 }
377
378 fn connect_peer_kind(
384 &self,
385 peer_id: PeerId,
386 kind: PeerKind,
387 tcp_addr: SocketAddr,
388 udp_addr: Option<SocketAddr>,
389 ) {
390 self.send_message(NetworkHandleMessage::ConnectPeer(
391 peer_id,
392 kind,
393 PeerAddr::new(tcp_addr, udp_addr),
394 ))
395 }
396
397 fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
399 self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
400 }
401
402 async fn reputation_by_id(&self, peer_id: PeerId) -> Result<Option<Reputation>, NetworkError> {
403 let (tx, rx) = oneshot::channel();
404 let _ = self.manager().send(NetworkHandleMessage::GetReputationById(peer_id, tx));
405 Ok(rx.await?)
406 }
407}
408
409impl<N: NetworkPrimitives> PeersHandleProvider for NetworkHandle<N> {
410 fn peers_handle(&self) -> &PeersHandle {
411 &self.inner.peers
412 }
413}
414
415impl<N: NetworkPrimitives> NetworkInfo for NetworkHandle<N> {
416 fn local_addr(&self) -> SocketAddr {
417 *self.inner.listener_address.lock()
418 }
419
420 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
421 let (tx, rx) = oneshot::channel();
422 let _ = self.manager().send(NetworkHandleMessage::GetStatus(tx));
423 rx.await.map_err(Into::into)
424 }
425
426 fn chain_id(&self) -> u64 {
427 self.inner.chain_id.load(Ordering::Relaxed)
428 }
429
430 fn is_syncing(&self) -> bool {
431 SyncStateProvider::is_syncing(self)
432 }
433
434 fn is_initially_syncing(&self) -> bool {
435 SyncStateProvider::is_initially_syncing(self)
436 }
437}
438
439impl<N: NetworkPrimitives> SyncStateProvider for NetworkHandle<N> {
440 fn is_syncing(&self) -> bool {
441 self.inner.is_syncing.load(Ordering::Relaxed)
442 }
443 fn is_initially_syncing(&self) -> bool {
445 if self.inner.initial_sync_done.load(Ordering::Relaxed) {
446 return false
447 }
448 self.inner.is_syncing.load(Ordering::Relaxed)
449 }
450}
451
452impl<N: NetworkPrimitives> NetworkSyncUpdater for NetworkHandle<N> {
453 fn update_sync_state(&self, state: SyncState) {
454 let future_state = state.is_syncing();
455 let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
456 let syncing_to_idle_state_transition = prev_state && !future_state;
457 if syncing_to_idle_state_transition {
458 self.inner.initial_sync_done.store(true, Ordering::Relaxed);
459 }
460 }
461
462 fn update_status(&self, head: Head) {
464 self.send_message(NetworkHandleMessage::StatusUpdate { head });
465 }
466
467 fn update_block_range(&self, update: reth_eth_wire::BlockRangeUpdate) {
469 self.send_message(NetworkHandleMessage::InternalBlockRangeUpdate(update));
470 }
471}
472
473impl<N: NetworkPrimitives> BlockDownloaderProvider for NetworkHandle<N> {
474 type Client = FetchClient<N>;
475
476 async fn fetch_client(&self) -> Result<Self::Client, oneshot::error::RecvError> {
477 let (tx, rx) = oneshot::channel();
478 let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx));
479 rx.await
480 }
481}
482
483#[derive(Debug)]
484struct NetworkInner<N: NetworkPrimitives = EthNetworkPrimitives> {
485 num_active_peers: Arc<AtomicUsize>,
487 to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
489 listener_address: Arc<Mutex<SocketAddr>>,
491 secret_key: SecretKey,
493 local_peer_id: PeerId,
495 peers: PeersHandle,
497 network_mode: NetworkMode,
499 is_syncing: Arc<AtomicBool>,
501 initial_sync_done: Arc<AtomicBool>,
503 chain_id: Arc<AtomicU64>,
505 tx_gossip_disabled: bool,
507 discv4: Option<Discv4>,
509 discv5: Option<Discv5>,
511 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
513 nat: Option<NatResolver>,
515}
516
517pub trait NetworkProtocols: Send + Sync {
519 fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol);
521}
522
523#[derive(Debug)]
525pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
526 AddTrustedPeerId(PeerId),
528 AddPeerAddress(PeerId, Option<PeerKind>, PeerAddr),
530 RemovePeer(PeerId, PeerKind),
532 DisconnectPeer(PeerId, Option<DisconnectReason>),
534 AnnounceBlock(N::NewBlockPayload, B256),
536 SendTransaction {
538 peer_id: PeerId,
540 msg: SharedTransactions<N::BroadcastedTransaction>,
542 },
543 SendPooledTransactionHashes {
545 peer_id: PeerId,
547 msg: NewPooledTransactionHashes,
549 },
550 EthRequest {
552 peer_id: PeerId,
554 request: PeerRequest<N>,
556 },
557 EthMessage {
559 peer_id: PeerId,
561 message: PeerMessage<N>,
563 },
564 ReputationChange(PeerId, ReputationChangeKind),
566 FetchClient(oneshot::Sender<FetchClient<N>>),
568 StatusUpdate {
570 head: Head,
572 },
573 GetStatus(oneshot::Sender<NetworkStatus>),
575 GetPeerInfosByIds(Vec<PeerId>, oneshot::Sender<Vec<PeerInfo>>),
577 GetPeerInfos(oneshot::Sender<Vec<PeerInfo>>),
579 GetPeerInfoById(PeerId, oneshot::Sender<Option<PeerInfo>>),
581 GetPeerInfosByPeerKind(PeerKind, oneshot::Sender<Vec<PeerInfo>>),
583 GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
585 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
587 Shutdown(oneshot::Sender<()>),
589 SetNetworkState(NetworkConnectionState),
591 DiscoveryListener(UnboundedSender<DiscoveryEvent>),
593 AddRlpxSubProtocol(RlpxSubProtocol),
595 ConnectPeer(PeerId, PeerKind, PeerAddr),
597 InternalBlockRangeUpdate(BlockRangeUpdate),
599}