Skip to main content

reth_network/
network.rs

1use crate::{
2    config::NetworkMode, message::PeerMessage, protocol::RlpxSubProtocol,
3    swarm::NetworkConnectionState, transactions::TransactionsHandle, FetchClient,
4};
5use alloy_primitives::B256;
6use enr::Enr;
7use futures::StreamExt;
8use parking_lot::Mutex;
9use reth_discv4::{Discv4, NatResolver};
10use reth_discv5::Discv5;
11use reth_eth_wire::{
12    BlockRangeUpdate, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
13    NewPooledTransactionHashes, SharedTransactions,
14};
15use reth_ethereum_forks::Head;
16use reth_network_api::{
17    events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
18    test_utils::{PeersHandle, PeersHandleProvider},
19    BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
20    NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
21    PeersInfo,
22};
23use reth_network_p2p::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
24use reth_network_peers::{NodeRecord, PeerId};
25use reth_network_types::{PeerAddr, PeerKind, Reputation, ReputationChangeKind};
26use reth_tokio_util::{EventSender, EventStream};
27use secp256k1::SecretKey;
28use std::{
29    net::SocketAddr,
30    sync::{
31        atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
32        Arc,
33    },
34};
35use tokio::sync::{
36    mpsc::{self, UnboundedSender},
37    oneshot,
38};
39use tokio_stream::wrappers::UnboundedReceiverStream;
40
41/// A _shareable_ network frontend. Used to interact with the network.
42///
43/// See also [`NetworkManager`](crate::NetworkManager).
44#[derive(Clone, Debug)]
45pub struct NetworkHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
46    /// The Arc'ed delegate that contains the state.
47    inner: Arc<NetworkInner<N>>,
48}
49
50// === impl NetworkHandle ===
51
52impl<N: NetworkPrimitives> NetworkHandle<N> {
53    /// Creates a single new instance.
54    #[expect(clippy::too_many_arguments)]
55    pub(crate) fn new(
56        num_active_peers: Arc<AtomicUsize>,
57        listener_address: Arc<Mutex<SocketAddr>>,
58        to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
59        secret_key: SecretKey,
60        local_peer_id: PeerId,
61        peers: PeersHandle,
62        network_mode: NetworkMode,
63        chain_id: Arc<AtomicU64>,
64        tx_gossip_disabled: bool,
65        discv4: Option<Discv4>,
66        discv5: Option<Discv5>,
67        event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
68        nat: Option<NatResolver>,
69    ) -> Self {
70        let inner = NetworkInner {
71            num_active_peers,
72            to_manager_tx,
73            listener_address,
74            secret_key,
75            local_peer_id,
76            peers,
77            network_mode,
78            is_syncing: Arc::new(AtomicBool::new(false)),
79            initial_sync_done: Arc::new(AtomicBool::new(false)),
80            chain_id,
81            tx_gossip_disabled,
82            discv4,
83            discv5,
84            event_sender,
85            nat,
86        };
87        Self { inner: Arc::new(inner) }
88    }
89
90    /// Returns the [`PeerId`] used in the network.
91    pub fn peer_id(&self) -> &PeerId {
92        &self.inner.local_peer_id
93    }
94
95    fn manager(&self) -> &UnboundedSender<NetworkHandleMessage<N>> {
96        &self.inner.to_manager_tx
97    }
98
99    /// Returns the mode of the network, either pow, or pos
100    pub fn mode(&self) -> &NetworkMode {
101        &self.inner.network_mode
102    }
103
104    /// Sends a [`NetworkHandleMessage`] to the manager
105    pub(crate) fn send_message(&self, msg: NetworkHandleMessage<N>) {
106        let _ = self.inner.to_manager_tx.send(msg);
107    }
108
109    /// Update the status of the node.
110    pub fn update_status(&self, head: Head) {
111        self.send_message(NetworkHandleMessage::StatusUpdate { head });
112    }
113
114    /// Announce a block over devp2p
115    ///
116    /// Caution: in `PoS` this is a noop because new blocks are no longer announced over devp2p.
117    /// Instead they are sent to the node by CL and can be requested over devp2p.
118    /// Broadcasting new blocks is considered a protocol violation.
119    pub fn announce_block(&self, block: N::NewBlockPayload, hash: B256) {
120        self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
121    }
122
123    /// Sends a [`PeerRequest`] to the given peer's session.
124    pub fn send_request(&self, peer_id: PeerId, request: PeerRequest<N>) {
125        self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
126    }
127
128    /// Send transactions hashes to the peer.
129    pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
130        self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
131    }
132
133    /// Send full transactions to the peer
134    pub fn send_transactions(&self, peer_id: PeerId, msg: Vec<Arc<N::BroadcastedTransaction>>) {
135        self.send_message(NetworkHandleMessage::SendTransaction {
136            peer_id,
137            msg: SharedTransactions(msg),
138        })
139    }
140
141    /// Send eth message to the peer.
142    pub fn send_eth_message(&self, peer_id: PeerId, message: PeerMessage<N>) {
143        self.send_message(NetworkHandleMessage::EthMessage { peer_id, message })
144    }
145
146    /// Send message to get the [`TransactionsHandle`].
147    ///
148    /// Returns `None` if no transaction task is installed.
149    pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
150        let (tx, rx) = oneshot::channel();
151        let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
152        rx.await.unwrap()
153    }
154
155    /// Send message to gracefully shutdown node.
156    ///
157    /// This will disconnect all active and pending sessions and prevent
158    /// new connections to be established.
159    pub async fn shutdown(&self) -> Result<(), oneshot::error::RecvError> {
160        let (tx, rx) = oneshot::channel();
161        self.send_message(NetworkHandleMessage::Shutdown(tx));
162        rx.await
163    }
164
165    /// Set network connection state to Active.
166    ///
167    /// New outbound connections will be established if there's capacity.
168    pub fn set_network_active(&self) {
169        self.set_network_conn(NetworkConnectionState::Active);
170    }
171
172    /// Set network connection state to Hibernate.
173    ///
174    /// No new outbound connections will be established.
175    pub fn set_network_hibernate(&self) {
176        self.set_network_conn(NetworkConnectionState::Hibernate);
177    }
178
179    /// Set network connection state.
180    fn set_network_conn(&self, network_conn: NetworkConnectionState) {
181        self.send_message(NetworkHandleMessage::SetNetworkState(network_conn));
182    }
183
184    /// Whether tx gossip is disabled
185    pub fn tx_gossip_disabled(&self) -> bool {
186        self.inner.tx_gossip_disabled
187    }
188
189    /// Returns the secret key used for authenticating sessions.
190    pub fn secret_key(&self) -> &SecretKey {
191        &self.inner.secret_key
192    }
193
194    /// Returns the [`Discv4`] handle if discv4 is enabled.
195    pub fn discv4(&self) -> Option<&Discv4> {
196        self.inner.discv4.as_ref()
197    }
198
199    /// Returns the [`Discv5`] handle if discv5 is enabled.
200    pub fn discv5(&self) -> Option<&Discv5> {
201        self.inner.discv5.as_ref()
202    }
203}
204
205// === API Implementations ===
206
207impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
208    /// Returns an event stream of peer-specific network events.
209    fn peer_events(&self) -> PeerEventStream {
210        let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
211            NetworkEvent::Peer(peer_event) => peer_event,
212            NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
213        });
214        PeerEventStream::new(peer_events)
215    }
216}
217
218impl<N: NetworkPrimitives> NetworkEventListenerProvider for NetworkHandle<N> {
219    type Primitives = N;
220
221    fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>> {
222        self.inner.event_sender.new_listener()
223    }
224
225    fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
226        let (tx, rx) = mpsc::unbounded_channel();
227        let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
228        UnboundedReceiverStream::new(rx)
229    }
230}
231
232impl<N: NetworkPrimitives> NetworkProtocols for NetworkHandle<N> {
233    fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol) {
234        self.send_message(NetworkHandleMessage::AddRlpxSubProtocol(protocol))
235    }
236}
237
238impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
239    fn num_connected_peers(&self) -> usize {
240        self.inner.num_active_peers.load(Ordering::Relaxed)
241    }
242
243    fn local_node_record(&self) -> NodeRecord {
244        if let Some(discv4) = &self.inner.discv4 {
245            // Note: the discv4 services uses the same `nat` so we can directly return the node
246            // record here
247            discv4.node_record()
248        } else if let Some(discv5) = self.inner.discv5.as_ref() {
249            // for disv5 we must check if we have an external ip configured
250            if let Some(external) =
251                self.inner.nat.clone().and_then(|nat| nat.as_external_ip(discv5.local_port()))
252            {
253                NodeRecord::new((external, discv5.local_port()).into(), *self.peer_id())
254            } else {
255                // use the node record that discv5 tracks or use localhost
256                self.inner.discv5.as_ref().and_then(|d| d.node_record()).unwrap_or_else(|| {
257                    NodeRecord::new(
258                        (std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), discv5.local_port())
259                            .into(),
260                        *self.peer_id(),
261                    )
262                })
263            }
264            // also use the tcp port
265            .with_tcp_port(self.inner.listener_address.lock().port())
266        } else {
267            let mut socket_addr = *self.inner.listener_address.lock();
268
269            let external_ip =
270                self.inner.nat.clone().and_then(|nat| nat.as_external_ip(socket_addr.port()));
271
272            if let Some(ip) = external_ip {
273                // if able to resolve external ip, use it instead and also set the local address
274                socket_addr.set_ip(ip)
275            } else if socket_addr.ip().is_unspecified() {
276                // zero address is invalid
277                if socket_addr.ip().is_ipv4() {
278                    socket_addr.set_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
279                } else {
280                    socket_addr.set_ip(std::net::IpAddr::V6(std::net::Ipv6Addr::LOCALHOST));
281                }
282            }
283
284            NodeRecord::new(socket_addr, *self.peer_id())
285        }
286    }
287
288    fn local_enr(&self) -> Enr<SecretKey> {
289        let local_node_record = self.local_node_record();
290        let mut builder = Enr::builder();
291        builder.ip(local_node_record.address);
292        if local_node_record.address.is_ipv4() {
293            builder.udp4(local_node_record.udp_port);
294            builder.tcp4(local_node_record.tcp_port);
295
296            // add IPv6 fields from discv5 for dual-stack support
297            if let Some(discv5) = self.inner.discv5.as_ref() {
298                let discv5_enr = discv5.local_enr();
299                if let Some(ip6) = discv5_enr.ip6() {
300                    builder.ip6(ip6);
301                }
302                if let Some(udp6) = discv5_enr.udp6() {
303                    builder.udp6(udp6);
304                }
305                if let Some(tcp6) = discv5_enr.tcp6() {
306                    builder.tcp6(tcp6);
307                }
308            }
309        } else {
310            builder.udp6(local_node_record.udp_port);
311            builder.tcp6(local_node_record.tcp_port);
312        }
313
314        builder.build(&self.inner.secret_key).expect("valid enr")
315    }
316}
317
318impl<N: NetworkPrimitives> Peers for NetworkHandle<N> {
319    fn add_trusted_peer_id(&self, peer: PeerId) {
320        self.send_message(NetworkHandleMessage::AddTrustedPeerId(peer));
321    }
322
323    /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
324    /// set, with the given kind.
325    fn add_peer_kind(
326        &self,
327        peer: PeerId,
328        kind: Option<PeerKind>,
329        tcp_addr: SocketAddr,
330        udp_addr: Option<SocketAddr>,
331    ) {
332        let addr = PeerAddr::new(tcp_addr, udp_addr);
333        self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
334    }
335
336    async fn get_peers_by_kind(&self, kind: PeerKind) -> Result<Vec<PeerInfo>, NetworkError> {
337        let (tx, rx) = oneshot::channel();
338        let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx));
339        Ok(rx.await?)
340    }
341
342    async fn get_all_peers(&self) -> Result<Vec<PeerInfo>, NetworkError> {
343        let (tx, rx) = oneshot::channel();
344        let _ = self.manager().send(NetworkHandleMessage::GetPeerInfos(tx));
345        Ok(rx.await?)
346    }
347
348    async fn get_peer_by_id(&self, peer_id: PeerId) -> Result<Option<PeerInfo>, NetworkError> {
349        let (tx, rx) = oneshot::channel();
350        let _ = self.manager().send(NetworkHandleMessage::GetPeerInfoById(peer_id, tx));
351        Ok(rx.await?)
352    }
353
354    async fn get_peers_by_id(&self, peer_ids: Vec<PeerId>) -> Result<Vec<PeerInfo>, NetworkError> {
355        let (tx, rx) = oneshot::channel();
356        let _ = self.manager().send(NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx));
357        Ok(rx.await?)
358    }
359
360    /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to remove a peer from the
361    /// set corresponding to given kind.
362    fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
363        self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
364    }
365
366    /// Sends a message to the [`NetworkManager`](crate::NetworkManager)  to disconnect an existing
367    /// connection to the given peer.
368    fn disconnect_peer(&self, peer: PeerId) {
369        self.send_message(NetworkHandleMessage::DisconnectPeer(peer, None))
370    }
371
372    /// Sends a message to the [`NetworkManager`](crate::NetworkManager)  to disconnect an existing
373    /// connection to the given peer using the provided reason
374    fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason) {
375        self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
376    }
377
378    /// Sends a message to the [`NetworkManager`](crate::NetworkManager) to connect to the given
379    /// peer.
380    ///
381    /// This will add a new entry for the given peer if it isn't tracked yet.
382    /// If it is tracked then the peer is updated with the given information.
383    fn connect_peer_kind(
384        &self,
385        peer_id: PeerId,
386        kind: PeerKind,
387        tcp_addr: SocketAddr,
388        udp_addr: Option<SocketAddr>,
389    ) {
390        self.send_message(NetworkHandleMessage::ConnectPeer(
391            peer_id,
392            kind,
393            PeerAddr::new(tcp_addr, udp_addr),
394        ))
395    }
396
397    /// Send a reputation change for the given peer.
398    fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
399        self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
400    }
401
402    async fn reputation_by_id(&self, peer_id: PeerId) -> Result<Option<Reputation>, NetworkError> {
403        let (tx, rx) = oneshot::channel();
404        let _ = self.manager().send(NetworkHandleMessage::GetReputationById(peer_id, tx));
405        Ok(rx.await?)
406    }
407}
408
409impl<N: NetworkPrimitives> PeersHandleProvider for NetworkHandle<N> {
410    fn peers_handle(&self) -> &PeersHandle {
411        &self.inner.peers
412    }
413}
414
415impl<N: NetworkPrimitives> NetworkInfo for NetworkHandle<N> {
416    fn local_addr(&self) -> SocketAddr {
417        *self.inner.listener_address.lock()
418    }
419
420    async fn network_status(&self) -> Result<NetworkStatus, NetworkError> {
421        let (tx, rx) = oneshot::channel();
422        let _ = self.manager().send(NetworkHandleMessage::GetStatus(tx));
423        rx.await.map_err(Into::into)
424    }
425
426    fn chain_id(&self) -> u64 {
427        self.inner.chain_id.load(Ordering::Relaxed)
428    }
429
430    fn is_syncing(&self) -> bool {
431        SyncStateProvider::is_syncing(self)
432    }
433
434    fn is_initially_syncing(&self) -> bool {
435        SyncStateProvider::is_initially_syncing(self)
436    }
437}
438
439impl<N: NetworkPrimitives> SyncStateProvider for NetworkHandle<N> {
440    fn is_syncing(&self) -> bool {
441        self.inner.is_syncing.load(Ordering::Relaxed)
442    }
443    // used to guard the txpool
444    fn is_initially_syncing(&self) -> bool {
445        if self.inner.initial_sync_done.load(Ordering::Relaxed) {
446            return false
447        }
448        self.inner.is_syncing.load(Ordering::Relaxed)
449    }
450}
451
452impl<N: NetworkPrimitives> NetworkSyncUpdater for NetworkHandle<N> {
453    fn update_sync_state(&self, state: SyncState) {
454        let future_state = state.is_syncing();
455        let prev_state = self.inner.is_syncing.swap(future_state, Ordering::Relaxed);
456        let syncing_to_idle_state_transition = prev_state && !future_state;
457        if syncing_to_idle_state_transition {
458            self.inner.initial_sync_done.store(true, Ordering::Relaxed);
459        }
460    }
461
462    /// Update the status of the node.
463    fn update_status(&self, head: Head) {
464        self.send_message(NetworkHandleMessage::StatusUpdate { head });
465    }
466
467    /// Updates the advertised block range.
468    fn update_block_range(&self, update: reth_eth_wire::BlockRangeUpdate) {
469        self.send_message(NetworkHandleMessage::InternalBlockRangeUpdate(update));
470    }
471}
472
473impl<N: NetworkPrimitives> BlockDownloaderProvider for NetworkHandle<N> {
474    type Client = FetchClient<N>;
475
476    async fn fetch_client(&self) -> Result<Self::Client, oneshot::error::RecvError> {
477        let (tx, rx) = oneshot::channel();
478        let _ = self.manager().send(NetworkHandleMessage::FetchClient(tx));
479        rx.await
480    }
481}
482
483#[derive(Debug)]
484struct NetworkInner<N: NetworkPrimitives = EthNetworkPrimitives> {
485    /// Number of active peer sessions the node's currently handling.
486    num_active_peers: Arc<AtomicUsize>,
487    /// Sender half of the message channel to the [`crate::NetworkManager`].
488    to_manager_tx: UnboundedSender<NetworkHandleMessage<N>>,
489    /// The local address that accepts incoming connections.
490    listener_address: Arc<Mutex<SocketAddr>>,
491    /// The secret key used for authenticating sessions.
492    secret_key: SecretKey,
493    /// The identifier used by this node.
494    local_peer_id: PeerId,
495    /// Access to all the nodes.
496    peers: PeersHandle,
497    /// The mode of the network
498    network_mode: NetworkMode,
499    /// Represents if the network is currently syncing.
500    is_syncing: Arc<AtomicBool>,
501    /// Used to differentiate between an initial pipeline sync or a live sync
502    initial_sync_done: Arc<AtomicBool>,
503    /// The chain id
504    chain_id: Arc<AtomicU64>,
505    /// Whether to disable transaction gossip
506    tx_gossip_disabled: bool,
507    /// The instance of the discv4 service
508    discv4: Option<Discv4>,
509    /// The instance of the discv5 service
510    discv5: Option<Discv5>,
511    /// Sender for high level network events.
512    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
513    /// The NAT resolver
514    nat: Option<NatResolver>,
515}
516
517/// Provides access to modify the network's additional protocol handlers.
518pub trait NetworkProtocols: Send + Sync {
519    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
520    fn add_rlpx_sub_protocol(&self, protocol: RlpxSubProtocol);
521}
522
523/// Internal messages that can be passed to the  [`NetworkManager`](crate::NetworkManager).
524#[derive(Debug)]
525pub(crate) enum NetworkHandleMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
526    /// Marks a peer as trusted.
527    AddTrustedPeerId(PeerId),
528    /// Adds an address for a peer, including its ID, kind, and socket address.
529    AddPeerAddress(PeerId, Option<PeerKind>, PeerAddr),
530    /// Removes a peer from the peerset corresponding to the given kind.
531    RemovePeer(PeerId, PeerKind),
532    /// Disconnects a connection to a peer if it exists, optionally providing a disconnect reason.
533    DisconnectPeer(PeerId, Option<DisconnectReason>),
534    /// Broadcasts an event to announce a new block to all nodes.
535    AnnounceBlock(N::NewBlockPayload, B256),
536    /// Sends a list of transactions to the given peer.
537    SendTransaction {
538        /// The ID of the peer to which the transactions are sent.
539        peer_id: PeerId,
540        /// The shared transactions to send.
541        msg: SharedTransactions<N::BroadcastedTransaction>,
542    },
543    /// Sends a list of transaction hashes to the given peer.
544    SendPooledTransactionHashes {
545        /// The ID of the peer to which the transaction hashes are sent.
546        peer_id: PeerId,
547        /// The new pooled transaction hashes to send.
548        msg: NewPooledTransactionHashes,
549    },
550    /// Sends an `eth` protocol request to the peer.
551    EthRequest {
552        /// The peer to send the request to.
553        peer_id: PeerId,
554        /// The request to send to the peer's sessions.
555        request: PeerRequest<N>,
556    },
557    /// Sends an `eth` protocol message to the peer.
558    EthMessage {
559        /// The peer to send the message to.
560        peer_id: PeerId,
561        /// The `eth` protocol message to send to the peer's session.
562        message: PeerMessage<N>,
563    },
564    /// Applies a reputation change to the given peer.
565    ReputationChange(PeerId, ReputationChangeKind),
566    /// Returns the client that can be used to interact with the network.
567    FetchClient(oneshot::Sender<FetchClient<N>>),
568    /// Applies a status update.
569    StatusUpdate {
570        /// The head status to apply.
571        head: Head,
572    },
573    /// Retrieves the current status via a oneshot sender.
574    GetStatus(oneshot::Sender<NetworkStatus>),
575    /// Gets `PeerInfo` for the specified peer IDs.
576    GetPeerInfosByIds(Vec<PeerId>, oneshot::Sender<Vec<PeerInfo>>),
577    /// Gets `PeerInfo` from all the peers via a oneshot sender.
578    GetPeerInfos(oneshot::Sender<Vec<PeerInfo>>),
579    /// Gets `PeerInfo` for a specific peer via a oneshot sender.
580    GetPeerInfoById(PeerId, oneshot::Sender<Option<PeerInfo>>),
581    /// Gets `PeerInfo` for a specific peer kind via a oneshot sender.
582    GetPeerInfosByPeerKind(PeerKind, oneshot::Sender<Vec<PeerInfo>>),
583    /// Gets the reputation for a specific peer via a oneshot sender.
584    GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
585    /// Retrieves the `TransactionsHandle` via a oneshot sender.
586    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
587    /// Initiates a graceful shutdown of the network via a oneshot sender.
588    Shutdown(oneshot::Sender<()>),
589    /// Sets the network state between hibernation and active.
590    SetNetworkState(NetworkConnectionState),
591    /// Adds a new listener for `DiscoveryEvent`.
592    DiscoveryListener(UnboundedSender<DiscoveryEvent>),
593    /// Adds an additional `RlpxSubProtocol`.
594    AddRlpxSubProtocol(RlpxSubProtocol),
595    /// Connect to the given peer.
596    ConnectPeer(PeerId, PeerKind, PeerAddr),
597    /// Message to update the node's advertised block range information.
598    InternalBlockRangeUpdate(BlockRangeUpdate),
599}