reth_network/
manager.rs

1//! High level network management.
2//!
3//! The [`NetworkManager`] contains the state of the network as a whole. It controls how connections
4//! are handled and keeps track of connections to peers.
5//!
6//! ## Capabilities
7//!
8//! The network manages peers depending on their announced capabilities via their `RLPx` sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
9//!
10//! ## Overview
11//!
12//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
13//! made up of peer-to-peer connections between nodes that are available on the same network.
14//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
15//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
16//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
17
18use crate::{
19    budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20    config::NetworkConfig,
21    discovery::Discovery,
22    error::{NetworkError, ServiceKind},
23    eth_requests::IncomingEthRequest,
24    import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25    listener::ConnectionListener,
26    message::{NewBlockMessage, PeerMessage},
27    metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28    network::{NetworkHandle, NetworkHandleMessage},
29    peers::PeersManager,
30    poll_nested_stream_with_budget,
31    protocol::IntoRlpxSubProtocol,
32    session::SessionManager,
33    state::NetworkState,
34    swarm::{Swarm, SwarmEvent},
35    transactions::NetworkTransactionEvent,
36    FetchClient, NetworkBuilder,
37};
38use futures::{Future, StreamExt};
39use parking_lot::Mutex;
40use reth_chainspec::EnrForkIdEntry;
41use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
42use reth_fs_util::{self as fs, FsPathError};
43use reth_metrics::common::mpsc::UnboundedMeteredSender;
44use reth_network_api::{
45    events::{PeerEvent, SessionInfo},
46    test_utils::PeersHandle,
47    EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
48};
49use reth_network_peers::{NodeRecord, PeerId};
50use reth_network_types::ReputationChangeKind;
51use reth_storage_api::BlockNumReader;
52use reth_tasks::shutdown::GracefulShutdown;
53use reth_tokio_util::EventSender;
54use secp256k1::SecretKey;
55use std::{
56    net::SocketAddr,
57    path::Path,
58    pin::Pin,
59    sync::{
60        atomic::{AtomicU64, AtomicUsize, Ordering},
61        Arc,
62    },
63    task::{Context, Poll},
64    time::{Duration, Instant},
65};
66use tokio::sync::mpsc::{self, error::TrySendError};
67use tokio_stream::wrappers::UnboundedReceiverStream;
68use tracing::{debug, error, trace, warn};
69
70#[cfg_attr(doc, aquamarine::aquamarine)]
71// TODO: Inlined diagram due to a bug in aquamarine library, should become an include when it's
72// fixed. See https://github.com/mersinvald/aquamarine/issues/50
73// include_mmd!("docs/mermaid/network-manager.mmd")
74/// Manages the _entire_ state of the network.
75///
76/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
77///
78/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
79///
80/// ```mermaid
81/// graph TB
82///   handle(NetworkHandle)
83///   events(NetworkEvents)
84///   transactions(Transactions Task)
85///   ethrequest(ETH Request Task)
86///   discovery(Discovery Task)
87///   subgraph NetworkManager
88///     direction LR
89///     subgraph Swarm
90///         direction TB
91///         B1[(Session Manager)]
92///         B2[(Connection Listener)]
93///         B3[(Network State)]
94///     end
95///  end
96///  handle <--> |request response channel| NetworkManager
97///  NetworkManager --> |Network events| events
98///  transactions <--> |transactions| NetworkManager
99///  ethrequest <--> |ETH request handing| NetworkManager
100///  discovery --> |Discovered peers| NetworkManager
101/// ```
102#[derive(Debug)]
103#[must_use = "The NetworkManager does nothing unless polled"]
104pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
105    /// The type that manages the actual network part, which includes connections.
106    swarm: Swarm<N>,
107    /// Underlying network handle that can be shared.
108    handle: NetworkHandle<N>,
109    /// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
110    from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
111    /// Handles block imports according to the `eth` protocol.
112    block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
113    /// Sender for high level network events.
114    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
115    /// Sender half to send events to the
116    /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
117    to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
118    /// Sender half to send events to the
119    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
120    ///
121    /// The channel that originally receives and bundles all requests from all sessions is already
122    /// bounded. However, since handling an eth request is more I/O intensive than delegating
123    /// them from the bounded channel to the eth-request channel, it is possible that this
124    /// builds up if the node is flooded with requests.
125    ///
126    /// Even though nonmalicious requests are relatively cheap, it's possible to craft
127    /// body requests with bogus data up until the allowed max message size limit.
128    /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
129    /// requests. This channel size is set at
130    /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
131    to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
132    /// Tracks the number of active session (connected peers).
133    ///
134    /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
135    /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
136    num_active_peers: Arc<AtomicUsize>,
137    /// Metrics for the Network
138    metrics: NetworkMetrics,
139    /// Disconnect metrics for the Network
140    disconnect_metrics: DisconnectMetrics,
141}
142
143impl NetworkManager {
144    /// Creates the manager of a new network with [`EthNetworkPrimitives`] types.
145    ///
146    /// ```no_run
147    /// # async fn f() {
148    /// use reth_chainspec::MAINNET;
149    /// use reth_network::{NetworkConfig, NetworkManager};
150    /// let config =
151    ///     NetworkConfig::builder_with_rng_secret_key().build_with_noop_provider(MAINNET.clone());
152    /// let manager = NetworkManager::eth(config).await;
153    /// # }
154    /// ```
155    pub async fn eth<C: BlockNumReader + 'static>(
156        config: NetworkConfig<C, EthNetworkPrimitives>,
157    ) -> Result<Self, NetworkError> {
158        Self::new(config).await
159    }
160}
161
162impl<N: NetworkPrimitives> NetworkManager<N> {
163    /// Sets the dedicated channel for events intended for the
164    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
165    pub fn with_transactions(
166        mut self,
167        tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
168    ) -> Self {
169        self.set_transactions(tx);
170        self
171    }
172
173    /// Sets the dedicated channel for events intended for the
174    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
175    pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
176        self.to_transactions_manager =
177            Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
178    }
179
180    /// Sets the dedicated channel for events intended for the
181    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
182    pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
183        self.set_eth_request_handler(tx);
184        self
185    }
186
187    /// Sets the dedicated channel for events intended for the
188    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
189    pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
190        self.to_eth_request_handler = Some(tx);
191    }
192
193    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
194    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
195        self.swarm.add_rlpx_sub_protocol(protocol)
196    }
197
198    /// Returns the [`NetworkHandle`] that can be cloned and shared.
199    ///
200    /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
201    pub const fn handle(&self) -> &NetworkHandle<N> {
202        &self.handle
203    }
204
205    /// Returns the secret key used for authenticating sessions.
206    pub const fn secret_key(&self) -> SecretKey {
207        self.swarm.sessions().secret_key()
208    }
209
210    #[inline]
211    fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
212        let metrics = &self.metrics;
213
214        let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
215
216        // update metrics for whole poll function
217        metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
218        // update poll metrics for nested items
219        metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
220        metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
221    }
222
223    /// Creates the manager of a new network.
224    ///
225    /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
226    /// state of the entire network.
227    pub async fn new<C: BlockNumReader + 'static>(
228        config: NetworkConfig<C, N>,
229    ) -> Result<Self, NetworkError> {
230        let NetworkConfig {
231            client,
232            secret_key,
233            discovery_v4_addr,
234            mut discovery_v4_config,
235            mut discovery_v5_config,
236            listener_addr,
237            peers_config,
238            sessions_config,
239            chain_id,
240            block_import,
241            network_mode,
242            boot_nodes,
243            executor,
244            hello_message,
245            status,
246            fork_filter,
247            dns_discovery_config,
248            extra_protocols,
249            tx_gossip_disabled,
250            transactions_manager_config: _,
251            nat,
252            handshake,
253        } = config;
254
255        let peers_manager = PeersManager::new(peers_config);
256        let peers_handle = peers_manager.handle();
257
258        let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
259            NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
260        })?;
261
262        // retrieve the tcp address of the socket
263        let listener_addr = incoming.local_address();
264
265        // resolve boot nodes
266        let resolved_boot_nodes =
267            futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
268
269        if let Some(disc_config) = discovery_v4_config.as_mut() {
270            // merge configured boot nodes
271            disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
272            // add the forkid entry for EIP-868, but wrap it in an `EnrForkIdEntry` for proper
273            // encoding
274            disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
275        }
276
277        if let Some(discv5) = discovery_v5_config.as_mut() {
278            // merge configured boot nodes
279            discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
280        }
281
282        let discovery = Discovery::new(
283            listener_addr,
284            discovery_v4_addr,
285            secret_key,
286            discovery_v4_config,
287            discovery_v5_config,
288            dns_discovery_config,
289        )
290        .await?;
291        // need to retrieve the addr here since provided port could be `0`
292        let local_peer_id = discovery.local_id();
293        let discv4 = discovery.discv4();
294        let discv5 = discovery.discv5();
295
296        let num_active_peers = Arc::new(AtomicUsize::new(0));
297
298        let sessions = SessionManager::new(
299            secret_key,
300            sessions_config,
301            executor,
302            status,
303            hello_message,
304            fork_filter,
305            extra_protocols,
306            handshake,
307        );
308
309        let state = NetworkState::new(
310            crate::state::BlockNumReader::new(client),
311            discovery,
312            peers_manager,
313            Arc::clone(&num_active_peers),
314        );
315
316        let swarm = Swarm::new(incoming, sessions, state);
317
318        let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
319
320        let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
321
322        let handle = NetworkHandle::new(
323            Arc::clone(&num_active_peers),
324            Arc::new(Mutex::new(listener_addr)),
325            to_manager_tx,
326            secret_key,
327            local_peer_id,
328            peers_handle,
329            network_mode,
330            Arc::new(AtomicU64::new(chain_id)),
331            tx_gossip_disabled,
332            discv4,
333            discv5,
334            event_sender.clone(),
335            nat,
336        );
337
338        Ok(Self {
339            swarm,
340            handle,
341            from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
342            block_import,
343            event_sender,
344            to_transactions_manager: None,
345            to_eth_request_handler: None,
346            num_active_peers,
347            metrics: Default::default(),
348            disconnect_metrics: Default::default(),
349        })
350    }
351
352    /// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
353    /// components of the network
354    ///
355    /// ```
356    /// use reth_network::{
357    ///     config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
358    /// };
359    /// use reth_network_peers::mainnet_nodes;
360    /// use reth_storage_api::noop::NoopProvider;
361    /// use reth_transaction_pool::TransactionPool;
362    /// async fn launch<Pool: TransactionPool>(pool: Pool) {
363    ///     // This block provider implementation is used for testing purposes.
364    ///     let client = NoopProvider::default();
365    ///
366    ///     // The key that's used for encrypting sessions and to identify our node.
367    ///     let local_key = rng_secret_key();
368    ///
369    ///     let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
370    ///         .boot_nodes(mainnet_nodes())
371    ///         .build(client.clone());
372    ///     let transactions_manager_config = config.transactions_manager_config.clone();
373    ///
374    ///     // create the network instance
375    ///     let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
376    ///         .await
377    ///         .unwrap()
378    ///         .transactions(pool, transactions_manager_config)
379    ///         .request_handler(client)
380    ///         .split_with_handle();
381    /// }
382    /// ```
383    pub async fn builder<C: BlockNumReader + 'static>(
384        config: NetworkConfig<C, N>,
385    ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
386        let network = Self::new(config).await?;
387        Ok(network.into_builder())
388    }
389
390    /// Create a [`NetworkBuilder`] to configure all components of the network
391    pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
392        NetworkBuilder { network: self, transactions: (), request_handler: () }
393    }
394
395    /// Returns the [`SocketAddr`] that listens for incoming tcp connections.
396    pub const fn local_addr(&self) -> SocketAddr {
397        self.swarm.listener().local_address()
398    }
399
400    /// How many peers we're currently connected to.
401    pub fn num_connected_peers(&self) -> usize {
402        self.swarm.state().num_active_peers()
403    }
404
405    /// Returns the [`PeerId`] used in the network.
406    pub fn peer_id(&self) -> &PeerId {
407        self.handle.peer_id()
408    }
409
410    /// Returns an iterator over all peers in the peer set.
411    pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
412        self.swarm.state().peers().iter_peers()
413    }
414
415    /// Returns the number of peers in the peer set.
416    pub fn num_known_peers(&self) -> usize {
417        self.swarm.state().peers().num_known_peers()
418    }
419
420    /// Returns a new [`PeersHandle`] that can be cloned and shared.
421    ///
422    /// The [`PeersHandle`] can be used to interact with the network's peer set.
423    pub fn peers_handle(&self) -> PeersHandle {
424        self.swarm.state().peers().handle()
425    }
426
427    /// Collect the peers from the [`NetworkManager`] and write them to the given
428    /// `persistent_peers_file`.
429    pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
430        let known_peers = self.all_peers().collect::<Vec<_>>();
431        persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
432        reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
433        Ok(())
434    }
435
436    /// Returns a new [`FetchClient`] that can be cloned and shared.
437    ///
438    /// The [`FetchClient`] is the entrypoint for sending requests to the network.
439    pub fn fetch_client(&self) -> FetchClient<N> {
440        self.swarm.state().fetch_client()
441    }
442
443    /// Returns the current [`NetworkStatus`] for the local node.
444    pub fn status(&self) -> NetworkStatus {
445        let sessions = self.swarm.sessions();
446        let status = sessions.status();
447        let hello_message = sessions.hello_message();
448
449        #[expect(deprecated)]
450        NetworkStatus {
451            client_version: hello_message.client_version,
452            protocol_version: hello_message.protocol_version as u64,
453            eth_protocol_info: EthProtocolInfo {
454                difficulty: None,
455                head: status.blockhash,
456                network: status.chain.id(),
457                genesis: status.genesis,
458                config: Default::default(),
459            },
460            capabilities: hello_message
461                .protocols
462                .into_iter()
463                .map(|protocol| protocol.cap)
464                .collect(),
465        }
466    }
467
468    /// Sends an event to the [`TransactionsManager`](crate::transactions::TransactionsManager) if
469    /// configured.
470    fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
471        if let Some(ref tx) = self.to_transactions_manager {
472            let _ = tx.send(event);
473        }
474    }
475
476    /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
477    /// configured.
478    fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
479        if let Some(ref reqs) = self.to_eth_request_handler {
480            let _ = reqs.try_send(event).map_err(|e| {
481                if let TrySendError::Full(_) = e {
482                    debug!(target:"net", "EthRequestHandler channel is full!");
483                    self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
484                }
485            });
486        }
487    }
488
489    /// Handle an incoming request from the peer
490    fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
491        match req {
492            PeerRequest::GetBlockHeaders { request, response } => {
493                self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
494                    peer_id,
495                    request,
496                    response,
497                })
498            }
499            PeerRequest::GetBlockBodies { request, response } => {
500                self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
501                    peer_id,
502                    request,
503                    response,
504                })
505            }
506            PeerRequest::GetNodeData { request, response } => {
507                self.delegate_eth_request(IncomingEthRequest::GetNodeData {
508                    peer_id,
509                    request,
510                    response,
511                })
512            }
513            PeerRequest::GetReceipts { request, response } => {
514                self.delegate_eth_request(IncomingEthRequest::GetReceipts {
515                    peer_id,
516                    request,
517                    response,
518                })
519            }
520            PeerRequest::GetReceipts69 { request, response } => {
521                self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
522                    peer_id,
523                    request,
524                    response,
525                })
526            }
527            PeerRequest::GetPooledTransactions { request, response } => {
528                self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
529                    peer_id,
530                    request,
531                    response,
532                });
533            }
534        }
535    }
536
537    /// Invoked after a `NewBlock` message from the peer was validated
538    fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
539        match event {
540            BlockImportEvent::Announcement(validation) => match validation {
541                BlockValidation::ValidHeader { block } => {
542                    self.swarm.state_mut().announce_new_block(block);
543                }
544                BlockValidation::ValidBlock { block } => {
545                    self.swarm.state_mut().announce_new_block_hash(block);
546                }
547            },
548            BlockImportEvent::Outcome(outcome) => {
549                let BlockImportOutcome { peer, result } = outcome;
550                match result {
551                    Ok(validated_block) => match validated_block {
552                        BlockValidation::ValidHeader { block } => {
553                            self.swarm.state_mut().update_peer_block(
554                                &peer,
555                                block.hash,
556                                block.number(),
557                            );
558                            self.swarm.state_mut().announce_new_block(block);
559                        }
560                        BlockValidation::ValidBlock { block } => {
561                            self.swarm.state_mut().announce_new_block_hash(block);
562                        }
563                    },
564                    Err(_err) => {
565                        self.swarm
566                            .state_mut()
567                            .peers_mut()
568                            .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
569                    }
570                }
571            }
572        }
573    }
574
575    /// Enforces [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) consensus rules for the network protocol
576    ///
577    /// Depending on the mode of the network:
578    ///    - disconnect peer if in POS
579    ///    - execute the closure if in POW
580    fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
581    where
582        F: FnOnce(&mut Self),
583    {
584        // reject message in POS
585        if self.handle.mode().is_stake() {
586            // connections to peers which send invalid messages should be terminated
587            self.swarm
588                .sessions_mut()
589                .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
590        } else {
591            only_pow(self);
592        }
593    }
594
595    /// Handles a received Message from the peer's session.
596    fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
597        match msg {
598            PeerMessage::NewBlockHashes(hashes) => {
599                self.within_pow_or_disconnect(peer_id, |this| {
600                    // update peer's state, to track what blocks this peer has seen
601                    this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
602                    // start block import process for the hashes
603                    this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
604                })
605            }
606            PeerMessage::NewBlock(block) => {
607                self.within_pow_or_disconnect(peer_id, move |this| {
608                    this.swarm.state_mut().on_new_block(peer_id, block.hash);
609                    // start block import process
610                    this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
611                });
612            }
613            PeerMessage::PooledTransactions(msg) => {
614                self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
615                    peer_id,
616                    msg,
617                });
618            }
619            PeerMessage::EthRequest(req) => {
620                self.on_eth_request(peer_id, req);
621            }
622            PeerMessage::ReceivedTransaction(msg) => {
623                self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
624                    peer_id,
625                    msg,
626                });
627            }
628            PeerMessage::SendTransactions(_) => {
629                unreachable!("Not emitted by session")
630            }
631            PeerMessage::BlockRangeUpdated(_) => {}
632            PeerMessage::Other(other) => {
633                debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
634            }
635        }
636    }
637
638    /// Handler for received messages from a handle
639    fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
640        match msg {
641            NetworkHandleMessage::DiscoveryListener(tx) => {
642                self.swarm.state_mut().discovery_mut().add_listener(tx);
643            }
644            NetworkHandleMessage::AnnounceBlock(block, hash) => {
645                if self.handle.mode().is_stake() {
646                    // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
647                    warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
648                    return
649                }
650                let msg = NewBlockMessage { hash, block: Arc::new(block) };
651                self.swarm.state_mut().announce_new_block(msg);
652            }
653            NetworkHandleMessage::EthRequest { peer_id, request } => {
654                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
655            }
656            NetworkHandleMessage::SendTransaction { peer_id, msg } => {
657                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
658            }
659            NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
660                .swarm
661                .sessions_mut()
662                .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
663            NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
664                self.swarm.state_mut().add_trusted_peer_id(peer_id);
665            }
666            NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
667                // only add peer if we are not shutting down
668                if !self.swarm.is_shutting_down() {
669                    self.swarm.state_mut().add_peer_kind(peer, kind, addr);
670                }
671            }
672            NetworkHandleMessage::RemovePeer(peer_id, kind) => {
673                self.swarm.state_mut().remove_peer_kind(peer_id, kind);
674            }
675            NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
676                self.swarm.sessions_mut().disconnect(peer_id, reason);
677            }
678            NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
679                self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
680            }
681            NetworkHandleMessage::SetNetworkState(net_state) => {
682                // Sets network connection state between Active and Hibernate.
683                // If hibernate stops the node to fill new outbound
684                // connections, this is beneficial for sync stages that do not require a network
685                // connection.
686                self.swarm.on_network_state_change(net_state);
687            }
688
689            NetworkHandleMessage::Shutdown(tx) => {
690                self.perform_network_shutdown();
691                let _ = tx.send(());
692            }
693            NetworkHandleMessage::ReputationChange(peer_id, kind) => {
694                self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
695            }
696            NetworkHandleMessage::GetReputationById(peer_id, tx) => {
697                let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
698            }
699            NetworkHandleMessage::FetchClient(tx) => {
700                let _ = tx.send(self.fetch_client());
701            }
702            NetworkHandleMessage::GetStatus(tx) => {
703                let _ = tx.send(self.status());
704            }
705            NetworkHandleMessage::StatusUpdate { head } => {
706                if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
707                    self.swarm.state_mut().update_fork_id(transition.current);
708                }
709            }
710            NetworkHandleMessage::GetPeerInfos(tx) => {
711                let _ = tx.send(self.get_peer_infos());
712            }
713            NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
714                let _ = tx.send(self.get_peer_info_by_id(peer_id));
715            }
716            NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
717                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
718            }
719            NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
720                let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
721                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
722            }
723            NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
724            NetworkHandleMessage::GetTransactionsHandle(tx) => {
725                if let Some(ref tx_inner) = self.to_transactions_manager {
726                    let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
727                } else {
728                    let _ = tx.send(None);
729                }
730            }
731            NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
732                self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
733            }
734            NetworkHandleMessage::EthMessage { peer_id, message } => {
735                self.swarm.sessions_mut().send_message(&peer_id, message)
736            }
737        }
738    }
739
740    fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
741        // handle event
742        match event {
743            SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
744            SwarmEvent::TcpListenerClosed { remote_addr } => {
745                trace!(target: "net", ?remote_addr, "TCP listener closed.");
746            }
747            SwarmEvent::TcpListenerError(err) => {
748                trace!(target: "net", %err, "TCP connection error.");
749            }
750            SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
751                trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
752                self.metrics.total_incoming_connections.increment(1);
753                self.metrics
754                    .incoming_connections
755                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
756            }
757            SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
758                trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
759                self.metrics.total_outgoing_connections.increment(1);
760                self.update_pending_connection_metrics()
761            }
762            SwarmEvent::SessionEstablished {
763                peer_id,
764                remote_addr,
765                client_version,
766                capabilities,
767                version,
768                messages,
769                status,
770                direction,
771            } => {
772                let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
773                self.metrics.connected_peers.set(total_active as f64);
774                debug!(
775                    target: "net",
776                    ?remote_addr,
777                    %client_version,
778                    ?peer_id,
779                    ?total_active,
780                    kind=%direction,
781                    peer_enode=%NodeRecord::new(remote_addr, peer_id),
782                    "Session established"
783                );
784
785                if direction.is_incoming() {
786                    self.swarm
787                        .state_mut()
788                        .peers_mut()
789                        .on_incoming_session_established(peer_id, remote_addr);
790                }
791
792                if direction.is_outgoing() {
793                    self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
794                }
795
796                self.update_active_connection_metrics();
797
798                let peer_kind = self
799                    .swarm
800                    .state()
801                    .peers()
802                    .peer_by_id(peer_id)
803                    .map(|(_, kind)| kind)
804                    .unwrap_or_default();
805                let session_info = SessionInfo {
806                    peer_id,
807                    remote_addr,
808                    client_version,
809                    capabilities,
810                    status,
811                    version,
812                    peer_kind,
813                };
814
815                self.event_sender
816                    .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
817            }
818            SwarmEvent::PeerAdded(peer_id) => {
819                trace!(target: "net", ?peer_id, "Peer added");
820                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
821                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
822            }
823            SwarmEvent::PeerRemoved(peer_id) => {
824                trace!(target: "net", ?peer_id, "Peer dropped");
825                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
826                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
827            }
828            SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
829                let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
830                self.metrics.connected_peers.set(total_active as f64);
831                trace!(
832                    target: "net",
833                    ?remote_addr,
834                    ?peer_id,
835                    ?total_active,
836                    ?error,
837                    "Session disconnected"
838                );
839
840                let reason = if let Some(ref err) = error {
841                    // If the connection was closed due to an error, we report
842                    // the peer
843                    self.swarm.state_mut().peers_mut().on_active_session_dropped(
844                        &remote_addr,
845                        &peer_id,
846                        err,
847                    );
848                    err.as_disconnected()
849                } else {
850                    // Gracefully disconnected
851                    self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
852                    None
853                };
854                self.metrics.closed_sessions.increment(1);
855                self.update_active_connection_metrics();
856
857                if let Some(reason) = reason {
858                    self.disconnect_metrics.increment(reason);
859                }
860                self.metrics.backed_off_peers.set(
861                        self.swarm
862                            .state()
863                            .peers()
864                            .num_backed_off_peers()
865                            .saturating_sub(1)
866                            as f64,
867                    );
868                self.event_sender
869                    .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
870            }
871            SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
872                trace!(
873                    target: "net",
874                    ?remote_addr,
875                    ?error,
876                    "Incoming pending session failed"
877                );
878
879                if let Some(ref err) = error {
880                    self.swarm
881                        .state_mut()
882                        .peers_mut()
883                        .on_incoming_pending_session_dropped(remote_addr, err);
884                    self.metrics.pending_session_failures.increment(1);
885                    if let Some(reason) = err.as_disconnected() {
886                        self.disconnect_metrics.increment(reason);
887                    }
888                } else {
889                    self.swarm
890                        .state_mut()
891                        .peers_mut()
892                        .on_incoming_pending_session_gracefully_closed();
893                }
894                self.metrics.closed_sessions.increment(1);
895                self.metrics
896                    .incoming_connections
897                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
898                self.metrics.backed_off_peers.set(
899                        self.swarm
900                            .state()
901                            .peers()
902                            .num_backed_off_peers()
903                            .saturating_sub(1)
904                            as f64,
905                    );
906            }
907            SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
908                trace!(
909                    target: "net",
910                    ?remote_addr,
911                    ?peer_id,
912                    ?error,
913                    "Outgoing pending session failed"
914                );
915
916                if let Some(ref err) = error {
917                    self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
918                        &remote_addr,
919                        &peer_id,
920                        err,
921                    );
922                    self.metrics.pending_session_failures.increment(1);
923                    if let Some(reason) = err.as_disconnected() {
924                        self.disconnect_metrics.increment(reason);
925                    }
926                } else {
927                    self.swarm
928                        .state_mut()
929                        .peers_mut()
930                        .on_outgoing_pending_session_gracefully_closed(&peer_id);
931                }
932                self.metrics.closed_sessions.increment(1);
933                self.update_pending_connection_metrics();
934
935                self.metrics.backed_off_peers.set(
936                        self.swarm
937                            .state()
938                            .peers()
939                            .num_backed_off_peers()
940                            .saturating_sub(1)
941                            as f64,
942                    );
943            }
944            SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
945                trace!(
946                    target: "net",
947                    ?remote_addr,
948                    ?peer_id,
949                    %error,
950                    "Outgoing connection error"
951                );
952
953                self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
954                    &remote_addr,
955                    &peer_id,
956                    &error,
957                );
958
959                self.metrics.backed_off_peers.set(
960                        self.swarm
961                            .state()
962                            .peers()
963                            .num_backed_off_peers()
964                            .saturating_sub(1)
965                            as f64,
966                    );
967                self.update_pending_connection_metrics();
968            }
969            SwarmEvent::BadMessage { peer_id } => {
970                self.swarm
971                    .state_mut()
972                    .peers_mut()
973                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
974                self.metrics.invalid_messages_received.increment(1);
975            }
976            SwarmEvent::ProtocolBreach { peer_id } => {
977                self.swarm
978                    .state_mut()
979                    .peers_mut()
980                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
981            }
982        }
983    }
984
985    /// Returns [`PeerInfo`] for all connected peers
986    fn get_peer_infos(&self) -> Vec<PeerInfo> {
987        self.swarm
988            .sessions()
989            .active_sessions()
990            .iter()
991            .filter_map(|(&peer_id, session)| {
992                self.swarm
993                    .state()
994                    .peers()
995                    .peer_by_id(peer_id)
996                    .map(|(record, kind)| session.peer_info(&record, kind))
997            })
998            .collect()
999    }
1000
1001    /// Returns [`PeerInfo`] for a given peer.
1002    ///
1003    /// Returns `None` if there's no active session to the peer.
1004    fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1005        self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1006            self.swarm
1007                .state()
1008                .peers()
1009                .peer_by_id(peer_id)
1010                .map(|(record, kind)| session.peer_info(&record, kind))
1011        })
1012    }
1013
1014    /// Returns [`PeerInfo`] for a given peers.
1015    ///
1016    /// Ignore the non-active peer.
1017    fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1018        peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1019    }
1020
1021    /// Updates the metrics for active,established connections
1022    #[inline]
1023    fn update_active_connection_metrics(&self) {
1024        self.metrics
1025            .incoming_connections
1026            .set(self.swarm.state().peers().num_inbound_connections() as f64);
1027        self.metrics
1028            .outgoing_connections
1029            .set(self.swarm.state().peers().num_outbound_connections() as f64);
1030    }
1031
1032    /// Updates the metrics for pending connections
1033    #[inline]
1034    fn update_pending_connection_metrics(&self) {
1035        self.metrics
1036            .pending_outgoing_connections
1037            .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1038        self.metrics
1039            .total_pending_connections
1040            .set(self.swarm.sessions().num_pending_connections() as f64);
1041    }
1042
1043    /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
1044    ///
1045    /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
1046    pub async fn run_until_graceful_shutdown<F, R>(
1047        mut self,
1048        shutdown: GracefulShutdown,
1049        shutdown_hook: F,
1050    ) -> R
1051    where
1052        F: FnOnce(Self) -> R,
1053    {
1054        let mut graceful_guard = None;
1055        tokio::select! {
1056            _ = &mut self => {},
1057            guard = shutdown => {
1058                graceful_guard = Some(guard);
1059            },
1060        }
1061
1062        self.perform_network_shutdown();
1063        let res = shutdown_hook(self);
1064        drop(graceful_guard);
1065        res
1066    }
1067
1068    /// Performs a graceful network shutdown by stopping new connections from being accepted while
1069    /// draining current and pending connections.
1070    fn perform_network_shutdown(&mut self) {
1071        // Set connection status to `Shutdown`. Stops node from accepting
1072        // new incoming connections as well as sending connection requests to newly
1073        // discovered nodes.
1074        self.swarm.on_shutdown_requested();
1075        // Disconnect all active connections
1076        self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1077        // drop pending connections
1078        self.swarm.sessions_mut().disconnect_all_pending();
1079    }
1080}
1081
1082impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1083    type Output = ();
1084
1085    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1086        let start = Instant::now();
1087        let mut poll_durations = NetworkManagerPollDurations::default();
1088
1089        let this = self.get_mut();
1090
1091        // poll new block imports (expected to be a noop for POS)
1092        while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1093            this.on_block_import_result(outcome);
1094        }
1095
1096        // These loops drive the entire state of network and does a lot of work. Under heavy load
1097        // (many messages/events), data may arrive faster than it can be processed (incoming
1098        // messages/requests -> events), and it is possible that more data has already arrived by
1099        // the time an internal event is processed. Which could turn this loop into a busy loop.
1100        // Without yielding back to the executor, it can starve other tasks waiting on that
1101        // executor to execute them, or drive underlying resources To prevent this, we
1102        // preemptively return control when the `budget` is exhausted. The value itself is chosen
1103        // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
1104        // low enough that this loop does not starve other tasks for too long. If the budget is
1105        // exhausted we manually yield back control to the (coop) scheduler. This manual yield
1106        // point should prevent situations where polling appears to be frozen. See also
1107        // <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
1108        // <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
1109        //
1110        // Testing has shown that this loop naturally reaches the pending state within 1-5
1111        // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
1112        // range of what's recommended as rule of thumb.
1113        // <https://ryhl.io/blog/async-what-is-blocking/>
1114
1115        // process incoming messages from a handle (`TransactionsManager` has one)
1116        //
1117        // will only be closed if the channel was deliberately closed since we always have an
1118        // instance of `NetworkHandle`
1119        let start_network_handle = Instant::now();
1120        let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1121            "net",
1122            "Network message channel",
1123            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1124            this.from_handle_rx.poll_next_unpin(cx),
1125            |msg| this.on_handle_message(msg),
1126            error!("Network channel closed");
1127        );
1128        poll_durations.acc_network_handle = start_network_handle.elapsed();
1129
1130        // process incoming messages from the network
1131        let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1132            "net",
1133            "Swarm events stream",
1134            DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1135            this.swarm.poll_next_unpin(cx),
1136            |event| this.on_swarm_event(event),
1137        );
1138        poll_durations.acc_swarm =
1139            start_network_handle.elapsed() - poll_durations.acc_network_handle;
1140
1141        // all streams are fully drained and import futures pending
1142        if maybe_more_handle_messages || maybe_more_swarm_events {
1143            // make sure we're woken up again
1144            cx.waker().wake_by_ref();
1145            return Poll::Pending
1146        }
1147
1148        this.update_poll_metrics(start, poll_durations);
1149
1150        Poll::Pending
1151    }
1152}
1153
1154#[derive(Debug, Default)]
1155struct NetworkManagerPollDurations {
1156    acc_network_handle: Duration,
1157    acc_swarm: Duration,
1158}