1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{
28 BackedOffPeersMetrics, ClosedSessionsMetrics, DirectionalDisconnectMetrics, NetworkMetrics,
29 PendingSessionFailureMetrics,
30 },
31 network::{NetworkHandle, NetworkHandleMessage},
32 peers::{BackoffReason, PeersManager},
33 poll_nested_stream_with_budget,
34 protocol::IntoRlpxSubProtocol,
35 required_block_filter::RequiredBlockFilter,
36 session::SessionManager,
37 state::NetworkState,
38 swarm::{Swarm, SwarmEvent},
39 transactions::NetworkTransactionEvent,
40 FetchClient, NetworkBuilder,
41};
42use futures::{Future, StreamExt};
43use parking_lot::Mutex;
44use reth_chainspec::EnrForkIdEntry;
45use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
46use reth_fs_util::{self as fs, FsPathError};
47use reth_metrics::common::mpsc::MemoryBoundedSender;
48use reth_network_api::{
49 events::{PeerEvent, SessionInfo},
50 test_utils::PeersHandle,
51 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
52};
53use reth_network_peers::{NodeRecord, PeerId};
54use reth_network_types::ReputationChangeKind;
55use reth_storage_api::BlockNumReader;
56use reth_tasks::shutdown::GracefulShutdown;
57use reth_tokio_util::EventSender;
58use secp256k1::SecretKey;
59use std::{
60 net::SocketAddr,
61 path::Path,
62 pin::Pin,
63 sync::{
64 atomic::{AtomicU64, AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::mpsc::{self, error::TrySendError};
71use tokio_stream::wrappers::UnboundedReceiverStream;
72use tracing::{debug, error, trace, warn};
73
74#[cfg_attr(doc, aquamarine::aquamarine)]
75#[derive(Debug)]
107#[must_use = "The NetworkManager does nothing unless polled"]
108pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
109 swarm: Swarm<N>,
111 handle: NetworkHandle<N>,
113 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
115 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
117 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
119 to_transactions_manager: Option<MemoryBoundedSender<NetworkTransactionEvent<N>>>,
122 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
136 num_active_peers: Arc<AtomicUsize>,
141 metrics: NetworkMetrics,
143 disconnect_metrics: DirectionalDisconnectMetrics,
145 closed_sessions_metrics: ClosedSessionsMetrics,
147 pending_session_failure_metrics: PendingSessionFailureMetrics,
149 backed_off_peers_metrics: BackedOffPeersMetrics,
151}
152
153impl NetworkManager {
154 pub async fn eth<C: BlockNumReader + 'static>(
167 config: NetworkConfig<C, EthNetworkPrimitives>,
168 ) -> Result<Self, NetworkError> {
169 Self::new(config).await
170 }
171}
172
173impl<N: NetworkPrimitives> NetworkManager<N> {
174 pub fn with_transactions(
177 mut self,
178 tx: MemoryBoundedSender<NetworkTransactionEvent<N>>,
179 ) -> Self {
180 self.set_transactions(tx);
181 self
182 }
183
184 pub fn set_transactions(&mut self, tx: MemoryBoundedSender<NetworkTransactionEvent<N>>) {
187 self.to_transactions_manager = Some(tx);
188 }
189
190 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
193 self.set_eth_request_handler(tx);
194 self
195 }
196
197 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
200 self.to_eth_request_handler = Some(tx);
201 }
202
203 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
205 self.swarm.add_rlpx_sub_protocol(protocol)
206 }
207
208 pub const fn handle(&self) -> &NetworkHandle<N> {
212 &self.handle
213 }
214
215 pub const fn secret_key(&self) -> SecretKey {
217 self.swarm.sessions().secret_key()
218 }
219
220 #[inline]
221 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
222 let metrics = &self.metrics;
223
224 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
225
226 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
228 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
230 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
231 }
232
233 pub async fn new<C: BlockNumReader + 'static>(
238 config: NetworkConfig<C, N>,
239 ) -> Result<Self, NetworkError> {
240 let NetworkConfig {
241 client,
242 secret_key,
243 discovery_v4_addr,
244 mut discovery_v4_config,
245 mut discovery_v5_config,
246 listener_addr,
247 peers_config,
248 sessions_config,
249 chain_id,
250 block_import,
251 network_mode,
252 boot_nodes,
253 executor,
254 hello_message,
255 status,
256 fork_filter,
257 dns_discovery_config,
258 extra_protocols,
259 tx_gossip_disabled,
260 transactions_manager_config: _,
261 nat,
262 handshake,
263 eth_max_message_size,
264 required_block_hashes,
265 } = config;
266
267 let peers_manager = PeersManager::new(peers_config);
268 let peers_handle = peers_manager.handle();
269
270 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
271 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
272 })?;
273
274 let listener_addr = incoming.local_address();
276
277 let resolved_boot_nodes =
279 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
280
281 if let Some(disc_config) = discovery_v4_config.as_mut() {
282 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
284 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
287 }
288
289 if let Some(discv5) = discovery_v5_config.as_mut() {
290 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
292 }
293
294 let discovery = Discovery::new(
295 listener_addr,
296 discovery_v4_addr,
297 secret_key,
298 discovery_v4_config,
299 discovery_v5_config,
300 dns_discovery_config,
301 )
302 .await?;
303 let local_peer_id = discovery.local_id();
305 let discv4 = discovery.discv4();
306 let discv5 = discovery.discv5();
307
308 let num_active_peers = Arc::new(AtomicUsize::new(0));
309
310 let sessions = SessionManager::new(
311 secret_key,
312 sessions_config,
313 executor,
314 status,
315 hello_message,
316 fork_filter,
317 extra_protocols,
318 handshake,
319 eth_max_message_size,
320 network_mode.is_stake(),
321 );
322
323 let state = NetworkState::new(
324 crate::state::BlockNumReader::new(client),
325 discovery,
326 peers_manager,
327 Arc::clone(&num_active_peers),
328 );
329
330 let swarm = Swarm::new(incoming, sessions, state);
331
332 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
333
334 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
335
336 let handle = NetworkHandle::new(
337 Arc::clone(&num_active_peers),
338 Arc::new(Mutex::new(listener_addr)),
339 to_manager_tx,
340 secret_key,
341 local_peer_id,
342 peers_handle,
343 network_mode,
344 Arc::new(AtomicU64::new(chain_id)),
345 tx_gossip_disabled,
346 discv4,
347 discv5,
348 event_sender.clone(),
349 nat,
350 );
351
352 if !required_block_hashes.is_empty() {
354 let filter = RequiredBlockFilter::new(handle.clone(), required_block_hashes);
355 filter.spawn();
356 }
357
358 Ok(Self {
359 swarm,
360 handle,
361 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
362 block_import,
363 event_sender,
364 to_transactions_manager: None,
365 to_eth_request_handler: None,
366 num_active_peers,
367 metrics: Default::default(),
368 disconnect_metrics: Default::default(),
369 closed_sessions_metrics: Default::default(),
370 pending_session_failure_metrics: Default::default(),
371 backed_off_peers_metrics: Default::default(),
372 })
373 }
374
375 pub async fn builder<C: BlockNumReader + 'static>(
408 config: NetworkConfig<C, N>,
409 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
410 let network = Self::new(config).await?;
411 Ok(network.into_builder())
412 }
413
414 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
416 NetworkBuilder { network: self, transactions: (), request_handler: () }
417 }
418
419 pub const fn local_addr(&self) -> SocketAddr {
421 self.swarm.listener().local_address()
422 }
423
424 pub fn num_connected_peers(&self) -> usize {
426 self.swarm.state().num_active_peers()
427 }
428
429 pub fn peer_id(&self) -> &PeerId {
431 self.handle.peer_id()
432 }
433
434 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
436 self.swarm.peers().iter_peers()
437 }
438
439 pub fn num_known_peers(&self) -> usize {
441 self.swarm.peers().num_known_peers()
442 }
443
444 pub fn peers_handle(&self) -> PeersHandle {
448 self.swarm.peers().handle()
449 }
450
451 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
457 let peers = self.swarm.peers().persistable_peers().collect::<Vec<_>>();
458 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
459 reth_fs_util::write_json_file(persistent_peers_file, &peers)?;
460 Ok(())
461 }
462
463 pub fn fetch_client(&self) -> FetchClient<N> {
467 self.swarm.state().fetch_client()
468 }
469
470 pub fn status(&self) -> NetworkStatus {
472 let sessions = self.swarm.sessions();
473 let status = sessions.status();
474 let hello_message = sessions.hello_message();
475
476 #[expect(deprecated)]
477 NetworkStatus {
478 client_version: hello_message.client_version,
479 protocol_version: hello_message.protocol_version as u64,
480 eth_protocol_info: EthProtocolInfo {
481 difficulty: None,
482 head: status.blockhash,
483 network: status.chain.id(),
484 genesis: status.genesis,
485 config: Default::default(),
486 },
487 capabilities: hello_message
488 .protocols
489 .into_iter()
490 .map(|protocol| protocol.cap)
491 .collect(),
492 }
493 }
494
495 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
498 if let Some(ref tx) = self.to_transactions_manager &&
499 let Err(e) = tx.try_send(event)
500 {
501 match e {
502 TrySendError::Full(_) => {
503 trace!(target: "net", "Transaction events channel at capacity, dropping event");
504 self.metrics.total_dropped_tx_events_at_full_capacity.increment(1);
505 }
506 TrySendError::Closed(_) => {}
507 }
508 }
509 }
510
511 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
514 if let Some(ref reqs) = self.to_eth_request_handler {
515 let _ = reqs.try_send(event).map_err(|e| {
516 if let TrySendError::Full(_) = e {
517 debug!(target:"net", "EthRequestHandler channel is full!");
518 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
519 }
520 });
521 }
522 }
523
524 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
526 match req {
527 PeerRequest::GetBlockHeaders { request, response } => {
528 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
529 peer_id,
530 request,
531 response,
532 })
533 }
534 PeerRequest::GetBlockBodies { request, response } => {
535 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
536 peer_id,
537 request,
538 response,
539 })
540 }
541 PeerRequest::GetNodeData { request, response } => {
542 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
543 peer_id,
544 request,
545 response,
546 })
547 }
548 PeerRequest::GetReceipts { request, response } => {
549 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
550 peer_id,
551 request,
552 response,
553 })
554 }
555 PeerRequest::GetReceipts69 { request, response } => {
556 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
557 peer_id,
558 request,
559 response,
560 })
561 }
562 PeerRequest::GetReceipts70 { request, response } => {
563 self.delegate_eth_request(IncomingEthRequest::GetReceipts70 {
564 peer_id,
565 request,
566 response,
567 })
568 }
569 PeerRequest::GetBlockAccessLists { request, response } => {
570 self.delegate_eth_request(IncomingEthRequest::GetBlockAccessLists {
571 peer_id,
572 request,
573 response,
574 })
575 }
576 PeerRequest::GetCells { request, response } => self
577 .delegate_eth_request(IncomingEthRequest::GetCells { peer_id, request, response }),
578 PeerRequest::GetPooledTransactions { request, response } => {
579 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
580 peer_id,
581 request,
582 response,
583 });
584 }
585 }
586 }
587
588 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
590 match event {
591 BlockImportEvent::Announcement(validation) => match validation {
592 BlockValidation::ValidHeader { block } => {
593 self.swarm.state_mut().announce_new_block(block);
594 }
595 BlockValidation::ValidBlock { block } => {
596 self.swarm.state_mut().announce_new_block_hash(block);
597 }
598 },
599 BlockImportEvent::Outcome(outcome) => {
600 let BlockImportOutcome { peer, result } = outcome;
601 match result {
602 Ok(validated_block) => match validated_block {
603 BlockValidation::ValidHeader { block } => {
604 self.swarm.state_mut().update_peer_block(
605 &peer,
606 block.hash,
607 block.number(),
608 );
609 self.swarm.state_mut().announce_new_block(block);
610 }
611 BlockValidation::ValidBlock { block } => {
612 self.swarm.state_mut().announce_new_block_hash(block);
613 }
614 },
615 Err(_err) => {
616 self.swarm
617 .state_mut()
618 .peers_mut()
619 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
620 }
621 }
622 }
623 }
624 }
625
626 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
632 where
633 F: FnOnce(&mut Self),
634 {
635 if self.handle.mode().is_stake() {
637 self.swarm
639 .sessions_mut()
640 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
641 } else {
642 only_pow(self);
643 }
644 }
645
646 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
648 match msg {
649 PeerMessage::NewBlockHashes(hashes) => {
650 self.within_pow_or_disconnect(peer_id, |this| {
651 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.to_vec());
653 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
655 })
656 }
657 PeerMessage::NewBlock(block) => {
658 self.within_pow_or_disconnect(peer_id, move |this| {
659 this.swarm.state_mut().on_new_block(peer_id, block.hash);
660 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
662 });
663 }
664 PeerMessage::PooledTransactions(msg) => {
665 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
666 peer_id,
667 msg,
668 });
669 }
670 PeerMessage::EthRequest(req) => {
671 self.on_eth_request(peer_id, req);
672 }
673 PeerMessage::ReceivedTransaction(msg) => {
674 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
675 peer_id,
676 msg,
677 });
678 }
679 PeerMessage::SendTransactions(_) => {
680 unreachable!("Not emitted by session")
681 }
682 PeerMessage::BlockRangeUpdated(_) => {}
683 PeerMessage::Other(other) => {
684 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
685 }
686 }
687 }
688
689 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
691 match msg {
692 NetworkHandleMessage::DiscoveryListener(tx) => {
693 self.swarm.state_mut().discovery_mut().add_listener(tx);
694 }
695 NetworkHandleMessage::AnnounceBlock(block, hash) => {
696 if self.handle.mode().is_stake() {
697 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
699 return
700 }
701 let msg = NewBlockMessage { hash, block: Arc::new(block) };
702 self.swarm.state_mut().announce_new_block(msg);
703 }
704 NetworkHandleMessage::EthRequest { peer_id, request } => {
705 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
706 }
707 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
708 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
709 }
710 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
711 .swarm
712 .sessions_mut()
713 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
714 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
715 self.swarm.state_mut().add_trusted_peer_id(peer_id);
716 }
717 NetworkHandleMessage::AddTrustedPeerNode(trusted_peer) => {
718 if !self.swarm.is_shutting_down() {
719 self.swarm.state_mut().add_trusted_peer_node(trusted_peer);
720 }
721 }
722 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
723 if !self.swarm.is_shutting_down() {
725 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
726 }
727 }
728 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
729 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
730 }
731 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
732 self.swarm.sessions_mut().disconnect(peer_id, reason);
733 }
734 NetworkHandleMessage::BanPeer(peer_id) => {
735 self.swarm.peers_mut().ban_peer_by_admin(peer_id);
736 }
737 NetworkHandleMessage::UnbanPeer(peer_id) => {
738 self.swarm.peers_mut().unban_peer_by_admin(peer_id);
739 }
740 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
741 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
742 }
743 NetworkHandleMessage::SetNetworkState(net_state) => {
744 self.swarm.on_network_state_change(net_state);
749 }
750
751 NetworkHandleMessage::Shutdown(tx) => {
752 self.perform_network_shutdown();
753 let _ = tx.send(());
754 }
755 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
756 self.swarm.peers_mut().apply_reputation_change(&peer_id, kind);
757 }
758 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
759 let _ = tx.send(self.swarm.peers().get_reputation(&peer_id));
760 }
761 NetworkHandleMessage::FetchClient(tx) => {
762 let _ = tx.send(self.fetch_client());
763 }
764 NetworkHandleMessage::GetStatus(tx) => {
765 let _ = tx.send(self.status());
766 }
767 NetworkHandleMessage::StatusUpdate { head } => {
768 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
769 self.swarm.state_mut().update_fork_id(transition.current);
770 }
771 }
772 NetworkHandleMessage::GetPeerInfos(tx) => {
773 let _ = tx.send(self.get_peer_infos());
774 }
775 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
776 let _ = tx.send(self.get_peer_info_by_id(peer_id));
777 }
778 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
779 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
780 }
781 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
782 let peer_ids = self.swarm.peers().peers_by_kind(kind);
783 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
784 }
785 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
786 NetworkHandleMessage::GetTransactionsHandle(tx) => {
787 if let Some(ref tx_inner) = self.to_transactions_manager {
788 let _ = tx_inner.try_send(NetworkTransactionEvent::GetTransactionsHandle(tx));
789 } else {
790 let _ = tx.send(None);
791 }
792 }
793 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
794 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
795 }
796 NetworkHandleMessage::EthMessage { peer_id, message } => {
797 self.swarm.sessions_mut().send_message(&peer_id, message)
798 }
799 }
800 }
801
802 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
803 match event {
805 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
806 SwarmEvent::TcpListenerClosed { remote_addr } => {
807 trace!(target: "net", ?remote_addr, "TCP listener closed.");
808 }
809 SwarmEvent::TcpListenerError(err) => {
810 trace!(target: "net", %err, "TCP connection error.");
811 }
812 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
813 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
814 self.metrics.total_incoming_connections.increment(1);
815 self.metrics
816 .incoming_connections
817 .set(self.swarm.peers().num_inbound_connections() as f64);
818 }
819 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
820 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
821 self.metrics.total_outgoing_connections.increment(1);
822 self.update_pending_connection_metrics()
823 }
824 SwarmEvent::SessionEstablished {
825 peer_id,
826 remote_addr,
827 client_version,
828 capabilities,
829 version,
830 messages,
831 status,
832 direction,
833 } => {
834 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
835 self.metrics.connected_peers.set(total_active as f64);
836 debug!(
837 target: "net",
838 ?remote_addr,
839 %client_version,
840 ?peer_id,
841 ?total_active,
842 kind=%direction,
843 peer_enode=%NodeRecord::new(remote_addr, peer_id),
844 "Session established"
845 );
846
847 if direction.is_incoming() {
848 self.swarm
849 .state_mut()
850 .peers_mut()
851 .on_incoming_session_established(peer_id, remote_addr);
852 }
853
854 if direction.is_outgoing() {
855 self.swarm.peers_mut().on_active_outgoing_established(peer_id);
856 }
857
858 self.update_active_connection_metrics();
859
860 let peer_kind = self
861 .swarm
862 .state()
863 .peers()
864 .peer_by_id(peer_id)
865 .map(|(_, kind)| kind)
866 .unwrap_or_default();
867 let session_info = SessionInfo {
868 peer_id,
869 remote_addr,
870 client_version,
871 capabilities,
872 status,
873 version,
874 peer_kind,
875 };
876
877 self.event_sender
878 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
879 }
880 SwarmEvent::PeerAdded(peer_id) => {
881 trace!(target: "net", ?peer_id, "Peer added");
882 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
883 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
884 }
885 SwarmEvent::PeerRemoved(peer_id) => {
886 trace!(target: "net", ?peer_id, "Peer dropped");
887 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
888 self.metrics.tracked_peers.set(self.swarm.peers().num_known_peers() as f64);
889 }
890 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
891 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
892 self.metrics.connected_peers.set(total_active as f64);
893 trace!(
894 target: "net",
895 ?remote_addr,
896 ?peer_id,
897 ?total_active,
898 ?error,
899 "Session disconnected"
900 );
901
902 let is_inbound = self.swarm.peers().is_inbound_peer(&peer_id);
904
905 let reason = if let Some(ref err) = error {
906 self.swarm.peers_mut().on_active_session_dropped(&remote_addr, &peer_id, err);
909 self.backed_off_peers_metrics.increment_for_reason(
910 BackoffReason::from_disconnect(err.as_disconnected()),
911 );
912 err.as_disconnected()
913 } else {
914 self.swarm.peers_mut().on_active_session_gracefully_closed(peer_id);
916 self.backed_off_peers_metrics
917 .increment_for_reason(BackoffReason::GracefulClose);
918 None
919 };
920 self.closed_sessions_metrics.active.increment(1);
921 self.update_active_connection_metrics();
922
923 if let Some(reason) = reason {
924 if is_inbound {
925 self.disconnect_metrics.increment_inbound(reason);
926 } else {
927 self.disconnect_metrics.increment_outbound(reason);
928 }
929 }
930 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
931 self.event_sender
932 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
933 }
934 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
935 trace!(
936 target: "net",
937 ?remote_addr,
938 ?error,
939 "Incoming pending session failed"
940 );
941
942 if let Some(ref err) = error {
943 self.swarm
944 .state_mut()
945 .peers_mut()
946 .on_incoming_pending_session_dropped(remote_addr, err);
947 self.pending_session_failure_metrics.inbound.increment(1);
948 if let Some(reason) = err.as_disconnected() {
949 self.disconnect_metrics.increment_inbound(reason);
950 }
951 } else {
952 self.swarm
953 .state_mut()
954 .peers_mut()
955 .on_incoming_pending_session_gracefully_closed();
956 }
957 self.closed_sessions_metrics.incoming_pending.increment(1);
958 self.metrics
959 .incoming_connections
960 .set(self.swarm.peers().num_inbound_connections() as f64);
961 }
962 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
963 trace!(
964 target: "net",
965 ?remote_addr,
966 ?peer_id,
967 ?error,
968 "Outgoing pending session failed"
969 );
970
971 if let Some(ref err) = error {
972 self.swarm.peers_mut().on_outgoing_pending_session_dropped(
973 &remote_addr,
974 &peer_id,
975 err,
976 );
977 self.pending_session_failure_metrics.outbound.increment(1);
978 self.backed_off_peers_metrics.increment_for_reason(
979 BackoffReason::from_disconnect(err.as_disconnected()),
980 );
981 if let Some(reason) = err.as_disconnected() {
982 self.disconnect_metrics.increment_outbound(reason);
983 }
984 } else {
985 self.swarm
986 .state_mut()
987 .peers_mut()
988 .on_outgoing_pending_session_gracefully_closed(&peer_id);
989 }
990 self.closed_sessions_metrics.outgoing_pending.increment(1);
991 self.update_pending_connection_metrics();
992 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
993 }
994 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
995 trace!(
996 target: "net",
997 ?remote_addr,
998 ?peer_id,
999 %error,
1000 "Outgoing connection error"
1001 );
1002
1003 self.swarm.peers_mut().on_outgoing_connection_failure(
1004 &remote_addr,
1005 &peer_id,
1006 &error,
1007 );
1008
1009 self.backed_off_peers_metrics.increment_for_reason(BackoffReason::ConnectionError);
1010 self.metrics.backed_off_peers.set(self.swarm.peers().num_backed_off_peers() as f64);
1011 self.update_pending_connection_metrics();
1012 }
1013 SwarmEvent::BadMessage { peer_id } => {
1014 self.swarm
1015 .state_mut()
1016 .peers_mut()
1017 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
1018 self.metrics.invalid_messages_received.increment(1);
1019 }
1020 SwarmEvent::ProtocolBreach { peer_id } => {
1021 self.swarm
1022 .state_mut()
1023 .peers_mut()
1024 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
1025 }
1026 }
1027 }
1028
1029 fn get_peer_infos(&self) -> Vec<PeerInfo> {
1031 self.swarm
1032 .sessions()
1033 .active_sessions()
1034 .iter()
1035 .filter_map(|(&peer_id, session)| {
1036 self.swarm
1037 .state()
1038 .peers()
1039 .peer_by_id(peer_id)
1040 .map(|(record, kind)| session.peer_info(&record, kind))
1041 })
1042 .collect()
1043 }
1044
1045 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1049 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1050 self.swarm
1051 .state()
1052 .peers()
1053 .peer_by_id(peer_id)
1054 .map(|(record, kind)| session.peer_info(&record, kind))
1055 })
1056 }
1057
1058 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1062 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1063 }
1064
1065 #[inline]
1067 fn update_active_connection_metrics(&self) {
1068 self.metrics.incoming_connections.set(self.swarm.peers().num_inbound_connections() as f64);
1069 self.metrics.outgoing_connections.set(self.swarm.peers().num_outbound_connections() as f64);
1070 }
1071
1072 #[inline]
1074 fn update_pending_connection_metrics(&self) {
1075 self.metrics
1076 .pending_outgoing_connections
1077 .set(self.swarm.peers().num_pending_outbound_connections() as f64);
1078 self.metrics
1079 .total_pending_connections
1080 .set(self.swarm.sessions().num_pending_connections() as f64);
1081 }
1082
1083 pub async fn run_until_graceful_shutdown<F, R>(
1087 mut self,
1088 shutdown: GracefulShutdown,
1089 shutdown_hook: F,
1090 ) -> R
1091 where
1092 F: FnOnce(Self) -> R,
1093 {
1094 let mut graceful_guard = None;
1095 tokio::select! {
1096 _ = &mut self => {},
1097 guard = shutdown => {
1098 graceful_guard = Some(guard);
1099 },
1100 }
1101
1102 self.perform_network_shutdown();
1103 let res = shutdown_hook(self);
1104 drop(graceful_guard);
1105 res
1106 }
1107
1108 fn perform_network_shutdown(&mut self) {
1111 self.swarm.on_shutdown_requested();
1115 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1117 self.swarm.sessions_mut().disconnect_all_pending();
1119 }
1120}
1121
1122impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1123 type Output = ();
1124
1125 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1126 let start = Instant::now();
1127 let mut poll_durations = NetworkManagerPollDurations::default();
1128
1129 let this = self.get_mut();
1130
1131 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1133 this.on_block_import_result(outcome);
1134 }
1135
1136 let start_network_handle = Instant::now();
1160 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1161 "net",
1162 "Network message channel",
1163 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1164 this.from_handle_rx.poll_next_unpin(cx),
1165 |msg| this.on_handle_message(msg),
1166 error!("Network channel closed");
1167 );
1168 poll_durations.acc_network_handle = start_network_handle.elapsed();
1169
1170 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1172 "net",
1173 "Swarm events stream",
1174 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1175 this.swarm.poll_next_unpin(cx),
1176 |event| this.on_swarm_event(event),
1177 );
1178 poll_durations.acc_swarm =
1179 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1180
1181 if maybe_more_handle_messages || maybe_more_swarm_events {
1183 cx.waker().wake_by_ref();
1185 return Poll::Pending
1186 }
1187
1188 this.update_poll_metrics(start, poll_durations);
1189
1190 Poll::Pending
1191 }
1192}
1193
1194#[derive(Debug, Default)]
1195struct NetworkManagerPollDurations {
1196 acc_network_handle: Duration,
1197 acc_swarm: Duration,
1198}