1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28 network::{NetworkHandle, NetworkHandleMessage},
29 peers::PeersManager,
30 poll_nested_stream_with_budget,
31 protocol::IntoRlpxSubProtocol,
32 required_block_filter::RequiredBlockFilter,
33 session::SessionManager,
34 state::NetworkState,
35 swarm::{Swarm, SwarmEvent},
36 transactions::NetworkTransactionEvent,
37 FetchClient, NetworkBuilder,
38};
39use futures::{Future, StreamExt};
40use parking_lot::Mutex;
41use reth_chainspec::EnrForkIdEntry;
42use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
43use reth_fs_util::{self as fs, FsPathError};
44use reth_metrics::common::mpsc::UnboundedMeteredSender;
45use reth_network_api::{
46 events::{PeerEvent, SessionInfo},
47 test_utils::PeersHandle,
48 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
49};
50use reth_network_peers::{NodeRecord, PeerId};
51use reth_network_types::ReputationChangeKind;
52use reth_storage_api::BlockNumReader;
53use reth_tasks::shutdown::GracefulShutdown;
54use reth_tokio_util::EventSender;
55use secp256k1::SecretKey;
56use std::{
57 net::SocketAddr,
58 path::Path,
59 pin::Pin,
60 sync::{
61 atomic::{AtomicU64, AtomicUsize, Ordering},
62 Arc,
63 },
64 task::{Context, Poll},
65 time::{Duration, Instant},
66};
67use tokio::sync::mpsc::{self, error::TrySendError};
68use tokio_stream::wrappers::UnboundedReceiverStream;
69use tracing::{debug, error, trace, warn};
70
71#[cfg_attr(doc, aquamarine::aquamarine)]
72#[derive(Debug)]
104#[must_use = "The NetworkManager does nothing unless polled"]
105pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
106 swarm: Swarm<N>,
108 handle: NetworkHandle<N>,
110 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
112 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
114 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
116 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
119 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
133 num_active_peers: Arc<AtomicUsize>,
138 metrics: NetworkMetrics,
140 disconnect_metrics: DisconnectMetrics,
142}
143
144impl NetworkManager {
145 pub async fn eth<C: BlockNumReader + 'static>(
157 config: NetworkConfig<C, EthNetworkPrimitives>,
158 ) -> Result<Self, NetworkError> {
159 Self::new(config).await
160 }
161}
162
163impl<N: NetworkPrimitives> NetworkManager<N> {
164 pub fn with_transactions(
167 mut self,
168 tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
169 ) -> Self {
170 self.set_transactions(tx);
171 self
172 }
173
174 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
177 self.to_transactions_manager =
178 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
179 }
180
181 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
184 self.set_eth_request_handler(tx);
185 self
186 }
187
188 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
191 self.to_eth_request_handler = Some(tx);
192 }
193
194 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
196 self.swarm.add_rlpx_sub_protocol(protocol)
197 }
198
199 pub const fn handle(&self) -> &NetworkHandle<N> {
203 &self.handle
204 }
205
206 pub const fn secret_key(&self) -> SecretKey {
208 self.swarm.sessions().secret_key()
209 }
210
211 #[inline]
212 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
213 let metrics = &self.metrics;
214
215 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
216
217 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
219 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
221 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
222 }
223
224 pub async fn new<C: BlockNumReader + 'static>(
229 config: NetworkConfig<C, N>,
230 ) -> Result<Self, NetworkError> {
231 let NetworkConfig {
232 client,
233 secret_key,
234 discovery_v4_addr,
235 mut discovery_v4_config,
236 mut discovery_v5_config,
237 listener_addr,
238 peers_config,
239 sessions_config,
240 chain_id,
241 block_import,
242 network_mode,
243 boot_nodes,
244 executor,
245 hello_message,
246 status,
247 fork_filter,
248 dns_discovery_config,
249 extra_protocols,
250 tx_gossip_disabled,
251 transactions_manager_config: _,
252 nat,
253 handshake,
254 required_block_hashes,
255 } = config;
256
257 let peers_manager = PeersManager::new(peers_config);
258 let peers_handle = peers_manager.handle();
259
260 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
261 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
262 })?;
263
264 let listener_addr = incoming.local_address();
266
267 let resolved_boot_nodes =
269 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
270
271 if let Some(disc_config) = discovery_v4_config.as_mut() {
272 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
274 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
277 }
278
279 if let Some(discv5) = discovery_v5_config.as_mut() {
280 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
282 }
283
284 let discovery = Discovery::new(
285 listener_addr,
286 discovery_v4_addr,
287 secret_key,
288 discovery_v4_config,
289 discovery_v5_config,
290 dns_discovery_config,
291 )
292 .await?;
293 let local_peer_id = discovery.local_id();
295 let discv4 = discovery.discv4();
296 let discv5 = discovery.discv5();
297
298 let num_active_peers = Arc::new(AtomicUsize::new(0));
299
300 let sessions = SessionManager::new(
301 secret_key,
302 sessions_config,
303 executor,
304 status,
305 hello_message,
306 fork_filter,
307 extra_protocols,
308 handshake,
309 );
310
311 let state = NetworkState::new(
312 crate::state::BlockNumReader::new(client),
313 discovery,
314 peers_manager,
315 Arc::clone(&num_active_peers),
316 );
317
318 let swarm = Swarm::new(incoming, sessions, state);
319
320 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
321
322 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
323
324 let handle = NetworkHandle::new(
325 Arc::clone(&num_active_peers),
326 Arc::new(Mutex::new(listener_addr)),
327 to_manager_tx,
328 secret_key,
329 local_peer_id,
330 peers_handle,
331 network_mode,
332 Arc::new(AtomicU64::new(chain_id)),
333 tx_gossip_disabled,
334 discv4,
335 discv5,
336 event_sender.clone(),
337 nat,
338 );
339
340 if !required_block_hashes.is_empty() {
342 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
343 filter.spawn();
344 }
345
346 Ok(Self {
347 swarm,
348 handle,
349 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
350 block_import,
351 event_sender,
352 to_transactions_manager: None,
353 to_eth_request_handler: None,
354 num_active_peers,
355 metrics: Default::default(),
356 disconnect_metrics: Default::default(),
357 })
358 }
359
360 pub async fn builder<C: BlockNumReader + 'static>(
392 config: NetworkConfig<C, N>,
393 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
394 let network = Self::new(config).await?;
395 Ok(network.into_builder())
396 }
397
398 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
400 NetworkBuilder { network: self, transactions: (), request_handler: () }
401 }
402
403 pub const fn local_addr(&self) -> SocketAddr {
405 self.swarm.listener().local_address()
406 }
407
408 pub fn num_connected_peers(&self) -> usize {
410 self.swarm.state().num_active_peers()
411 }
412
413 pub fn peer_id(&self) -> &PeerId {
415 self.handle.peer_id()
416 }
417
418 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
420 self.swarm.state().peers().iter_peers()
421 }
422
423 pub fn num_known_peers(&self) -> usize {
425 self.swarm.state().peers().num_known_peers()
426 }
427
428 pub fn peers_handle(&self) -> PeersHandle {
432 self.swarm.state().peers().handle()
433 }
434
435 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
438 let known_peers = self.all_peers().collect::<Vec<_>>();
439 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
440 reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
441 Ok(())
442 }
443
444 pub fn fetch_client(&self) -> FetchClient<N> {
448 self.swarm.state().fetch_client()
449 }
450
451 pub fn status(&self) -> NetworkStatus {
453 let sessions = self.swarm.sessions();
454 let status = sessions.status();
455 let hello_message = sessions.hello_message();
456
457 #[expect(deprecated)]
458 NetworkStatus {
459 client_version: hello_message.client_version,
460 protocol_version: hello_message.protocol_version as u64,
461 eth_protocol_info: EthProtocolInfo {
462 difficulty: None,
463 head: status.blockhash,
464 network: status.chain.id(),
465 genesis: status.genesis,
466 config: Default::default(),
467 },
468 capabilities: hello_message
469 .protocols
470 .into_iter()
471 .map(|protocol| protocol.cap)
472 .collect(),
473 }
474 }
475
476 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
479 if let Some(ref tx) = self.to_transactions_manager {
480 let _ = tx.send(event);
481 }
482 }
483
484 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
487 if let Some(ref reqs) = self.to_eth_request_handler {
488 let _ = reqs.try_send(event).map_err(|e| {
489 if let TrySendError::Full(_) = e {
490 debug!(target:"net", "EthRequestHandler channel is full!");
491 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
492 }
493 });
494 }
495 }
496
497 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
499 match req {
500 PeerRequest::GetBlockHeaders { request, response } => {
501 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
502 peer_id,
503 request,
504 response,
505 })
506 }
507 PeerRequest::GetBlockBodies { request, response } => {
508 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
509 peer_id,
510 request,
511 response,
512 })
513 }
514 PeerRequest::GetNodeData { request, response } => {
515 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
516 peer_id,
517 request,
518 response,
519 })
520 }
521 PeerRequest::GetReceipts { request, response } => {
522 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
523 peer_id,
524 request,
525 response,
526 })
527 }
528 PeerRequest::GetReceipts69 { request, response } => {
529 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
530 peer_id,
531 request,
532 response,
533 })
534 }
535 PeerRequest::GetReceipts70 { request, response } => {
536 self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
537 peer_id,
538 request,
539 response,
540 })
541 }
542 PeerRequest::GetPooledTransactions { request, response } => {
543 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
544 peer_id,
545 request,
546 response,
547 });
548 }
549 }
550 }
551
552 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
554 match event {
555 BlockImportEvent::Announcement(validation) => match validation {
556 BlockValidation::ValidHeader { block } => {
557 self.swarm.state_mut().announce_new_block(block);
558 }
559 BlockValidation::ValidBlock { block } => {
560 self.swarm.state_mut().announce_new_block_hash(block);
561 }
562 },
563 BlockImportEvent::Outcome(outcome) => {
564 let BlockImportOutcome { peer, result } = outcome;
565 match result {
566 Ok(validated_block) => match validated_block {
567 BlockValidation::ValidHeader { block } => {
568 self.swarm.state_mut().update_peer_block(
569 &peer,
570 block.hash,
571 block.number(),
572 );
573 self.swarm.state_mut().announce_new_block(block);
574 }
575 BlockValidation::ValidBlock { block } => {
576 self.swarm.state_mut().announce_new_block_hash(block);
577 }
578 },
579 Err(_err) => {
580 self.swarm
581 .state_mut()
582 .peers_mut()
583 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
584 }
585 }
586 }
587 }
588 }
589
590 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
596 where
597 F: FnOnce(&mut Self),
598 {
599 if self.handle.mode().is_stake() {
601 self.swarm
603 .sessions_mut()
604 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
605 } else {
606 only_pow(self);
607 }
608 }
609
610 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
612 match msg {
613 PeerMessage::NewBlockHashes(hashes) => {
614 self.within_pow_or_disconnect(peer_id, |this| {
615 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
617 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
619 })
620 }
621 PeerMessage::NewBlock(block) => {
622 self.within_pow_or_disconnect(peer_id, move |this| {
623 this.swarm.state_mut().on_new_block(peer_id, block.hash);
624 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
626 });
627 }
628 PeerMessage::PooledTransactions(msg) => {
629 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
630 peer_id,
631 msg,
632 });
633 }
634 PeerMessage::EthRequest(req) => {
635 self.on_eth_request(peer_id, req);
636 }
637 PeerMessage::ReceivedTransaction(msg) => {
638 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
639 peer_id,
640 msg,
641 });
642 }
643 PeerMessage::SendTransactions(_) => {
644 unreachable!("Not emitted by session")
645 }
646 PeerMessage::BlockRangeUpdated(_) => {}
647 PeerMessage::Other(other) => {
648 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
649 }
650 }
651 }
652
653 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
655 match msg {
656 NetworkHandleMessage::DiscoveryListener(tx) => {
657 self.swarm.state_mut().discovery_mut().add_listener(tx);
658 }
659 NetworkHandleMessage::AnnounceBlock(block, hash) => {
660 if self.handle.mode().is_stake() {
661 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
663 return
664 }
665 let msg = NewBlockMessage { hash, block: Arc::new(block) };
666 self.swarm.state_mut().announce_new_block(msg);
667 }
668 NetworkHandleMessage::EthRequest { peer_id, request } => {
669 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
670 }
671 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
672 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
673 }
674 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
675 .swarm
676 .sessions_mut()
677 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
678 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
679 self.swarm.state_mut().add_trusted_peer_id(peer_id);
680 }
681 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
682 if !self.swarm.is_shutting_down() {
684 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
685 }
686 }
687 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
688 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
689 }
690 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
691 self.swarm.sessions_mut().disconnect(peer_id, reason);
692 }
693 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
694 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
695 }
696 NetworkHandleMessage::SetNetworkState(net_state) => {
697 self.swarm.on_network_state_change(net_state);
702 }
703
704 NetworkHandleMessage::Shutdown(tx) => {
705 self.perform_network_shutdown();
706 let _ = tx.send(());
707 }
708 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
709 self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
710 }
711 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
712 let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
713 }
714 NetworkHandleMessage::FetchClient(tx) => {
715 let _ = tx.send(self.fetch_client());
716 }
717 NetworkHandleMessage::GetStatus(tx) => {
718 let _ = tx.send(self.status());
719 }
720 NetworkHandleMessage::StatusUpdate { head } => {
721 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
722 self.swarm.state_mut().update_fork_id(transition.current);
723 }
724 }
725 NetworkHandleMessage::GetPeerInfos(tx) => {
726 let _ = tx.send(self.get_peer_infos());
727 }
728 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
729 let _ = tx.send(self.get_peer_info_by_id(peer_id));
730 }
731 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
732 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
733 }
734 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
735 let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
736 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
737 }
738 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
739 NetworkHandleMessage::GetTransactionsHandle(tx) => {
740 if let Some(ref tx_inner) = self.to_transactions_manager {
741 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
742 } else {
743 let _ = tx.send(None);
744 }
745 }
746 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
747 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
748 }
749 NetworkHandleMessage::EthMessage { peer_id, message } => {
750 self.swarm.sessions_mut().send_message(&peer_id, message)
751 }
752 }
753 }
754
755 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
756 match event {
758 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
759 SwarmEvent::TcpListenerClosed { remote_addr } => {
760 trace!(target: "net", ?remote_addr, "TCP listener closed.");
761 }
762 SwarmEvent::TcpListenerError(err) => {
763 trace!(target: "net", %err, "TCP connection error.");
764 }
765 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
766 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
767 self.metrics.total_incoming_connections.increment(1);
768 self.metrics
769 .incoming_connections
770 .set(self.swarm.state().peers().num_inbound_connections() as f64);
771 }
772 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
773 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
774 self.metrics.total_outgoing_connections.increment(1);
775 self.update_pending_connection_metrics()
776 }
777 SwarmEvent::SessionEstablished {
778 peer_id,
779 remote_addr,
780 client_version,
781 capabilities,
782 version,
783 messages,
784 status,
785 direction,
786 } => {
787 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
788 self.metrics.connected_peers.set(total_active as f64);
789 debug!(
790 target: "net",
791 ?remote_addr,
792 %client_version,
793 ?peer_id,
794 ?total_active,
795 kind=%direction,
796 peer_enode=%NodeRecord::new(remote_addr, peer_id),
797 "Session established"
798 );
799
800 if direction.is_incoming() {
801 self.swarm
802 .state_mut()
803 .peers_mut()
804 .on_incoming_session_established(peer_id, remote_addr);
805 }
806
807 if direction.is_outgoing() {
808 self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
809 }
810
811 self.update_active_connection_metrics();
812
813 let peer_kind = self
814 .swarm
815 .state()
816 .peers()
817 .peer_by_id(peer_id)
818 .map(|(_, kind)| kind)
819 .unwrap_or_default();
820 let session_info = SessionInfo {
821 peer_id,
822 remote_addr,
823 client_version,
824 capabilities,
825 status,
826 version,
827 peer_kind,
828 };
829
830 self.event_sender
831 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
832 }
833 SwarmEvent::PeerAdded(peer_id) => {
834 trace!(target: "net", ?peer_id, "Peer added");
835 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
836 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
837 }
838 SwarmEvent::PeerRemoved(peer_id) => {
839 trace!(target: "net", ?peer_id, "Peer dropped");
840 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
841 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
842 }
843 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
844 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
845 self.metrics.connected_peers.set(total_active as f64);
846 trace!(
847 target: "net",
848 ?remote_addr,
849 ?peer_id,
850 ?total_active,
851 ?error,
852 "Session disconnected"
853 );
854
855 let reason = if let Some(ref err) = error {
856 self.swarm.state_mut().peers_mut().on_active_session_dropped(
859 &remote_addr,
860 &peer_id,
861 err,
862 );
863 err.as_disconnected()
864 } else {
865 self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
867 None
868 };
869 self.metrics.closed_sessions.increment(1);
870 self.update_active_connection_metrics();
871
872 if let Some(reason) = reason {
873 self.disconnect_metrics.increment(reason);
874 }
875 self.metrics.backed_off_peers.set(
876 self.swarm
877 .state()
878 .peers()
879 .num_backed_off_peers()
880 .saturating_sub(1)
881 as f64,
882 );
883 self.event_sender
884 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
885 }
886 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
887 trace!(
888 target: "net",
889 ?remote_addr,
890 ?error,
891 "Incoming pending session failed"
892 );
893
894 if let Some(ref err) = error {
895 self.swarm
896 .state_mut()
897 .peers_mut()
898 .on_incoming_pending_session_dropped(remote_addr, err);
899 self.metrics.pending_session_failures.increment(1);
900 if let Some(reason) = err.as_disconnected() {
901 self.disconnect_metrics.increment(reason);
902 }
903 } else {
904 self.swarm
905 .state_mut()
906 .peers_mut()
907 .on_incoming_pending_session_gracefully_closed();
908 }
909 self.metrics.closed_sessions.increment(1);
910 self.metrics
911 .incoming_connections
912 .set(self.swarm.state().peers().num_inbound_connections() as f64);
913 self.metrics.backed_off_peers.set(
914 self.swarm
915 .state()
916 .peers()
917 .num_backed_off_peers()
918 .saturating_sub(1)
919 as f64,
920 );
921 }
922 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
923 trace!(
924 target: "net",
925 ?remote_addr,
926 ?peer_id,
927 ?error,
928 "Outgoing pending session failed"
929 );
930
931 if let Some(ref err) = error {
932 self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
933 &remote_addr,
934 &peer_id,
935 err,
936 );
937 self.metrics.pending_session_failures.increment(1);
938 if let Some(reason) = err.as_disconnected() {
939 self.disconnect_metrics.increment(reason);
940 }
941 } else {
942 self.swarm
943 .state_mut()
944 .peers_mut()
945 .on_outgoing_pending_session_gracefully_closed(&peer_id);
946 }
947 self.metrics.closed_sessions.increment(1);
948 self.update_pending_connection_metrics();
949
950 self.metrics.backed_off_peers.set(
951 self.swarm
952 .state()
953 .peers()
954 .num_backed_off_peers()
955 .saturating_sub(1)
956 as f64,
957 );
958 }
959 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
960 trace!(
961 target: "net",
962 ?remote_addr,
963 ?peer_id,
964 %error,
965 "Outgoing connection error"
966 );
967
968 self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
969 &remote_addr,
970 &peer_id,
971 &error,
972 );
973
974 self.metrics.backed_off_peers.set(
975 self.swarm
976 .state()
977 .peers()
978 .num_backed_off_peers()
979 .saturating_sub(1)
980 as f64,
981 );
982 self.update_pending_connection_metrics();
983 }
984 SwarmEvent::BadMessage { peer_id } => {
985 self.swarm
986 .state_mut()
987 .peers_mut()
988 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
989 self.metrics.invalid_messages_received.increment(1);
990 }
991 SwarmEvent::ProtocolBreach { peer_id } => {
992 self.swarm
993 .state_mut()
994 .peers_mut()
995 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
996 }
997 }
998 }
999
1000 fn get_peer_infos(&self) -> Vec<PeerInfo> {
1002 self.swarm
1003 .sessions()
1004 .active_sessions()
1005 .iter()
1006 .filter_map(|(&peer_id, session)| {
1007 self.swarm
1008 .state()
1009 .peers()
1010 .peer_by_id(peer_id)
1011 .map(|(record, kind)| session.peer_info(&record, kind))
1012 })
1013 .collect()
1014 }
1015
1016 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1020 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1021 self.swarm
1022 .state()
1023 .peers()
1024 .peer_by_id(peer_id)
1025 .map(|(record, kind)| session.peer_info(&record, kind))
1026 })
1027 }
1028
1029 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1033 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1034 }
1035
1036 #[inline]
1038 fn update_active_connection_metrics(&self) {
1039 self.metrics
1040 .incoming_connections
1041 .set(self.swarm.state().peers().num_inbound_connections() as f64);
1042 self.metrics
1043 .outgoing_connections
1044 .set(self.swarm.state().peers().num_outbound_connections() as f64);
1045 }
1046
1047 #[inline]
1049 fn update_pending_connection_metrics(&self) {
1050 self.metrics
1051 .pending_outgoing_connections
1052 .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1053 self.metrics
1054 .total_pending_connections
1055 .set(self.swarm.sessions().num_pending_connections() as f64);
1056 }
1057
1058 pub async fn run_until_graceful_shutdown<F, R>(
1062 mut self,
1063 shutdown: GracefulShutdown,
1064 shutdown_hook: F,
1065 ) -> R
1066 where
1067 F: FnOnce(Self) -> R,
1068 {
1069 let mut graceful_guard = None;
1070 tokio::select! {
1071 _ = &mut self => {},
1072 guard = shutdown => {
1073 graceful_guard = Some(guard);
1074 },
1075 }
1076
1077 self.perform_network_shutdown();
1078 let res = shutdown_hook(self);
1079 drop(graceful_guard);
1080 res
1081 }
1082
1083 fn perform_network_shutdown(&mut self) {
1086 self.swarm.on_shutdown_requested();
1090 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1092 self.swarm.sessions_mut().disconnect_all_pending();
1094 }
1095}
1096
1097impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1098 type Output = ();
1099
1100 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1101 let start = Instant::now();
1102 let mut poll_durations = NetworkManagerPollDurations::default();
1103
1104 let this = self.get_mut();
1105
1106 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1108 this.on_block_import_result(outcome);
1109 }
1110
1111 let start_network_handle = Instant::now();
1135 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1136 "net",
1137 "Network message channel",
1138 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1139 this.from_handle_rx.poll_next_unpin(cx),
1140 |msg| this.on_handle_message(msg),
1141 error!("Network channel closed");
1142 );
1143 poll_durations.acc_network_handle = start_network_handle.elapsed();
1144
1145 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1147 "net",
1148 "Swarm events stream",
1149 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1150 this.swarm.poll_next_unpin(cx),
1151 |event| this.on_swarm_event(event),
1152 );
1153 poll_durations.acc_swarm =
1154 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1155
1156 if maybe_more_handle_messages || maybe_more_swarm_events {
1158 cx.waker().wake_by_ref();
1160 return Poll::Pending
1161 }
1162
1163 this.update_poll_metrics(start, poll_durations);
1164
1165 Poll::Pending
1166 }
1167}
1168
1169#[derive(Debug, Default)]
1170struct NetworkManagerPollDurations {
1171 acc_network_handle: Duration,
1172 acc_swarm: Duration,
1173}