Skip to main content

reth_network/
manager.rs

1//! High level network management.
2//!
3//! The [`NetworkManager`] contains the state of the network as a whole. It controls how connections
4//! are handled and keeps track of connections to peers.
5//!
6//! ## Capabilities
7//!
8//! The network manages peers depending on their announced capabilities via their `RLPx` sessions. Most importantly the [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md)(`eth`).
9//!
10//! ## Overview
11//!
12//! The [`NetworkManager`] is responsible for advancing the state of the `network`. The `network` is
13//! made up of peer-to-peer connections between nodes that are available on the same network.
14//! Responsible for peer discovery is ethereum's discovery protocol (discv4, discv5). If the address
15//! (IP+port) of our node is published via discovery, remote peers can initiate inbound connections
16//! to the local node. Once a (tcp) connection is established, both peers start to authenticate a [RLPx session](https://github.com/ethereum/devp2p/blob/master/rlpx.md) via a handshake. If the handshake was successful, both peers announce their capabilities and are now ready to exchange sub-protocol messages via the `RLPx` session.
17
18use crate::{
19    budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20    config::NetworkConfig,
21    discovery::Discovery,
22    error::{NetworkError, ServiceKind},
23    eth_requests::IncomingEthRequest,
24    import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25    listener::ConnectionListener,
26    message::{NewBlockMessage, PeerMessage},
27    metrics::{
28        BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
29        PendingSessionFailureMetrics,
30    },
31    network::{NetworkHandle, NetworkHandleMessage},
32    peers::{BackoffReason, PeersManager},
33    poll_nested_stream_with_budget,
34    protocol::IntoRlpxSubProtocol,
35    required_block_filter::RequiredBlockFilter,
36    session::SessionManager,
37    state::NetworkState,
38    swarm::{Swarm, SwarmEvent},
39    transactions::NetworkTransactionEvent,
40    FetchClient, NetworkBuilder,
41};
42use futures::{Future, StreamExt};
43use parking_lot::Mutex;
44use reth_chainspec::EnrForkIdEntry;
45use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
46use reth_fs_util::{self as fs, FsPathError};
47use reth_metrics::common::mpsc::MemoryBoundedSender;
48use reth_network_api::{
49    events::{PeerEvent, SessionInfo},
50    test_utils::PeersHandle,
51    EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
52};
53use reth_network_peers::{NodeRecord, PeerId};
54use reth_network_types::ReputationChangeKind;
55use reth_storage_api::BlockNumReader;
56use reth_tasks::shutdown::GracefulShutdown;
57use reth_tokio_util::EventSender;
58use secp256k1::SecretKey;
59use std::{
60    net::SocketAddr,
61    path::Path,
62    pin::Pin,
63    sync::{
64        atomic::{AtomicU64, AtomicUsize, Ordering},
65        Arc,
66    },
67    task::{Context, Poll},
68    time::{Duration, Instant},
69};
70use tokio::sync::mpsc::{self, error::TrySendError};
71use tokio_stream::wrappers::UnboundedReceiverStream;
72use tracing::{debug, error, trace, warn};
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
75// TODO: Inlined diagram due to a bug in aquamarine library, should become an include when it's
76// fixed. See https://github.com/mersinvald/aquamarine/issues/50
77// include_mmd!("docs/mermaid/network-manager.mmd")
78/// Manages the _entire_ state of the network.
79///
80/// This is an endless [`Future`] that consistently drives the state of the entire network forward.
81///
82/// The [`NetworkManager`] is the container type for all parts involved with advancing the network.
83///
84/// ```mermaid
85/// graph TB
86///   handle(NetworkHandle)
87///   events(NetworkEvents)
88///   transactions(Transactions Task)
89///   ethrequest(ETH Request Task)
90///   discovery(Discovery Task)
91///   subgraph NetworkManager
92///     direction LR
93///     subgraph Swarm
94///         direction TB
95///         B1[(Session Manager)]
96///         B2[(Connection Listener)]
97///         B3[(Network State)]
98///     end
99///  end
100///  handle <--> |request response channel| NetworkManager
101///  NetworkManager --> |Network events| events
102///  transactions <--> |transactions| NetworkManager
103///  ethrequest <--> |ETH request handing| NetworkManager
104///  discovery --> |Discovered peers| NetworkManager
105/// ```
106#[derive(Debug)]
107#[must_use = "The NetworkManager does nothing unless polled"]
108pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
109    /// The type that manages the actual network part, which includes connections.
110    swarm: Swarm<N>,
111    /// Underlying network handle that can be shared.
112    handle: NetworkHandle<N>,
113    /// Receiver half of the command channel set up between this type and the [`NetworkHandle`]
114    from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
115    /// Handles block imports according to the `eth` protocol.
116    block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
117    /// Sender for high level network events.
118    event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
119    /// Sender half to send events to the
120    /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
121    to_transactions_manager: Option<MemoryBoundedSender<NetworkTransactionEvent<N>>>,
122    /// Sender half to send events to the
123    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
124    ///
125    /// The channel that originally receives and bundles all requests from all sessions is already
126    /// bounded. However, since handling an eth request is more I/O intensive than delegating
127    /// them from the bounded channel to the eth-request channel, it is possible that this
128    /// builds up if the node is flooded with requests.
129    ///
130    /// Even though nonmalicious requests are relatively cheap, it's possible to craft
131    /// body requests with bogus data up until the allowed max message size limit.
132    /// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
133    /// requests. This channel size is set at
134    /// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
135    to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
136    /// Tracks the number of active session (connected peers).
137    ///
138    /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
139    /// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
140    num_active_peers: Arc<AtomicUsize>,
141    /// Metrics for the Network
142    metrics: NetworkMetrics,
143    /// Disconnect metrics for the Network, split by connection direction.
144    disconnect_metrics: DirectionalDisconnectMetrics,
145    /// Closed sessions metrics, split by direction.
146    closed_sessions_metrics: ClosedSessionsMetrics,
147    /// Pending session failure metrics, split by direction.
148    pending_session_failure_metrics: PendingSessionFailureMetrics,
149    /// Backed off peers metrics, split by reason.
150    backed_off_peers_metrics: BackedOffPeersMetrics,
151}
152
153impl NetworkManager {
154    /// Creates the manager of a new network with [`EthNetworkPrimitives`] types.
155    ///
156    /// ```no_run
157    /// # async fn f() {
158    /// use reth_chainspec::MAINNET;
159    /// use reth_network::{NetworkConfig, NetworkManager};
160    /// use reth_tasks::Runtime;
161    /// let config = NetworkConfig::builder_with_rng_secret_key(Runtime::test())
162    ///     .build_with_noop_provider(MAINNET.clone());
163    /// let manager = NetworkManager::eth(config).await;
164    /// # }
165    /// ```
166    pub async fn eth<C: BlockNumReader + 'static>(
167        config: NetworkConfig<C, EthNetworkPrimitives>,
168    ) -> Result<Self, NetworkError> {
169        Self::new(config).await
170    }
171}
172
173impl<N: NetworkPrimitives> NetworkManager<N> {
174    /// Sets the dedicated channel for events intended for the
175    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
176    pub fn with_transactions(
177        mut self,
178        tx: MemoryBoundedSender<NetworkTransactionEvent<N>>,
179    ) -> Self {
180        self.set_transactions(tx);
181        self
182    }
183
184    /// Sets the dedicated channel for events intended for the
185    /// [`TransactionsManager`](crate::transactions::TransactionsManager).
186    pub fn set_transactions(&mut self, tx: MemoryBoundedSender<NetworkTransactionEvent<N>>) {
187        self.to_transactions_manager = Some(tx);
188    }
189
190    /// Sets the dedicated channel for events intended for the
191    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
192    pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
193        self.set_eth_request_handler(tx);
194        self
195    }
196
197    /// Sets the dedicated channel for events intended for the
198    /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
199    pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
200        self.to_eth_request_handler = Some(tx);
201    }
202
203    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
204    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
205        self.swarm.add_rlpx_sub_protocol(protocol)
206    }
207
208    /// Returns the [`NetworkHandle`] that can be cloned and shared.
209    ///
210    /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
211    pub const fn handle(&self) -> &NetworkHandle<N> {
212        &self.handle
213    }
214
215    /// Returns the secret key used for authenticating sessions.
216    pub const fn secret_key(&self) -> SecretKey {
217        self.swarm.sessions().secret_key()
218    }
219
220    #[inline]
221    fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
222        let metrics = &self.metrics;
223
224        let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
225
226        // update metrics for whole poll function
227        metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
228        // update poll metrics for nested items
229        metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
230        metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
231    }
232
233    /// Creates the manager of a new network.
234    ///
235    /// The [`NetworkManager`] is an endless future that needs to be polled in order to advance the
236    /// state of the entire network.
237    pub async fn new<C: BlockNumReader + 'static>(
238        config: NetworkConfig<C, N>,
239    ) -> Result<Self, NetworkError> {
240        let NetworkConfig {
241            client,
242            secret_key,
243            discovery_v4_addr,
244            mut discovery_v4_config,
245            mut discovery_v5_config,
246            listener_addr,
247            peers_config,
248            sessions_config,
249            chain_id,
250            block_import,
251            network_mode,
252            boot_nodes,
253            executor,
254            hello_message,
255            status,
256            fork_filter,
257            dns_discovery_config,
258            extra_protocols,
259            tx_gossip_disabled,
260            transactions_manager_config: _,
261            nat,
262            handshake,
263            eth_max_message_size,
264            required_block_hashes,
265        } = config;
266
267        let peers_manager = PeersManager::new(peers_config);
268        let peers_handle = peers_manager.handle();
269
270        let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
271            NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
272        })?;
273
274        // retrieve the tcp address of the socket
275        let listener_addr = incoming.local_address();
276
277        // resolve boot nodes
278        let resolved_boot_nodes =
279            futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
280
281        if let Some(disc_config) = discovery_v4_config.as_mut() {
282            // merge configured boot nodes
283            disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
284            // add the forkid entry for EIP-868, but wrap it in an `EnrForkIdEntry` for proper
285            // encoding
286            disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
287        }
288
289        if let Some(discv5) = discovery_v5_config.as_mut() {
290            // merge configured boot nodes
291            discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
292        }
293
294        let discovery = Discovery::new(
295            listener_addr,
296            discovery_v4_addr,
297            secret_key,
298            discovery_v4_config,
299            discovery_v5_config,
300            dns_discovery_config,
301        )
302        .await?;
303        // need to retrieve the addr here since provided port could be `0`
304        let local_peer_id = discovery.local_id();
305        let discv4 = discovery.discv4();
306        let discv5 = discovery.discv5();
307
308        let num_active_peers = Arc::new(AtomicUsize::new(0));
309
310        let sessions = SessionManager::new(
311            secret_key,
312            sessions_config,
313            executor,
314            status,
315            hello_message,
316            fork_filter,
317            extra_protocols,
318            handshake,
319            eth_max_message_size,
320            network_mode.is_stake(),
321        );
322
323        let state = NetworkState::new(
324            crate::state::BlockNumReader::new(client),
325            discovery,
326            peers_manager,
327            Arc::clone(&num_active_peers),
328        );
329
330        let swarm = Swarm::new(incoming, sessions, state);
331
332        let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
333
334        let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
335
336        let handle = NetworkHandle::new(
337            Arc::clone(&num_active_peers),
338            Arc::new(Mutex::new(listener_addr)),
339            to_manager_tx,
340            secret_key,
341            local_peer_id,
342            peers_handle,
343            network_mode,
344            Arc::new(AtomicU64::new(chain_id)),
345            tx_gossip_disabled,
346            discv4,
347            discv5,
348            event_sender.clone(),
349            nat,
350        );
351
352        // Spawn required block peer filter if configured
353        if !required_block_hashes.is_empty() {
354            let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
355            filter.spawn();
356        }
357
358        Ok(Self {
359            swarm,
360            handle,
361            from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
362            block_import,
363            event_sender,
364            to_transactions_manager: None,
365            to_eth_request_handler: None,
366            num_active_peers,
367            metrics: Default::default(),
368            disconnect_metrics: Default::default(),
369            closed_sessions_metrics: Default::default(),
370            pending_session_failure_metrics: Default::default(),
371            backed_off_peers_metrics: Default::default(),
372        })
373    }
374
375    /// Create a new [`NetworkManager`] instance and start a [`NetworkBuilder`] to configure all
376    /// components of the network
377    ///
378    /// ```
379    /// use reth_network::{
380    ///     config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
381    /// };
382    /// use reth_network_peers::mainnet_nodes;
383    /// use reth_storage_api::noop::NoopProvider;
384    /// use reth_tasks::Runtime;
385    /// use reth_transaction_pool::TransactionPool;
386    /// async fn launch<Pool: TransactionPool>(pool: Pool) {
387    ///     // This block provider implementation is used for testing purposes.
388    ///     let client = NoopProvider::default();
389    ///
390    ///     // The key that's used for encrypting sessions and to identify our node.
391    ///     let local_key = rng_secret_key();
392    ///
393    ///     let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key, Runtime::test())
394    ///         .boot_nodes(mainnet_nodes())
395    ///         .build(client.clone());
396    ///     let transactions_manager_config = config.transactions_manager_config.clone();
397    ///
398    ///     // create the network instance
399    ///     let (handle, network, transactions, request_handler) = NetworkManager::builder(config)
400    ///         .await
401    ///         .unwrap()
402    ///         .transactions(pool, transactions_manager_config)
403    ///         .request_handler(client)
404    ///         .split_with_handle();
405    /// }
406    /// ```
407    pub async fn builder<C: BlockNumReader + 'static>(
408        config: NetworkConfig<C, N>,
409    ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
410        let network = Self::new(config).await?;
411        Ok(network.into_builder())
412    }
413
414    /// Create a [`NetworkBuilder`] to configure all components of the network
415    pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
416        NetworkBuilder { network: self, transactions: (), request_handler: () }
417    }
418
419    /// Returns the [`SocketAddr`] that listens for incoming tcp connections.
420    pub const fn local_addr(&self) -> SocketAddr {
421        self.swarm.listener().local_address()
422    }
423
424    /// How many peers we're currently connected to.
425    pub fn num_connected_peers(&self) -> usize {
426        self.swarm.state().num_active_peers()
427    }
428
429    /// Returns the [`PeerId`] used in the network.
430    pub fn peer_id(&self) -> &PeerId {
431        self.handle.peer_id()
432    }
433
434    /// Returns an iterator over all peers in the peer set.
435    pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
436        self.swarm.peers().iter_peers()
437    }
438
439    /// Returns the number of peers in the peer set.
440    pub fn num_known_peers(&self) -> usize {
441        self.swarm.peers().num_known_peers()
442    }
443
444    /// Returns a new [`PeersHandle`] that can be cloned and shared.
445    ///
446    /// The [`PeersHandle`] can be used to interact with the network's peer set.
447    pub fn peers_handle(&self) -> PeersHandle {
448        self.swarm.peers().handle()
449    }
450
451    /// Collect the peers from the [`NetworkManager`] and write them to the given
452    /// `persistent_peers_file`.
453    ///
454    /// Only persists peers that are not currently backed off or banned. Includes metadata like
455    /// peer kind, fork ID, and reputation.
456    pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
457        let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
458        persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
459        reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
460        Ok(())
461    }
462
463    /// Returns a new [`FetchClient`] that can be cloned and shared.
464    ///
465    /// The [`FetchClient`] is the entrypoint for sending requests to the network.
466    pub fn fetch_client(&self) -> FetchClient<N> {
467        self.swarm.state().fetch_client()
468    }
469
470    /// Returns the current [`NetworkStatus`] for the local node.
471    pub fn status(&self) -> NetworkStatus {
472        let sessions = self.swarm.sessions();
473        let status = sessions.status();
474        let hello_message = sessions.hello_message();
475
476        #[expect(deprecated)]
477        NetworkStatus {
478            client_version: hello_message.client_version,
479            protocol_version: hello_message.protocol_version as u64,
480            eth_protocol_info: EthProtocolInfo {
481                difficulty: None,
482                head: status.blockhash,
483                network: status.chain.id(),
484                genesis: status.genesis,
485                config: Default::default(),
486            },
487            capabilities: hello_message
488                .protocols
489                .into_iter()
490                .map(|protocol| protocol.cap)
491                .collect(),
492        }
493    }
494
495    /// Sends an event to the [`TransactionsManager`](crate::transactions::TransactionsManager) if
496    /// configured.
497    fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
498        if let Some(ref tx) = self.to_transactions_manager &&
499            let Err(e) = tx.try_send(event)
500        {
501            match e {
502                TrySendError::Full(_) => {
503                    trace!(target: "net", "Transaction events channel at capacity, dropping event");
504                    self.metrics.total_dropped_tx_events_at_full_capacity.increment(1);
505                }
506                TrySendError::Closed(_) => {}
507            }
508        }
509    }
510
511    /// Sends an event to the [`EthRequestManager`](crate::eth_requests::EthRequestHandler) if
512    /// configured.
513    fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
514        if let Some(ref reqs) = self.to_eth_request_handler {
515            let _ = reqs.try_send(event).map_err(|e| {
516                if let TrySendError::Full(_) = e {
517                    debug!(target:"net", "EthRequestHandler channel is full!");
518                    self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
519                }
520            });
521        }
522    }
523
524    /// Handle an incoming request from the peer
525    fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
526        match req {
527            PeerRequest::GetBlockHeaders { request, response } => {
528                self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
529                    peer_id,
530                    request,
531                    response,
532                })
533            }
534            PeerRequest::GetBlockBodies { request, response } => {
535                self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
536                    peer_id,
537                    request,
538                    response,
539                })
540            }
541            PeerRequest::GetNodeData { request, response } => {
542                self.delegate_eth_request(IncomingEthRequest::GetNodeData {
543                    peer_id,
544                    request,
545                    response,
546                })
547            }
548            PeerRequest::GetReceipts { request, response } => {
549                self.delegate_eth_request(IncomingEthRequest::GetReceipts {
550                    peer_id,
551                    request,
552                    response,
553                })
554            }
555            PeerRequest::GetReceipts69 { request, response } => {
556                self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
557                    peer_id,
558                    request,
559                    response,
560                })
561            }
562            PeerRequest::GetReceipts70 { request, response } => {
563                self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
564                    peer_id,
565                    request,
566                    response,
567                })
568            }
569            PeerRequest::GetBlockAccessLists { request, response } => {
570                self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
571                    peer_id,
572                    request,
573                    response,
574                })
575            }
576            PeerRequest::GetCells { request, response } => self
577                .delegate_eth_request(IncomingEthRequest::GetCells { peer_id, request, response }),
578            PeerRequest::GetPooledTransactions { request, response } => {
579                self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
580                    peer_id,
581                    request,
582                    response,
583                });
584            }
585        }
586    }
587
588    /// Invoked after a `NewBlock` message from the peer was validated
589    fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
590        match event {
591            BlockImportEvent::Announcement(validation) => match validation {
592                BlockValidation::ValidHeader { block } => {
593                    self.swarm.state_mut().announce_new_block(block);
594                }
595                BlockValidation::ValidBlock { block } => {
596                    self.swarm.state_mut().announce_new_block_hash(block);
597                }
598            },
599            BlockImportEvent::Outcome(outcome) => {
600                let BlockImportOutcome { peer, result } = outcome;
601                match result {
602                    Ok(validated_block) => match validated_block {
603                        BlockValidation::ValidHeader { block } => {
604                            self.swarm.state_mut().update_peer_block(
605                                &peer,
606                                block.hash,
607                                block.number(),
608                            );
609                            self.swarm.state_mut().announce_new_block(block);
610                        }
611                        BlockValidation::ValidBlock { block } => {
612                            self.swarm.state_mut().announce_new_block_hash(block);
613                        }
614                    },
615                    Err(_err) => {
616                        self.swarm
617                            .state_mut()
618                            .peers_mut()
619                            .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
620                    }
621                }
622            }
623        }
624    }
625
626    /// Enforces [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) consensus rules for the network protocol
627    ///
628    /// Depending on the mode of the network:
629    ///    - disconnect peer if in POS
630    ///    - execute the closure if in POW
631    fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
632    where
633        F: FnOnce(&mut Self),
634    {
635        // reject message in POS
636        if self.handle.mode().is_stake() {
637            // connections to peers which send invalid messages should be terminated
638            self.swarm
639                .sessions_mut()
640                .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
641        } else {
642            only_pow(self);
643        }
644    }
645
646    /// Handles a received Message from the peer's session.
647    fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
648        match msg {
649            PeerMessage::NewBlockHashes(hashes) => {
650                self.within_pow_or_disconnect(peer_id, |this| {
651                    // update peer's state, to track what blocks this peer has seen
652                    this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.to_vec());
653                    // start block import process for the hashes
654                    this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
655                })
656            }
657            PeerMessage::NewBlock(block) => {
658                self.within_pow_or_disconnect(peer_id, move |this| {
659                    this.swarm.state_mut().on_new_block(peer_id, block.hash);
660                    // start block import process
661                    this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
662                });
663            }
664            PeerMessage::PooledTransactions(msg) => {
665                self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
666                    peer_id,
667                    msg,
668                });
669            }
670            PeerMessage::EthRequest(req) => {
671                self.on_eth_request(peer_id, req);
672            }
673            PeerMessage::ReceivedTransaction(msg) => {
674                self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
675                    peer_id,
676                    msg,
677                });
678            }
679            PeerMessage::SendTransactions(_) => {
680                unreachable!("Not emitted by session")
681            }
682            PeerMessage::BlockRangeUpdated(_) => {}
683            PeerMessage::Other(other) => {
684                debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
685            }
686        }
687    }
688
689    /// Handler for received messages from a handle
690    fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
691        match msg {
692            NetworkHandleMessage::DiscoveryListener(tx) => {
693                self.swarm.state_mut().discovery_mut().add_listener(tx);
694            }
695            NetworkHandleMessage::AnnounceBlock(block, hash) => {
696                if self.handle.mode().is_stake() {
697                    // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
698                    warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
699                    return
700                }
701                let msg = NewBlockMessage { hash, block: Arc::new(block) };
702                self.swarm.state_mut().announce_new_block(msg);
703            }
704            NetworkHandleMessage::EthRequest { peer_id, request } => {
705                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
706            }
707            NetworkHandleMessage::SendTransaction { peer_id, msg } => {
708                self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
709            }
710            NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
711                .swarm
712                .sessions_mut()
713                .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
714            NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
715                self.swarm.state_mut().add_trusted_peer_id(peer_id);
716            }
717            NetworkHandleMessage::AddTrustedPeerNode(trusted_peer) => {
718                if !self.swarm.is_shutting_down() {
719                    self.swarm.state_mut().add_trusted_peer_node(trusted_peer);
720                }
721            }
722            NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
723                // only add peer if we are not shutting down
724                if !self.swarm.is_shutting_down() {
725                    self.swarm.state_mut().add_peer_kind(peer, kind, addr);
726                }
727            }
728            NetworkHandleMessage::RemovePeer(peer_id, kind) => {
729                self.swarm.state_mut().remove_peer_kind(peer_id, kind);
730            }
731            NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
732                self.swarm.sessions_mut().disconnect(peer_id, reason);
733            }
734            NetworkHandleMessage::BanPeer(peer_id) => {
735                self.swarm.peers_mut().ban_peer_by_admin(peer_id);
736            }
737            NetworkHandleMessage::UnbanPeer(peer_id) => {
738                self.swarm.peers_mut().unban_peer_by_admin(peer_id);
739            }
740            NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
741                self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
742            }
743            NetworkHandleMessage::SetNetworkState(net_state) => {
744                // Sets network connection state between Active and Hibernate.
745                // If hibernate stops the node to fill new outbound
746                // connections, this is beneficial for sync stages that do not require a network
747                // connection.
748                self.swarm.on_network_state_change(net_state);
749            }
750
751            NetworkHandleMessage::Shutdown(tx) => {
752                self.perform_network_shutdown();
753                let _ = tx.send(());
754            }
755            NetworkHandleMessage::ReputationChange(peer_id, kind) => {
756                self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
757            }
758            NetworkHandleMessage::GetReputationById(peer_id, tx) => {
759                let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
760            }
761            NetworkHandleMessage::FetchClient(tx) => {
762                let _ = tx.send(self.fetch_client());
763            }
764            NetworkHandleMessage::GetStatus(tx) => {
765                let _ = tx.send(self.status());
766            }
767            NetworkHandleMessage::StatusUpdate { head } => {
768                if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
769                    self.swarm.state_mut().update_fork_id(transition.current);
770                }
771            }
772            NetworkHandleMessage::GetPeerInfos(tx) => {
773                let _ = tx.send(self.get_peer_infos());
774            }
775            NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
776                let _ = tx.send(self.get_peer_info_by_id(peer_id));
777            }
778            NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
779                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
780            }
781            NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
782                let peer_ids = self.swarm.peers().peers_by_kind(kind);
783                let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
784            }
785            NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
786            NetworkHandleMessage::GetTransactionsHandle(tx) => {
787                if let Some(ref tx_inner) = self.to_transactions_manager {
788                    let _ = tx_inner.try_send(NetworkTransactionEvent::GetTransactionsHandle(tx));
789                } else {
790                    let _ = tx.send(None);
791                }
792            }
793            NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
794                self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
795            }
796            NetworkHandleMessage::EthMessage { peer_id, message } => {
797                self.swarm.sessions_mut().send_message(&peer_id, message)
798            }
799        }
800    }
801
802    fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
803        // handle event
804        match event {
805            SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
806            SwarmEvent::TcpListenerClosed { remote_addr } => {
807                trace!(target: "net", ?remote_addr, "TCP listener closed.");
808            }
809            SwarmEvent::TcpListenerError(err) => {
810                trace!(target: "net", %err, "TCP connection error.");
811            }
812            SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
813                trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
814                self.metrics.total_incoming_connections.increment(1);
815                self.metrics
816                    .incoming_connections
817                    .set(self.swarm.peers().num_inbound_connections() as f64);
818            }
819            SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
820                trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
821                self.metrics.total_outgoing_connections.increment(1);
822                self.update_pending_connection_metrics()
823            }
824            SwarmEvent::SessionEstablished {
825                peer_id,
826                remote_addr,
827                client_version,
828                capabilities,
829                version,
830                messages,
831                status,
832                direction,
833            } => {
834                let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
835                self.metrics.connected_peers.set(total_active as f64);
836                debug!(
837                    target: "net",
838                    ?remote_addr,
839                    %client_version,
840                    ?peer_id,
841                    ?total_active,
842                    kind=%direction,
843                    peer_enode=%NodeRecord::new(remote_addr, peer_id),
844                    "Session established"
845                );
846
847                if direction.is_incoming() {
848                    self.swarm
849                        .state_mut()
850                        .peers_mut()
851                        .on_incoming_session_established(peer_id, remote_addr);
852                }
853
854                if direction.is_outgoing() {
855                    self.swarm.peers_mut().on_active_outgoing_established(peer_id);
856                }
857
858                self.update_active_connection_metrics();
859
860                let peer_kind = self
861                    .swarm
862                    .state()
863                    .peers()
864                    .peer_by_id(peer_id)
865                    .map(|(_, kind)| kind)
866                    .unwrap_or_default();
867                let session_info = SessionInfo {
868                    peer_id,
869                    remote_addr,
870                    client_version,
871                    capabilities,
872                    status,
873                    version,
874                    peer_kind,
875                };
876
877                self.event_sender
878                    .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
879            }
880            SwarmEvent::PeerAdded(peer_id) => {
881                trace!(target: "net", ?peer_id, "Peer added");
882                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
883                self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
884            }
885            SwarmEvent::PeerRemoved(peer_id) => {
886                trace!(target: "net", ?peer_id, "Peer dropped");
887                self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
888                self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
889            }
890            SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
891                let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
892                self.metrics.connected_peers.set(total_active as f64);
893                trace!(
894                    target: "net",
895                    ?remote_addr,
896                    ?peer_id,
897                    ?total_active,
898                    ?error,
899                    "Session disconnected"
900                );
901
902                // Capture direction before state is reset to Idle
903                let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
904
905                let reason = if let Some(ref err) = error {
906                    // If the connection was closed due to an error, we report
907                    // the peer
908                    self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
909                    self.backed_off_peers_metrics.increment_for_reason(
910                        BackoffReason::from_disconnect(err.as_disconnected()),
911                    );
912                    err.as_disconnected()
913                } else {
914                    // Gracefully disconnected
915                    self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
916                    self.backed_off_peers_metrics
917                        .increment_for_reason(BackoffReason::GracefulClose);
918                    None
919                };
920                self.closed_sessions_metrics.active.increment(1);
921                self.update_active_connection_metrics();
922
923                if let Some(reason) = reason {
924                    if is_inbound {
925                        self.disconnect_metrics.increment_inbound(reason);
926                    } else {
927                        self.disconnect_metrics.increment_outbound(reason);
928                    }
929                }
930                self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
931                self.event_sender
932                    .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
933            }
934            SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
935                trace!(
936                    target: "net",
937                    ?remote_addr,
938                    ?error,
939                    "Incoming pending session failed"
940                );
941
942                if let Some(ref err) = error {
943                    self.swarm
944                        .state_mut()
945                        .peers_mut()
946                        .on_incoming_pending_session_dropped(remote_addr, err);
947                    self.pending_session_failure_metrics.inbound.increment(1);
948                    if let Some(reason) = err.as_disconnected() {
949                        self.disconnect_metrics.increment_inbound(reason);
950                    }
951                } else {
952                    self.swarm
953                        .state_mut()
954                        .peers_mut()
955                        .on_incoming_pending_session_gracefully_closed();
956                }
957                self.closed_sessions_metrics.incoming_pending.increment(1);
958                self.metrics
959                    .incoming_connections
960                    .set(self.swarm.peers().num_inbound_connections() as f64);
961            }
962            SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
963                trace!(
964                    target: "net",
965                    ?remote_addr,
966                    ?peer_id,
967                    ?error,
968                    "Outgoing pending session failed"
969                );
970
971                if let Some(ref err) = error {
972                    self.swarm.peers_mut().on_outgoing_pending_session_dropped(
973                        &remote_addr,
974                        &peer_id,
975                        err,
976                    );
977                    self.pending_session_failure_metrics.outbound.increment(1);
978                    self.backed_off_peers_metrics.increment_for_reason(
979                        BackoffReason::from_disconnect(err.as_disconnected()),
980                    );
981                    if let Some(reason) = err.as_disconnected() {
982                        self.disconnect_metrics.increment_outbound(reason);
983                    }
984                } else {
985                    self.swarm
986                        .state_mut()
987                        .peers_mut()
988                        .on_outgoing_pending_session_gracefully_closed(&peer_id);
989                }
990                self.closed_sessions_metrics.outgoing_pending.increment(1);
991                self.update_pending_connection_metrics();
992                self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
993            }
994            SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
995                trace!(
996                    target: "net",
997                    ?remote_addr,
998                    ?peer_id,
999                    %error,
1000                    "Outgoing connection error"
1001                );
1002
1003                self.swarm.peers_mut().on_outgoing_connection_failure(
1004                    &remote_addr,
1005                    &peer_id,
1006                    &error,
1007                );
1008
1009                self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
1010                self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
1011                self.update_pending_connection_metrics();
1012            }
1013            SwarmEvent::BadMessage { peer_id } => {
1014                self.swarm
1015                    .state_mut()
1016                    .peers_mut()
1017                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
1018                self.metrics.invalid_messages_received.increment(1);
1019            }
1020            SwarmEvent::ProtocolBreach { peer_id } => {
1021                self.swarm
1022                    .state_mut()
1023                    .peers_mut()
1024                    .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
1025            }
1026        }
1027    }
1028
1029    /// Returns [`PeerInfo`] for all connected peers
1030    fn get_peer_infos(&self) -> Vec<PeerInfo> {
1031        self.swarm
1032            .sessions()
1033            .active_sessions()
1034            .iter()
1035            .filter_map(|(&peer_id, session)| {
1036                self.swarm
1037                    .state()
1038                    .peers()
1039                    .peer_by_id(peer_id)
1040                    .map(|(record, kind)| session.peer_info(&record, kind))
1041            })
1042            .collect()
1043    }
1044
1045    /// Returns [`PeerInfo`] for a given peer.
1046    ///
1047    /// Returns `None` if there's no active session to the peer.
1048    fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1049        self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1050            self.swarm
1051                .state()
1052                .peers()
1053                .peer_by_id(peer_id)
1054                .map(|(record, kind)| session.peer_info(&record, kind))
1055        })
1056    }
1057
1058    /// Returns [`PeerInfo`] for a given peers.
1059    ///
1060    /// Ignore the non-active peer.
1061    fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1062        peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1063    }
1064
1065    /// Updates the metrics for active,established connections
1066    #[inline]
1067    fn update_active_connection_metrics(&self) {
1068        self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
1069        self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
1070    }
1071
1072    /// Updates the metrics for pending connections
1073    #[inline]
1074    fn update_pending_connection_metrics(&self) {
1075        self.metrics
1076            .pending_outgoing_connections
1077            .set(self.swarm.peers().num_pending_outbound_connections() as f64);
1078        self.metrics
1079            .total_pending_connections
1080            .set(self.swarm.sessions().num_pending_connections() as f64);
1081    }
1082
1083    /// Drives the [`NetworkManager`] future until a [`GracefulShutdown`] signal is received.
1084    ///
1085    /// This invokes the given function `shutdown_hook` while holding the graceful shutdown guard.
1086    pub async fn run_until_graceful_shutdown<F, R>(
1087        mut self,
1088        shutdown: GracefulShutdown,
1089        shutdown_hook: F,
1090    ) -> R
1091    where
1092        F: FnOnce(Self) -> R,
1093    {
1094        let mut graceful_guard = None;
1095        tokio::select! {
1096            _ = &mut self => {},
1097            guard = shutdown => {
1098                graceful_guard = Some(guard);
1099            },
1100        }
1101
1102        self.perform_network_shutdown();
1103        let res = shutdown_hook(self);
1104        drop(graceful_guard);
1105        res
1106    }
1107
1108    /// Performs a graceful network shutdown by stopping new connections from being accepted while
1109    /// draining current and pending connections.
1110    fn perform_network_shutdown(&mut self) {
1111        // Set connection status to `Shutdown`. Stops node from accepting
1112        // new incoming connections as well as sending connection requests to newly
1113        // discovered nodes.
1114        self.swarm.on_shutdown_requested();
1115        // Disconnect all active connections
1116        self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1117        // drop pending connections
1118        self.swarm.sessions_mut().disconnect_all_pending();
1119    }
1120}
1121
1122impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1123    type Output = ();
1124
1125    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1126        let start = Instant::now();
1127        let mut poll_durations = NetworkManagerPollDurations::default();
1128
1129        let this = self.get_mut();
1130
1131        // poll new block imports (expected to be a noop for POS)
1132        while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1133            this.on_block_import_result(outcome);
1134        }
1135
1136        // These loops drive the entire state of network and does a lot of work. Under heavy load
1137        // (many messages/events), data may arrive faster than it can be processed (incoming
1138        // messages/requests -> events), and it is possible that more data has already arrived by
1139        // the time an internal event is processed. Which could turn this loop into a busy loop.
1140        // Without yielding back to the executor, it can starve other tasks waiting on that
1141        // executor to execute them, or drive underlying resources To prevent this, we
1142        // preemptively return control when the `budget` is exhausted. The value itself is chosen
1143        // somewhat arbitrarily, it is high enough so the swarm can make meaningful progress but
1144        // low enough that this loop does not starve other tasks for too long. If the budget is
1145        // exhausted we manually yield back control to the (coop) scheduler. This manual yield
1146        // point should prevent situations where polling appears to be frozen. See also
1147        // <https://tokio.rs/blog/2020-04-preemption> And tokio's docs on cooperative scheduling
1148        // <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
1149        //
1150        // Testing has shown that this loop naturally reaches the pending state within 1-5
1151        // iterations in << 100µs in most cases. On average it requires ~50µs, which is inside the
1152        // range of what's recommended as rule of thumb.
1153        // <https://ryhl.io/blog/async-what-is-blocking/>
1154
1155        // process incoming messages from a handle (`TransactionsManager` has one)
1156        //
1157        // will only be closed if the channel was deliberately closed since we always have an
1158        // instance of `NetworkHandle`
1159        let start_network_handle = Instant::now();
1160        let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1161            "net",
1162            "Network message channel",
1163            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1164            this.from_handle_rx.poll_next_unpin(cx),
1165            |msg| this.on_handle_message(msg),
1166            error!("Network channel closed");
1167        );
1168        poll_durations.acc_network_handle = start_network_handle.elapsed();
1169
1170        // process incoming messages from the network
1171        let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1172            "net",
1173            "Swarm events stream",
1174            DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1175            this.swarm.poll_next_unpin(cx),
1176            |event| this.on_swarm_event(event),
1177        );
1178        poll_durations.acc_swarm =
1179            start_network_handle.elapsed() - poll_durations.acc_network_handle;
1180
1181        // all streams are fully drained and import futures pending
1182        if maybe_more_handle_messages || maybe_more_swarm_events {
1183            // make sure we're woken up again
1184            cx.waker().wake_by_ref();
1185            return Poll::Pending
1186        }
1187
1188        this.update_poll_metrics(start, poll_durations);
1189
1190        Poll::Pending
1191    }
1192}
1193
1194#[derive(Debug, Default)]
1195struct NetworkManagerPollDurations {
1196    acc_network_handle: Duration,
1197    acc_swarm: Duration,
1198}