reth_network/
manager.rs

1//! High level network management.
2//!
3//! The [`NetworkManager`] contains the state of the network as a whole. It controls how connections
4//! are handled and keeps track of connections to peers.
5//!
6//! ## Capabilities
7//!
8//! The network manages peers depending on their announced capabilities via their `RLPx` sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
9//!
10//! ## Overview
11//!
12//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
13//! made up of peer-to-peer connections between nodes that are available on the same network.
14//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
15//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
16//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
17
18use crate::{
19    budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20    config::NetworkConfig,
21    discovery::Discovery,
22    error::{NetworkError, ServiceKind},
23    eth_requests::IncomingEthRequest,
24    import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25    listener::ConnectionListener,
26    message::{NewBlockMessage, PeerMessage},
27    metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28    network::{NetworkHandle, NetworkHandleMessage},
29    peers::PeersManager,
30    poll_nested_stream_with_budget,
31    protocol::IntoRlpxSubProtocol,
32    required_block_filter::RequiredBlockFilter,
33    session::SessionManager,
34    state::NetworkState,
35    swarm::{Swarm, SwarmEvent},
36    transactions::NetworkTransactionEvent,
37    FetchClient, NetworkBuilder,
38};
39use futures::{Future, StreamExt};
40use parking_lot::Mutex;
41use reth_chainspec::EnrForkIdEntry;
42use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
43use reth_fs_util::{self as fs, FsPathError};
44use reth_metrics::common::mpsc::UnboundedMeteredSender;
45use reth_network_api::{
46    events::{PeerEvent, SessionInfo},
47    test_utils::PeersHandle,
48    EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
49};
50use reth_network_peers::{NodeRecord, PeerId};
51use reth_network_types::ReputationChangeKind;
52use reth_storage_api::BlockNumReader;
53use reth_tasks::shutdown::GracefulShutdown;
54use reth_tokio_util::EventSender;
55use secp256k1::SecretKey;
56use std::{
57    net::SocketAddr,
58    path::Path,
59    pin::Pin,
60    sync::{
61        atomic::{AtomicU64, AtomicUsize, Ordering},
62        Arc,
63    },
64    task::{Context, Poll},
65    time::{Duration, Instant},
66};
67use tokio::sync::mpsc::{self, error::TrySendError};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, trace, warn};
70
71#[cfg_attr(doc, aquamarine::aquamarine)]
72// TODO: Inlined diagram due to a bug in aquamarine library, should become an include when it's
73// fixed. See https://github.com/mersinvald/aquamarine/issues/50
74// include_mmd!("docs/mermaid/network-manager.mmd")
75/// Manages the _entire_ state of the network.
76///
77/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
78///
79/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
80///
81/// ```mermaid
82/// graph TB
83///   handle(NetworkHandle)
84///   events(NetworkEvents)
85///   transactions(Transactions Task)
86///   ethrequest(ETH Request Task)
87///   discovery(Discovery Task)
88///   subgraph NetworkManager
89///     direction LR
90///     subgraph Swarm
91///         direction TB
92///         B1[(Session Manager)]
93///         B2[(Connection Listener)]
94///         B3[(Network State)]
95///     end
96///  end
97///  handle <--> |request response channel| NetworkManager
98///  NetworkManager --> |Network events| events
99///  transactions <--> |transactions| NetworkManager
100///  ethrequest <--> |ETH request handing| NetworkManager
101///  discovery --> |Discovered peers| NetworkManager
102/// ```
103#[derive(Debug)]
104#[must_use = "The NetworkManager does nothing unless polled"]
105pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
106    /// The type that manages the actual network part, which includes connections.
107    swarm: Swarm<N>,
108    /// Underlying network handle that can be shared.
109    handle: NetworkHandle<N>,
110    /// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
111    from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
112    /// Handles block imports according to the `eth` protocol.
113    block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
114    /// Sender for high level network events.
115    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
116    /// Sender half to send events to the
117    /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
118    to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
119    /// Sender half to send events to the
120    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
121    ///
122    /// The channel that originally receives and bundles all requests from all sessions is already
123    /// bounded. However, since handling an eth request is more I/O intensive than delegating
124    /// them from the bounded channel to the eth-request channel, it is possible that this
125    /// builds up if the node is flooded with requests.
126    ///
127    /// Even though nonmalicious requests are relatively cheap, it's possible to craft
128    /// body requests with bogus data up until the allowed max message size limit.
129    /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
130    /// requests. This channel size is set at
131    /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
132    to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
133    /// Tracks the number of active session (connected peers).
134    ///
135    /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
136    /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
137    num_active_peers: Arc<AtomicUsize>,
138    /// Metrics for the Network
139    metrics: NetworkMetrics,
140    /// Disconnect metrics for the Network
141    disconnect_metrics: DisconnectMetrics,
142}
143
144impl NetworkManager {
145    /// Creates the manager of a new network with [`EthNetworkPrimitives`] types.
146    ///
147    /// ```no_run
148    /// # async fn f() {
149    /// use reth_chainspec::MAINNET;
150    /// use reth_network::{NetworkConfig, NetworkManager};
151    /// let config =
152    ///     NetworkConfig::builder_with_rng_secret_key().build_with_noop_provider(MAINNET.clone());
153    /// let manager = NetworkManager::eth(config).await;
154    /// # }
155    /// ```
156    pub async fn eth<C: BlockNumReader + 'static>(
157        config: NetworkConfig<C, EthNetworkPrimitives>,
158    ) -> Result<Self, NetworkError> {
159        Self::new(config).await
160    }
161}
162
163impl<N: NetworkPrimitives> NetworkManager<N> {
164    /// Sets the dedicated channel for events intended for the
165    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
166    pub fn with_transactions(
167        mut self,
168        tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
169    ) -> Self {
170        self.set_transactions(tx);
171        self
172    }
173
174    /// Sets the dedicated channel for events intended for the
175    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
176    pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
177        self.to_transactions_manager =
178            Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
179    }
180
181    /// Sets the dedicated channel for events intended for the
182    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
183    pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
184        self.set_eth_request_handler(tx);
185        self
186    }
187
188    /// Sets the dedicated channel for events intended for the
189    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
190    pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
191        self.to_eth_request_handler = Some(tx);
192    }
193
194    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
195    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
196        self.swarm.add_rlpx_sub_protocol(protocol)
197    }
198
199    /// Returns the [`NetworkHandle`] that can be cloned and shared.
200    ///
201    /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
202    pub const fn handle(&self) -> &NetworkHandle<N> {
203        &self.handle
204    }
205
206    /// Returns the secret key used for authenticating sessions.
207    pub const fn secret_key(&self) -> SecretKey {
208        self.swarm.sessions().secret_key()
209    }
210
211    #[inline]
212    fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
213        let metrics = &self.metrics;
214
215        let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
216
217        // update metrics for whole poll function
218        metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
219        // update poll metrics for nested items
220        metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
221        metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
222    }
223
224    /// Creates the manager of a new network.
225    ///
226    /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
227    /// state of the entire network.
228    pub async fn new<C: BlockNumReader + 'static>(
229        config: NetworkConfig<C, N>,
230    ) -> Result<Self, NetworkError> {
231        let NetworkConfig {
232            client,
233            secret_key,
234            discovery_v4_addr,
235            mut discovery_v4_config,
236            mut discovery_v5_config,
237            listener_addr,
238            peers_config,
239            sessions_config,
240            chain_id,
241            block_import,
242            network_mode,
243            boot_nodes,
244            executor,
245            hello_message,
246            status,
247            fork_filter,
248            dns_discovery_config,
249            extra_protocols,
250            tx_gossip_disabled,
251            transactions_manager_config: _,
252            nat,
253            handshake,
254            required_block_hashes,
255        } = config;
256
257        let peers_manager = PeersManager::new(peers_config);
258        let peers_handle = peers_manager.handle();
259
260        let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
261            NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
262        })?;
263
264        // retrieve the tcp address of the socket
265        let listener_addr = incoming.local_address();
266
267        // resolve boot nodes
268        let resolved_boot_nodes =
269            futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
270
271        if let Some(disc_config) = discovery_v4_config.as_mut() {
272            // merge configured boot nodes
273            disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
274            // add the forkid entry for EIP-868, but wrap it in an `EnrForkIdEntry` for proper
275            // encoding
276            disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
277        }
278
279        if let Some(discv5) = discovery_v5_config.as_mut() {
280            // merge configured boot nodes
281            discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
282        }
283
284        let discovery = Discovery::new(
285            listener_addr,
286            discovery_v4_addr,
287            secret_key,
288            discovery_v4_config,
289            discovery_v5_config,
290            dns_discovery_config,
291        )
292        .await?;
293        // need to retrieve the addr here since provided port could be `0`
294        let local_peer_id = discovery.local_id();
295        let discv4 = discovery.discv4();
296        let discv5 = discovery.discv5();
297
298        let num_active_peers = Arc::new(AtomicUsize::new(0));
299
300        let sessions = SessionManager::new(
301            secret_key,
302            sessions_config,
303            executor,
304            status,
305            hello_message,
306            fork_filter,
307            extra_protocols,
308            handshake,
309        );
310
311        let state = NetworkState::new(
312            crate::state::BlockNumReader::new(client),
313            discovery,
314            peers_manager,
315            Arc::clone(&num_active_peers),
316        );
317
318        let swarm = Swarm::new(incoming, sessions, state);
319
320        let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
321
322        let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
323
324        let handle = NetworkHandle::new(
325            Arc::clone(&num_active_peers),
326            Arc::new(Mutex::new(listener_addr)),
327            to_manager_tx,
328            secret_key,
329            local_peer_id,
330            peers_handle,
331            network_mode,
332            Arc::new(AtomicU64::new(chain_id)),
333            tx_gossip_disabled,
334            discv4,
335            discv5,
336            event_sender.clone(),
337            nat,
338        );
339
340        // Spawn required block peer filter if configured
341        if !required_block_hashes.is_empty() {
342            let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
343            filter.spawn();
344        }
345
346        Ok(Self {
347            swarm,
348            handle,
349            from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
350            block_import,
351            event_sender,
352            to_transactions_manager: None,
353            to_eth_request_handler: None,
354            num_active_peers,
355            metrics: Default::default(),
356            disconnect_metrics: Default::default(),
357        })
358    }
359
360    /// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
361    /// components of the network
362    ///
363    /// ```
364    /// use reth_network::{
365    ///     config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
366    /// };
367    /// use reth_network_peers::mainnet_nodes;
368    /// use reth_storage_api::noop::NoopProvider;
369    /// use reth_transaction_pool::TransactionPool;
370    /// async fn launch<Pool: TransactionPool>(pool: Pool) {
371    ///     // This block provider implementation is used for testing purposes.
372    ///     let client = NoopProvider::default();
373    ///
374    ///     // The key that's used for encrypting sessions and to identify our node.
375    ///     let local_key = rng_secret_key();
376    ///
377    ///     let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
378    ///         .boot_nodes(mainnet_nodes())
379    ///         .build(client.clone());
380    ///     let transactions_manager_config = config.transactions_manager_config.clone();
381    ///
382    ///     // create the network instance
383    ///     let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
384    ///         .await
385    ///         .unwrap()
386    ///         .transactions(pool, transactions_manager_config)
387    ///         .request_handler(client)
388    ///         .split_with_handle();
389    /// }
390    /// ```
391    pub async fn builder<C: BlockNumReader + 'static>(
392        config: NetworkConfig<C, N>,
393    ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
394        let network = Self::new(config).await?;
395        Ok(network.into_builder())
396    }
397
398    /// Create a [`NetworkBuilder`] to configure all components of the network
399    pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
400        NetworkBuilder { network: self, transactions: (), request_handler: () }
401    }
402
403    /// Returns the [`SocketAddr`] that listens for incoming tcp connections.
404    pub const fn local_addr(&self) -> SocketAddr {
405        self.swarm.listener().local_address()
406    }
407
408    /// How many peers we're currently connected to.
409    pub fn num_connected_peers(&self) -> usize {
410        self.swarm.state().num_active_peers()
411    }
412
413    /// Returns the [`PeerId`] used in the network.
414    pub fn peer_id(&self) -> &PeerId {
415        self.handle.peer_id()
416    }
417
418    /// Returns an iterator over all peers in the peer set.
419    pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
420        self.swarm.state().peers().iter_peers()
421    }
422
423    /// Returns the number of peers in the peer set.
424    pub fn num_known_peers(&self) -> usize {
425        self.swarm.state().peers().num_known_peers()
426    }
427
428    /// Returns a new [`PeersHandle`] that can be cloned and shared.
429    ///
430    /// The [`PeersHandle`] can be used to interact with the network's peer set.
431    pub fn peers_handle(&self) -> PeersHandle {
432        self.swarm.state().peers().handle()
433    }
434
435    /// Collect the peers from the [`NetworkManager`] and write them to the given
436    /// `persistent_peers_file`.
437    pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
438        let known_peers = self.all_peers().collect::<Vec<_>>();
439        persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
440        reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
441        Ok(())
442    }
443
444    /// Returns a new [`FetchClient`] that can be cloned and shared.
445    ///
446    /// The [`FetchClient`] is the entrypoint for sending requests to the network.
447    pub fn fetch_client(&self) -> FetchClient<N> {
448        self.swarm.state().fetch_client()
449    }
450
451    /// Returns the current [`NetworkStatus`] for the local node.
452    pub fn status(&self) -> NetworkStatus {
453        let sessions = self.swarm.sessions();
454        let status = sessions.status();
455        let hello_message = sessions.hello_message();
456
457        #[expect(deprecated)]
458        NetworkStatus {
459            client_version: hello_message.client_version,
460            protocol_version: hello_message.protocol_version as u64,
461            eth_protocol_info: EthProtocolInfo {
462                difficulty: None,
463                head: status.blockhash,
464                network: status.chain.id(),
465                genesis: status.genesis,
466                config: Default::default(),
467            },
468            capabilities: hello_message
469                .protocols
470                .into_iter()
471                .map(|protocol| protocol.cap)
472                .collect(),
473        }
474    }
475
476    /// Sends an event to the [`TransactionsManager`](crate::transactions::TransactionsManager) if
477    /// configured.
478    fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
479        if let Some(ref tx) = self.to_transactions_manager {
480            let _ = tx.send(event);
481        }
482    }
483
484    /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
485    /// configured.
486    fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
487        if let Some(ref reqs) = self.to_eth_request_handler {
488            let _ = reqs.try_send(event).map_err(|e| {
489                if let TrySendError::Full(_) = e {
490                    debug!(target:"net", "EthRequestHandler channel is full!");
491                    self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
492                }
493            });
494        }
495    }
496
497    /// Handle an incoming request from the peer
498    fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
499        match req {
500            PeerRequest::GetBlockHeaders { request, response } => {
501                self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
502                    peer_id,
503                    request,
504                    response,
505                })
506            }
507            PeerRequest::GetBlockBodies { request, response } => {
508                self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
509                    peer_id,
510                    request,
511                    response,
512                })
513            }
514            PeerRequest::GetNodeData { request, response } => {
515                self.delegate_eth_request(IncomingEthRequest::GetNodeData {
516                    peer_id,
517                    request,
518                    response,
519                })
520            }
521            PeerRequest::GetReceipts { request, response } => {
522                self.delegate_eth_request(IncomingEthRequest::GetReceipts {
523                    peer_id,
524                    request,
525                    response,
526                })
527            }
528            PeerRequest::GetReceipts69 { request, response } => {
529                self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
530                    peer_id,
531                    request,
532                    response,
533                })
534            }
535            PeerRequest::GetReceipts70 { request, response } => {
536                self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
537                    peer_id,
538                    request,
539                    response,
540                })
541            }
542            PeerRequest::GetPooledTransactions { request, response } => {
543                self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
544                    peer_id,
545                    request,
546                    response,
547                });
548            }
549        }
550    }
551
552    /// Invoked after a `NewBlock` message from the peer was validated
553    fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
554        match event {
555            BlockImportEvent::Announcement(validation) => match validation {
556                BlockValidation::ValidHeader { block } => {
557                    self.swarm.state_mut().announce_new_block(block);
558                }
559                BlockValidation::ValidBlock { block } => {
560                    self.swarm.state_mut().announce_new_block_hash(block);
561                }
562            },
563            BlockImportEvent::Outcome(outcome) => {
564                let BlockImportOutcome { peer, result } = outcome;
565                match result {
566                    Ok(validated_block) => match validated_block {
567                        BlockValidation::ValidHeader { block } => {
568                            self.swarm.state_mut().update_peer_block(
569                                &peer,
570                                block.hash,
571                                block.number(),
572                            );
573                            self.swarm.state_mut().announce_new_block(block);
574                        }
575                        BlockValidation::ValidBlock { block } => {
576                            self.swarm.state_mut().announce_new_block_hash(block);
577                        }
578                    },
579                    Err(_err) => {
580                        self.swarm
581                            .state_mut()
582                            .peers_mut()
583                            .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
584                    }
585                }
586            }
587        }
588    }
589
590    /// Enforces [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) consensus rules for the network protocol
591    ///
592    /// Depending on the mode of the network:
593    ///    - disconnect peer if in POS
594    ///    - execute the closure if in POW
595    fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
596    where
597        F: FnOnce(&mut Self),
598    {
599        // reject message in POS
600        if self.handle.mode().is_stake() {
601            // connections to peers which send invalid messages should be terminated
602            self.swarm
603                .sessions_mut()
604                .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
605        } else {
606            only_pow(self);
607        }
608    }
609
610    /// Handles a received Message from the peer's session.
611    fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
612        match msg {
613            PeerMessage::NewBlockHashes(hashes) => {
614                self.within_pow_or_disconnect(peer_id, |this| {
615                    // update peer's state, to track what blocks this peer has seen
616                    this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
617                    // start block import process for the hashes
618                    this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
619                })
620            }
621            PeerMessage::NewBlock(block) => {
622                self.within_pow_or_disconnect(peer_id, move |this| {
623                    this.swarm.state_mut().on_new_block(peer_id, block.hash);
624                    // start block import process
625                    this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
626                });
627            }
628            PeerMessage::PooledTransactions(msg) => {
629                self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
630                    peer_id,
631                    msg,
632                });
633            }
634            PeerMessage::EthRequest(req) => {
635                self.on_eth_request(peer_id, req);
636            }
637            PeerMessage::ReceivedTransaction(msg) => {
638                self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
639                    peer_id,
640                    msg,
641                });
642            }
643            PeerMessage::SendTransactions(_) => {
644                unreachable!("Not emitted by session")
645            }
646            PeerMessage::BlockRangeUpdated(_) => {}
647            PeerMessage::Other(other) => {
648                debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
649            }
650        }
651    }
652
653    /// Handler for received messages from a handle
654    fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
655        match msg {
656            NetworkHandleMessage::DiscoveryListener(tx) => {
657                self.swarm.state_mut().discovery_mut().add_listener(tx);
658            }
659            NetworkHandleMessage::AnnounceBlock(block, hash) => {
660                if self.handle.mode().is_stake() {
661                    // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
662                    warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
663                    return
664                }
665                let msg = NewBlockMessage { hash, block: Arc::new(block) };
666                self.swarm.state_mut().announce_new_block(msg);
667            }
668            NetworkHandleMessage::EthRequest { peer_id, request } => {
669                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
670            }
671            NetworkHandleMessage::SendTransaction { peer_id, msg } => {
672                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
673            }
674            NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
675                .swarm
676                .sessions_mut()
677                .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
678            NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
679                self.swarm.state_mut().add_trusted_peer_id(peer_id);
680            }
681            NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
682                // only add peer if we are not shutting down
683                if !self.swarm.is_shutting_down() {
684                    self.swarm.state_mut().add_peer_kind(peer, kind, addr);
685                }
686            }
687            NetworkHandleMessage::RemovePeer(peer_id, kind) => {
688                self.swarm.state_mut().remove_peer_kind(peer_id, kind);
689            }
690            NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
691                self.swarm.sessions_mut().disconnect(peer_id, reason);
692            }
693            NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
694                self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
695            }
696            NetworkHandleMessage::SetNetworkState(net_state) => {
697                // Sets network connection state between Active and Hibernate.
698                // If hibernate stops the node to fill new outbound
699                // connections, this is beneficial for sync stages that do not require a network
700                // connection.
701                self.swarm.on_network_state_change(net_state);
702            }
703
704            NetworkHandleMessage::Shutdown(tx) => {
705                self.perform_network_shutdown();
706                let _ = tx.send(());
707            }
708            NetworkHandleMessage::ReputationChange(peer_id, kind) => {
709                self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
710            }
711            NetworkHandleMessage::GetReputationById(peer_id, tx) => {
712                let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
713            }
714            NetworkHandleMessage::FetchClient(tx) => {
715                let _ = tx.send(self.fetch_client());
716            }
717            NetworkHandleMessage::GetStatus(tx) => {
718                let _ = tx.send(self.status());
719            }
720            NetworkHandleMessage::StatusUpdate { head } => {
721                if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
722                    self.swarm.state_mut().update_fork_id(transition.current);
723                }
724            }
725            NetworkHandleMessage::GetPeerInfos(tx) => {
726                let _ = tx.send(self.get_peer_infos());
727            }
728            NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
729                let _ = tx.send(self.get_peer_info_by_id(peer_id));
730            }
731            NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
732                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
733            }
734            NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
735                let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
736                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
737            }
738            NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
739            NetworkHandleMessage::GetTransactionsHandle(tx) => {
740                if let Some(ref tx_inner) = self.to_transactions_manager {
741                    let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
742                } else {
743                    let _ = tx.send(None);
744                }
745            }
746            NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
747                self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
748            }
749            NetworkHandleMessage::EthMessage { peer_id, message } => {
750                self.swarm.sessions_mut().send_message(&peer_id, message)
751            }
752        }
753    }
754
755    fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
756        // handle event
757        match event {
758            SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
759            SwarmEvent::TcpListenerClosed { remote_addr } => {
760                trace!(target: "net", ?remote_addr, "TCP listener closed.");
761            }
762            SwarmEvent::TcpListenerError(err) => {
763                trace!(target: "net", %err, "TCP connection error.");
764            }
765            SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
766                trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
767                self.metrics.total_incoming_connections.increment(1);
768                self.metrics
769                    .incoming_connections
770                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
771            }
772            SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
773                trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
774                self.metrics.total_outgoing_connections.increment(1);
775                self.update_pending_connection_metrics()
776            }
777            SwarmEvent::SessionEstablished {
778                peer_id,
779                remote_addr,
780                client_version,
781                capabilities,
782                version,
783                messages,
784                status,
785                direction,
786            } => {
787                let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
788                self.metrics.connected_peers.set(total_active as f64);
789                debug!(
790                    target: "net",
791                    ?remote_addr,
792                    %client_version,
793                    ?peer_id,
794                    ?total_active,
795                    kind=%direction,
796                    peer_enode=%NodeRecord::new(remote_addr, peer_id),
797                    "Session established"
798                );
799
800                if direction.is_incoming() {
801                    self.swarm
802                        .state_mut()
803                        .peers_mut()
804                        .on_incoming_session_established(peer_id, remote_addr);
805                }
806
807                if direction.is_outgoing() {
808                    self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
809                }
810
811                self.update_active_connection_metrics();
812
813                let peer_kind = self
814                    .swarm
815                    .state()
816                    .peers()
817                    .peer_by_id(peer_id)
818                    .map(|(_, kind)| kind)
819                    .unwrap_or_default();
820                let session_info = SessionInfo {
821                    peer_id,
822                    remote_addr,
823                    client_version,
824                    capabilities,
825                    status,
826                    version,
827                    peer_kind,
828                };
829
830                self.event_sender
831                    .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
832            }
833            SwarmEvent::PeerAdded(peer_id) => {
834                trace!(target: "net", ?peer_id, "Peer added");
835                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
836                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
837            }
838            SwarmEvent::PeerRemoved(peer_id) => {
839                trace!(target: "net", ?peer_id, "Peer dropped");
840                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
841                self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
842            }
843            SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
844                let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
845                self.metrics.connected_peers.set(total_active as f64);
846                trace!(
847                    target: "net",
848                    ?remote_addr,
849                    ?peer_id,
850                    ?total_active,
851                    ?error,
852                    "Session disconnected"
853                );
854
855                let reason = if let Some(ref err) = error {
856                    // If the connection was closed due to an error, we report
857                    // the peer
858                    self.swarm.state_mut().peers_mut().on_active_session_dropped(
859                        &remote_addr,
860                        &peer_id,
861                        err,
862                    );
863                    err.as_disconnected()
864                } else {
865                    // Gracefully disconnected
866                    self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
867                    None
868                };
869                self.metrics.closed_sessions.increment(1);
870                self.update_active_connection_metrics();
871
872                if let Some(reason) = reason {
873                    self.disconnect_metrics.increment(reason);
874                }
875                self.metrics.backed_off_peers.set(
876                        self.swarm
877                            .state()
878                            .peers()
879                            .num_backed_off_peers()
880                            .saturating_sub(1)
881                            as f64,
882                    );
883                self.event_sender
884                    .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
885            }
886            SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
887                trace!(
888                    target: "net",
889                    ?remote_addr,
890                    ?error,
891                    "Incoming pending session failed"
892                );
893
894                if let Some(ref err) = error {
895                    self.swarm
896                        .state_mut()
897                        .peers_mut()
898                        .on_incoming_pending_session_dropped(remote_addr, err);
899                    self.metrics.pending_session_failures.increment(1);
900                    if let Some(reason) = err.as_disconnected() {
901                        self.disconnect_metrics.increment(reason);
902                    }
903                } else {
904                    self.swarm
905                        .state_mut()
906                        .peers_mut()
907                        .on_incoming_pending_session_gracefully_closed();
908                }
909                self.metrics.closed_sessions.increment(1);
910                self.metrics
911                    .incoming_connections
912                    .set(self.swarm.state().peers().num_inbound_connections() as f64);
913                self.metrics.backed_off_peers.set(
914                        self.swarm
915                            .state()
916                            .peers()
917                            .num_backed_off_peers()
918                            .saturating_sub(1)
919                            as f64,
920                    );
921            }
922            SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
923                trace!(
924                    target: "net",
925                    ?remote_addr,
926                    ?peer_id,
927                    ?error,
928                    "Outgoing pending session failed"
929                );
930
931                if let Some(ref err) = error {
932                    self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
933                        &remote_addr,
934                        &peer_id,
935                        err,
936                    );
937                    self.metrics.pending_session_failures.increment(1);
938                    if let Some(reason) = err.as_disconnected() {
939                        self.disconnect_metrics.increment(reason);
940                    }
941                } else {
942                    self.swarm
943                        .state_mut()
944                        .peers_mut()
945                        .on_outgoing_pending_session_gracefully_closed(&peer_id);
946                }
947                self.metrics.closed_sessions.increment(1);
948                self.update_pending_connection_metrics();
949
950                self.metrics.backed_off_peers.set(
951                        self.swarm
952                            .state()
953                            .peers()
954                            .num_backed_off_peers()
955                            .saturating_sub(1)
956                            as f64,
957                    );
958            }
959            SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
960                trace!(
961                    target: "net",
962                    ?remote_addr,
963                    ?peer_id,
964                    %error,
965                    "Outgoing connection error"
966                );
967
968                self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
969                    &remote_addr,
970                    &peer_id,
971                    &error,
972                );
973
974                self.metrics.backed_off_peers.set(
975                        self.swarm
976                            .state()
977                            .peers()
978                            .num_backed_off_peers()
979                            .saturating_sub(1)
980                            as f64,
981                    );
982                self.update_pending_connection_metrics();
983            }
984            SwarmEvent::BadMessage { peer_id } => {
985                self.swarm
986                    .state_mut()
987                    .peers_mut()
988                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
989                self.metrics.invalid_messages_received.increment(1);
990            }
991            SwarmEvent::ProtocolBreach { peer_id } => {
992                self.swarm
993                    .state_mut()
994                    .peers_mut()
995                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
996            }
997        }
998    }
999
1000    /// Returns [`PeerInfo`] for all connected peers
1001    fn get_peer_infos(&self) -> Vec<PeerInfo> {
1002        self.swarm
1003            .sessions()
1004            .active_sessions()
1005            .iter()
1006            .filter_map(|(&peer_id, session)| {
1007                self.swarm
1008                    .state()
1009                    .peers()
1010                    .peer_by_id(peer_id)
1011                    .map(|(record, kind)| session.peer_info(&record, kind))
1012            })
1013            .collect()
1014    }
1015
1016    /// Returns [`PeerInfo`] for a given peer.
1017    ///
1018    /// Returns `None` if there's no active session to the peer.
1019    fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1020        self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1021            self.swarm
1022                .state()
1023                .peers()
1024                .peer_by_id(peer_id)
1025                .map(|(record, kind)| session.peer_info(&record, kind))
1026        })
1027    }
1028
1029    /// Returns [`PeerInfo`] for a given peers.
1030    ///
1031    /// Ignore the non-active peer.
1032    fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1033        peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1034    }
1035
1036    /// Updates the metrics for active,established connections
1037    #[inline]
1038    fn update_active_connection_metrics(&self) {
1039        self.metrics
1040            .incoming_connections
1041            .set(self.swarm.state().peers().num_inbound_connections() as f64);
1042        self.metrics
1043            .outgoing_connections
1044            .set(self.swarm.state().peers().num_outbound_connections() as f64);
1045    }
1046
1047    /// Updates the metrics for pending connections
1048    #[inline]
1049    fn update_pending_connection_metrics(&self) {
1050        self.metrics
1051            .pending_outgoing_connections
1052            .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1053        self.metrics
1054            .total_pending_connections
1055            .set(self.swarm.sessions().num_pending_connections() as f64);
1056    }
1057
1058    /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
1059    ///
1060    /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
1061    pub async fn run_until_graceful_shutdown<F, R>(
1062        mut self,
1063        shutdown: GracefulShutdown,
1064        shutdown_hook: F,
1065    ) -> R
1066    where
1067        F: FnOnce(Self) -> R,
1068    {
1069        let mut graceful_guard = None;
1070        tokio::select! {
1071            _ = &mut self => {},
1072            guard = shutdown => {
1073                graceful_guard = Some(guard);
1074            },
1075        }
1076
1077        self.perform_network_shutdown();
1078        let res = shutdown_hook(self);
1079        drop(graceful_guard);
1080        res
1081    }
1082
1083    /// Performs a graceful network shutdown by stopping new connections from being accepted while
1084    /// draining current and pending connections.
1085    fn perform_network_shutdown(&mut self) {
1086        // Set connection status to `Shutdown`. Stops node from accepting
1087        // new incoming connections as well as sending connection requests to newly
1088        // discovered nodes.
1089        self.swarm.on_shutdown_requested();
1090        // Disconnect all active connections
1091        self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1092        // drop pending connections
1093        self.swarm.sessions_mut().disconnect_all_pending();
1094    }
1095}
1096
1097impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1098    type Output = ();
1099
1100    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1101        let start = Instant::now();
1102        let mut poll_durations = NetworkManagerPollDurations::default();
1103
1104        let this = self.get_mut();
1105
1106        // poll new block imports (expected to be a noop for POS)
1107        while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1108            this.on_block_import_result(outcome);
1109        }
1110
1111        // These loops drive the entire state of network and does a lot of work. Under heavy load
1112        // (many messages/events), data may arrive faster than it can be processed (incoming
1113        // messages/requests -> events), and it is possible that more data has already arrived by
1114        // the time an internal event is processed. Which could turn this loop into a busy loop.
1115        // Without yielding back to the executor, it can starve other tasks waiting on that
1116        // executor to execute them, or drive underlying resources To prevent this, we
1117        // preemptively return control when the `budget` is exhausted. The value itself is chosen
1118        // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
1119        // low enough that this loop does not starve other tasks for too long. If the budget is
1120        // exhausted we manually yield back control to the (coop) scheduler. This manual yield
1121        // point should prevent situations where polling appears to be frozen. See also
1122        // <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
1123        // <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
1124        //
1125        // Testing has shown that this loop naturally reaches the pending state within 1-5
1126        // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
1127        // range of what's recommended as rule of thumb.
1128        // <https://ryhl.io/blog/async-what-is-blocking/>
1129
1130        // process incoming messages from a handle (`TransactionsManager` has one)
1131        //
1132        // will only be closed if the channel was deliberately closed since we always have an
1133        // instance of `NetworkHandle`
1134        let start_network_handle = Instant::now();
1135        let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1136            "net",
1137            "Network message channel",
1138            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1139            this.from_handle_rx.poll_next_unpin(cx),
1140            |msg| this.on_handle_message(msg),
1141            error!("Network channel closed");
1142        );
1143        poll_durations.acc_network_handle = start_network_handle.elapsed();
1144
1145        // process incoming messages from the network
1146        let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1147            "net",
1148            "Swarm events stream",
1149            DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1150            this.swarm.poll_next_unpin(cx),
1151            |event| this.on_swarm_event(event),
1152        );
1153        poll_durations.acc_swarm =
1154            start_network_handle.elapsed() - poll_durations.acc_network_handle;
1155
1156        // all streams are fully drained and import futures pending
1157        if maybe_more_handle_messages || maybe_more_swarm_events {
1158            // make sure we're woken up again
1159            cx.waker().wake_by_ref();
1160            return Poll::Pending
1161        }
1162
1163        this.update_poll_metrics(start, poll_durations);
1164
1165        Poll::Pending
1166    }
1167}
1168
1169#[derive(Debug, Default)]
1170struct NetworkManagerPollDurations {
1171    acc_network_handle: Duration,
1172    acc_swarm: Duration,
1173}