reth_network/
manager.rs

1//! High level network management.
2//!
3//! The [`NetworkManager`] contains the state of the network as a whole. It controls how connections
4//! are handled and keeps track of connections to peers.
5//!
6//! ## Capabilities
7//!
8//! The network manages peers depending on their announced capabilities via their `RLPx` sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
9//!
10//! ## Overview
11//!
12//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
13//! made up of peer-to-peer connections between nodes that are available on the same network.
14//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
15//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
16//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
17
18use crate::{
19    budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20    config::NetworkConfig,
21    discovery::Discovery,
22    error::{NetworkError, ServiceKind},
23    eth_requests::IncomingEthRequest,
24    import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25    listener::ConnectionListener,
26    message::{NewBlockMessage, PeerMessage},
27    metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28    network::{NetworkHandle, NetworkHandleMessage},
29    peers::PeersManager,
30    poll_nested_stream_with_budget,
31    protocol::IntoRlpxSubProtocol,
32    required_block_filter::RequiredBlockFilter,
33    session::SessionManager,
34    state::NetworkState,
35    swarm::{Swarm, SwarmEvent},
36    transactions::NetworkTransactionEvent,
37    FetchClient, NetworkBuilder,
38};
39use futures::{Future, StreamExt};
40use parking_lot::Mutex;
41use reth_chainspec::EnrForkIdEntry;
42use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
43use reth_fs_util::{self as fs, FsPathError};
44use reth_metrics::common::mpsc::UnboundedMeteredSender;
45use reth_network_api::{
46    events::{PeerEvent, SessionInfo},
47    test_utils::PeersHandle,
48    EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
49};
50use reth_network_peers::{NodeRecord, PeerId};
51use reth_network_types::ReputationChangeKind;
52use reth_storage_api::BlockNumReader;
53use reth_tasks::shutdown::GracefulShutdown;
54use reth_tokio_util::EventSender;
55use secp256k1::SecretKey;
56use std::{
57    net::SocketAddr,
58    path::Path,
59    pin::Pin,
60    sync::{
61        atomic::{AtomicU64, AtomicUsize, Ordering},
62        Arc,
63    },
64    task::{Context, Poll},
65    time::{Duration, Instant},
66};
67use tokio::sync::mpsc::{self, error::TrySendError};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, trace, warn};
70
71#[cfg_attr(doc, aquamarine::aquamarine)]
72// TODO: Inlined diagram due to a bug in aquamarine library, should become an include when it's
73// fixed. See https://github.com/mersinvald/aquamarine/issues/50
74// include_mmd!("docs/mermaid/network-manager.mmd")
75/// Manages the _entire_ state of the network.
76///
77/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
78///
79/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
80///
81/// ```mermaid
82/// graph TB
83///   handle(NetworkHandle)
84///   events(NetworkEvents)
85///   transactions(Transactions Task)
86///   ethrequest(ETH Request Task)
87///   discovery(Discovery Task)
88///   subgraph NetworkManager
89///     direction LR
90///     subgraph Swarm
91///         direction TB
92///         B1[(Session Manager)]
93///         B2[(Connection Listener)]
94///         B3[(Network State)]
95///     end
96///  end
97///  handle <--> |request response channel| NetworkManager
98///  NetworkManager --> |Network events| events
99///  transactions <--> |transactions| NetworkManager
100///  ethrequest <--> |ETH request handing| NetworkManager
101///  discovery --> |Discovered peers| NetworkManager
102/// ```
103#[derive(Debug)]
104#[must_use = "The NetworkManager does nothing unless polled"]
105pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
106    /// The type that manages the actual network part, which includes connections.
107    swarm: Swarm<N>,
108    /// Underlying network handle that can be shared.
109    handle: NetworkHandle<N>,
110    /// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
111    from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
112    /// Handles block imports according to the `eth` protocol.
113    block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
114    /// Sender for high level network events.
115    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
116    /// Sender half to send events to the
117    /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
118    to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
119    /// Sender half to send events to the
120    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
121    ///
122    /// The channel that originally receives and bundles all requests from all sessions is already
123    /// bounded. However, since handling an eth request is more I/O intensive than delegating
124    /// them from the bounded channel to the eth-request channel, it is possible that this
125    /// builds up if the node is flooded with requests.
126    ///
127    /// Even though nonmalicious requests are relatively cheap, it's possible to craft
128    /// body requests with bogus data up until the allowed max message size limit.
129    /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
130    /// requests. This channel size is set at
131    /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
132    to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
133    /// Tracks the number of active session (connected peers).
134    ///
135    /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
136    /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
137    num_active_peers: Arc<AtomicUsize>,
138    /// Metrics for the Network
139    metrics: NetworkMetrics,
140    /// Disconnect metrics for the Network
141    disconnect_metrics: DisconnectMetrics,
142}
143
144impl NetworkManager {
145    /// Creates the manager of a new network with [`EthNetworkPrimitives`] types.
146    ///
147    /// ```no_run
148    /// # async fn f() {
149    /// use reth_chainspec::MAINNET;
150    /// use reth_network::{NetworkConfig, NetworkManager};
151    /// let config =
152    ///     NetworkConfig::builder_with_rng_secret_key().build_with_noop_provider(MAINNET.clone());
153    /// let manager = NetworkManager::eth(config).await;
154    /// # }
155    /// ```
156    pub async fn eth<C: BlockNumReader + 'static>(
157        config: NetworkConfig<C, EthNetworkPrimitives>,
158    ) -> Result<Self, NetworkError> {
159        Self::new(config).await
160    }
161}
162
163impl<N: NetworkPrimitives> NetworkManager<N> {
164    /// Sets the dedicated channel for events intended for the
165    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
166    pub fn with_transactions(
167        mut self,
168        tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
169    ) -> Self {
170        self.set_transactions(tx);
171        self
172    }
173
174    /// Sets the dedicated channel for events intended for the
175    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
176    pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
177        self.to_transactions_manager =
178            Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
179    }
180
181    /// Sets the dedicated channel for events intended for the
182    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
183    pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
184        self.set_eth_request_handler(tx);
185        self
186    }
187
188    /// Sets the dedicated channel for events intended for the
189    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
190    pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
191        self.to_eth_request_handler = Some(tx);
192    }
193
194    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
195    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
196        self.swarm.add_rlpx_sub_protocol(protocol)
197    }
198
199    /// Returns the [`NetworkHandle`] that can be cloned and shared.
200    ///
201    /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
202    pub const fn handle(&self) -> &NetworkHandle<N> {
203        &self.handle
204    }
205
206    /// Returns the secret key used for authenticating sessions.
207    pub const fn secret_key(&self) -> SecretKey {
208        self.swarm.sessions().secret_key()
209    }
210
211    #[inline]
212    fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
213        let metrics = &self.metrics;
214
215        let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
216
217        // update metrics for whole poll function
218        metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
219        // update poll metrics for nested items
220        metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
221        metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
222    }
223
224    /// Creates the manager of a new network.
225    ///
226    /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
227    /// state of the entire network.
228    pub async fn new<C: BlockNumReader + 'static>(
229        config: NetworkConfig<C, N>,
230    ) -> Result<Self, NetworkError> {
231        let NetworkConfig {
232            client,
233            secret_key,
234            discovery_v4_addr,
235            mut discovery_v4_config,
236            mut discovery_v5_config,
237            listener_addr,
238            peers_config,
239            sessions_config,
240            chain_id,
241            block_import,
242            network_mode,
243            boot_nodes,
244            executor,
245            hello_message,
246            status,
247            fork_filter,
248            dns_discovery_config,
249            extra_protocols,
250            tx_gossip_disabled,
251            transactions_manager_config: _,
252            nat,
253            handshake,
254            required_block_hashes,
255        } = config;
256
257        let peers_manager = PeersManager::new(peers_config);
258        let peers_handle = peers_manager.handle();
259
260        let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
261            NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
262        })?;
263
264        // retrieve the tcp address of the socket
265        let listener_addr = incoming.local_address();
266
267        // resolve boot nodes
268        let resolved_boot_nodes =
269            futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
270
271        if let Some(disc_config) = discovery_v4_config.as_mut() {
272            // merge configured boot nodes
273            disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
274            // add the forkid entry for EIP-868, but wrap it in an `EnrForkIdEntry` for proper
275            // encoding
276            disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
277        }
278
279        if let Some(discv5) = discovery_v5_config.as_mut() {
280            // merge configured boot nodes
281            discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
282        }
283
284        let discovery = Discovery::new(
285            listener_addr,
286            discovery_v4_addr,
287            secret_key,
288            discovery_v4_config,
289            discovery_v5_config,
290            dns_discovery_config,
291        )
292        .await?;
293        // need to retrieve the addr here since provided port could be `0`
294        let local_peer_id = discovery.local_id();
295        let discv4 = discovery.discv4();
296        let discv5 = discovery.discv5();
297
298        let num_active_peers = Arc::new(AtomicUsize::new(0));
299
300        let sessions = SessionManager::new(
301            secret_key,
302            sessions_config,
303            executor,
304            status,
305            hello_message,
306            fork_filter,
307            extra_protocols,
308            handshake,
309        );
310
311        let state = NetworkState::new(
312            crate::state::BlockNumReader::new(client),
313            discovery,
314            peers_manager,
315            Arc::clone(&num_active_peers),
316        );
317
318        let swarm = Swarm::new(incoming, sessions, state);
319
320        let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
321
322        let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
323
324        let handle = NetworkHandle::new(
325            Arc::clone(&num_active_peers),
326            Arc::new(Mutex::new(listener_addr)),
327            to_manager_tx,
328            secret_key,
329            local_peer_id,
330            peers_handle,
331            network_mode,
332            Arc::new(AtomicU64::new(chain_id)),
333            tx_gossip_disabled,
334            discv4,
335            discv5,
336            event_sender.clone(),
337            nat,
338        );
339
340        // Spawn required block peer filter if configured
341        if !required_block_hashes.is_empty() {
342            let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
343            filter.spawn();
344        }
345
346        Ok(Self {
347            swarm,
348            handle,
349            from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
350            block_import,
351            event_sender,
352            to_transactions_manager: None,
353            to_eth_request_handler: None,
354            num_active_peers,
355            metrics: Default::default(),
356            disconnect_metrics: Default::default(),
357        })
358    }
359
360    /// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
361    /// components of the network
362    ///
363    /// ```
364    /// use reth_network::{
365    ///     config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
366    /// };
367    /// use reth_network_peers::mainnet_nodes;
368    /// use reth_storage_api::noop::NoopProvider;
369    /// use reth_transaction_pool::TransactionPool;
370    /// async fn launch<Pool: TransactionPool>(pool: Pool) {
371    ///     // This block provider implementation is used for testing purposes.
372    ///     let client = NoopProvider::default();
373    ///
374    ///     // The key that's used for encrypting sessions and to identify our node.
375    ///     let local_key = rng_secret_key();
376    ///
377    ///     let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
378    ///         .boot_nodes(mainnet_nodes())
379    ///         .build(client.clone());
380    ///     let transactions_manager_config = config.transactions_manager_config.clone();
381    ///
382    ///     // create the network instance
383    ///     let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
384    ///         .await
385    ///         .unwrap()
386    ///         .transactions(pool, transactions_manager_config)
387    ///         .request_handler(client)
388    ///         .split_with_handle();
389    /// }
390    /// ```
391    pub async fn builder<C: BlockNumReader + 'static>(
392        config: NetworkConfig<C, N>,
393    ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
394        let network = Self::new(config).await?;
395        Ok(network.into_builder())
396    }
397
398    /// Create a [`NetworkBuilder`] to configure all components of the network
399    pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
400        NetworkBuilder { network: self, transactions: (), request_handler: () }
401    }
402
403    /// Returns the [`SocketAddr`] that listens for incoming tcp connections.
404    pub const fn local_addr(&self) -> SocketAddr {
405        self.swarm.listener().local_address()
406    }
407
408    /// How many peers we're currently connected to.
409    pub fn num_connected_peers(&self) -> usize {
410        self.swarm.state().num_active_peers()
411    }
412
413    /// Returns the [`PeerId`] used in the network.
414    pub fn peer_id(&self) -> &PeerId {
415        self.handle.peer_id()
416    }
417
418    /// Returns an iterator over all peers in the peer set.
419    pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
420        self.swarm.state().peers().iter_peers()
421    }
422
423    /// Returns the number of peers in the peer set.
424    pub fn num_known_peers(&self) -> usize {
425        self.swarm.state().peers().num_known_peers()
426    }
427
428    /// Returns a new [`PeersHandle`] that can be cloned and shared.
429    ///
430    /// The [`PeersHandle`] can be used to interact with the network's peer set.
431    pub fn peers_handle(&self) -> PeersHandle {
432        self.swarm.state().peers().handle()
433    }
434
435    /// Collect the peers from the [`NetworkManager`] and write them to the given
436    /// `persistent_peers_file`.
437    pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
438        let known_peers = self.all_peers().collect::<Vec<_>>();
439        persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
440        reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
441        Ok(())
442    }
443
444    /// Returns a new [`FetchClient`] that can be cloned and shared.
445    ///
446    /// The [`FetchClient`] is the entrypoint for sending requests to the network.
447    pub fn fetch_client(&self) -> FetchClient<N> {
448        self.swarm.state().fetch_client()
449    }
450
451    /// Returns the current [`NetworkStatus`] for the local node.
452    pub fn status(&self) -> NetworkStatus {
453        let sessions = self.swarm.sessions();
454        let status = sessions.status();
455        let hello_message = sessions.hello_message();
456
457        #[expect(deprecated)]
458        NetworkStatus {
459            client_version: hello_message.client_version,
460            protocol_version: hello_message.protocol_version as u64,
461            eth_protocol_info: EthProtocolInfo {
462                difficulty: None,
463                head: status.blockhash,
464                network: status.chain.id(),
465                genesis: status.genesis,
466                config: Default::default(),
467            },
468            capabilities: hello_message
469                .protocols
470                .into_iter()
471                .map(|protocol| protocol.cap)
472                .collect(),
473        }
474    }
475
476    /// Sends an event to the [`TransactionsManager`](crate::transactions::TransactionsManager) if
477    /// configured.
478    fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
479        if let Some(ref tx) = self.to_transactions_manager {
480            let _ = tx.send(event);
481        }
482    }
483
484    /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
485    /// configured.
486    fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
487        if let Some(ref reqs) = self.to_eth_request_handler {
488            let _ = reqs.try_send(event).map_err(|e| {
489                if let TrySendError::Full(_) = e {
490                    debug!(target:"net", "EthRequestHandler channel is full!");
491                    self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
492                }
493            });
494        }
495    }
496
497    /// Handle an incoming request from the peer
498    fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
499        match req {
500            PeerRequest::GetBlockHeaders { request, response } => {
501                self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
502                    peer_id,
503                    request,
504                    response,
505                })
506            }
507            PeerRequest::GetBlockBodies { request, response } => {
508                self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
509                    peer_id,
510                    request,
511                    response,
512                })
513            }
514            PeerRequest::GetNodeData { request, response } => {
515                self.delegate_eth_request(IncomingEthRequest::GetNodeData {
516                    peer_id,
517                    request,
518                    response,
519                })
520            }
521            PeerRequest::GetReceipts { request, response } => {
522                self.delegate_eth_request(IncomingEthRequest::GetReceipts {
523                    peer_id,
524                    request,
525                    response,
526                })
527            }
528            PeerRequest::GetReceipts69 { request, response } => {
529                self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
530                    peer_id,
531                    request,
532                    response,
533                })
534            }
535            PeerRequest::GetPooledTransactions { request, response } => {
536                self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
537                    peer_id,
538                    request,
539                    response,
540                });
541            }
542        }
543    }
544
545    /// Invoked after a `NewBlock` message from the peer was validated
546    fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
547        match event {
548            BlockImportEvent::Announcement(validation) => match validation {
549                BlockValidation::ValidHeader { block } => {
550                    self.swarm.state_mut().announce_new_block(block);
551                }
552                BlockValidation::ValidBlock { block } => {
553                    self.swarm.state_mut().announce_new_block_hash(block);
554                }
555            },
556            BlockImportEvent::Outcome(outcome) => {
557                let BlockImportOutcome { peer, result } = outcome;
558                match result {
559                    Ok(validated_block) => match validated_block {
560                        BlockValidation::ValidHeader { block } => {
561                            self.swarm.state_mut().update_peer_block(
562                                &peer,
563                                block.hash,
564                                block.number(),
565                            );
566                            self.swarm.state_mut().announce_new_block(block);
567                        }
568                        BlockValidation::ValidBlock { block } => {
569                            self.swarm.state_mut().announce_new_block_hash(block);
570                        }
571                    },
572                    Err(_err) => {
573                        self.swarm
574                            .state_mut()
575                            .peers_mut()
576                            .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
577                    }
578                }
579            }
580        }
581    }
582
583    /// Enforces [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) consensus rules for the network protocol
584    ///
585    /// Depending on the mode of the network:
586    ///    - disconnect peer if in POS
587    ///    - execute the closure if in POW
588    fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
589    where
590        F: FnOnce(&mut Self),
591    {
592        // reject message in POS
593        if self.handle.mode().is_stake() {
594            // connections to peers which send invalid messages should be terminated
595            self.swarm
596                .sessions_mut()
597                .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
598        } else {
599            only_pow(self);
600        }
601    }
602
603    /// Handles a received Message from the peer's session.
604    fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
605        match msg {
606            PeerMessage::NewBlockHashes(hashes) => {
607                self.within_pow_or_disconnect(peer_id, |this| {
608                    // update peer's state, to track what blocks this peer has seen
609                    this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
610                    // start block import process for the hashes
611                    this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
612                })
613            }
614            PeerMessage::NewBlock(block) => {
615                self.within_pow_or_disconnect(peer_id, move |this| {
616                    this.swarm.state_mut().on_new_block(peer_id, block.hash);
617                    // start block import process
618                    this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
619                });
620            }
621            PeerMessage::PooledTransactions(msg) => {
622                self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
623                    peer_id,
624                    msg,
625                });
626            }
627            PeerMessage::EthRequest(req) => {
628                self.on_eth_request(peer_id, req);
629            }
630            PeerMessage::ReceivedTransaction(msg) => {
631                self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
632                    peer_id,
633                    msg,
634                });
635            }
636            PeerMessage::SendTransactions(_) => {
637                unreachable!("Not emitted by session")
638            }
639            PeerMessage::BlockRangeUpdated(_) => {}
640            PeerMessage::Other(other) => {
641                debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
642            }
643        }
644    }
645
646    /// Handler for received messages from a handle
647    fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
648        match msg {
649            NetworkHandleMessage::DiscoveryListener(tx) => {
650                self.swarm.state_mut().discovery_mut().add_listener(tx);
651            }
652            NetworkHandleMessage::AnnounceBlock(block, hash) => {
653                if self.handle.mode().is_stake() {
654                    // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
655                    warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
656                    return
657                }
658                let msg = NewBlockMessage { hash, block: Arc::new(block) };
659                self.swarm.state_mut().announce_new_block(msg);
660            }
661            NetworkHandleMessage::EthRequest { peer_id, request } => {
662                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
663            }
664            NetworkHandleMessage::SendTransaction { peer_id, msg } => {
665                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
666            }
667            NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
668                .swarm
669                .sessions_mut()
670                .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
671            NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
672                self.swarm.state_mut().add_trusted_peer_id(peer_id);
673            }
674            NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
675                // only add peer if we are not shutting down
676                if !self.swarm.is_shutting_down() {
677                    self.swarm.state_mut().add_peer_kind(peer, kind, addr);
678                }
679            }
680            NetworkHandleMessage::RemovePeer(peer_id, kind) => {
681                self.swarm.state_mut().remove_peer_kind(peer_id, kind);
682            }
683            NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
684                self.swarm.sessions_mut().disconnect(peer_id, reason);
685            }
686            NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
687                self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
688            }
689            NetworkHandleMessage::SetNetworkState(net_state) => {
690                // Sets network connection state between Active and Hibernate.
691                // If hibernate stops the node to fill new outbound
692                // connections, this is beneficial for sync stages that do not require a network
693                // connection.
694                self.swarm.on_network_state_change(net_state);
695            }
696
697            NetworkHandleMessage::Shutdown(tx) => {
698                self.perform_network_shutdown();
699                let _ = tx.send(());
700            }
701            NetworkHandleMessage::ReputationChange(peer_id, kind) => {
702                self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
703            }
704            NetworkHandleMessage::GetReputationById(peer_id, tx) => {
705                let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
706            }
707            NetworkHandleMessage::FetchClient(tx) => {
708                let _ = tx.send(self.fetch_client());
709            }
710            NetworkHandleMessage::GetStatus(tx) => {
711                let _ = tx.send(self.status());
712            }
713            NetworkHandleMessage::StatusUpdate { head } => {
714                if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
715                    self.swarm.state_mut().update_fork_id(transition.current);
716                }
717            }
718            NetworkHandleMessage::GetPeerInfos(tx) => {
719                let _ = tx.send(self.get_peer_infos());
720            }
721            NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
722                let _ = tx.send(self.get_peer_info_by_id(peer_id));
723            }
724            NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
725                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
726            }
727            NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
728                let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
729                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
730            }
731            NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
732            NetworkHandleMessage::GetTransactionsHandle(tx) => {
733                if let Some(ref tx_inner) = self.to_transactions_manager {
734                    let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
735                } else {
736                    let _ = tx.send(None);
737                }
738            }
739            NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
740                self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
741            }
742            NetworkHandleMessage::EthMessage { peer_id, message } => {
743                self.swarm.sessions_mut().send_message(&peer_id, message)
744            }
745        }
746    }
747
748    fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
749        // handle event
750        match event {
751            SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
752            SwarmEvent::TcpListenerClosed { remote_addr } => {
753                trace!(target: "net", ?remote_addr, "TCP listener closed.");
754            }
755            SwarmEvent::TcpListenerError(err) => {
756                trace!(target: "net", %err, "TCP connection error.");
757            }
758            SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
759                trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
760                self.metrics.total_incoming_connections.increment(1);
761                self.metrics
762                    .incoming_connections
763                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
764            }
765            SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
766                trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
767                self.metrics.total_outgoing_connections.increment(1);
768                self.update_pending_connection_metrics()
769            }
770            SwarmEvent::SessionEstablished {
771                peer_id,
772                remote_addr,
773                client_version,
774                capabilities,
775                version,
776                messages,
777                status,
778                direction,
779            } => {
780                let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
781                self.metrics.connected_peers.set(total_active as f64);
782                debug!(
783                    target: "net",
784                    ?remote_addr,
785                    %client_version,
786                    ?peer_id,
787                    ?total_active,
788                    kind=%direction,
789                    peer_enode=%NodeRecord::new(remote_addr, peer_id),
790                    "Session established"
791                );
792
793                if direction.is_incoming() {
794                    self.swarm
795                        .state_mut()
796                        .peers_mut()
797                        .on_incoming_session_established(peer_id, remote_addr);
798                }
799
800                if direction.is_outgoing() {
801                    self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
802                }
803
804                self.update_active_connection_metrics();
805
806                let peer_kind = self
807                    .swarm
808                    .state()
809                    .peers()
810                    .peer_by_id(peer_id)
811                    .map(|(_, kind)| kind)
812                    .unwrap_or_default();
813                let session_info = SessionInfo {
814                    peer_id,
815                    remote_addr,
816                    client_version,
817                    capabilities,
818                    status,
819                    version,
820                    peer_kind,
821                };
822
823                self.event_sender
824                    .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
825            }
826            SwarmEvent::PeerAdded(peer_id) => {
827                trace!(target: "net", ?peer_id, "Peer added");
828                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
829                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
830            }
831            SwarmEvent::PeerRemoved(peer_id) => {
832                trace!(target: "net", ?peer_id, "Peer dropped");
833                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
834                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
835            }
836            SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
837                let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
838                self.metrics.connected_peers.set(total_active as f64);
839                trace!(
840                    target: "net",
841                    ?remote_addr,
842                    ?peer_id,
843                    ?total_active,
844                    ?error,
845                    "Session disconnected"
846                );
847
848                let reason = if let Some(ref err) = error {
849                    // If the connection was closed due to an error, we report
850                    // the peer
851                    self.swarm.state_mut().peers_mut().on_active_session_dropped(
852                        &remote_addr,
853                        &peer_id,
854                        err,
855                    );
856                    err.as_disconnected()
857                } else {
858                    // Gracefully disconnected
859                    self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
860                    None
861                };
862                self.metrics.closed_sessions.increment(1);
863                self.update_active_connection_metrics();
864
865                if let Some(reason) = reason {
866                    self.disconnect_metrics.increment(reason);
867                }
868                self.metrics.backed_off_peers.set(
869                        self.swarm
870                            .state()
871                            .peers()
872                            .num_backed_off_peers()
873                            .saturating_sub(1)
874                            as f64,
875                    );
876                self.event_sender
877                    .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
878            }
879            SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
880                trace!(
881                    target: "net",
882                    ?remote_addr,
883                    ?error,
884                    "Incoming pending session failed"
885                );
886
887                if let Some(ref err) = error {
888                    self.swarm
889                        .state_mut()
890                        .peers_mut()
891                        .on_incoming_pending_session_dropped(remote_addr, err);
892                    self.metrics.pending_session_failures.increment(1);
893                    if let Some(reason) = err.as_disconnected() {
894                        self.disconnect_metrics.increment(reason);
895                    }
896                } else {
897                    self.swarm
898                        .state_mut()
899                        .peers_mut()
900                        .on_incoming_pending_session_gracefully_closed();
901                }
902                self.metrics.closed_sessions.increment(1);
903                self.metrics
904                    .incoming_connections
905                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
906                self.metrics.backed_off_peers.set(
907                        self.swarm
908                            .state()
909                            .peers()
910                            .num_backed_off_peers()
911                            .saturating_sub(1)
912                            as f64,
913                    );
914            }
915            SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
916                trace!(
917                    target: "net",
918                    ?remote_addr,
919                    ?peer_id,
920                    ?error,
921                    "Outgoing pending session failed"
922                );
923
924                if let Some(ref err) = error {
925                    self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
926                        &remote_addr,
927                        &peer_id,
928                        err,
929                    );
930                    self.metrics.pending_session_failures.increment(1);
931                    if let Some(reason) = err.as_disconnected() {
932                        self.disconnect_metrics.increment(reason);
933                    }
934                } else {
935                    self.swarm
936                        .state_mut()
937                        .peers_mut()
938                        .on_outgoing_pending_session_gracefully_closed(&peer_id);
939                }
940                self.metrics.closed_sessions.increment(1);
941                self.update_pending_connection_metrics();
942
943                self.metrics.backed_off_peers.set(
944                        self.swarm
945                            .state()
946                            .peers()
947                            .num_backed_off_peers()
948                            .saturating_sub(1)
949                            as f64,
950                    );
951            }
952            SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
953                trace!(
954                    target: "net",
955                    ?remote_addr,
956                    ?peer_id,
957                    %error,
958                    "Outgoing connection error"
959                );
960
961                self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
962                    &remote_addr,
963                    &peer_id,
964                    &error,
965                );
966
967                self.metrics.backed_off_peers.set(
968                        self.swarm
969                            .state()
970                            .peers()
971                            .num_backed_off_peers()
972                            .saturating_sub(1)
973                            as f64,
974                    );
975                self.update_pending_connection_metrics();
976            }
977            SwarmEvent::BadMessage { peer_id } => {
978                self.swarm
979                    .state_mut()
980                    .peers_mut()
981                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
982                self.metrics.invalid_messages_received.increment(1);
983            }
984            SwarmEvent::ProtocolBreach { peer_id } => {
985                self.swarm
986                    .state_mut()
987                    .peers_mut()
988                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
989            }
990        }
991    }
992
993    /// Returns [`PeerInfo`] for all connected peers
994    fn get_peer_infos(&self) -> Vec<PeerInfo> {
995        self.swarm
996            .sessions()
997            .active_sessions()
998            .iter()
999            .filter_map(|(&peer_id, session)| {
1000                self.swarm
1001                    .state()
1002                    .peers()
1003                    .peer_by_id(peer_id)
1004                    .map(|(record, kind)| session.peer_info(&record, kind))
1005            })
1006            .collect()
1007    }
1008
1009    /// Returns [`PeerInfo`] for a given peer.
1010    ///
1011    /// Returns `None` if there's no active session to the peer.
1012    fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1013        self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1014            self.swarm
1015                .state()
1016                .peers()
1017                .peer_by_id(peer_id)
1018                .map(|(record, kind)| session.peer_info(&record, kind))
1019        })
1020    }
1021
1022    /// Returns [`PeerInfo`] for a given peers.
1023    ///
1024    /// Ignore the non-active peer.
1025    fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1026        peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1027    }
1028
1029    /// Updates the metrics for active,established connections
1030    #[inline]
1031    fn update_active_connection_metrics(&self) {
1032        self.metrics
1033            .incoming_connections
1034            .set(self.swarm.state().peers().num_inbound_connections() as f64);
1035        self.metrics
1036            .outgoing_connections
1037            .set(self.swarm.state().peers().num_outbound_connections() as f64);
1038    }
1039
1040    /// Updates the metrics for pending connections
1041    #[inline]
1042    fn update_pending_connection_metrics(&self) {
1043        self.metrics
1044            .pending_outgoing_connections
1045            .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1046        self.metrics
1047            .total_pending_connections
1048            .set(self.swarm.sessions().num_pending_connections() as f64);
1049    }
1050
1051    /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
1052    ///
1053    /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
1054    pub async fn run_until_graceful_shutdown<F, R>(
1055        mut self,
1056        shutdown: GracefulShutdown,
1057        shutdown_hook: F,
1058    ) -> R
1059    where
1060        F: FnOnce(Self) -> R,
1061    {
1062        let mut graceful_guard = None;
1063        tokio::select! {
1064            _ = &mut self => {},
1065            guard = shutdown => {
1066                graceful_guard = Some(guard);
1067            },
1068        }
1069
1070        self.perform_network_shutdown();
1071        let res = shutdown_hook(self);
1072        drop(graceful_guard);
1073        res
1074    }
1075
1076    /// Performs a graceful network shutdown by stopping new connections from being accepted while
1077    /// draining current and pending connections.
1078    fn perform_network_shutdown(&mut self) {
1079        // Set connection status to `Shutdown`. Stops node from accepting
1080        // new incoming connections as well as sending connection requests to newly
1081        // discovered nodes.
1082        self.swarm.on_shutdown_requested();
1083        // Disconnect all active connections
1084        self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1085        // drop pending connections
1086        self.swarm.sessions_mut().disconnect_all_pending();
1087    }
1088}
1089
1090impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1091    type Output = ();
1092
1093    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1094        let start = Instant::now();
1095        let mut poll_durations = NetworkManagerPollDurations::default();
1096
1097        let this = self.get_mut();
1098
1099        // poll new block imports (expected to be a noop for POS)
1100        while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1101            this.on_block_import_result(outcome);
1102        }
1103
1104        // These loops drive the entire state of network and does a lot of work. Under heavy load
1105        // (many messages/events), data may arrive faster than it can be processed (incoming
1106        // messages/requests -> events), and it is possible that more data has already arrived by
1107        // the time an internal event is processed. Which could turn this loop into a busy loop.
1108        // Without yielding back to the executor, it can starve other tasks waiting on that
1109        // executor to execute them, or drive underlying resources To prevent this, we
1110        // preemptively return control when the `budget` is exhausted. The value itself is chosen
1111        // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
1112        // low enough that this loop does not starve other tasks for too long. If the budget is
1113        // exhausted we manually yield back control to the (coop) scheduler. This manual yield
1114        // point should prevent situations where polling appears to be frozen. See also
1115        // <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
1116        // <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
1117        //
1118        // Testing has shown that this loop naturally reaches the pending state within 1-5
1119        // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
1120        // range of what's recommended as rule of thumb.
1121        // <https://ryhl.io/blog/async-what-is-blocking/>
1122
1123        // process incoming messages from a handle (`TransactionsManager` has one)
1124        //
1125        // will only be closed if the channel was deliberately closed since we always have an
1126        // instance of `NetworkHandle`
1127        let start_network_handle = Instant::now();
1128        let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1129            "net",
1130            "Network message channel",
1131            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1132            this.from_handle_rx.poll_next_unpin(cx),
1133            |msg| this.on_handle_message(msg),
1134            error!("Network channel closed");
1135        );
1136        poll_durations.acc_network_handle = start_network_handle.elapsed();
1137
1138        // process incoming messages from the network
1139        let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1140            "net",
1141            "Swarm events stream",
1142            DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1143            this.swarm.poll_next_unpin(cx),
1144            |event| this.on_swarm_event(event),
1145        );
1146        poll_durations.acc_swarm =
1147            start_network_handle.elapsed() - poll_durations.acc_network_handle;
1148
1149        // all streams are fully drained and import futures pending
1150        if maybe_more_handle_messages || maybe_more_swarm_events {
1151            // make sure we're woken up again
1152            cx.waker().wake_by_ref();
1153            return Poll::Pending
1154        }
1155
1156        this.update_poll_metrics(start, poll_durations);
1157
1158        Poll::Pending
1159    }
1160}
1161
1162#[derive(Debug, Default)]
1163struct NetworkManagerPollDurations {
1164    acc_network_handle: Duration,
1165    acc_swarm: Duration,
1166}