1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{
28 BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
29 PendingSessionFailureMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
30 },
31 network::{NetworkHandle, NetworkHandleMessage},
32 peers::{BackoffReason, PeersManager},
33 poll_nested_stream_with_budget,
34 protocol::IntoRlpxSubProtocol,
35 required_block_filter::RequiredBlockFilter,
36 session::SessionManager,
37 state::NetworkState,
38 swarm::{Swarm, SwarmEvent},
39 transactions::NetworkTransactionEvent,
40 FetchClient, NetworkBuilder,
41};
42use futures::{Future, StreamExt};
43use parking_lot::Mutex;
44use reth_chainspec::EnrForkIdEntry;
45use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
46use reth_fs_util::{self as fs, FsPathError};
47use reth_metrics::common::mpsc::UnboundedMeteredSender;
48use reth_network_api::{
49 events::{PeerEvent, SessionInfo},
50 test_utils::PeersHandle,
51 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
52};
53use reth_network_peers::{NodeRecord, PeerId};
54use reth_network_types::ReputationChangeKind;
55use reth_storage_api::BlockNumReader;
56use reth_tasks::shutdown::GracefulShutdown;
57use reth_tokio_util::EventSender;
58use secp256k1::SecretKey;
59use std::{
60 net::SocketAddr,
61 path::Path,
62 pin::Pin,
63 sync::{
64 atomic::{AtomicU64, AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::mpsc::{self, error::TrySendError};
71use tokio_stream::wrappers::UnboundedReceiverStream;
72use tracing::{debug, error, trace, warn};
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
75#[derive(Debug)]
107#[must_use = "The NetworkManager does nothing unless polled"]
108pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
109 swarm: Swarm<N>,
111 handle: NetworkHandle<N>,
113 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
115 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
117 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
119 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
122 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
136 num_active_peers: Arc<AtomicUsize>,
141 metrics: NetworkMetrics,
143 disconnect_metrics: DirectionalDisconnectMetrics,
145 closed_sessions_metrics: ClosedSessionsMetrics,
147 pending_session_failure_metrics: PendingSessionFailureMetrics,
149 backed_off_peers_metrics: BackedOffPeersMetrics,
151}
152
153impl NetworkManager {
154 pub async fn eth<C: BlockNumReader + 'static>(
166 config: NetworkConfig<C, EthNetworkPrimitives>,
167 ) -> Result<Self, NetworkError> {
168 Self::new(config).await
169 }
170}
171
172impl<N: NetworkPrimitives> NetworkManager<N> {
173 pub fn with_transactions(
176 mut self,
177 tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
178 ) -> Self {
179 self.set_transactions(tx);
180 self
181 }
182
183 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
186 self.to_transactions_manager =
187 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
188 }
189
190 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
193 self.set_eth_request_handler(tx);
194 self
195 }
196
197 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
200 self.to_eth_request_handler = Some(tx);
201 }
202
203 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
205 self.swarm.add_rlpx_sub_protocol(protocol)
206 }
207
208 pub const fn handle(&self) -> &NetworkHandle<N> {
212 &self.handle
213 }
214
215 pub const fn secret_key(&self) -> SecretKey {
217 self.swarm.sessions().secret_key()
218 }
219
220 #[inline]
221 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
222 let metrics = &self.metrics;
223
224 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
225
226 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
228 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
230 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
231 }
232
233 pub async fn new<C: BlockNumReader + 'static>(
238 config: NetworkConfig<C, N>,
239 ) -> Result<Self, NetworkError> {
240 let NetworkConfig {
241 client,
242 secret_key,
243 discovery_v4_addr,
244 mut discovery_v4_config,
245 mut discovery_v5_config,
246 listener_addr,
247 peers_config,
248 sessions_config,
249 chain_id,
250 block_import,
251 network_mode,
252 boot_nodes,
253 executor,
254 hello_message,
255 status,
256 fork_filter,
257 dns_discovery_config,
258 extra_protocols,
259 tx_gossip_disabled,
260 transactions_manager_config: _,
261 nat,
262 handshake,
263 required_block_hashes,
264 } = config;
265
266 let peers_manager = PeersManager::new(peers_config);
267 let peers_handle = peers_manager.handle();
268
269 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
270 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
271 })?;
272
273 let listener_addr = incoming.local_address();
275
276 let resolved_boot_nodes =
278 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
279
280 if let Some(disc_config) = discovery_v4_config.as_mut() {
281 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
283 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
286 }
287
288 if let Some(discv5) = discovery_v5_config.as_mut() {
289 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
291 }
292
293 let discovery = Discovery::new(
294 listener_addr,
295 discovery_v4_addr,
296 secret_key,
297 discovery_v4_config,
298 discovery_v5_config,
299 dns_discovery_config,
300 )
301 .await?;
302 let local_peer_id = discovery.local_id();
304 let discv4 = discovery.discv4();
305 let discv5 = discovery.discv5();
306
307 let num_active_peers = Arc::new(AtomicUsize::new(0));
308
309 let sessions = SessionManager::new(
310 secret_key,
311 sessions_config,
312 executor,
313 status,
314 hello_message,
315 fork_filter,
316 extra_protocols,
317 handshake,
318 );
319
320 let state = NetworkState::new(
321 crate::state::BlockNumReader::new(client),
322 discovery,
323 peers_manager,
324 Arc::clone(&num_active_peers),
325 );
326
327 let swarm = Swarm::new(incoming, sessions, state);
328
329 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
330
331 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
332
333 let handle = NetworkHandle::new(
334 Arc::clone(&num_active_peers),
335 Arc::new(Mutex::new(listener_addr)),
336 to_manager_tx,
337 secret_key,
338 local_peer_id,
339 peers_handle,
340 network_mode,
341 Arc::new(AtomicU64::new(chain_id)),
342 tx_gossip_disabled,
343 discv4,
344 discv5,
345 event_sender.clone(),
346 nat,
347 );
348
349 if !required_block_hashes.is_empty() {
351 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
352 filter.spawn();
353 }
354
355 Ok(Self {
356 swarm,
357 handle,
358 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
359 block_import,
360 event_sender,
361 to_transactions_manager: None,
362 to_eth_request_handler: None,
363 num_active_peers,
364 metrics: Default::default(),
365 disconnect_metrics: Default::default(),
366 closed_sessions_metrics: Default::default(),
367 pending_session_failure_metrics: Default::default(),
368 backed_off_peers_metrics: Default::default(),
369 })
370 }
371
372 pub async fn builder<C: BlockNumReader + 'static>(
404 config: NetworkConfig<C, N>,
405 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
406 let network = Self::new(config).await?;
407 Ok(network.into_builder())
408 }
409
410 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
412 NetworkBuilder { network: self, transactions: (), request_handler: () }
413 }
414
415 pub const fn local_addr(&self) -> SocketAddr {
417 self.swarm.listener().local_address()
418 }
419
420 pub fn num_connected_peers(&self) -> usize {
422 self.swarm.state().num_active_peers()
423 }
424
425 pub fn peer_id(&self) -> &PeerId {
427 self.handle.peer_id()
428 }
429
430 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
432 self.swarm.state().peers().iter_peers()
433 }
434
435 pub fn num_known_peers(&self) -> usize {
437 self.swarm.state().peers().num_known_peers()
438 }
439
440 pub fn peers_handle(&self) -> PeersHandle {
444 self.swarm.state().peers().handle()
445 }
446
447 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
450 let known_peers = self.all_peers().collect::<Vec<_>>();
451 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
452 reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
453 Ok(())
454 }
455
456 pub fn fetch_client(&self) -> FetchClient<N> {
460 self.swarm.state().fetch_client()
461 }
462
463 pub fn status(&self) -> NetworkStatus {
465 let sessions = self.swarm.sessions();
466 let status = sessions.status();
467 let hello_message = sessions.hello_message();
468
469 #[expect(deprecated)]
470 NetworkStatus {
471 client_version: hello_message.client_version,
472 protocol_version: hello_message.protocol_version as u64,
473 eth_protocol_info: EthProtocolInfo {
474 difficulty: None,
475 head: status.blockhash,
476 network: status.chain.id(),
477 genesis: status.genesis,
478 config: Default::default(),
479 },
480 capabilities: hello_message
481 .protocols
482 .into_iter()
483 .map(|protocol| protocol.cap)
484 .collect(),
485 }
486 }
487
488 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
491 if let Some(ref tx) = self.to_transactions_manager {
492 let _ = tx.send(event);
493 }
494 }
495
496 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
499 if let Some(ref reqs) = self.to_eth_request_handler {
500 let _ = reqs.try_send(event).map_err(|e| {
501 if let TrySendError::Full(_) = e {
502 debug!(target:"net", "EthRequestHandler channel is full!");
503 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
504 }
505 });
506 }
507 }
508
509 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
511 match req {
512 PeerRequest::GetBlockHeaders { request, response } => {
513 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
514 peer_id,
515 request,
516 response,
517 })
518 }
519 PeerRequest::GetBlockBodies { request, response } => {
520 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
521 peer_id,
522 request,
523 response,
524 })
525 }
526 PeerRequest::GetNodeData { request, response } => {
527 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
528 peer_id,
529 request,
530 response,
531 })
532 }
533 PeerRequest::GetReceipts { request, response } => {
534 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
535 peer_id,
536 request,
537 response,
538 })
539 }
540 PeerRequest::GetReceipts69 { request, response } => {
541 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
542 peer_id,
543 request,
544 response,
545 })
546 }
547 PeerRequest::GetReceipts70 { request, response } => {
548 self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
549 peer_id,
550 request,
551 response,
552 })
553 }
554 PeerRequest::GetPooledTransactions { request, response } => {
555 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
556 peer_id,
557 request,
558 response,
559 });
560 }
561 }
562 }
563
564 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
566 match event {
567 BlockImportEvent::Announcement(validation) => match validation {
568 BlockValidation::ValidHeader { block } => {
569 self.swarm.state_mut().announce_new_block(block);
570 }
571 BlockValidation::ValidBlock { block } => {
572 self.swarm.state_mut().announce_new_block_hash(block);
573 }
574 },
575 BlockImportEvent::Outcome(outcome) => {
576 let BlockImportOutcome { peer, result } = outcome;
577 match result {
578 Ok(validated_block) => match validated_block {
579 BlockValidation::ValidHeader { block } => {
580 self.swarm.state_mut().update_peer_block(
581 &peer,
582 block.hash,
583 block.number(),
584 );
585 self.swarm.state_mut().announce_new_block(block);
586 }
587 BlockValidation::ValidBlock { block } => {
588 self.swarm.state_mut().announce_new_block_hash(block);
589 }
590 },
591 Err(_err) => {
592 self.swarm
593 .state_mut()
594 .peers_mut()
595 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
596 }
597 }
598 }
599 }
600 }
601
602 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
608 where
609 F: FnOnce(&mut Self),
610 {
611 if self.handle.mode().is_stake() {
613 self.swarm
615 .sessions_mut()
616 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
617 } else {
618 only_pow(self);
619 }
620 }
621
622 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
624 match msg {
625 PeerMessage::NewBlockHashes(hashes) => {
626 self.within_pow_or_disconnect(peer_id, |this| {
627 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
629 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
631 })
632 }
633 PeerMessage::NewBlock(block) => {
634 self.within_pow_or_disconnect(peer_id, move |this| {
635 this.swarm.state_mut().on_new_block(peer_id, block.hash);
636 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
638 });
639 }
640 PeerMessage::PooledTransactions(msg) => {
641 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
642 peer_id,
643 msg,
644 });
645 }
646 PeerMessage::EthRequest(req) => {
647 self.on_eth_request(peer_id, req);
648 }
649 PeerMessage::ReceivedTransaction(msg) => {
650 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
651 peer_id,
652 msg,
653 });
654 }
655 PeerMessage::SendTransactions(_) => {
656 unreachable!("Not emitted by session")
657 }
658 PeerMessage::BlockRangeUpdated(_) => {}
659 PeerMessage::Other(other) => {
660 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
661 }
662 }
663 }
664
665 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
667 match msg {
668 NetworkHandleMessage::DiscoveryListener(tx) => {
669 self.swarm.state_mut().discovery_mut().add_listener(tx);
670 }
671 NetworkHandleMessage::AnnounceBlock(block, hash) => {
672 if self.handle.mode().is_stake() {
673 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
675 return
676 }
677 let msg = NewBlockMessage { hash, block: Arc::new(block) };
678 self.swarm.state_mut().announce_new_block(msg);
679 }
680 NetworkHandleMessage::EthRequest { peer_id, request } => {
681 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
682 }
683 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
684 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
685 }
686 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
687 .swarm
688 .sessions_mut()
689 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
690 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
691 self.swarm.state_mut().add_trusted_peer_id(peer_id);
692 }
693 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
694 if !self.swarm.is_shutting_down() {
696 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
697 }
698 }
699 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
700 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
701 }
702 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
703 self.swarm.sessions_mut().disconnect(peer_id, reason);
704 }
705 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
706 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
707 }
708 NetworkHandleMessage::SetNetworkState(net_state) => {
709 self.swarm.on_network_state_change(net_state);
714 }
715
716 NetworkHandleMessage::Shutdown(tx) => {
717 self.perform_network_shutdown();
718 let _ = tx.send(());
719 }
720 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
721 self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
722 }
723 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
724 let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
725 }
726 NetworkHandleMessage::FetchClient(tx) => {
727 let _ = tx.send(self.fetch_client());
728 }
729 NetworkHandleMessage::GetStatus(tx) => {
730 let _ = tx.send(self.status());
731 }
732 NetworkHandleMessage::StatusUpdate { head } => {
733 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
734 self.swarm.state_mut().update_fork_id(transition.current);
735 }
736 }
737 NetworkHandleMessage::GetPeerInfos(tx) => {
738 let _ = tx.send(self.get_peer_infos());
739 }
740 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
741 let _ = tx.send(self.get_peer_info_by_id(peer_id));
742 }
743 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
744 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
745 }
746 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
747 let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
748 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
749 }
750 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
751 NetworkHandleMessage::GetTransactionsHandle(tx) => {
752 if let Some(ref tx_inner) = self.to_transactions_manager {
753 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
754 } else {
755 let _ = tx.send(None);
756 }
757 }
758 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
759 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
760 }
761 NetworkHandleMessage::EthMessage { peer_id, message } => {
762 self.swarm.sessions_mut().send_message(&peer_id, message)
763 }
764 }
765 }
766
767 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
768 match event {
770 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
771 SwarmEvent::TcpListenerClosed { remote_addr } => {
772 trace!(target: "net", ?remote_addr, "TCP listener closed.");
773 }
774 SwarmEvent::TcpListenerError(err) => {
775 trace!(target: "net", %err, "TCP connection error.");
776 }
777 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
778 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
779 self.metrics.total_incoming_connections.increment(1);
780 self.metrics
781 .incoming_connections
782 .set(self.swarm.state().peers().num_inbound_connections() as f64);
783 }
784 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
785 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
786 self.metrics.total_outgoing_connections.increment(1);
787 self.update_pending_connection_metrics()
788 }
789 SwarmEvent::SessionEstablished {
790 peer_id,
791 remote_addr,
792 client_version,
793 capabilities,
794 version,
795 messages,
796 status,
797 direction,
798 } => {
799 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
800 self.metrics.connected_peers.set(total_active as f64);
801 debug!(
802 target: "net",
803 ?remote_addr,
804 %client_version,
805 ?peer_id,
806 ?total_active,
807 kind=%direction,
808 peer_enode=%NodeRecord::new(remote_addr, peer_id),
809 "Session established"
810 );
811
812 if direction.is_incoming() {
813 self.swarm
814 .state_mut()
815 .peers_mut()
816 .on_incoming_session_established(peer_id, remote_addr);
817 }
818
819 if direction.is_outgoing() {
820 self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
821 }
822
823 self.update_active_connection_metrics();
824
825 let peer_kind = self
826 .swarm
827 .state()
828 .peers()
829 .peer_by_id(peer_id)
830 .map(|(_, kind)| kind)
831 .unwrap_or_default();
832 let session_info = SessionInfo {
833 peer_id,
834 remote_addr,
835 client_version,
836 capabilities,
837 status,
838 version,
839 peer_kind,
840 };
841
842 self.event_sender
843 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
844 }
845 SwarmEvent::PeerAdded(peer_id) => {
846 trace!(target: "net", ?peer_id, "Peer added");
847 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
848 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
849 }
850 SwarmEvent::PeerRemoved(peer_id) => {
851 trace!(target: "net", ?peer_id, "Peer dropped");
852 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
853 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
854 }
855 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
856 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
857 self.metrics.connected_peers.set(total_active as f64);
858 trace!(
859 target: "net",
860 ?remote_addr,
861 ?peer_id,
862 ?total_active,
863 ?error,
864 "Session disconnected"
865 );
866
867 let is_inbound = self.swarm.state().peers().is_inbound_peer(&peer_id);
869
870 let reason = if let Some(ref err) = error {
871 self.swarm.state_mut().peers_mut().on_active_session_dropped(
874 &remote_addr,
875 &peer_id,
876 err,
877 );
878 self.backed_off_peers_metrics.increment_for_reason(
879 BackoffReason::from_disconnect(err.as_disconnected()),
880 );
881 err.as_disconnected()
882 } else {
883 self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
885 self.backed_off_peers_metrics
886 .increment_for_reason(BackoffReason::GracefulClose);
887 None
888 };
889 self.closed_sessions_metrics.active.increment(1);
890 self.update_active_connection_metrics();
891
892 if let Some(reason) = reason {
893 if is_inbound {
894 self.disconnect_metrics.increment_inbound(reason);
895 } else {
896 self.disconnect_metrics.increment_outbound(reason);
897 }
898 }
899 self.metrics
900 .backed_off_peers
901 .set(self.swarm.state().peers().num_backed_off_peers() as f64);
902 self.event_sender
903 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
904 }
905 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
906 trace!(
907 target: "net",
908 ?remote_addr,
909 ?error,
910 "Incoming pending session failed"
911 );
912
913 if let Some(ref err) = error {
914 self.swarm
915 .state_mut()
916 .peers_mut()
917 .on_incoming_pending_session_dropped(remote_addr, err);
918 self.pending_session_failure_metrics.inbound.increment(1);
919 if let Some(reason) = err.as_disconnected() {
920 self.disconnect_metrics.increment_inbound(reason);
921 }
922 } else {
923 self.swarm
924 .state_mut()
925 .peers_mut()
926 .on_incoming_pending_session_gracefully_closed();
927 }
928 self.closed_sessions_metrics.incoming_pending.increment(1);
929 self.metrics
930 .incoming_connections
931 .set(self.swarm.state().peers().num_inbound_connections() as f64);
932 }
933 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
934 trace!(
935 target: "net",
936 ?remote_addr,
937 ?peer_id,
938 ?error,
939 "Outgoing pending session failed"
940 );
941
942 if let Some(ref err) = error {
943 self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
944 &remote_addr,
945 &peer_id,
946 err,
947 );
948 self.pending_session_failure_metrics.outbound.increment(1);
949 self.backed_off_peers_metrics.increment_for_reason(
950 BackoffReason::from_disconnect(err.as_disconnected()),
951 );
952 if let Some(reason) = err.as_disconnected() {
953 self.disconnect_metrics.increment_outbound(reason);
954 }
955 } else {
956 self.swarm
957 .state_mut()
958 .peers_mut()
959 .on_outgoing_pending_session_gracefully_closed(&peer_id);
960 }
961 self.closed_sessions_metrics.outgoing_pending.increment(1);
962 self.update_pending_connection_metrics();
963 self.metrics
964 .backed_off_peers
965 .set(self.swarm.state().peers().num_backed_off_peers() as f64);
966 }
967 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
968 trace!(
969 target: "net",
970 ?remote_addr,
971 ?peer_id,
972 %error,
973 "Outgoing connection error"
974 );
975
976 self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
977 &remote_addr,
978 &peer_id,
979 &error,
980 );
981
982 self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
983 self.metrics
984 .backed_off_peers
985 .set(self.swarm.state().peers().num_backed_off_peers() as f64);
986 self.update_pending_connection_metrics();
987 }
988 SwarmEvent::BadMessage { peer_id } => {
989 self.swarm
990 .state_mut()
991 .peers_mut()
992 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
993 self.metrics.invalid_messages_received.increment(1);
994 }
995 SwarmEvent::ProtocolBreach { peer_id } => {
996 self.swarm
997 .state_mut()
998 .peers_mut()
999 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
1000 }
1001 }
1002 }
1003
1004 fn get_peer_infos(&self) -> Vec<PeerInfo> {
1006 self.swarm
1007 .sessions()
1008 .active_sessions()
1009 .iter()
1010 .filter_map(|(&peer_id, session)| {
1011 self.swarm
1012 .state()
1013 .peers()
1014 .peer_by_id(peer_id)
1015 .map(|(record, kind)| session.peer_info(&record, kind))
1016 })
1017 .collect()
1018 }
1019
1020 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1024 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1025 self.swarm
1026 .state()
1027 .peers()
1028 .peer_by_id(peer_id)
1029 .map(|(record, kind)| session.peer_info(&record, kind))
1030 })
1031 }
1032
1033 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1037 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1038 }
1039
1040 #[inline]
1042 fn update_active_connection_metrics(&self) {
1043 self.metrics
1044 .incoming_connections
1045 .set(self.swarm.state().peers().num_inbound_connections() as f64);
1046 self.metrics
1047 .outgoing_connections
1048 .set(self.swarm.state().peers().num_outbound_connections() as f64);
1049 }
1050
1051 #[inline]
1053 fn update_pending_connection_metrics(&self) {
1054 self.metrics
1055 .pending_outgoing_connections
1056 .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1057 self.metrics
1058 .total_pending_connections
1059 .set(self.swarm.sessions().num_pending_connections() as f64);
1060 }
1061
1062 pub async fn run_until_graceful_shutdown<F, R>(
1066 mut self,
1067 shutdown: GracefulShutdown,
1068 shutdown_hook: F,
1069 ) -> R
1070 where
1071 F: FnOnce(Self) -> R,
1072 {
1073 let mut graceful_guard = None;
1074 tokio::select! {
1075 _ = &mut self => {},
1076 guard = shutdown => {
1077 graceful_guard = Some(guard);
1078 },
1079 }
1080
1081 self.perform_network_shutdown();
1082 let res = shutdown_hook(self);
1083 drop(graceful_guard);
1084 res
1085 }
1086
1087 fn perform_network_shutdown(&mut self) {
1090 self.swarm.on_shutdown_requested();
1094 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1096 self.swarm.sessions_mut().disconnect_all_pending();
1098 }
1099}
1100
1101impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1102 type Output = ();
1103
1104 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1105 let start = Instant::now();
1106 let mut poll_durations = NetworkManagerPollDurations::default();
1107
1108 let this = self.get_mut();
1109
1110 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1112 this.on_block_import_result(outcome);
1113 }
1114
1115 let start_network_handle = Instant::now();
1139 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1140 "net",
1141 "Network message channel",
1142 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1143 this.from_handle_rx.poll_next_unpin(cx),
1144 |msg| this.on_handle_message(msg),
1145 error!("Network channel closed");
1146 );
1147 poll_durations.acc_network_handle = start_network_handle.elapsed();
1148
1149 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1151 "net",
1152 "Swarm events stream",
1153 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1154 this.swarm.poll_next_unpin(cx),
1155 |event| this.on_swarm_event(event),
1156 );
1157 poll_durations.acc_swarm =
1158 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1159
1160 if maybe_more_handle_messages || maybe_more_swarm_events {
1162 cx.waker().wake_by_ref();
1164 return Poll::Pending
1165 }
1166
1167 this.update_poll_metrics(start, poll_durations);
1168
1169 Poll::Pending
1170 }
1171}
1172
1173#[derive(Debug, Default)]
1174struct NetworkManagerPollDurations {
1175 acc_network_handle: Duration,
1176 acc_swarm: Duration,
1177}