1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28 network::{NetworkHandle, NetworkHandleMessage},
29 peers::PeersManager,
30 poll_nested_stream_with_budget,
31 protocol::IntoRlpxSubProtocol,
32 required_block_filter::RequiredBlockFilter,
33 session::SessionManager,
34 state::NetworkState,
35 swarm::{Swarm, SwarmEvent},
36 transactions::NetworkTransactionEvent,
37 FetchClient, NetworkBuilder,
38};
39use futures::{Future, StreamExt};
40use parking_lot::Mutex;
41use reth_chainspec::EnrForkIdEntry;
42use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
43use reth_fs_util::{self as fs, FsPathError};
44use reth_metrics::common::mpsc::UnboundedMeteredSender;
45use reth_network_api::{
46 events::{PeerEvent, SessionInfo},
47 test_utils::PeersHandle,
48 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
49};
50use reth_network_peers::{NodeRecord, PeerId};
51use reth_network_types::ReputationChangeKind;
52use reth_storage_api::BlockNumReader;
53use reth_tasks::shutdown::GracefulShutdown;
54use reth_tokio_util::EventSender;
55use secp256k1::SecretKey;
56use std::{
57 net::SocketAddr,
58 path::Path,
59 pin::Pin,
60 sync::{
61 atomic::{AtomicU64, AtomicUsize, Ordering},
62 Arc,
63 },
64 task::{Context, Poll},
65 time::{Duration, Instant},
66};
67use tokio::sync::mpsc::{self, error::TrySendError};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, trace, warn};
70
71#[cfg_attr(doc, aquamarine::aquamarine)]
72#[derive(Debug)]
104#[must_use = "The NetworkManager does nothing unless polled"]
105pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
106 swarm: Swarm<N>,
108 handle: NetworkHandle<N>,
110 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
112 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
114 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
116 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
119 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
133 num_active_peers: Arc<AtomicUsize>,
138 metrics: NetworkMetrics,
140 disconnect_metrics: DisconnectMetrics,
142}
143
144impl NetworkManager {
145 pub async fn eth<C: BlockNumReader + 'static>(
157 config: NetworkConfig<C, EthNetworkPrimitives>,
158 ) -> Result<Self, NetworkError> {
159 Self::new(config).await
160 }
161}
162
163impl<N: NetworkPrimitives> NetworkManager<N> {
164 pub fn with_transactions(
167 mut self,
168 tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
169 ) -> Self {
170 self.set_transactions(tx);
171 self
172 }
173
174 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
177 self.to_transactions_manager =
178 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
179 }
180
181 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
184 self.set_eth_request_handler(tx);
185 self
186 }
187
188 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
191 self.to_eth_request_handler = Some(tx);
192 }
193
194 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
196 self.swarm.add_rlpx_sub_protocol(protocol)
197 }
198
199 pub const fn handle(&self) -> &NetworkHandle<N> {
203 &self.handle
204 }
205
206 pub const fn secret_key(&self) -> SecretKey {
208 self.swarm.sessions().secret_key()
209 }
210
211 #[inline]
212 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
213 let metrics = &self.metrics;
214
215 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
216
217 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
219 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
221 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
222 }
223
224 pub async fn new<C: BlockNumReader + 'static>(
229 config: NetworkConfig<C, N>,
230 ) -> Result<Self, NetworkError> {
231 let NetworkConfig {
232 client,
233 secret_key,
234 discovery_v4_addr,
235 mut discovery_v4_config,
236 mut discovery_v5_config,
237 listener_addr,
238 peers_config,
239 sessions_config,
240 chain_id,
241 block_import,
242 network_mode,
243 boot_nodes,
244 executor,
245 hello_message,
246 status,
247 fork_filter,
248 dns_discovery_config,
249 extra_protocols,
250 tx_gossip_disabled,
251 transactions_manager_config: _,
252 nat,
253 handshake,
254 required_block_hashes,
255 } = config;
256
257 let peers_manager = PeersManager::new(peers_config);
258 let peers_handle = peers_manager.handle();
259
260 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
261 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
262 })?;
263
264 let listener_addr = incoming.local_address();
266
267 let resolved_boot_nodes =
269 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
270
271 if let Some(disc_config) = discovery_v4_config.as_mut() {
272 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
274 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
277 }
278
279 if let Some(discv5) = discovery_v5_config.as_mut() {
280 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
282 }
283
284 let discovery = Discovery::new(
285 listener_addr,
286 discovery_v4_addr,
287 secret_key,
288 discovery_v4_config,
289 discovery_v5_config,
290 dns_discovery_config,
291 )
292 .await?;
293 let local_peer_id = discovery.local_id();
295 let discv4 = discovery.discv4();
296 let discv5 = discovery.discv5();
297
298 let num_active_peers = Arc::new(AtomicUsize::new(0));
299
300 let sessions = SessionManager::new(
301 secret_key,
302 sessions_config,
303 executor,
304 status,
305 hello_message,
306 fork_filter,
307 extra_protocols,
308 handshake,
309 );
310
311 let state = NetworkState::new(
312 crate::state::BlockNumReader::new(client),
313 discovery,
314 peers_manager,
315 Arc::clone(&num_active_peers),
316 );
317
318 let swarm = Swarm::new(incoming, sessions, state);
319
320 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
321
322 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
323
324 let handle = NetworkHandle::new(
325 Arc::clone(&num_active_peers),
326 Arc::new(Mutex::new(listener_addr)),
327 to_manager_tx,
328 secret_key,
329 local_peer_id,
330 peers_handle,
331 network_mode,
332 Arc::new(AtomicU64::new(chain_id)),
333 tx_gossip_disabled,
334 discv4,
335 discv5,
336 event_sender.clone(),
337 nat,
338 );
339
340 if !required_block_hashes.is_empty() {
342 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
343 filter.spawn();
344 }
345
346 Ok(Self {
347 swarm,
348 handle,
349 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
350 block_import,
351 event_sender,
352 to_transactions_manager: None,
353 to_eth_request_handler: None,
354 num_active_peers,
355 metrics: Default::default(),
356 disconnect_metrics: Default::default(),
357 })
358 }
359
360 pub async fn builder<C: BlockNumReader + 'static>(
392 config: NetworkConfig<C, N>,
393 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
394 let network = Self::new(config).await?;
395 Ok(network.into_builder())
396 }
397
398 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
400 NetworkBuilder { network: self, transactions: (), request_handler: () }
401 }
402
403 pub const fn local_addr(&self) -> SocketAddr {
405 self.swarm.listener().local_address()
406 }
407
408 pub fn num_connected_peers(&self) -> usize {
410 self.swarm.state().num_active_peers()
411 }
412
413 pub fn peer_id(&self) -> &PeerId {
415 self.handle.peer_id()
416 }
417
418 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
420 self.swarm.state().peers().iter_peers()
421 }
422
423 pub fn num_known_peers(&self) -> usize {
425 self.swarm.state().peers().num_known_peers()
426 }
427
428 pub fn peers_handle(&self) -> PeersHandle {
432 self.swarm.state().peers().handle()
433 }
434
435 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
438 let known_peers = self.all_peers().collect::<Vec<_>>();
439 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
440 reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
441 Ok(())
442 }
443
444 pub fn fetch_client(&self) -> FetchClient<N> {
448 self.swarm.state().fetch_client()
449 }
450
451 pub fn status(&self) -> NetworkStatus {
453 let sessions = self.swarm.sessions();
454 let status = sessions.status();
455 let hello_message = sessions.hello_message();
456
457 #[expect(deprecated)]
458 NetworkStatus {
459 client_version: hello_message.client_version,
460 protocol_version: hello_message.protocol_version as u64,
461 eth_protocol_info: EthProtocolInfo {
462 difficulty: None,
463 head: status.blockhash,
464 network: status.chain.id(),
465 genesis: status.genesis,
466 config: Default::default(),
467 },
468 capabilities: hello_message
469 .protocols
470 .into_iter()
471 .map(|protocol| protocol.cap)
472 .collect(),
473 }
474 }
475
476 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
479 if let Some(ref tx) = self.to_transactions_manager {
480 let _ = tx.send(event);
481 }
482 }
483
484 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
487 if let Some(ref reqs) = self.to_eth_request_handler {
488 let _ = reqs.try_send(event).map_err(|e| {
489 if let TrySendError::Full(_) = e {
490 debug!(target:"net", "EthRequestHandler channel is full!");
491 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
492 }
493 });
494 }
495 }
496
497 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
499 match req {
500 PeerRequest::GetBlockHeaders { request, response } => {
501 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
502 peer_id,
503 request,
504 response,
505 })
506 }
507 PeerRequest::GetBlockBodies { request, response } => {
508 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
509 peer_id,
510 request,
511 response,
512 })
513 }
514 PeerRequest::GetNodeData { request, response } => {
515 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
516 peer_id,
517 request,
518 response,
519 })
520 }
521 PeerRequest::GetReceipts { request, response } => {
522 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
523 peer_id,
524 request,
525 response,
526 })
527 }
528 PeerRequest::GetReceipts69 { request, response } => {
529 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
530 peer_id,
531 request,
532 response,
533 })
534 }
535 PeerRequest::GetPooledTransactions { request, response } => {
536 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
537 peer_id,
538 request,
539 response,
540 });
541 }
542 }
543 }
544
545 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
547 match event {
548 BlockImportEvent::Announcement(validation) => match validation {
549 BlockValidation::ValidHeader { block } => {
550 self.swarm.state_mut().announce_new_block(block);
551 }
552 BlockValidation::ValidBlock { block } => {
553 self.swarm.state_mut().announce_new_block_hash(block);
554 }
555 },
556 BlockImportEvent::Outcome(outcome) => {
557 let BlockImportOutcome { peer, result } = outcome;
558 match result {
559 Ok(validated_block) => match validated_block {
560 BlockValidation::ValidHeader { block } => {
561 self.swarm.state_mut().update_peer_block(
562 &peer,
563 block.hash,
564 block.number(),
565 );
566 self.swarm.state_mut().announce_new_block(block);
567 }
568 BlockValidation::ValidBlock { block } => {
569 self.swarm.state_mut().announce_new_block_hash(block);
570 }
571 },
572 Err(_err) => {
573 self.swarm
574 .state_mut()
575 .peers_mut()
576 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
577 }
578 }
579 }
580 }
581 }
582
583 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
589 where
590 F: FnOnce(&mut Self),
591 {
592 if self.handle.mode().is_stake() {
594 self.swarm
596 .sessions_mut()
597 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
598 } else {
599 only_pow(self);
600 }
601 }
602
603 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
605 match msg {
606 PeerMessage::NewBlockHashes(hashes) => {
607 self.within_pow_or_disconnect(peer_id, |this| {
608 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
610 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
612 })
613 }
614 PeerMessage::NewBlock(block) => {
615 self.within_pow_or_disconnect(peer_id, move |this| {
616 this.swarm.state_mut().on_new_block(peer_id, block.hash);
617 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
619 });
620 }
621 PeerMessage::PooledTransactions(msg) => {
622 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
623 peer_id,
624 msg,
625 });
626 }
627 PeerMessage::EthRequest(req) => {
628 self.on_eth_request(peer_id, req);
629 }
630 PeerMessage::ReceivedTransaction(msg) => {
631 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
632 peer_id,
633 msg,
634 });
635 }
636 PeerMessage::SendTransactions(_) => {
637 unreachable!("Not emitted by session")
638 }
639 PeerMessage::BlockRangeUpdated(_) => {}
640 PeerMessage::Other(other) => {
641 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
642 }
643 }
644 }
645
646 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
648 match msg {
649 NetworkHandleMessage::DiscoveryListener(tx) => {
650 self.swarm.state_mut().discovery_mut().add_listener(tx);
651 }
652 NetworkHandleMessage::AnnounceBlock(block, hash) => {
653 if self.handle.mode().is_stake() {
654 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
656 return
657 }
658 let msg = NewBlockMessage { hash, block: Arc::new(block) };
659 self.swarm.state_mut().announce_new_block(msg);
660 }
661 NetworkHandleMessage::EthRequest { peer_id, request } => {
662 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
663 }
664 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
665 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
666 }
667 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
668 .swarm
669 .sessions_mut()
670 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
671 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
672 self.swarm.state_mut().add_trusted_peer_id(peer_id);
673 }
674 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
675 if !self.swarm.is_shutting_down() {
677 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
678 }
679 }
680 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
681 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
682 }
683 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
684 self.swarm.sessions_mut().disconnect(peer_id, reason);
685 }
686 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
687 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
688 }
689 NetworkHandleMessage::SetNetworkState(net_state) => {
690 self.swarm.on_network_state_change(net_state);
695 }
696
697 NetworkHandleMessage::Shutdown(tx) => {
698 self.perform_network_shutdown();
699 let _ = tx.send(());
700 }
701 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
702 self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
703 }
704 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
705 let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
706 }
707 NetworkHandleMessage::FetchClient(tx) => {
708 let _ = tx.send(self.fetch_client());
709 }
710 NetworkHandleMessage::GetStatus(tx) => {
711 let _ = tx.send(self.status());
712 }
713 NetworkHandleMessage::StatusUpdate { head } => {
714 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
715 self.swarm.state_mut().update_fork_id(transition.current);
716 }
717 }
718 NetworkHandleMessage::GetPeerInfos(tx) => {
719 let _ = tx.send(self.get_peer_infos());
720 }
721 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
722 let _ = tx.send(self.get_peer_info_by_id(peer_id));
723 }
724 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
725 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
726 }
727 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
728 let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
729 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
730 }
731 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
732 NetworkHandleMessage::GetTransactionsHandle(tx) => {
733 if let Some(ref tx_inner) = self.to_transactions_manager {
734 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
735 } else {
736 let _ = tx.send(None);
737 }
738 }
739 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
740 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
741 }
742 NetworkHandleMessage::EthMessage { peer_id, message } => {
743 self.swarm.sessions_mut().send_message(&peer_id, message)
744 }
745 }
746 }
747
748 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
749 match event {
751 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
752 SwarmEvent::TcpListenerClosed { remote_addr } => {
753 trace!(target: "net", ?remote_addr, "TCP listener closed.");
754 }
755 SwarmEvent::TcpListenerError(err) => {
756 trace!(target: "net", %err, "TCP connection error.");
757 }
758 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
759 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
760 self.metrics.total_incoming_connections.increment(1);
761 self.metrics
762 .incoming_connections
763 .set(self.swarm.state().peers().num_inbound_connections() as f64);
764 }
765 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
766 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
767 self.metrics.total_outgoing_connections.increment(1);
768 self.update_pending_connection_metrics()
769 }
770 SwarmEvent::SessionEstablished {
771 peer_id,
772 remote_addr,
773 client_version,
774 capabilities,
775 version,
776 messages,
777 status,
778 direction,
779 } => {
780 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
781 self.metrics.connected_peers.set(total_active as f64);
782 debug!(
783 target: "net",
784 ?remote_addr,
785 %client_version,
786 ?peer_id,
787 ?total_active,
788 kind=%direction,
789 peer_enode=%NodeRecord::new(remote_addr, peer_id),
790 "Session established"
791 );
792
793 if direction.is_incoming() {
794 self.swarm
795 .state_mut()
796 .peers_mut()
797 .on_incoming_session_established(peer_id, remote_addr);
798 }
799
800 if direction.is_outgoing() {
801 self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
802 }
803
804 self.update_active_connection_metrics();
805
806 let peer_kind = self
807 .swarm
808 .state()
809 .peers()
810 .peer_by_id(peer_id)
811 .map(|(_, kind)| kind)
812 .unwrap_or_default();
813 let session_info = SessionInfo {
814 peer_id,
815 remote_addr,
816 client_version,
817 capabilities,
818 status,
819 version,
820 peer_kind,
821 };
822
823 self.event_sender
824 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
825 }
826 SwarmEvent::PeerAdded(peer_id) => {
827 trace!(target: "net", ?peer_id, "Peer added");
828 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
829 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
830 }
831 SwarmEvent::PeerRemoved(peer_id) => {
832 trace!(target: "net", ?peer_id, "Peer dropped");
833 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
834 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
835 }
836 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
837 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
838 self.metrics.connected_peers.set(total_active as f64);
839 trace!(
840 target: "net",
841 ?remote_addr,
842 ?peer_id,
843 ?total_active,
844 ?error,
845 "Session disconnected"
846 );
847
848 let reason = if let Some(ref err) = error {
849 self.swarm.state_mut().peers_mut().on_active_session_dropped(
852 &remote_addr,
853 &peer_id,
854 err,
855 );
856 err.as_disconnected()
857 } else {
858 self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
860 None
861 };
862 self.metrics.closed_sessions.increment(1);
863 self.update_active_connection_metrics();
864
865 if let Some(reason) = reason {
866 self.disconnect_metrics.increment(reason);
867 }
868 self.metrics.backed_off_peers.set(
869 self.swarm
870 .state()
871 .peers()
872 .num_backed_off_peers()
873 .saturating_sub(1)
874 as f64,
875 );
876 self.event_sender
877 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
878 }
879 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
880 trace!(
881 target: "net",
882 ?remote_addr,
883 ?error,
884 "Incoming pending session failed"
885 );
886
887 if let Some(ref err) = error {
888 self.swarm
889 .state_mut()
890 .peers_mut()
891 .on_incoming_pending_session_dropped(remote_addr, err);
892 self.metrics.pending_session_failures.increment(1);
893 if let Some(reason) = err.as_disconnected() {
894 self.disconnect_metrics.increment(reason);
895 }
896 } else {
897 self.swarm
898 .state_mut()
899 .peers_mut()
900 .on_incoming_pending_session_gracefully_closed();
901 }
902 self.metrics.closed_sessions.increment(1);
903 self.metrics
904 .incoming_connections
905 .set(self.swarm.state().peers().num_inbound_connections() as f64);
906 self.metrics.backed_off_peers.set(
907 self.swarm
908 .state()
909 .peers()
910 .num_backed_off_peers()
911 .saturating_sub(1)
912 as f64,
913 );
914 }
915 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
916 trace!(
917 target: "net",
918 ?remote_addr,
919 ?peer_id,
920 ?error,
921 "Outgoing pending session failed"
922 );
923
924 if let Some(ref err) = error {
925 self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
926 &remote_addr,
927 &peer_id,
928 err,
929 );
930 self.metrics.pending_session_failures.increment(1);
931 if let Some(reason) = err.as_disconnected() {
932 self.disconnect_metrics.increment(reason);
933 }
934 } else {
935 self.swarm
936 .state_mut()
937 .peers_mut()
938 .on_outgoing_pending_session_gracefully_closed(&peer_id);
939 }
940 self.metrics.closed_sessions.increment(1);
941 self.update_pending_connection_metrics();
942
943 self.metrics.backed_off_peers.set(
944 self.swarm
945 .state()
946 .peers()
947 .num_backed_off_peers()
948 .saturating_sub(1)
949 as f64,
950 );
951 }
952 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
953 trace!(
954 target: "net",
955 ?remote_addr,
956 ?peer_id,
957 %error,
958 "Outgoing connection error"
959 );
960
961 self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
962 &remote_addr,
963 &peer_id,
964 &error,
965 );
966
967 self.metrics.backed_off_peers.set(
968 self.swarm
969 .state()
970 .peers()
971 .num_backed_off_peers()
972 .saturating_sub(1)
973 as f64,
974 );
975 self.update_pending_connection_metrics();
976 }
977 SwarmEvent::BadMessage { peer_id } => {
978 self.swarm
979 .state_mut()
980 .peers_mut()
981 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
982 self.metrics.invalid_messages_received.increment(1);
983 }
984 SwarmEvent::ProtocolBreach { peer_id } => {
985 self.swarm
986 .state_mut()
987 .peers_mut()
988 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
989 }
990 }
991 }
992
993 fn get_peer_infos(&self) -> Vec<PeerInfo> {
995 self.swarm
996 .sessions()
997 .active_sessions()
998 .iter()
999 .filter_map(|(&peer_id, session)| {
1000 self.swarm
1001 .state()
1002 .peers()
1003 .peer_by_id(peer_id)
1004 .map(|(record, kind)| session.peer_info(&record, kind))
1005 })
1006 .collect()
1007 }
1008
1009 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1013 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1014 self.swarm
1015 .state()
1016 .peers()
1017 .peer_by_id(peer_id)
1018 .map(|(record, kind)| session.peer_info(&record, kind))
1019 })
1020 }
1021
1022 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1026 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1027 }
1028
1029 #[inline]
1031 fn update_active_connection_metrics(&self) {
1032 self.metrics
1033 .incoming_connections
1034 .set(self.swarm.state().peers().num_inbound_connections() as f64);
1035 self.metrics
1036 .outgoing_connections
1037 .set(self.swarm.state().peers().num_outbound_connections() as f64);
1038 }
1039
1040 #[inline]
1042 fn update_pending_connection_metrics(&self) {
1043 self.metrics
1044 .pending_outgoing_connections
1045 .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1046 self.metrics
1047 .total_pending_connections
1048 .set(self.swarm.sessions().num_pending_connections() as f64);
1049 }
1050
1051 pub async fn run_until_graceful_shutdown<F, R>(
1055 mut self,
1056 shutdown: GracefulShutdown,
1057 shutdown_hook: F,
1058 ) -> R
1059 where
1060 F: FnOnce(Self) -> R,
1061 {
1062 let mut graceful_guard = None;
1063 tokio::select! {
1064 _ = &mut self => {},
1065 guard = shutdown => {
1066 graceful_guard = Some(guard);
1067 },
1068 }
1069
1070 self.perform_network_shutdown();
1071 let res = shutdown_hook(self);
1072 drop(graceful_guard);
1073 res
1074 }
1075
1076 fn perform_network_shutdown(&mut self) {
1079 self.swarm.on_shutdown_requested();
1083 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1085 self.swarm.sessions_mut().disconnect_all_pending();
1087 }
1088}
1089
1090impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1091 type Output = ();
1092
1093 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1094 let start = Instant::now();
1095 let mut poll_durations = NetworkManagerPollDurations::default();
1096
1097 let this = self.get_mut();
1098
1099 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1101 this.on_block_import_result(outcome);
1102 }
1103
1104 let start_network_handle = Instant::now();
1128 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1129 "net",
1130 "Network message channel",
1131 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1132 this.from_handle_rx.poll_next_unpin(cx),
1133 |msg| this.on_handle_message(msg),
1134 error!("Network channel closed");
1135 );
1136 poll_durations.acc_network_handle = start_network_handle.elapsed();
1137
1138 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1140 "net",
1141 "Swarm events stream",
1142 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1143 this.swarm.poll_next_unpin(cx),
1144 |event| this.on_swarm_event(event),
1145 );
1146 poll_durations.acc_swarm =
1147 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1148
1149 if maybe_more_handle_messages || maybe_more_swarm_events {
1151 cx.waker().wake_by_ref();
1153 return Poll::Pending
1154 }
1155
1156 this.update_poll_metrics(start, poll_durations);
1157
1158 Poll::Pending
1159 }
1160}
1161
1162#[derive(Debug, Default)]
1163struct NetworkManagerPollDurations {
1164 acc_network_handle: Duration,
1165 acc_swarm: Duration,
1166}