1use crate::{
2 config::NetworkMode, message::PeerMessage, protocol::RlpxSubProtocol,
3 swarm::NetworkConnectionState, transactions::TransactionsHandle, FetchClient,
4};
5use alloy_primitives::B256;
6use enr::Enr;
7use futures::StreamExt;
8use parking_lot::Mutex;
9use reth_discv4::{Discv4, NatResolver};
10use reth_discv5::Discv5;
11use reth_eth_wire::{
12 BlockRangeUpdate, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
13 NewPooledTransactionHashes, SharedTransactions,
14};
15use reth_ethereum_forks::Head;
16use reth_network_api::{
17 events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
18 test_utils::{PeersHandle, PeersHandleProvider},
19 BlockDownloaderProvider, CellCustody, DiscoveryEvent, NetworkError, NetworkEvent,
20 NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
21 PeersInfo,
22};
23use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
24use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
25use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind};
26use reth_tokio_util::{EventSender, EventStream};
27use secp256k1::SecretKey;
28use std::{
29 net::SocketAddr,
30 sync::{
31 atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
32 Arc,
33 },
34};
35use tokio::sync::{
36 mpsc::{self, UnboundedSender},
37 oneshot,
38};
39use tokio_stream::wrappers::UnboundedReceiverStream;
40
41#[derive(Clone, Debug)]
45pub struct NetworkHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
46 inner: Arc<NetworkInner<N>>,
48}
49
50impl<N: NetworkPrimitives> NetworkHandle<N> {
53 #[expect(clippy::too_many_arguments)]
55 pub(crate) fn new(
56 num_active_peers: Arc<AtomicUsize>,
57 listener_address: Arc<Mutex<SocketAddr>>,
58 to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
59 secret_key: SecretKey,
60 local_peer_id: PeerId,
61 peers: PeersHandle,
62 network_mode: NetworkMode,
63 chain_id: Arc<AtomicU64>,
64 tx_gossip_disabled: bool,
65 discv4: Option<Discv4>,
66 discv5: Option<Discv5>,
67 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
68 nat: Option<NatResolver>,
69 ) -> Self {
70 let inner = NetworkInner {
71 num_active_peers,
72 to_manager_tx,
73 listener_address,
74 secret_key,
75 local_peer_id,
76 peers,
77 network_mode,
78 is_syncing: Arc::new(AtomicBool::new(false)),
79 initial_sync_done: Arc::new(AtomicBool::new(false)),
80 chain_id,
81 cell_custody: CellCustody::default(),
82 tx_gossip_disabled,
83 discv4,
84 discv5,
85 event_sender,
86 nat,
87 };
88 Self { inner: Arc::new(inner) }
89 }
90
91 pub fn peer_id(&self) -> &PeerId {
93 &self.inner.local_peer_id
94 }
95
96 fn manager(&self) -> &UnboundedSender<NetworkHandleMessage<N>> {
97 &self.inner.to_manager_tx
98 }
99
100 pub fn mode(&self) -> &NetworkMode {
102 &self.inner.network_mode
103 }
104
105 pub(crate) fn send_message(&self, msg: NetworkHandleMessage<N>) {
107 let _ = self.inner.to_manager_tx.send(msg);
108 }
109
110 pub fn update_status(&self, head: Head) {
112 self.send_message(NetworkHandleMessage::StatusUpdate { head });
113 }
114
115 pub fn announce_block(&self, block: N::NewBlockPayload, hash: B256) {
121 self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
122 }
123
124 pub fn send_request(&self, peer_id: PeerId, request: PeerRequest<N>) {
126 self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
127 }
128
129 pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
131 self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
132 }
133
134 pub fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<N::BroadcastedTransaction>>) {
136 self.send_message(NetworkHandleMessage::SendTransaction {
137 peer_id,
138 msg: SharedTransactions(msg),
139 })
140 }
141
142 pub fn send_eth_message(&self, peer_id: PeerId, message: PeerMessage<N>) {
144 self.send_message(NetworkHandleMessage::EthMessage { peer_id, message })
145 }
146
147 pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
151 let (tx, rx) = oneshot::channel();
152 let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
153 rx.await.unwrap()
154 }
155
156 pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
161 let (tx, rx) = oneshot::channel();
162 self.send_message(NetworkHandleMessage::Shutdown(tx));
163 rx.await
164 }
165
166 pub fn set_network_active(&self) {
170 self.set_network_conn(NetworkConnectionState::Active);
171 }
172
173 pub fn set_network_hibernate(&self) {
177 self.set_network_conn(NetworkConnectionState::Hibernate);
178 }
179
180 fn set_network_conn(&self, network_conn: NetworkConnectionState) {
182 self.send_message(NetworkHandleMessage::SetNetworkState(network_conn));
183 }
184
185 pub fn tx_gossip_disabled(&self) -> bool {
187 self.inner.tx_gossip_disabled
188 }
189
190 pub fn secret_key(&self) -> &SecretKey {
192 &self.inner.secret_key
193 }
194
195 pub fn discv4(&self) -> Option<&Discv4> {
197 self.inner.discv4.as_ref()
198 }
199
200 pub fn discv5(&self) -> Option<&Discv5> {
202 self.inner.discv5.as_ref()
203 }
204}
205
206impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
209 fn peer_events(&self) -> PeerEventStream {
211 let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
212 NetworkEvent::Peer(peer_event) => peer_event,
213 NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
214 });
215 PeerEventStream::new(peer_events)
216 }
217}
218
219impl<N: NetworkPrimitives> NetworkEventListenerProvider for NetworkHandle<N> {
220 type Primitives = N;
221
222 fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>> {
223 self.inner.event_sender.new_listener()
224 }
225
226 fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
227 let (tx, rx) = mpsc::unbounded_channel();
228 let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
229 UnboundedReceiverStream::new(rx)
230 }
231}
232
233impl<N: NetworkPrimitives> NetworkProtocols for NetworkHandle<N> {
234 fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol) {
235 self.send_message(NetworkHandleMessage::AddRlpxSubProtocol(protocol))
236 }
237}
238
239impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
240 fn num_connected_peers(&self) -> usize {
241 self.inner.num_active_peers.load(Ordering::Relaxed)
242 }
243
244 fn local_node_record(&self) -> NodeRecord {
245 if let Some(discv4) = &self.inner.discv4 {
246 discv4.node_record()
249 } else if let Some(discv5) = self.inner.discv5.as_ref() {
250 if let Some(external) =
252 self.inner.nat.clone().and_then(|nat| nat.as_external_ip(discv5.local_port()))
253 {
254 NodeRecord::new((external, discv5.local_port()).into(), *self.peer_id())
255 } else {
256 self.inner.discv5.as_ref().and_then(|d| d.node_record()).unwrap_or_else(|| {
258 NodeRecord::new(
259 (std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), discv5.local_port())
260 .into(),
261 *self.peer_id(),
262 )
263 })
264 }
265 .with_tcp_port(self.inner.listener_address.lock().port())
267 } else {
268 let mut socket_addr = *self.inner.listener_address.lock();
269
270 let external_ip =
271 self.inner.nat.clone().and_then(|nat| nat.as_external_ip(socket_addr.port()));
272
273 if let Some(ip) = external_ip {
274 socket_addr.set_ip(ip)
276 } else if socket_addr.ip().is_unspecified() {
277 if socket_addr.ip().is_ipv4() {
279 socket_addr.set_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
280 } else {
281 socket_addr.set_ip(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST));
282 }
283 }
284
285 NodeRecord::new(socket_addr, *self.peer_id())
286 }
287 }
288
289 fn local_enr(&self) -> Enr<SecretKey> {
290 let local_node_record = self.local_node_record();
291 let mut builder = Enr::builder();
292 builder.ip(local_node_record.address);
293 if local_node_record.address.is_ipv4() {
294 builder.udp4(local_node_record.udp_port);
295 builder.tcp4(local_node_record.tcp_port);
296
297 if let Some(discv5) = self.inner.discv5.as_ref() {
299 let discv5_enr = discv5.local_enr();
300 if let Some(ip6) = discv5_enr.ip6() {
301 builder.ip6(ip6);
302 }
303 if let Some(udp6) = discv5_enr.udp6() {
304 builder.udp6(udp6);
305 }
306 if let Some(tcp6) = discv5_enr.tcp6() {
307 builder.tcp6(tcp6);
308 }
309 }
310 } else {
311 builder.udp6(local_node_record.udp_port);
312 builder.tcp6(local_node_record.tcp_port);
313 }
314
315 builder.build(&self.inner.secret_key).expect("valid enr")
316 }
317}
318
319impl<N: NetworkPrimitives> Peers for NetworkHandle<N> {
320 fn add_trusted_peer_id(&self, peer: PeerId) {
321 self.send_message(NetworkHandleMessage::AddTrustedPeerId(peer));
322 }
323
324 fn add_trusted_peer_node(&self, peer: TrustedPeer) {
325 self.send_message(NetworkHandleMessage::AddTrustedPeerNode(peer));
326 }
327
328 fn add_peer_kind(
331 &self,
332 peer: PeerId,
333 kind: Option<PeerKind>,
334 tcp_addr: SocketAddr,
335 udp_addr: Option<SocketAddr>,
336 ) {
337 let addr = PeerAddr::new(tcp_addr, udp_addr);
338 self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
339 }
340
341 async fn get_peers_by_kind(&self, kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> {
342 let (tx, rx) = oneshot::channel();
343 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx));
344 Ok(rx.await?)
345 }
346
347 async fn get_all_peers(&self) -> Result<Vec<PeerInfo>, NetworkError> {
348 let (tx, rx) = oneshot::channel();
349 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfos(tx));
350 Ok(rx.await?)
351 }
352
353 async fn get_peer_by_id(&self, peer_id: PeerId) -> Result<Option<PeerInfo>, NetworkError> {
354 let (tx, rx) = oneshot::channel();
355 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfoById(peer_id, tx));
356 Ok(rx.await?)
357 }
358
359 async fn get_peers_by_id(&self, peer_ids: Vec<PeerId>) -> Result<Vec<PeerInfo>, NetworkError> {
360 let (tx, rx) = oneshot::channel();
361 let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx));
362 Ok(rx.await?)
363 }
364
365 fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
368 self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
369 }
370
371 fn disconnect_peer(&self, peer: PeerId) {
374 self.send_message(NetworkHandleMessage::DisconnectPeer(peer, None))
375 }
376
377 fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason) {
380 self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
381 }
382
383 fn ban_peer(&self, peer: PeerId) {
386 self.send_message(NetworkHandleMessage::BanPeer(peer))
387 }
388
389 fn unban_peer(&self, peer: PeerId) {
391 self.send_message(NetworkHandleMessage::UnbanPeer(peer))
392 }
393
394 fn connect_peer_kind(
400 &self,
401 peer_id: PeerId,
402 kind: PeerKind,
403 tcp_addr: SocketAddr,
404 udp_addr: Option<SocketAddr>,
405 ) {
406 self.send_message(NetworkHandleMessage::ConnectPeer(
407 peer_id,
408 kind,
409 PeerAddr::new(tcp_addr, udp_addr),
410 ))
411 }
412
413 fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
415 self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
416 }
417
418 async fn reputation_by_id(&self, peer_id: PeerId) -> Result<Option<Reputation>, NetworkError> {
419 let (tx, rx) = oneshot::channel();
420 let _ = self.manager().send(NetworkHandleMessage::GetReputationById(peer_id, tx));
421 Ok(rx.await?)
422 }
423}
424
425impl<N: NetworkPrimitives> PeersHandleProvider for NetworkHandle<N> {
426 fn peers_handle(&self) -> &PeersHandle {
427 &self.inner.peers
428 }
429}
430
431impl<N: NetworkPrimitives> NetworkInfo for NetworkHandle<N> {
432 fn local_addr(&self) -> SocketAddr {
433 *self.inner.listener_address.lock()
434 }
435
436 async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
437 let (tx, rx) = oneshot::channel();
438 let _ = self.manager().send(NetworkHandleMessage::GetStatus(tx));
439 rx.await.map_err(Into::into)
440 }
441
442 fn chain_id(&self) -> u64 {
443 self.inner.chain_id.load(Ordering::Relaxed)
444 }
445
446 fn cell_custody(&self) -> &CellCustody {
447 &self.inner.cell_custody
448 }
449
450 fn is_syncing(&self) -> bool {
451 SyncStateProvider::is_syncing(self)
452 }
453
454 fn is_initially_syncing(&self) -> bool {
455 SyncStateProvider::is_initially_syncing(self)
456 }
457}
458
459impl<N: NetworkPrimitives> SyncStateProvider for NetworkHandle<N> {
460 fn is_syncing(&self) -> bool {
461 self.inner.is_syncing.load(Ordering::Relaxed)
462 }
463 fn is_initially_syncing(&self) -> bool {
465 if self.inner.initial_sync_done.load(Ordering::Relaxed) {
466 return false
467 }
468 self.inner.is_syncing.load(Ordering::Relaxed)
469 }
470}
471
472impl<N: NetworkPrimitives> NetworkSyncUpdater for NetworkHandle<N> {
473 fn update_sync_state(&self, state: SyncState) {
474 let future_state = state.is_syncing();
475 let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
476 let syncing_to_idle_state_transition = prev_state && !future_state;
477 if syncing_to_idle_state_transition {
478 self.inner.initial_sync_done.store(true, Ordering::Relaxed);
479 }
480 }
481
482 fn update_status(&self, head: Head) {
484 self.send_message(NetworkHandleMessage::StatusUpdate { head });
485 }
486
487 fn update_block_range(&self, update: reth_eth_wire::BlockRangeUpdate) {
489 self.send_message(NetworkHandleMessage::InternalBlockRangeUpdate(update));
490 }
491}
492
493impl<N: NetworkPrimitives> BlockDownloaderProvider for NetworkHandle<N> {
494 type Client = FetchClient<N>;
495
496 async fn fetch_client(&self) -> Result<Self::Client, oneshot::error::RecvError> {
497 let (tx, rx) = oneshot::channel();
498 let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx));
499 rx.await
500 }
501}
502
503#[derive(Debug)]
504struct NetworkInner<N: NetworkPrimitives = EthNetworkPrimitives> {
505 num_active_peers: Arc<AtomicUsize>,
507 to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
509 listener_address: Arc<Mutex<SocketAddr>>,
511 secret_key: SecretKey,
513 local_peer_id: PeerId,
515 peers: PeersHandle,
517 network_mode: NetworkMode,
519 is_syncing: Arc<AtomicBool>,
521 initial_sync_done: Arc<AtomicBool>,
523 chain_id: Arc<AtomicU64>,
525 cell_custody: CellCustody,
527 tx_gossip_disabled: bool,
529 discv4: Option<Discv4>,
531 discv5: Option<Discv5>,
533 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
535 nat: Option<NatResolver>,
537}
538
539pub trait NetworkProtocols: Send + Sync {
541 fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol);
543}
544
545#[derive(Debug)]
547pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
548 AddTrustedPeerId(PeerId),
550 AddTrustedPeerNode(TrustedPeer),
552 AddPeerAddress(PeerId, Option<PeerKind>, PeerAddr),
554 RemovePeer(PeerId, PeerKind),
556 DisconnectPeer(PeerId, Option<DisconnectReason>),
558 BanPeer(PeerId),
560 UnbanPeer(PeerId),
562 AnnounceBlock(N::NewBlockPayload, B256),
564 SendTransaction {
566 peer_id: PeerId,
568 msg: SharedTransactions<N::BroadcastedTransaction>,
570 },
571 SendPooledTransactionHashes {
573 peer_id: PeerId,
575 msg: NewPooledTransactionHashes,
577 },
578 EthRequest {
580 peer_id: PeerId,
582 request: PeerRequest<N>,
584 },
585 EthMessage {
587 peer_id: PeerId,
589 message: PeerMessage<N>,
591 },
592 ReputationChange(PeerId, ReputationChangeKind),
594 FetchClient(oneshot::Sender<FetchClient<N>>),
596 StatusUpdate {
598 head: Head,
600 },
601 GetStatus(oneshot::Sender<NetworkStatus>),
603 GetPeerInfosByIds(Vec<PeerId>, oneshot::Sender<Vec<PeerInfo>>),
605 GetPeerInfos(oneshot::Sender<Vec<PeerInfo>>),
607 GetPeerInfoById(PeerId, oneshot::Sender<Option<PeerInfo>>),
609 GetPeerInfosByPeerKind(PeerKind, oneshot::Sender<Vec<PeerInfo>>),
611 GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
613 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
615 Shutdown(oneshot::Sender<()>),
617 SetNetworkState(NetworkConnectionState),
619 DiscoveryListener(UnboundedSender<DiscoveryEvent>),
621 AddRlpxSubProtocol(RlpxSubProtocol),
623 ConnectPeer(PeerId, PeerKind, PeerAddr),
625 InternalBlockRangeUpdate(BlockRangeUpdate),
627}