1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{
28 BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
29 PendingSessionFailureMetrics,
30 },
31 network::{NetworkHandle, NetworkHandleMessage},
32 peers::{BackoffReason, PeersManager},
33 poll_nested_stream_with_budget,
34 protocol::IntoRlpxSubProtocol,
35 required_block_filter::RequiredBlockFilter,
36 session::SessionManager,
37 state::NetworkState,
38 swarm::{Swarm, SwarmEvent},
39 transactions::NetworkTransactionEvent,
40 FetchClient, NetworkBuilder,
41};
42use futures::{Future, StreamExt};
43use parking_lot::Mutex;
44use reth_chainspec::EnrForkIdEntry;
45use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
46use reth_fs_util::{self as fs, FsPathError};
47use reth_metrics::common::mpsc::MemoryBoundedSender;
48use reth_network_api::{
49 events::{PeerEvent, SessionInfo},
50 test_utils::PeersHandle,
51 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
52};
53use reth_network_peers::{NodeRecord, PeerId};
54use reth_network_types::ReputationChangeKind;
55use reth_storage_api::BlockNumReader;
56use reth_tasks::shutdown::GracefulShutdown;
57use reth_tokio_util::EventSender;
58use secp256k1::SecretKey;
59use std::{
60 net::SocketAddr,
61 path::Path,
62 pin::Pin,
63 sync::{
64 atomic::{AtomicU64, AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::mpsc::{self, error::TrySendError};
71use tokio_stream::wrappers::UnboundedReceiverStream;
72use tracing::{debug, error, trace, warn};
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
75#[derive(Debug)]
107#[must_use = "The NetworkManager does nothing unless polled"]
108pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
109 swarm: Swarm<N>,
111 handle: NetworkHandle<N>,
113 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
115 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
117 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
119 to_transactions_manager: Option<MemoryBoundedSender<NetworkTransactionEvent<N>>>,
122 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
136 num_active_peers: Arc<AtomicUsize>,
141 metrics: NetworkMetrics,
143 disconnect_metrics: DirectionalDisconnectMetrics,
145 closed_sessions_metrics: ClosedSessionsMetrics,
147 pending_session_failure_metrics: PendingSessionFailureMetrics,
149 backed_off_peers_metrics: BackedOffPeersMetrics,
151}
152
153impl NetworkManager {
154 pub async fn eth<C: BlockNumReader + 'static>(
167 config: NetworkConfig<C, EthNetworkPrimitives>,
168 ) -> Result<Self, NetworkError> {
169 Self::new(config).await
170 }
171}
172
173impl<N: NetworkPrimitives> NetworkManager<N> {
174 pub fn with_transactions(
177 mut self,
178 tx: MemoryBoundedSender<NetworkTransactionEvent<N>>,
179 ) -> Self {
180 self.set_transactions(tx);
181 self
182 }
183
184 pub fn set_transactions(&mut self, tx: MemoryBoundedSender<NetworkTransactionEvent<N>>) {
187 self.to_transactions_manager = Some(tx);
188 }
189
190 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
193 self.set_eth_request_handler(tx);
194 self
195 }
196
197 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
200 self.to_eth_request_handler = Some(tx);
201 }
202
203 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
205 self.swarm.add_rlpx_sub_protocol(protocol)
206 }
207
208 pub const fn handle(&self) -> &NetworkHandle<N> {
212 &self.handle
213 }
214
215 pub const fn secret_key(&self) -> SecretKey {
217 self.swarm.sessions().secret_key()
218 }
219
220 #[inline]
221 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
222 let metrics = &self.metrics;
223
224 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
225
226 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
228 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
230 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
231 }
232
233 pub async fn new<C: BlockNumReader + 'static>(
238 config: NetworkConfig<C, N>,
239 ) -> Result<Self, NetworkError> {
240 let NetworkConfig {
241 client,
242 secret_key,
243 discovery_v4_addr,
244 mut discovery_v4_config,
245 mut discovery_v5_config,
246 listener_addr,
247 peers_config,
248 sessions_config,
249 chain_id,
250 block_import,
251 network_mode,
252 boot_nodes,
253 executor,
254 hello_message,
255 status,
256 fork_filter,
257 dns_discovery_config,
258 extra_protocols,
259 tx_gossip_disabled,
260 transactions_manager_config: _,
261 nat,
262 handshake,
263 eth_max_message_size,
264 required_block_hashes,
265 } = config;
266
267 let peers_manager = PeersManager::new(peers_config);
268 let peers_handle = peers_manager.handle();
269
270 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
271 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
272 })?;
273
274 let listener_addr = incoming.local_address();
276
277 let resolved_boot_nodes =
279 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
280
281 if let Some(disc_config) = discovery_v4_config.as_mut() {
282 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
284 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
287 }
288
289 if let Some(discv5) = discovery_v5_config.as_mut() {
290 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
292 }
293
294 let discovery = Discovery::new(
295 listener_addr,
296 discovery_v4_addr,
297 secret_key,
298 discovery_v4_config,
299 discovery_v5_config,
300 dns_discovery_config,
301 )
302 .await?;
303 let local_peer_id = discovery.local_id();
305 let discv4 = discovery.discv4();
306 let discv5 = discovery.discv5();
307
308 let num_active_peers = Arc::new(AtomicUsize::new(0));
309
310 let sessions = SessionManager::new(
311 secret_key,
312 sessions_config,
313 executor,
314 status,
315 hello_message,
316 fork_filter,
317 extra_protocols,
318 handshake,
319 eth_max_message_size,
320 network_mode.is_stake(),
321 );
322
323 let state = NetworkState::new(
324 crate::state::BlockNumReader::new(client),
325 discovery,
326 peers_manager,
327 Arc::clone(&num_active_peers),
328 );
329
330 let swarm = Swarm::new(incoming, sessions, state);
331
332 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
333
334 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
335
336 let handle = NetworkHandle::new(
337 Arc::clone(&num_active_peers),
338 Arc::new(Mutex::new(listener_addr)),
339 to_manager_tx,
340 secret_key,
341 local_peer_id,
342 peers_handle,
343 network_mode,
344 Arc::new(AtomicU64::new(chain_id)),
345 tx_gossip_disabled,
346 discv4,
347 discv5,
348 event_sender.clone(),
349 nat,
350 );
351
352 if !required_block_hashes.is_empty() {
354 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
355 filter.spawn();
356 }
357
358 Ok(Self {
359 swarm,
360 handle,
361 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
362 block_import,
363 event_sender,
364 to_transactions_manager: None,
365 to_eth_request_handler: None,
366 num_active_peers,
367 metrics: Default::default(),
368 disconnect_metrics: Default::default(),
369 closed_sessions_metrics: Default::default(),
370 pending_session_failure_metrics: Default::default(),
371 backed_off_peers_metrics: Default::default(),
372 })
373 }
374
375 pub async fn builder<C: BlockNumReader + 'static>(
408 config: NetworkConfig<C, N>,
409 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
410 let network = Self::new(config).await?;
411 Ok(network.into_builder())
412 }
413
414 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
416 NetworkBuilder { network: self, transactions: (), request_handler: () }
417 }
418
419 pub const fn local_addr(&self) -> SocketAddr {
421 self.swarm.listener().local_address()
422 }
423
424 pub fn num_connected_peers(&self) -> usize {
426 self.swarm.state().num_active_peers()
427 }
428
429 pub fn peer_id(&self) -> &PeerId {
431 self.handle.peer_id()
432 }
433
434 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
436 self.swarm.peers().iter_peers()
437 }
438
439 pub fn num_known_peers(&self) -> usize {
441 self.swarm.peers().num_known_peers()
442 }
443
444 pub fn peers_handle(&self) -> PeersHandle {
448 self.swarm.peers().handle()
449 }
450
451 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
457 let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
458 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
459 reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
460 Ok(())
461 }
462
463 pub fn fetch_client(&self) -> FetchClient<N> {
467 self.swarm.state().fetch_client()
468 }
469
470 pub fn status(&self) -> NetworkStatus {
472 let sessions = self.swarm.sessions();
473 let status = sessions.status();
474 let hello_message = sessions.hello_message();
475
476 #[expect(deprecated)]
477 NetworkStatus {
478 client_version: hello_message.client_version,
479 protocol_version: hello_message.protocol_version as u64,
480 eth_protocol_info: EthProtocolInfo {
481 difficulty: None,
482 head: status.blockhash,
483 network: status.chain.id(),
484 genesis: status.genesis,
485 config: Default::default(),
486 },
487 capabilities: hello_message
488 .protocols
489 .into_iter()
490 .map(|protocol| protocol.cap)
491 .collect(),
492 }
493 }
494
495 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
498 if let Some(ref tx) = self.to_transactions_manager &&
499 let Err(e) = tx.try_send(event)
500 {
501 match e {
502 TrySendError::Full(_) => {
503 trace!(target: "net", "Transaction events channel at capacity, dropping event");
504 self.metrics.total_dropped_tx_events_at_full_capacity.increment(1);
505 }
506 TrySendError::Closed(_) => {}
507 }
508 }
509 }
510
511 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
514 if let Some(ref reqs) = self.to_eth_request_handler {
515 let _ = reqs.try_send(event).map_err(|e| {
516 if let TrySendError::Full(_) = e {
517 debug!(target:"net", "EthRequestHandler channel is full!");
518 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
519 }
520 });
521 }
522 }
523
524 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
526 match req {
527 PeerRequest::GetBlockHeaders { request, response } => {
528 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
529 peer_id,
530 request,
531 response,
532 })
533 }
534 PeerRequest::GetBlockBodies { request, response } => {
535 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
536 peer_id,
537 request,
538 response,
539 })
540 }
541 PeerRequest::GetNodeData { request, response } => {
542 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
543 peer_id,
544 request,
545 response,
546 })
547 }
548 PeerRequest::GetReceipts { request, response } => {
549 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
550 peer_id,
551 request,
552 response,
553 })
554 }
555 PeerRequest::GetReceipts69 { request, response } => {
556 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
557 peer_id,
558 request,
559 response,
560 })
561 }
562 PeerRequest::GetReceipts70 { request, response } => {
563 self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
564 peer_id,
565 request,
566 response,
567 })
568 }
569 PeerRequest::GetBlockAccessLists { request, response } => {
570 self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
571 peer_id,
572 request,
573 response,
574 })
575 }
576 PeerRequest::GetCells { request, response } => self
577 .delegate_eth_request(IncomingEthRequest::GetCells { peer_id, request, response }),
578 PeerRequest::GetPooledTransactions { request, response } => {
579 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
580 peer_id,
581 request,
582 response,
583 });
584 }
585 }
586 }
587
588 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
590 match event {
591 BlockImportEvent::Announcement(validation) => match validation {
592 BlockValidation::ValidHeader { block } => {
593 self.swarm.state_mut().announce_new_block(block);
594 }
595 BlockValidation::ValidBlock { block } => {
596 self.swarm.state_mut().announce_new_block_hash(block);
597 }
598 },
599 BlockImportEvent::Outcome(outcome) => {
600 let BlockImportOutcome { peer, result } = outcome;
601 match result {
602 Ok(validated_block) => match validated_block {
603 BlockValidation::ValidHeader { block } => {
604 self.swarm.state_mut().update_peer_block(
605 &peer,
606 block.hash,
607 block.number(),
608 );
609 self.swarm.state_mut().announce_new_block(block);
610 }
611 BlockValidation::ValidBlock { block } => {
612 self.swarm.state_mut().announce_new_block_hash(block);
613 }
614 },
615 Err(_err) => {
616 self.swarm
617 .state_mut()
618 .peers_mut()
619 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
620 }
621 }
622 }
623 }
624 }
625
626 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
632 where
633 F: FnOnce(&mut Self),
634 {
635 if self.handle.mode().is_stake() {
637 self.swarm
639 .sessions_mut()
640 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
641 } else {
642 only_pow(self);
643 }
644 }
645
646 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
648 match msg {
649 PeerMessage::NewBlockHashes(hashes) => {
650 self.within_pow_or_disconnect(peer_id, |this| {
651 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.to_vec());
653 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
655 })
656 }
657 PeerMessage::NewBlock(block) => {
658 self.within_pow_or_disconnect(peer_id, move |this| {
659 this.swarm.state_mut().on_new_block(peer_id, block.hash);
660 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
662 });
663 }
664 PeerMessage::PooledTransactions(msg) => {
665 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
666 peer_id,
667 msg,
668 });
669 }
670 PeerMessage::EthRequest(req) => {
671 self.on_eth_request(peer_id, req);
672 }
673 PeerMessage::ReceivedTransaction(msg) => {
674 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
675 peer_id,
676 msg,
677 });
678 }
679 PeerMessage::SendTransactions(_) => {
680 unreachable!("Not emitted by session")
681 }
682 PeerMessage::BlockRangeUpdated(_) => {}
683 PeerMessage::Other(other) => {
684 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
685 }
686 }
687 }
688
689 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
691 match msg {
692 NetworkHandleMessage::DiscoveryListener(tx) => {
693 self.swarm.state_mut().discovery_mut().add_listener(tx);
694 }
695 NetworkHandleMessage::AnnounceBlock(block, hash) => {
696 if self.handle.mode().is_stake() {
697 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
699 return
700 }
701 let msg = NewBlockMessage { hash, block: Arc::new(block) };
702 self.swarm.state_mut().announce_new_block(msg);
703 }
704 NetworkHandleMessage::EthRequest { peer_id, request } => {
705 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
706 }
707 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
708 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
709 }
710 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
711 .swarm
712 .sessions_mut()
713 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
714 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
715 self.swarm.state_mut().add_trusted_peer_id(peer_id);
716 }
717 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
718 if !self.swarm.is_shutting_down() {
720 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
721 }
722 }
723 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
724 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
725 }
726 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
727 self.swarm.sessions_mut().disconnect(peer_id, reason);
728 }
729 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
730 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
731 }
732 NetworkHandleMessage::SetNetworkState(net_state) => {
733 self.swarm.on_network_state_change(net_state);
738 }
739
740 NetworkHandleMessage::Shutdown(tx) => {
741 self.perform_network_shutdown();
742 let _ = tx.send(());
743 }
744 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
745 self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
746 }
747 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
748 let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
749 }
750 NetworkHandleMessage::FetchClient(tx) => {
751 let _ = tx.send(self.fetch_client());
752 }
753 NetworkHandleMessage::GetStatus(tx) => {
754 let _ = tx.send(self.status());
755 }
756 NetworkHandleMessage::StatusUpdate { head } => {
757 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
758 self.swarm.state_mut().update_fork_id(transition.current);
759 }
760 }
761 NetworkHandleMessage::GetPeerInfos(tx) => {
762 let _ = tx.send(self.get_peer_infos());
763 }
764 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
765 let _ = tx.send(self.get_peer_info_by_id(peer_id));
766 }
767 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
768 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
769 }
770 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
771 let peer_ids = self.swarm.peers().peers_by_kind(kind);
772 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
773 }
774 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
775 NetworkHandleMessage::GetTransactionsHandle(tx) => {
776 if let Some(ref tx_inner) = self.to_transactions_manager {
777 let _ = tx_inner.try_send(NetworkTransactionEvent::GetTransactionsHandle(tx));
778 } else {
779 let _ = tx.send(None);
780 }
781 }
782 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
783 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
784 }
785 NetworkHandleMessage::EthMessage { peer_id, message } => {
786 self.swarm.sessions_mut().send_message(&peer_id, message)
787 }
788 }
789 }
790
791 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
792 match event {
794 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
795 SwarmEvent::TcpListenerClosed { remote_addr } => {
796 trace!(target: "net", ?remote_addr, "TCP listener closed.");
797 }
798 SwarmEvent::TcpListenerError(err) => {
799 trace!(target: "net", %err, "TCP connection error.");
800 }
801 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
802 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
803 self.metrics.total_incoming_connections.increment(1);
804 self.metrics
805 .incoming_connections
806 .set(self.swarm.peers().num_inbound_connections() as f64);
807 }
808 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
809 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
810 self.metrics.total_outgoing_connections.increment(1);
811 self.update_pending_connection_metrics()
812 }
813 SwarmEvent::SessionEstablished {
814 peer_id,
815 remote_addr,
816 client_version,
817 capabilities,
818 version,
819 messages,
820 status,
821 direction,
822 } => {
823 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
824 self.metrics.connected_peers.set(total_active as f64);
825 debug!(
826 target: "net",
827 ?remote_addr,
828 %client_version,
829 ?peer_id,
830 ?total_active,
831 kind=%direction,
832 peer_enode=%NodeRecord::new(remote_addr, peer_id),
833 "Session established"
834 );
835
836 if direction.is_incoming() {
837 self.swarm
838 .state_mut()
839 .peers_mut()
840 .on_incoming_session_established(peer_id, remote_addr);
841 }
842
843 if direction.is_outgoing() {
844 self.swarm.peers_mut().on_active_outgoing_established(peer_id);
845 }
846
847 self.update_active_connection_metrics();
848
849 let peer_kind = self
850 .swarm
851 .state()
852 .peers()
853 .peer_by_id(peer_id)
854 .map(|(_, kind)| kind)
855 .unwrap_or_default();
856 let session_info = SessionInfo {
857 peer_id,
858 remote_addr,
859 client_version,
860 capabilities,
861 status,
862 version,
863 peer_kind,
864 };
865
866 self.event_sender
867 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
868 }
869 SwarmEvent::PeerAdded(peer_id) => {
870 trace!(target: "net", ?peer_id, "Peer added");
871 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
872 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
873 }
874 SwarmEvent::PeerRemoved(peer_id) => {
875 trace!(target: "net", ?peer_id, "Peer dropped");
876 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
877 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
878 }
879 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
880 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
881 self.metrics.connected_peers.set(total_active as f64);
882 trace!(
883 target: "net",
884 ?remote_addr,
885 ?peer_id,
886 ?total_active,
887 ?error,
888 "Session disconnected"
889 );
890
891 let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
893
894 let reason = if let Some(ref err) = error {
895 self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
898 self.backed_off_peers_metrics.increment_for_reason(
899 BackoffReason::from_disconnect(err.as_disconnected()),
900 );
901 err.as_disconnected()
902 } else {
903 self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
905 self.backed_off_peers_metrics
906 .increment_for_reason(BackoffReason::GracefulClose);
907 None
908 };
909 self.closed_sessions_metrics.active.increment(1);
910 self.update_active_connection_metrics();
911
912 if let Some(reason) = reason {
913 if is_inbound {
914 self.disconnect_metrics.increment_inbound(reason);
915 } else {
916 self.disconnect_metrics.increment_outbound(reason);
917 }
918 }
919 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
920 self.event_sender
921 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
922 }
923 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
924 trace!(
925 target: "net",
926 ?remote_addr,
927 ?error,
928 "Incoming pending session failed"
929 );
930
931 if let Some(ref err) = error {
932 self.swarm
933 .state_mut()
934 .peers_mut()
935 .on_incoming_pending_session_dropped(remote_addr, err);
936 self.pending_session_failure_metrics.inbound.increment(1);
937 if let Some(reason) = err.as_disconnected() {
938 self.disconnect_metrics.increment_inbound(reason);
939 }
940 } else {
941 self.swarm
942 .state_mut()
943 .peers_mut()
944 .on_incoming_pending_session_gracefully_closed();
945 }
946 self.closed_sessions_metrics.incoming_pending.increment(1);
947 self.metrics
948 .incoming_connections
949 .set(self.swarm.peers().num_inbound_connections() as f64);
950 }
951 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
952 trace!(
953 target: "net",
954 ?remote_addr,
955 ?peer_id,
956 ?error,
957 "Outgoing pending session failed"
958 );
959
960 if let Some(ref err) = error {
961 self.swarm.peers_mut().on_outgoing_pending_session_dropped(
962 &remote_addr,
963 &peer_id,
964 err,
965 );
966 self.pending_session_failure_metrics.outbound.increment(1);
967 self.backed_off_peers_metrics.increment_for_reason(
968 BackoffReason::from_disconnect(err.as_disconnected()),
969 );
970 if let Some(reason) = err.as_disconnected() {
971 self.disconnect_metrics.increment_outbound(reason);
972 }
973 } else {
974 self.swarm
975 .state_mut()
976 .peers_mut()
977 .on_outgoing_pending_session_gracefully_closed(&peer_id);
978 }
979 self.closed_sessions_metrics.outgoing_pending.increment(1);
980 self.update_pending_connection_metrics();
981 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
982 }
983 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
984 trace!(
985 target: "net",
986 ?remote_addr,
987 ?peer_id,
988 %error,
989 "Outgoing connection error"
990 );
991
992 self.swarm.peers_mut().on_outgoing_connection_failure(
993 &remote_addr,
994 &peer_id,
995 &error,
996 );
997
998 self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
999 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
1000 self.update_pending_connection_metrics();
1001 }
1002 SwarmEvent::BadMessage { peer_id } => {
1003 self.swarm
1004 .state_mut()
1005 .peers_mut()
1006 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
1007 self.metrics.invalid_messages_received.increment(1);
1008 }
1009 SwarmEvent::ProtocolBreach { peer_id } => {
1010 self.swarm
1011 .state_mut()
1012 .peers_mut()
1013 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
1014 }
1015 }
1016 }
1017
1018 fn get_peer_infos(&self) -> Vec<PeerInfo> {
1020 self.swarm
1021 .sessions()
1022 .active_sessions()
1023 .iter()
1024 .filter_map(|(&peer_id, session)| {
1025 self.swarm
1026 .state()
1027 .peers()
1028 .peer_by_id(peer_id)
1029 .map(|(record, kind)| session.peer_info(&record, kind))
1030 })
1031 .collect()
1032 }
1033
1034 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1038 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1039 self.swarm
1040 .state()
1041 .peers()
1042 .peer_by_id(peer_id)
1043 .map(|(record, kind)| session.peer_info(&record, kind))
1044 })
1045 }
1046
1047 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1051 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1052 }
1053
1054 #[inline]
1056 fn update_active_connection_metrics(&self) {
1057 self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
1058 self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
1059 }
1060
1061 #[inline]
1063 fn update_pending_connection_metrics(&self) {
1064 self.metrics
1065 .pending_outgoing_connections
1066 .set(self.swarm.peers().num_pending_outbound_connections() as f64);
1067 self.metrics
1068 .total_pending_connections
1069 .set(self.swarm.sessions().num_pending_connections() as f64);
1070 }
1071
1072 pub async fn run_until_graceful_shutdown<F, R>(
1076 mut self,
1077 shutdown: GracefulShutdown,
1078 shutdown_hook: F,
1079 ) -> R
1080 where
1081 F: FnOnce(Self) -> R,
1082 {
1083 let mut graceful_guard = None;
1084 tokio::select! {
1085 _ = &mut self => {},
1086 guard = shutdown => {
1087 graceful_guard = Some(guard);
1088 },
1089 }
1090
1091 self.perform_network_shutdown();
1092 let res = shutdown_hook(self);
1093 drop(graceful_guard);
1094 res
1095 }
1096
1097 fn perform_network_shutdown(&mut self) {
1100 self.swarm.on_shutdown_requested();
1104 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1106 self.swarm.sessions_mut().disconnect_all_pending();
1108 }
1109}
1110
1111impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1112 type Output = ();
1113
1114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1115 let start = Instant::now();
1116 let mut poll_durations = NetworkManagerPollDurations::default();
1117
1118 let this = self.get_mut();
1119
1120 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1122 this.on_block_import_result(outcome);
1123 }
1124
1125 let start_network_handle = Instant::now();
1149 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1150 "net",
1151 "Network message channel",
1152 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1153 this.from_handle_rx.poll_next_unpin(cx),
1154 |msg| this.on_handle_message(msg),
1155 error!("Network channel closed");
1156 );
1157 poll_durations.acc_network_handle = start_network_handle.elapsed();
1158
1159 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1161 "net",
1162 "Swarm events stream",
1163 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1164 this.swarm.poll_next_unpin(cx),
1165 |event| this.on_swarm_event(event),
1166 );
1167 poll_durations.acc_swarm =
1168 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1169
1170 if maybe_more_handle_messages || maybe_more_swarm_events {
1172 cx.waker().wake_by_ref();
1174 return Poll::Pending
1175 }
1176
1177 this.update_poll_metrics(start, poll_durations);
1178
1179 Poll::Pending
1180 }
1181}
1182
1183#[derive(Debug, Default)]
1184struct NetworkManagerPollDurations {
1185 acc_network_handle: Duration,
1186 acc_swarm: Duration,
1187}