1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{
28 BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
29 PendingSessionFailureMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
30 },
31 network::{NetworkHandle, NetworkHandleMessage},
32 peers::{BackoffReason, PeersManager},
33 poll_nested_stream_with_budget,
34 protocol::IntoRlpxSubProtocol,
35 required_block_filter::RequiredBlockFilter,
36 session::SessionManager,
37 state::NetworkState,
38 swarm::{Swarm, SwarmEvent},
39 transactions::NetworkTransactionEvent,
40 FetchClient, NetworkBuilder,
41};
42use futures::{Future, StreamExt};
43use parking_lot::Mutex;
44use reth_chainspec::EnrForkIdEntry;
45use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
46use reth_fs_util::{self as fs, FsPathError};
47use reth_metrics::common::mpsc::UnboundedMeteredSender;
48use reth_network_api::{
49 events::{PeerEvent, SessionInfo},
50 test_utils::PeersHandle,
51 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
52};
53use reth_network_peers::{NodeRecord, PeerId};
54use reth_network_types::ReputationChangeKind;
55use reth_storage_api::BlockNumReader;
56use reth_tasks::shutdown::GracefulShutdown;
57use reth_tokio_util::EventSender;
58use secp256k1::SecretKey;
59use std::{
60 net::SocketAddr,
61 path::Path,
62 pin::Pin,
63 sync::{
64 atomic::{AtomicU64, AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::mpsc::{self, error::TrySendError};
71use tokio_stream::wrappers::UnboundedReceiverStream;
72use tracing::{debug, error, trace, warn};
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
75#[derive(Debug)]
107#[must_use = "The NetworkManager does nothing unless polled"]
108pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
109 swarm: Swarm<N>,
111 handle: NetworkHandle<N>,
113 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
115 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
117 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
119 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
122 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
136 num_active_peers: Arc<AtomicUsize>,
141 metrics: NetworkMetrics,
143 disconnect_metrics: DirectionalDisconnectMetrics,
145 closed_sessions_metrics: ClosedSessionsMetrics,
147 pending_session_failure_metrics: PendingSessionFailureMetrics,
149 backed_off_peers_metrics: BackedOffPeersMetrics,
151}
152
153impl NetworkManager {
154 pub async fn eth<C: BlockNumReader + 'static>(
167 config: NetworkConfig<C, EthNetworkPrimitives>,
168 ) -> Result<Self, NetworkError> {
169 Self::new(config).await
170 }
171}
172
173impl<N: NetworkPrimitives> NetworkManager<N> {
174 pub fn with_transactions(
177 mut self,
178 tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
179 ) -> Self {
180 self.set_transactions(tx);
181 self
182 }
183
184 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
187 self.to_transactions_manager =
188 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
189 }
190
191 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
194 self.set_eth_request_handler(tx);
195 self
196 }
197
198 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
201 self.to_eth_request_handler = Some(tx);
202 }
203
204 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
206 self.swarm.add_rlpx_sub_protocol(protocol)
207 }
208
209 pub const fn handle(&self) -> &NetworkHandle<N> {
213 &self.handle
214 }
215
216 pub const fn secret_key(&self) -> SecretKey {
218 self.swarm.sessions().secret_key()
219 }
220
221 #[inline]
222 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
223 let metrics = &self.metrics;
224
225 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
226
227 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
229 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
231 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
232 }
233
234 pub async fn new<C: BlockNumReader + 'static>(
239 config: NetworkConfig<C, N>,
240 ) -> Result<Self, NetworkError> {
241 let NetworkConfig {
242 client,
243 secret_key,
244 discovery_v4_addr,
245 mut discovery_v4_config,
246 mut discovery_v5_config,
247 listener_addr,
248 peers_config,
249 sessions_config,
250 chain_id,
251 block_import,
252 network_mode,
253 boot_nodes,
254 executor,
255 hello_message,
256 status,
257 fork_filter,
258 dns_discovery_config,
259 extra_protocols,
260 tx_gossip_disabled,
261 transactions_manager_config: _,
262 nat,
263 handshake,
264 required_block_hashes,
265 } = config;
266
267 let peers_manager = PeersManager::new(peers_config);
268 let peers_handle = peers_manager.handle();
269
270 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
271 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
272 })?;
273
274 let listener_addr = incoming.local_address();
276
277 let resolved_boot_nodes =
279 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
280
281 if let Some(disc_config) = discovery_v4_config.as_mut() {
282 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
284 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
287 }
288
289 if let Some(discv5) = discovery_v5_config.as_mut() {
290 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
292 }
293
294 let discovery = Discovery::new(
295 listener_addr,
296 discovery_v4_addr,
297 secret_key,
298 discovery_v4_config,
299 discovery_v5_config,
300 dns_discovery_config,
301 )
302 .await?;
303 let local_peer_id = discovery.local_id();
305 let discv4 = discovery.discv4();
306 let discv5 = discovery.discv5();
307
308 let num_active_peers = Arc::new(AtomicUsize::new(0));
309
310 let sessions = SessionManager::new(
311 secret_key,
312 sessions_config,
313 executor,
314 status,
315 hello_message,
316 fork_filter,
317 extra_protocols,
318 handshake,
319 );
320
321 let state = NetworkState::new(
322 crate::state::BlockNumReader::new(client),
323 discovery,
324 peers_manager,
325 Arc::clone(&num_active_peers),
326 );
327
328 let swarm = Swarm::new(incoming, sessions, state);
329
330 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
331
332 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
333
334 let handle = NetworkHandle::new(
335 Arc::clone(&num_active_peers),
336 Arc::new(Mutex::new(listener_addr)),
337 to_manager_tx,
338 secret_key,
339 local_peer_id,
340 peers_handle,
341 network_mode,
342 Arc::new(AtomicU64::new(chain_id)),
343 tx_gossip_disabled,
344 discv4,
345 discv5,
346 event_sender.clone(),
347 nat,
348 );
349
350 if !required_block_hashes.is_empty() {
352 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
353 filter.spawn();
354 }
355
356 Ok(Self {
357 swarm,
358 handle,
359 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
360 block_import,
361 event_sender,
362 to_transactions_manager: None,
363 to_eth_request_handler: None,
364 num_active_peers,
365 metrics: Default::default(),
366 disconnect_metrics: Default::default(),
367 closed_sessions_metrics: Default::default(),
368 pending_session_failure_metrics: Default::default(),
369 backed_off_peers_metrics: Default::default(),
370 })
371 }
372
373 pub async fn builder<C: BlockNumReader + 'static>(
406 config: NetworkConfig<C, N>,
407 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
408 let network = Self::new(config).await?;
409 Ok(network.into_builder())
410 }
411
412 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
414 NetworkBuilder { network: self, transactions: (), request_handler: () }
415 }
416
417 pub const fn local_addr(&self) -> SocketAddr {
419 self.swarm.listener().local_address()
420 }
421
422 pub fn num_connected_peers(&self) -> usize {
424 self.swarm.state().num_active_peers()
425 }
426
427 pub fn peer_id(&self) -> &PeerId {
429 self.handle.peer_id()
430 }
431
432 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
434 self.swarm.peers().iter_peers()
435 }
436
437 pub fn num_known_peers(&self) -> usize {
439 self.swarm.peers().num_known_peers()
440 }
441
442 pub fn peers_handle(&self) -> PeersHandle {
446 self.swarm.peers().handle()
447 }
448
449 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
455 let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
456 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
457 reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
458 Ok(())
459 }
460
461 pub fn fetch_client(&self) -> FetchClient<N> {
465 self.swarm.state().fetch_client()
466 }
467
468 pub fn status(&self) -> NetworkStatus {
470 let sessions = self.swarm.sessions();
471 let status = sessions.status();
472 let hello_message = sessions.hello_message();
473
474 #[expect(deprecated)]
475 NetworkStatus {
476 client_version: hello_message.client_version,
477 protocol_version: hello_message.protocol_version as u64,
478 eth_protocol_info: EthProtocolInfo {
479 difficulty: None,
480 head: status.blockhash,
481 network: status.chain.id(),
482 genesis: status.genesis,
483 config: Default::default(),
484 },
485 capabilities: hello_message
486 .protocols
487 .into_iter()
488 .map(|protocol| protocol.cap)
489 .collect(),
490 }
491 }
492
493 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
496 if let Some(ref tx) = self.to_transactions_manager {
497 let _ = tx.send(event);
498 }
499 }
500
501 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
504 if let Some(ref reqs) = self.to_eth_request_handler {
505 let _ = reqs.try_send(event).map_err(|e| {
506 if let TrySendError::Full(_) = e {
507 debug!(target:"net", "EthRequestHandler channel is full!");
508 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
509 }
510 });
511 }
512 }
513
514 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
516 match req {
517 PeerRequest::GetBlockHeaders { request, response } => {
518 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
519 peer_id,
520 request,
521 response,
522 })
523 }
524 PeerRequest::GetBlockBodies { request, response } => {
525 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
526 peer_id,
527 request,
528 response,
529 })
530 }
531 PeerRequest::GetNodeData { request, response } => {
532 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
533 peer_id,
534 request,
535 response,
536 })
537 }
538 PeerRequest::GetReceipts { request, response } => {
539 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
540 peer_id,
541 request,
542 response,
543 })
544 }
545 PeerRequest::GetReceipts69 { request, response } => {
546 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
547 peer_id,
548 request,
549 response,
550 })
551 }
552 PeerRequest::GetReceipts70 { request, response } => {
553 self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
554 peer_id,
555 request,
556 response,
557 })
558 }
559 PeerRequest::GetBlockAccessLists { request, response } => {
560 self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
561 peer_id,
562 request,
563 response,
564 })
565 }
566 PeerRequest::GetPooledTransactions { request, response } => {
567 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
568 peer_id,
569 request,
570 response,
571 });
572 }
573 }
574 }
575
576 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
578 match event {
579 BlockImportEvent::Announcement(validation) => match validation {
580 BlockValidation::ValidHeader { block } => {
581 self.swarm.state_mut().announce_new_block(block);
582 }
583 BlockValidation::ValidBlock { block } => {
584 self.swarm.state_mut().announce_new_block_hash(block);
585 }
586 },
587 BlockImportEvent::Outcome(outcome) => {
588 let BlockImportOutcome { peer, result } = outcome;
589 match result {
590 Ok(validated_block) => match validated_block {
591 BlockValidation::ValidHeader { block } => {
592 self.swarm.state_mut().update_peer_block(
593 &peer,
594 block.hash,
595 block.number(),
596 );
597 self.swarm.state_mut().announce_new_block(block);
598 }
599 BlockValidation::ValidBlock { block } => {
600 self.swarm.state_mut().announce_new_block_hash(block);
601 }
602 },
603 Err(_err) => {
604 self.swarm
605 .state_mut()
606 .peers_mut()
607 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
608 }
609 }
610 }
611 }
612 }
613
614 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
620 where
621 F: FnOnce(&mut Self),
622 {
623 if self.handle.mode().is_stake() {
625 self.swarm
627 .sessions_mut()
628 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
629 } else {
630 only_pow(self);
631 }
632 }
633
634 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
636 match msg {
637 PeerMessage::NewBlockHashes(hashes) => {
638 self.within_pow_or_disconnect(peer_id, |this| {
639 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
641 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
643 })
644 }
645 PeerMessage::NewBlock(block) => {
646 self.within_pow_or_disconnect(peer_id, move |this| {
647 this.swarm.state_mut().on_new_block(peer_id, block.hash);
648 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
650 });
651 }
652 PeerMessage::PooledTransactions(msg) => {
653 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
654 peer_id,
655 msg,
656 });
657 }
658 PeerMessage::EthRequest(req) => {
659 self.on_eth_request(peer_id, req);
660 }
661 PeerMessage::ReceivedTransaction(msg) => {
662 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
663 peer_id,
664 msg,
665 });
666 }
667 PeerMessage::SendTransactions(_) => {
668 unreachable!("Not emitted by session")
669 }
670 PeerMessage::BlockRangeUpdated(_) => {}
671 PeerMessage::Other(other) => {
672 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
673 }
674 }
675 }
676
677 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
679 match msg {
680 NetworkHandleMessage::DiscoveryListener(tx) => {
681 self.swarm.state_mut().discovery_mut().add_listener(tx);
682 }
683 NetworkHandleMessage::AnnounceBlock(block, hash) => {
684 if self.handle.mode().is_stake() {
685 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
687 return
688 }
689 let msg = NewBlockMessage { hash, block: Arc::new(block) };
690 self.swarm.state_mut().announce_new_block(msg);
691 }
692 NetworkHandleMessage::EthRequest { peer_id, request } => {
693 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
694 }
695 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
696 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
697 }
698 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
699 .swarm
700 .sessions_mut()
701 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
702 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
703 self.swarm.state_mut().add_trusted_peer_id(peer_id);
704 }
705 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
706 if !self.swarm.is_shutting_down() {
708 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
709 }
710 }
711 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
712 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
713 }
714 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
715 self.swarm.sessions_mut().disconnect(peer_id, reason);
716 }
717 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
718 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
719 }
720 NetworkHandleMessage::SetNetworkState(net_state) => {
721 self.swarm.on_network_state_change(net_state);
726 }
727
728 NetworkHandleMessage::Shutdown(tx) => {
729 self.perform_network_shutdown();
730 let _ = tx.send(());
731 }
732 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
733 self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
734 }
735 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
736 let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
737 }
738 NetworkHandleMessage::FetchClient(tx) => {
739 let _ = tx.send(self.fetch_client());
740 }
741 NetworkHandleMessage::GetStatus(tx) => {
742 let _ = tx.send(self.status());
743 }
744 NetworkHandleMessage::StatusUpdate { head } => {
745 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
746 self.swarm.state_mut().update_fork_id(transition.current);
747 }
748 }
749 NetworkHandleMessage::GetPeerInfos(tx) => {
750 let _ = tx.send(self.get_peer_infos());
751 }
752 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
753 let _ = tx.send(self.get_peer_info_by_id(peer_id));
754 }
755 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
756 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
757 }
758 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
759 let peer_ids = self.swarm.peers().peers_by_kind(kind);
760 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
761 }
762 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
763 NetworkHandleMessage::GetTransactionsHandle(tx) => {
764 if let Some(ref tx_inner) = self.to_transactions_manager {
765 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
766 } else {
767 let _ = tx.send(None);
768 }
769 }
770 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
771 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
772 }
773 NetworkHandleMessage::EthMessage { peer_id, message } => {
774 self.swarm.sessions_mut().send_message(&peer_id, message)
775 }
776 }
777 }
778
779 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
780 match event {
782 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
783 SwarmEvent::TcpListenerClosed { remote_addr } => {
784 trace!(target: "net", ?remote_addr, "TCP listener closed.");
785 }
786 SwarmEvent::TcpListenerError(err) => {
787 trace!(target: "net", %err, "TCP connection error.");
788 }
789 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
790 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
791 self.metrics.total_incoming_connections.increment(1);
792 self.metrics
793 .incoming_connections
794 .set(self.swarm.peers().num_inbound_connections() as f64);
795 }
796 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
797 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
798 self.metrics.total_outgoing_connections.increment(1);
799 self.update_pending_connection_metrics()
800 }
801 SwarmEvent::SessionEstablished {
802 peer_id,
803 remote_addr,
804 client_version,
805 capabilities,
806 version,
807 messages,
808 status,
809 direction,
810 } => {
811 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
812 self.metrics.connected_peers.set(total_active as f64);
813 debug!(
814 target: "net",
815 ?remote_addr,
816 %client_version,
817 ?peer_id,
818 ?total_active,
819 kind=%direction,
820 peer_enode=%NodeRecord::new(remote_addr, peer_id),
821 "Session established"
822 );
823
824 if direction.is_incoming() {
825 self.swarm
826 .state_mut()
827 .peers_mut()
828 .on_incoming_session_established(peer_id, remote_addr);
829 }
830
831 if direction.is_outgoing() {
832 self.swarm.peers_mut().on_active_outgoing_established(peer_id);
833 }
834
835 self.update_active_connection_metrics();
836
837 let peer_kind = self
838 .swarm
839 .state()
840 .peers()
841 .peer_by_id(peer_id)
842 .map(|(_, kind)| kind)
843 .unwrap_or_default();
844 let session_info = SessionInfo {
845 peer_id,
846 remote_addr,
847 client_version,
848 capabilities,
849 status,
850 version,
851 peer_kind,
852 };
853
854 self.event_sender
855 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
856 }
857 SwarmEvent::PeerAdded(peer_id) => {
858 trace!(target: "net", ?peer_id, "Peer added");
859 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
860 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
861 }
862 SwarmEvent::PeerRemoved(peer_id) => {
863 trace!(target: "net", ?peer_id, "Peer dropped");
864 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
865 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
866 }
867 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
868 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
869 self.metrics.connected_peers.set(total_active as f64);
870 trace!(
871 target: "net",
872 ?remote_addr,
873 ?peer_id,
874 ?total_active,
875 ?error,
876 "Session disconnected"
877 );
878
879 let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
881
882 let reason = if let Some(ref err) = error {
883 self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
886 self.backed_off_peers_metrics.increment_for_reason(
887 BackoffReason::from_disconnect(err.as_disconnected()),
888 );
889 err.as_disconnected()
890 } else {
891 self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
893 self.backed_off_peers_metrics
894 .increment_for_reason(BackoffReason::GracefulClose);
895 None
896 };
897 self.closed_sessions_metrics.active.increment(1);
898 self.update_active_connection_metrics();
899
900 if let Some(reason) = reason {
901 if is_inbound {
902 self.disconnect_metrics.increment_inbound(reason);
903 } else {
904 self.disconnect_metrics.increment_outbound(reason);
905 }
906 }
907 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
908 self.event_sender
909 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
910 }
911 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
912 trace!(
913 target: "net",
914 ?remote_addr,
915 ?error,
916 "Incoming pending session failed"
917 );
918
919 if let Some(ref err) = error {
920 self.swarm
921 .state_mut()
922 .peers_mut()
923 .on_incoming_pending_session_dropped(remote_addr, err);
924 self.pending_session_failure_metrics.inbound.increment(1);
925 if let Some(reason) = err.as_disconnected() {
926 self.disconnect_metrics.increment_inbound(reason);
927 }
928 } else {
929 self.swarm
930 .state_mut()
931 .peers_mut()
932 .on_incoming_pending_session_gracefully_closed();
933 }
934 self.closed_sessions_metrics.incoming_pending.increment(1);
935 self.metrics
936 .incoming_connections
937 .set(self.swarm.peers().num_inbound_connections() as f64);
938 }
939 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
940 trace!(
941 target: "net",
942 ?remote_addr,
943 ?peer_id,
944 ?error,
945 "Outgoing pending session failed"
946 );
947
948 if let Some(ref err) = error {
949 self.swarm.peers_mut().on_outgoing_pending_session_dropped(
950 &remote_addr,
951 &peer_id,
952 err,
953 );
954 self.pending_session_failure_metrics.outbound.increment(1);
955 self.backed_off_peers_metrics.increment_for_reason(
956 BackoffReason::from_disconnect(err.as_disconnected()),
957 );
958 if let Some(reason) = err.as_disconnected() {
959 self.disconnect_metrics.increment_outbound(reason);
960 }
961 } else {
962 self.swarm
963 .state_mut()
964 .peers_mut()
965 .on_outgoing_pending_session_gracefully_closed(&peer_id);
966 }
967 self.closed_sessions_metrics.outgoing_pending.increment(1);
968 self.update_pending_connection_metrics();
969 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
970 }
971 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
972 trace!(
973 target: "net",
974 ?remote_addr,
975 ?peer_id,
976 %error,
977 "Outgoing connection error"
978 );
979
980 self.swarm.peers_mut().on_outgoing_connection_failure(
981 &remote_addr,
982 &peer_id,
983 &error,
984 );
985
986 self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
987 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
988 self.update_pending_connection_metrics();
989 }
990 SwarmEvent::BadMessage { peer_id } => {
991 self.swarm
992 .state_mut()
993 .peers_mut()
994 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
995 self.metrics.invalid_messages_received.increment(1);
996 }
997 SwarmEvent::ProtocolBreach { peer_id } => {
998 self.swarm
999 .state_mut()
1000 .peers_mut()
1001 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
1002 }
1003 }
1004 }
1005
1006 fn get_peer_infos(&self) -> Vec<PeerInfo> {
1008 self.swarm
1009 .sessions()
1010 .active_sessions()
1011 .iter()
1012 .filter_map(|(&peer_id, session)| {
1013 self.swarm
1014 .state()
1015 .peers()
1016 .peer_by_id(peer_id)
1017 .map(|(record, kind)| session.peer_info(&record, kind))
1018 })
1019 .collect()
1020 }
1021
1022 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1026 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1027 self.swarm
1028 .state()
1029 .peers()
1030 .peer_by_id(peer_id)
1031 .map(|(record, kind)| session.peer_info(&record, kind))
1032 })
1033 }
1034
1035 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1039 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1040 }
1041
1042 #[inline]
1044 fn update_active_connection_metrics(&self) {
1045 self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
1046 self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
1047 }
1048
1049 #[inline]
1051 fn update_pending_connection_metrics(&self) {
1052 self.metrics
1053 .pending_outgoing_connections
1054 .set(self.swarm.peers().num_pending_outbound_connections() as f64);
1055 self.metrics
1056 .total_pending_connections
1057 .set(self.swarm.sessions().num_pending_connections() as f64);
1058 }
1059
1060 pub async fn run_until_graceful_shutdown<F, R>(
1064 mut self,
1065 shutdown: GracefulShutdown,
1066 shutdown_hook: F,
1067 ) -> R
1068 where
1069 F: FnOnce(Self) -> R,
1070 {
1071 let mut graceful_guard = None;
1072 tokio::select! {
1073 _ = &mut self => {},
1074 guard = shutdown => {
1075 graceful_guard = Some(guard);
1076 },
1077 }
1078
1079 self.perform_network_shutdown();
1080 let res = shutdown_hook(self);
1081 drop(graceful_guard);
1082 res
1083 }
1084
1085 fn perform_network_shutdown(&mut self) {
1088 self.swarm.on_shutdown_requested();
1092 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1094 self.swarm.sessions_mut().disconnect_all_pending();
1096 }
1097}
1098
1099impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1100 type Output = ();
1101
1102 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1103 let start = Instant::now();
1104 let mut poll_durations = NetworkManagerPollDurations::default();
1105
1106 let this = self.get_mut();
1107
1108 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1110 this.on_block_import_result(outcome);
1111 }
1112
1113 let start_network_handle = Instant::now();
1137 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1138 "net",
1139 "Network message channel",
1140 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1141 this.from_handle_rx.poll_next_unpin(cx),
1142 |msg| this.on_handle_message(msg),
1143 error!("Network channel closed");
1144 );
1145 poll_durations.acc_network_handle = start_network_handle.elapsed();
1146
1147 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1149 "net",
1150 "Swarm events stream",
1151 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1152 this.swarm.poll_next_unpin(cx),
1153 |event| this.on_swarm_event(event),
1154 );
1155 poll_durations.acc_swarm =
1156 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1157
1158 if maybe_more_handle_messages || maybe_more_swarm_events {
1160 cx.waker().wake_by_ref();
1162 return Poll::Pending
1163 }
1164
1165 this.update_poll_metrics(start, poll_durations);
1166
1167 Poll::Pending
1168 }
1169}
1170
1171#[derive(Debug, Default)]
1172struct NetworkManagerPollDurations {
1173 acc_network_handle: Duration,
1174 acc_swarm: Duration,
1175}