1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{
28 BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
29 PendingSessionFailureMetrics,
30 },
31 network::{NetworkHandle, NetworkHandleMessage},
32 peers::{BackoffReason, PeersManager},
33 poll_nested_stream_with_budget,
34 protocol::IntoRlpxSubProtocol,
35 required_block_filter::RequiredBlockFilter,
36 session::SessionManager,
37 state::NetworkState,
38 swarm::{Swarm, SwarmEvent},
39 transactions::NetworkTransactionEvent,
40 FetchClient, NetworkBuilder,
41};
42use futures::{Future, StreamExt};
43use parking_lot::Mutex;
44use reth_chainspec::EnrForkIdEntry;
45use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
46use reth_fs_util::{self as fs, FsPathError};
47use reth_metrics::common::mpsc::MemoryBoundedSender;
48use reth_network_api::{
49 events::{PeerEvent, SessionInfo},
50 test_utils::PeersHandle,
51 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
52};
53use reth_network_peers::{NodeRecord, PeerId};
54use reth_network_types::ReputationChangeKind;
55use reth_storage_api::BlockNumReader;
56use reth_tasks::shutdown::GracefulShutdown;
57use reth_tokio_util::EventSender;
58use secp256k1::SecretKey;
59use std::{
60 net::SocketAddr,
61 path::Path,
62 pin::Pin,
63 sync::{
64 atomic::{AtomicU64, AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::mpsc::{self, error::TrySendError};
71use tokio_stream::wrappers::UnboundedReceiverStream;
72use tracing::{debug, error, trace, warn};
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
75#[derive(Debug)]
107#[must_use = "The NetworkManager does nothing unless polled"]
108pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
109 swarm: Swarm<N>,
111 handle: NetworkHandle<N>,
113 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
115 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
117 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
119 to_transactions_manager: Option<MemoryBoundedSender<NetworkTransactionEvent<N>>>,
122 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
136 num_active_peers: Arc<AtomicUsize>,
141 metrics: NetworkMetrics,
143 disconnect_metrics: DirectionalDisconnectMetrics,
145 closed_sessions_metrics: ClosedSessionsMetrics,
147 pending_session_failure_metrics: PendingSessionFailureMetrics,
149 backed_off_peers_metrics: BackedOffPeersMetrics,
151}
152
153impl NetworkManager {
154 pub async fn eth<C: BlockNumReader + 'static>(
167 config: NetworkConfig<C, EthNetworkPrimitives>,
168 ) -> Result<Self, NetworkError> {
169 Self::new(config).await
170 }
171}
172
173impl<N: NetworkPrimitives> NetworkManager<N> {
174 pub fn with_transactions(
177 mut self,
178 tx: MemoryBoundedSender<NetworkTransactionEvent<N>>,
179 ) -> Self {
180 self.set_transactions(tx);
181 self
182 }
183
184 pub fn set_transactions(&mut self, tx: MemoryBoundedSender<NetworkTransactionEvent<N>>) {
187 self.to_transactions_manager = Some(tx);
188 }
189
190 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
193 self.set_eth_request_handler(tx);
194 self
195 }
196
197 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
200 self.to_eth_request_handler = Some(tx);
201 }
202
203 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
205 self.swarm.add_rlpx_sub_protocol(protocol)
206 }
207
208 pub const fn handle(&self) -> &NetworkHandle<N> {
212 &self.handle
213 }
214
215 pub const fn secret_key(&self) -> SecretKey {
217 self.swarm.sessions().secret_key()
218 }
219
220 #[inline]
221 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
222 let metrics = &self.metrics;
223
224 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
225
226 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
228 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
230 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
231 }
232
233 pub async fn new<C: BlockNumReader + 'static>(
238 config: NetworkConfig<C, N>,
239 ) -> Result<Self, NetworkError> {
240 let NetworkConfig {
241 client,
242 secret_key,
243 discovery_v4_addr,
244 mut discovery_v4_config,
245 mut discovery_v5_config,
246 listener_addr,
247 peers_config,
248 sessions_config,
249 chain_id,
250 block_import,
251 network_mode,
252 boot_nodes,
253 executor,
254 hello_message,
255 status,
256 fork_filter,
257 dns_discovery_config,
258 extra_protocols,
259 tx_gossip_disabled,
260 transactions_manager_config: _,
261 nat,
262 handshake,
263 eth_max_message_size,
264 required_block_hashes,
265 } = config;
266
267 let peers_manager = PeersManager::new(peers_config);
268 let peers_handle = peers_manager.handle();
269
270 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
271 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
272 })?;
273
274 let listener_addr = incoming.local_address();
276
277 let resolved_boot_nodes =
279 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
280
281 if let Some(disc_config) = discovery_v4_config.as_mut() {
282 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
284 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
287 }
288
289 if let Some(discv5) = discovery_v5_config.as_mut() {
290 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
292 }
293
294 let discovery = Discovery::new(
295 listener_addr,
296 discovery_v4_addr,
297 secret_key,
298 discovery_v4_config,
299 discovery_v5_config,
300 dns_discovery_config,
301 )
302 .await?;
303 let local_peer_id = discovery.local_id();
305 let discv4 = discovery.discv4();
306 let discv5 = discovery.discv5();
307
308 let num_active_peers = Arc::new(AtomicUsize::new(0));
309
310 let sessions = SessionManager::new(
311 secret_key,
312 sessions_config,
313 executor,
314 status,
315 hello_message,
316 fork_filter,
317 extra_protocols,
318 handshake,
319 eth_max_message_size,
320 network_mode.is_stake(),
321 );
322
323 let state = NetworkState::new(
324 crate::state::BlockNumReader::new(client),
325 discovery,
326 peers_manager,
327 Arc::clone(&num_active_peers),
328 );
329
330 let swarm = Swarm::new(incoming, sessions, state);
331
332 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
333
334 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
335
336 let handle = NetworkHandle::new(
337 Arc::clone(&num_active_peers),
338 Arc::new(Mutex::new(listener_addr)),
339 to_manager_tx,
340 secret_key,
341 local_peer_id,
342 peers_handle,
343 network_mode,
344 Arc::new(AtomicU64::new(chain_id)),
345 tx_gossip_disabled,
346 discv4,
347 discv5,
348 event_sender.clone(),
349 nat,
350 );
351
352 if !required_block_hashes.is_empty() {
354 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
355 filter.spawn();
356 }
357
358 Ok(Self {
359 swarm,
360 handle,
361 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
362 block_import,
363 event_sender,
364 to_transactions_manager: None,
365 to_eth_request_handler: None,
366 num_active_peers,
367 metrics: Default::default(),
368 disconnect_metrics: Default::default(),
369 closed_sessions_metrics: Default::default(),
370 pending_session_failure_metrics: Default::default(),
371 backed_off_peers_metrics: Default::default(),
372 })
373 }
374
375 pub async fn builder<C: BlockNumReader + 'static>(
408 config: NetworkConfig<C, N>,
409 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
410 let network = Self::new(config).await?;
411 Ok(network.into_builder())
412 }
413
414 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
416 NetworkBuilder { network: self, transactions: (), request_handler: () }
417 }
418
419 pub const fn local_addr(&self) -> SocketAddr {
421 self.swarm.listener().local_address()
422 }
423
424 pub fn num_connected_peers(&self) -> usize {
426 self.swarm.state().num_active_peers()
427 }
428
429 pub fn peer_id(&self) -> &PeerId {
431 self.handle.peer_id()
432 }
433
434 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
436 self.swarm.peers().iter_peers()
437 }
438
439 pub fn num_known_peers(&self) -> usize {
441 self.swarm.peers().num_known_peers()
442 }
443
444 pub fn peers_handle(&self) -> PeersHandle {
448 self.swarm.peers().handle()
449 }
450
451 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
457 let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
458 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
459 reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
460 Ok(())
461 }
462
463 pub fn fetch_client(&self) -> FetchClient<N> {
467 self.swarm.state().fetch_client()
468 }
469
470 pub fn status(&self) -> NetworkStatus {
472 let sessions = self.swarm.sessions();
473 let status = sessions.status();
474 let hello_message = sessions.hello_message();
475
476 #[expect(deprecated)]
477 NetworkStatus {
478 client_version: hello_message.client_version,
479 protocol_version: hello_message.protocol_version as u64,
480 eth_protocol_info: EthProtocolInfo {
481 difficulty: None,
482 head: status.blockhash,
483 network: status.chain.id(),
484 genesis: status.genesis,
485 config: Default::default(),
486 },
487 capabilities: hello_message
488 .protocols
489 .into_iter()
490 .map(|protocol| protocol.cap)
491 .collect(),
492 }
493 }
494
495 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
498 if let Some(ref tx) = self.to_transactions_manager &&
499 let Err(e) = tx.try_send(event)
500 {
501 match e {
502 TrySendError::Full(_) => {
503 trace!(target: "net", "Transaction events channel at capacity, dropping event");
504 self.metrics.total_dropped_tx_events_at_full_capacity.increment(1);
505 }
506 TrySendError::Closed(_) => {}
507 }
508 }
509 }
510
511 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
514 if let Some(ref reqs) = self.to_eth_request_handler {
515 let _ = reqs.try_send(event).map_err(|e| {
516 if let TrySendError::Full(_) = e {
517 debug!(target:"net", "EthRequestHandler channel is full!");
518 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
519 }
520 });
521 }
522 }
523
524 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
526 match req {
527 PeerRequest::GetBlockHeaders { request, response } => {
528 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
529 peer_id,
530 request,
531 response,
532 })
533 }
534 PeerRequest::GetBlockBodies { request, response } => {
535 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
536 peer_id,
537 request,
538 response,
539 })
540 }
541 PeerRequest::GetNodeData { request, response } => {
542 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
543 peer_id,
544 request,
545 response,
546 })
547 }
548 PeerRequest::GetReceipts { request, response } => {
549 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
550 peer_id,
551 request,
552 response,
553 })
554 }
555 PeerRequest::GetReceipts69 { request, response } => {
556 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
557 peer_id,
558 request,
559 response,
560 })
561 }
562 PeerRequest::GetReceipts70 { request, response } => {
563 self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
564 peer_id,
565 request,
566 response,
567 })
568 }
569 PeerRequest::GetBlockAccessLists { request, response } => {
570 self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
571 peer_id,
572 request,
573 response,
574 })
575 }
576 PeerRequest::GetCells { request, response } => self
577 .delegate_eth_request(IncomingEthRequest::GetCells { peer_id, request, response }),
578 PeerRequest::GetPooledTransactions { request, response } => {
579 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
580 peer_id,
581 request,
582 response,
583 });
584 }
585 }
586 }
587
588 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
590 match event {
591 BlockImportEvent::Announcement(validation) => match validation {
592 BlockValidation::ValidHeader { block } => {
593 self.swarm.state_mut().announce_new_block(block);
594 }
595 BlockValidation::ValidBlock { block } => {
596 self.swarm.state_mut().announce_new_block_hash(block);
597 }
598 },
599 BlockImportEvent::Outcome(outcome) => {
600 let BlockImportOutcome { peer, result } = outcome;
601 match result {
602 Ok(validated_block) => match validated_block {
603 BlockValidation::ValidHeader { block } => {
604 self.swarm.state_mut().update_peer_block(
605 &peer,
606 block.hash,
607 block.number(),
608 );
609 self.swarm.state_mut().announce_new_block(block);
610 }
611 BlockValidation::ValidBlock { block } => {
612 self.swarm.state_mut().announce_new_block_hash(block);
613 }
614 },
615 Err(_err) => {
616 self.swarm
617 .state_mut()
618 .peers_mut()
619 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
620 }
621 }
622 }
623 }
624 }
625
626 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
632 where
633 F: FnOnce(&mut Self),
634 {
635 if self.handle.mode().is_stake() {
637 self.swarm
639 .sessions_mut()
640 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
641 } else {
642 only_pow(self);
643 }
644 }
645
646 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
648 match msg {
649 PeerMessage::NewBlockHashes(hashes) => {
650 self.within_pow_or_disconnect(peer_id, |this| {
651 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.to_vec());
653 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
655 })
656 }
657 PeerMessage::NewBlock(block) => {
658 self.within_pow_or_disconnect(peer_id, move |this| {
659 this.swarm.state_mut().on_new_block(peer_id, block.hash);
660 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
662 });
663 }
664 PeerMessage::PooledTransactions(msg) => {
665 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
666 peer_id,
667 msg,
668 });
669 }
670 PeerMessage::EthRequest(req) => {
671 self.on_eth_request(peer_id, req);
672 }
673 PeerMessage::ReceivedTransaction(msg) => {
674 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
675 peer_id,
676 msg,
677 });
678 }
679 PeerMessage::SendTransactions(_) => {
680 unreachable!("Not emitted by session")
681 }
682 PeerMessage::BlockRangeUpdated(_) => {}
683 PeerMessage::Other(other) => {
684 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
685 }
686 }
687 }
688
689 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
691 match msg {
692 NetworkHandleMessage::DiscoveryListener(tx) => {
693 self.swarm.state_mut().discovery_mut().add_listener(tx);
694 }
695 NetworkHandleMessage::AnnounceBlock(block, hash) => {
696 if self.handle.mode().is_stake() {
697 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
699 return
700 }
701 let msg = NewBlockMessage { hash, block: Arc::new(block) };
702 self.swarm.state_mut().announce_new_block(msg);
703 }
704 NetworkHandleMessage::EthRequest { peer_id, request } => {
705 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
706 }
707 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
708 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
709 }
710 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
711 .swarm
712 .sessions_mut()
713 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
714 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
715 self.swarm.state_mut().add_trusted_peer_id(peer_id);
716 }
717 NetworkHandleMessage::AddTrustedPeerNode(trusted_peer) => {
718 if !self.swarm.is_shutting_down() {
719 self.swarm.state_mut().add_trusted_peer_node(trusted_peer);
720 }
721 }
722 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
723 if !self.swarm.is_shutting_down() {
725 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
726 }
727 }
728 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
729 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
730 }
731 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
732 self.swarm.sessions_mut().disconnect(peer_id, reason);
733 }
734 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
735 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
736 }
737 NetworkHandleMessage::SetNetworkState(net_state) => {
738 self.swarm.on_network_state_change(net_state);
743 }
744
745 NetworkHandleMessage::Shutdown(tx) => {
746 self.perform_network_shutdown();
747 let _ = tx.send(());
748 }
749 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
750 self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
751 }
752 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
753 let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
754 }
755 NetworkHandleMessage::FetchClient(tx) => {
756 let _ = tx.send(self.fetch_client());
757 }
758 NetworkHandleMessage::GetStatus(tx) => {
759 let _ = tx.send(self.status());
760 }
761 NetworkHandleMessage::StatusUpdate { head } => {
762 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
763 self.swarm.state_mut().update_fork_id(transition.current);
764 }
765 }
766 NetworkHandleMessage::GetPeerInfos(tx) => {
767 let _ = tx.send(self.get_peer_infos());
768 }
769 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
770 let _ = tx.send(self.get_peer_info_by_id(peer_id));
771 }
772 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
773 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
774 }
775 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
776 let peer_ids = self.swarm.peers().peers_by_kind(kind);
777 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
778 }
779 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
780 NetworkHandleMessage::GetTransactionsHandle(tx) => {
781 if let Some(ref tx_inner) = self.to_transactions_manager {
782 let _ = tx_inner.try_send(NetworkTransactionEvent::GetTransactionsHandle(tx));
783 } else {
784 let _ = tx.send(None);
785 }
786 }
787 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
788 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
789 }
790 NetworkHandleMessage::EthMessage { peer_id, message } => {
791 self.swarm.sessions_mut().send_message(&peer_id, message)
792 }
793 }
794 }
795
796 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
797 match event {
799 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
800 SwarmEvent::TcpListenerClosed { remote_addr } => {
801 trace!(target: "net", ?remote_addr, "TCP listener closed.");
802 }
803 SwarmEvent::TcpListenerError(err) => {
804 trace!(target: "net", %err, "TCP connection error.");
805 }
806 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
807 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
808 self.metrics.total_incoming_connections.increment(1);
809 self.metrics
810 .incoming_connections
811 .set(self.swarm.peers().num_inbound_connections() as f64);
812 }
813 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
814 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
815 self.metrics.total_outgoing_connections.increment(1);
816 self.update_pending_connection_metrics()
817 }
818 SwarmEvent::SessionEstablished {
819 peer_id,
820 remote_addr,
821 client_version,
822 capabilities,
823 version,
824 messages,
825 status,
826 direction,
827 } => {
828 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
829 self.metrics.connected_peers.set(total_active as f64);
830 debug!(
831 target: "net",
832 ?remote_addr,
833 %client_version,
834 ?peer_id,
835 ?total_active,
836 kind=%direction,
837 peer_enode=%NodeRecord::new(remote_addr, peer_id),
838 "Session established"
839 );
840
841 if direction.is_incoming() {
842 self.swarm
843 .state_mut()
844 .peers_mut()
845 .on_incoming_session_established(peer_id, remote_addr);
846 }
847
848 if direction.is_outgoing() {
849 self.swarm.peers_mut().on_active_outgoing_established(peer_id);
850 }
851
852 self.update_active_connection_metrics();
853
854 let peer_kind = self
855 .swarm
856 .state()
857 .peers()
858 .peer_by_id(peer_id)
859 .map(|(_, kind)| kind)
860 .unwrap_or_default();
861 let session_info = SessionInfo {
862 peer_id,
863 remote_addr,
864 client_version,
865 capabilities,
866 status,
867 version,
868 peer_kind,
869 };
870
871 self.event_sender
872 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
873 }
874 SwarmEvent::PeerAdded(peer_id) => {
875 trace!(target: "net", ?peer_id, "Peer added");
876 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
877 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
878 }
879 SwarmEvent::PeerRemoved(peer_id) => {
880 trace!(target: "net", ?peer_id, "Peer dropped");
881 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
882 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
883 }
884 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
885 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
886 self.metrics.connected_peers.set(total_active as f64);
887 trace!(
888 target: "net",
889 ?remote_addr,
890 ?peer_id,
891 ?total_active,
892 ?error,
893 "Session disconnected"
894 );
895
896 let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
898
899 let reason = if let Some(ref err) = error {
900 self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
903 self.backed_off_peers_metrics.increment_for_reason(
904 BackoffReason::from_disconnect(err.as_disconnected()),
905 );
906 err.as_disconnected()
907 } else {
908 self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
910 self.backed_off_peers_metrics
911 .increment_for_reason(BackoffReason::GracefulClose);
912 None
913 };
914 self.closed_sessions_metrics.active.increment(1);
915 self.update_active_connection_metrics();
916
917 if let Some(reason) = reason {
918 if is_inbound {
919 self.disconnect_metrics.increment_inbound(reason);
920 } else {
921 self.disconnect_metrics.increment_outbound(reason);
922 }
923 }
924 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
925 self.event_sender
926 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
927 }
928 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
929 trace!(
930 target: "net",
931 ?remote_addr,
932 ?error,
933 "Incoming pending session failed"
934 );
935
936 if let Some(ref err) = error {
937 self.swarm
938 .state_mut()
939 .peers_mut()
940 .on_incoming_pending_session_dropped(remote_addr, err);
941 self.pending_session_failure_metrics.inbound.increment(1);
942 if let Some(reason) = err.as_disconnected() {
943 self.disconnect_metrics.increment_inbound(reason);
944 }
945 } else {
946 self.swarm
947 .state_mut()
948 .peers_mut()
949 .on_incoming_pending_session_gracefully_closed();
950 }
951 self.closed_sessions_metrics.incoming_pending.increment(1);
952 self.metrics
953 .incoming_connections
954 .set(self.swarm.peers().num_inbound_connections() as f64);
955 }
956 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
957 trace!(
958 target: "net",
959 ?remote_addr,
960 ?peer_id,
961 ?error,
962 "Outgoing pending session failed"
963 );
964
965 if let Some(ref err) = error {
966 self.swarm.peers_mut().on_outgoing_pending_session_dropped(
967 &remote_addr,
968 &peer_id,
969 err,
970 );
971 self.pending_session_failure_metrics.outbound.increment(1);
972 self.backed_off_peers_metrics.increment_for_reason(
973 BackoffReason::from_disconnect(err.as_disconnected()),
974 );
975 if let Some(reason) = err.as_disconnected() {
976 self.disconnect_metrics.increment_outbound(reason);
977 }
978 } else {
979 self.swarm
980 .state_mut()
981 .peers_mut()
982 .on_outgoing_pending_session_gracefully_closed(&peer_id);
983 }
984 self.closed_sessions_metrics.outgoing_pending.increment(1);
985 self.update_pending_connection_metrics();
986 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
987 }
988 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
989 trace!(
990 target: "net",
991 ?remote_addr,
992 ?peer_id,
993 %error,
994 "Outgoing connection error"
995 );
996
997 self.swarm.peers_mut().on_outgoing_connection_failure(
998 &remote_addr,
999 &peer_id,
1000 &error,
1001 );
1002
1003 self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
1004 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
1005 self.update_pending_connection_metrics();
1006 }
1007 SwarmEvent::BadMessage { peer_id } => {
1008 self.swarm
1009 .state_mut()
1010 .peers_mut()
1011 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
1012 self.metrics.invalid_messages_received.increment(1);
1013 }
1014 SwarmEvent::ProtocolBreach { peer_id } => {
1015 self.swarm
1016 .state_mut()
1017 .peers_mut()
1018 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
1019 }
1020 }
1021 }
1022
1023 fn get_peer_infos(&self) -> Vec<PeerInfo> {
1025 self.swarm
1026 .sessions()
1027 .active_sessions()
1028 .iter()
1029 .filter_map(|(&peer_id, session)| {
1030 self.swarm
1031 .state()
1032 .peers()
1033 .peer_by_id(peer_id)
1034 .map(|(record, kind)| session.peer_info(&record, kind))
1035 })
1036 .collect()
1037 }
1038
1039 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1043 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1044 self.swarm
1045 .state()
1046 .peers()
1047 .peer_by_id(peer_id)
1048 .map(|(record, kind)| session.peer_info(&record, kind))
1049 })
1050 }
1051
1052 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1056 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1057 }
1058
1059 #[inline]
1061 fn update_active_connection_metrics(&self) {
1062 self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
1063 self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
1064 }
1065
1066 #[inline]
1068 fn update_pending_connection_metrics(&self) {
1069 self.metrics
1070 .pending_outgoing_connections
1071 .set(self.swarm.peers().num_pending_outbound_connections() as f64);
1072 self.metrics
1073 .total_pending_connections
1074 .set(self.swarm.sessions().num_pending_connections() as f64);
1075 }
1076
1077 pub async fn run_until_graceful_shutdown<F, R>(
1081 mut self,
1082 shutdown: GracefulShutdown,
1083 shutdown_hook: F,
1084 ) -> R
1085 where
1086 F: FnOnce(Self) -> R,
1087 {
1088 let mut graceful_guard = None;
1089 tokio::select! {
1090 _ = &mut self => {},
1091 guard = shutdown => {
1092 graceful_guard = Some(guard);
1093 },
1094 }
1095
1096 self.perform_network_shutdown();
1097 let res = shutdown_hook(self);
1098 drop(graceful_guard);
1099 res
1100 }
1101
1102 fn perform_network_shutdown(&mut self) {
1105 self.swarm.on_shutdown_requested();
1109 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1111 self.swarm.sessions_mut().disconnect_all_pending();
1113 }
1114}
1115
1116impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1117 type Output = ();
1118
1119 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1120 let start = Instant::now();
1121 let mut poll_durations = NetworkManagerPollDurations::default();
1122
1123 let this = self.get_mut();
1124
1125 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1127 this.on_block_import_result(outcome);
1128 }
1129
1130 let start_network_handle = Instant::now();
1154 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1155 "net",
1156 "Network message channel",
1157 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1158 this.from_handle_rx.poll_next_unpin(cx),
1159 |msg| this.on_handle_message(msg),
1160 error!("Network channel closed");
1161 );
1162 poll_durations.acc_network_handle = start_network_handle.elapsed();
1163
1164 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1166 "net",
1167 "Swarm events stream",
1168 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1169 this.swarm.poll_next_unpin(cx),
1170 |event| this.on_swarm_event(event),
1171 );
1172 poll_durations.acc_swarm =
1173 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1174
1175 if maybe_more_handle_messages || maybe_more_swarm_events {
1177 cx.waker().wake_by_ref();
1179 return Poll::Pending
1180 }
1181
1182 this.update_poll_metrics(start, poll_durations);
1183
1184 Poll::Pending
1185 }
1186}
1187
1188#[derive(Debug, Default)]
1189struct NetworkManagerPollDurations {
1190 acc_network_handle: Duration,
1191 acc_swarm: Duration,
1192}