1use crate::{
19 budget::{DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL, DEFAULT_BUDGET_TRY_DRAIN_SWARM},
20 config::NetworkConfig,
21 discovery::Discovery,
22 error::{NetworkError, ServiceKind},
23 eth_requests::IncomingEthRequest,
24 import::{BlockImport, BlockImportEvent, BlockImportOutcome, BlockValidation, NewBlockEvent},
25 listener::ConnectionListener,
26 message::{NewBlockMessage, PeerMessage},
27 metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
28 network::{NetworkHandle, NetworkHandleMessage},
29 peers::PeersManager,
30 poll_nested_stream_with_budget,
31 protocol::IntoRlpxSubProtocol,
32 session::SessionManager,
33 state::NetworkState,
34 swarm::{Swarm, SwarmEvent},
35 transactions::NetworkTransactionEvent,
36 FetchClient, NetworkBuilder,
37};
38use futures::{Future, StreamExt};
39use parking_lot::Mutex;
40use reth_chainspec::EnrForkIdEntry;
41use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, NetworkPrimitives};
42use reth_fs_util::{self as fs, FsPathError};
43use reth_metrics::common::mpsc::UnboundedMeteredSender;
44use reth_network_api::{
45 events::{PeerEvent, SessionInfo},
46 test_utils::PeersHandle,
47 EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
48};
49use reth_network_peers::{NodeRecord, PeerId};
50use reth_network_types::ReputationChangeKind;
51use reth_storage_api::BlockNumReader;
52use reth_tasks::shutdown::GracefulShutdown;
53use reth_tokio_util::EventSender;
54use secp256k1::SecretKey;
55use std::{
56 net::SocketAddr,
57 path::Path,
58 pin::Pin,
59 sync::{
60 atomic::{AtomicU64, AtomicUsize, Ordering},
61 Arc,
62 },
63 task::{Context, Poll},
64 time::{Duration, Instant},
65};
66use tokio::sync::mpsc::{self, error::TrySendError};
67use tokio_stream::wrappers::UnboundedReceiverStream;
68use tracing::{debug, error, trace, warn};
69
70#[cfg_attr(doc, aquamarine::aquamarine)]
71#[derive(Debug)]
103#[must_use = "The NetworkManager does nothing unless polled"]
104pub struct NetworkManager<N: NetworkPrimitives = EthNetworkPrimitives> {
105 swarm: Swarm<N>,
107 handle: NetworkHandle<N>,
109 from_handle_rx: UnboundedReceiverStream<NetworkHandleMessage<N>>,
111 block_import: Box<dyn BlockImport<N::NewBlockPayload>>,
113 event_sender: EventSender<NetworkEvent<PeerRequest<N>>>,
115 to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent<N>>>,
118 to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest<N>>>,
132 num_active_peers: Arc<AtomicUsize>,
137 metrics: NetworkMetrics,
139 disconnect_metrics: DisconnectMetrics,
141}
142
143impl NetworkManager {
144 pub async fn eth<C: BlockNumReader + 'static>(
156 config: NetworkConfig<C, EthNetworkPrimitives>,
157 ) -> Result<Self, NetworkError> {
158 Self::new(config).await
159 }
160}
161
162impl<N: NetworkPrimitives> NetworkManager<N> {
163 pub fn with_transactions(
166 mut self,
167 tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>,
168 ) -> Self {
169 self.set_transactions(tx);
170 self
171 }
172
173 pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent<N>>) {
176 self.to_transactions_manager =
177 Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
178 }
179
180 pub fn with_eth_request_handler(mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) -> Self {
183 self.set_eth_request_handler(tx);
184 self
185 }
186
187 pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest<N>>) {
190 self.to_eth_request_handler = Some(tx);
191 }
192
193 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
195 self.swarm.add_rlpx_sub_protocol(protocol)
196 }
197
198 pub const fn handle(&self) -> &NetworkHandle<N> {
202 &self.handle
203 }
204
205 pub const fn secret_key(&self) -> SecretKey {
207 self.swarm.sessions().secret_key()
208 }
209
210 #[inline]
211 fn update_poll_metrics(&self, start: Instant, poll_durations: NetworkManagerPollDurations) {
212 let metrics = &self.metrics;
213
214 let NetworkManagerPollDurations { acc_network_handle, acc_swarm } = poll_durations;
215
216 metrics.duration_poll_network_manager.set(start.elapsed().as_secs_f64());
218 metrics.acc_duration_poll_network_handle.set(acc_network_handle.as_secs_f64());
220 metrics.acc_duration_poll_swarm.set(acc_swarm.as_secs_f64());
221 }
222
223 pub async fn new<C: BlockNumReader + 'static>(
228 config: NetworkConfig<C, N>,
229 ) -> Result<Self, NetworkError> {
230 let NetworkConfig {
231 client,
232 secret_key,
233 discovery_v4_addr,
234 mut discovery_v4_config,
235 mut discovery_v5_config,
236 listener_addr,
237 peers_config,
238 sessions_config,
239 chain_id,
240 block_import,
241 network_mode,
242 boot_nodes,
243 executor,
244 hello_message,
245 status,
246 fork_filter,
247 dns_discovery_config,
248 extra_protocols,
249 tx_gossip_disabled,
250 transactions_manager_config: _,
251 nat,
252 handshake,
253 } = config;
254
255 let peers_manager = PeersManager::new(peers_config);
256 let peers_handle = peers_manager.handle();
257
258 let incoming = ConnectionListener::bind(listener_addr).await.map_err(|err| {
259 NetworkError::from_io_error(err, ServiceKind::Listener(listener_addr))
260 })?;
261
262 let listener_addr = incoming.local_address();
264
265 let resolved_boot_nodes =
267 futures::future::try_join_all(boot_nodes.iter().map(|record| record.resolve())).await?;
268
269 if let Some(disc_config) = discovery_v4_config.as_mut() {
270 disc_config.bootstrap_nodes.extend(resolved_boot_nodes.clone());
272 disc_config.add_eip868_pair("eth", EnrForkIdEntry::from(status.forkid));
275 }
276
277 if let Some(discv5) = discovery_v5_config.as_mut() {
278 discv5.extend_unsigned_boot_nodes(resolved_boot_nodes)
280 }
281
282 let discovery = Discovery::new(
283 listener_addr,
284 discovery_v4_addr,
285 secret_key,
286 discovery_v4_config,
287 discovery_v5_config,
288 dns_discovery_config,
289 )
290 .await?;
291 let local_peer_id = discovery.local_id();
293 let discv4 = discovery.discv4();
294 let discv5 = discovery.discv5();
295
296 let num_active_peers = Arc::new(AtomicUsize::new(0));
297
298 let sessions = SessionManager::new(
299 secret_key,
300 sessions_config,
301 executor,
302 status,
303 hello_message,
304 fork_filter,
305 extra_protocols,
306 handshake,
307 );
308
309 let state = NetworkState::new(
310 crate::state::BlockNumReader::new(client),
311 discovery,
312 peers_manager,
313 Arc::clone(&num_active_peers),
314 );
315
316 let swarm = Swarm::new(incoming, sessions, state);
317
318 let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel();
319
320 let event_sender: EventSender<NetworkEvent<PeerRequest<N>>> = Default::default();
321
322 let handle = NetworkHandle::new(
323 Arc::clone(&num_active_peers),
324 Arc::new(Mutex::new(listener_addr)),
325 to_manager_tx,
326 secret_key,
327 local_peer_id,
328 peers_handle,
329 network_mode,
330 Arc::new(AtomicU64::new(chain_id)),
331 tx_gossip_disabled,
332 discv4,
333 discv5,
334 event_sender.clone(),
335 nat,
336 );
337
338 Ok(Self {
339 swarm,
340 handle,
341 from_handle_rx: UnboundedReceiverStream::new(from_handle_rx),
342 block_import,
343 event_sender,
344 to_transactions_manager: None,
345 to_eth_request_handler: None,
346 num_active_peers,
347 metrics: Default::default(),
348 disconnect_metrics: Default::default(),
349 })
350 }
351
352 pub async fn builder<C: BlockNumReader + 'static>(
384 config: NetworkConfig<C, N>,
385 ) -> Result<NetworkBuilder<(), (), N>, NetworkError> {
386 let network = Self::new(config).await?;
387 Ok(network.into_builder())
388 }
389
390 pub const fn into_builder(self) -> NetworkBuilder<(), (), N> {
392 NetworkBuilder { network: self, transactions: (), request_handler: () }
393 }
394
395 pub const fn local_addr(&self) -> SocketAddr {
397 self.swarm.listener().local_address()
398 }
399
400 pub fn num_connected_peers(&self) -> usize {
402 self.swarm.state().num_active_peers()
403 }
404
405 pub fn peer_id(&self) -> &PeerId {
407 self.handle.peer_id()
408 }
409
410 pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
412 self.swarm.state().peers().iter_peers()
413 }
414
415 pub fn num_known_peers(&self) -> usize {
417 self.swarm.state().peers().num_known_peers()
418 }
419
420 pub fn peers_handle(&self) -> PeersHandle {
424 self.swarm.state().peers().handle()
425 }
426
427 pub fn write_peers_to_file(&self, persistent_peers_file: &Path) -> Result<(), FsPathError> {
430 let known_peers = self.all_peers().collect::<Vec<_>>();
431 persistent_peers_file.parent().map(fs::create_dir_all).transpose()?;
432 reth_fs_util::write_json_file(persistent_peers_file, &known_peers)?;
433 Ok(())
434 }
435
436 pub fn fetch_client(&self) -> FetchClient<N> {
440 self.swarm.state().fetch_client()
441 }
442
443 pub fn status(&self) -> NetworkStatus {
445 let sessions = self.swarm.sessions();
446 let status = sessions.status();
447 let hello_message = sessions.hello_message();
448
449 #[expect(deprecated)]
450 NetworkStatus {
451 client_version: hello_message.client_version,
452 protocol_version: hello_message.protocol_version as u64,
453 eth_protocol_info: EthProtocolInfo {
454 difficulty: None,
455 head: status.blockhash,
456 network: status.chain.id(),
457 genesis: status.genesis,
458 config: Default::default(),
459 },
460 capabilities: hello_message
461 .protocols
462 .into_iter()
463 .map(|protocol| protocol.cap)
464 .collect(),
465 }
466 }
467
468 fn notify_tx_manager(&self, event: NetworkTransactionEvent<N>) {
471 if let Some(ref tx) = self.to_transactions_manager {
472 let _ = tx.send(event);
473 }
474 }
475
476 fn delegate_eth_request(&self, event: IncomingEthRequest<N>) {
479 if let Some(ref reqs) = self.to_eth_request_handler {
480 let _ = reqs.try_send(event).map_err(|e| {
481 if let TrySendError::Full(_) = e {
482 debug!(target:"net", "EthRequestHandler channel is full!");
483 self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
484 }
485 });
486 }
487 }
488
489 fn on_eth_request(&self, peer_id: PeerId, req: PeerRequest<N>) {
491 match req {
492 PeerRequest::GetBlockHeaders { request, response } => {
493 self.delegate_eth_request(IncomingEthRequest::GetBlockHeaders {
494 peer_id,
495 request,
496 response,
497 })
498 }
499 PeerRequest::GetBlockBodies { request, response } => {
500 self.delegate_eth_request(IncomingEthRequest::GetBlockBodies {
501 peer_id,
502 request,
503 response,
504 })
505 }
506 PeerRequest::GetNodeData { request, response } => {
507 self.delegate_eth_request(IncomingEthRequest::GetNodeData {
508 peer_id,
509 request,
510 response,
511 })
512 }
513 PeerRequest::GetReceipts { request, response } => {
514 self.delegate_eth_request(IncomingEthRequest::GetReceipts {
515 peer_id,
516 request,
517 response,
518 })
519 }
520 PeerRequest::GetReceipts69 { request, response } => {
521 self.delegate_eth_request(IncomingEthRequest::GetReceipts69 {
522 peer_id,
523 request,
524 response,
525 })
526 }
527 PeerRequest::GetPooledTransactions { request, response } => {
528 self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
529 peer_id,
530 request,
531 response,
532 });
533 }
534 }
535 }
536
537 fn on_block_import_result(&mut self, event: BlockImportEvent<N::NewBlockPayload>) {
539 match event {
540 BlockImportEvent::Announcement(validation) => match validation {
541 BlockValidation::ValidHeader { block } => {
542 self.swarm.state_mut().announce_new_block(block);
543 }
544 BlockValidation::ValidBlock { block } => {
545 self.swarm.state_mut().announce_new_block_hash(block);
546 }
547 },
548 BlockImportEvent::Outcome(outcome) => {
549 let BlockImportOutcome { peer, result } = outcome;
550 match result {
551 Ok(validated_block) => match validated_block {
552 BlockValidation::ValidHeader { block } => {
553 self.swarm.state_mut().update_peer_block(
554 &peer,
555 block.hash,
556 block.number(),
557 );
558 self.swarm.state_mut().announce_new_block(block);
559 }
560 BlockValidation::ValidBlock { block } => {
561 self.swarm.state_mut().announce_new_block_hash(block);
562 }
563 },
564 Err(_err) => {
565 self.swarm
566 .state_mut()
567 .peers_mut()
568 .apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
569 }
570 }
571 }
572 }
573 }
574
575 fn within_pow_or_disconnect<F>(&mut self, peer_id: PeerId, only_pow: F)
581 where
582 F: FnOnce(&mut Self),
583 {
584 if self.handle.mode().is_stake() {
586 self.swarm
588 .sessions_mut()
589 .disconnect(peer_id, Some(DisconnectReason::SubprotocolSpecific));
590 } else {
591 only_pow(self);
592 }
593 }
594
595 fn on_peer_message(&mut self, peer_id: PeerId, msg: PeerMessage<N>) {
597 match msg {
598 PeerMessage::NewBlockHashes(hashes) => {
599 self.within_pow_or_disconnect(peer_id, |this| {
600 this.swarm.state_mut().on_new_block_hashes(peer_id, hashes.0.clone());
602 this.block_import.on_new_block(peer_id, NewBlockEvent::Hashes(hashes));
604 })
605 }
606 PeerMessage::NewBlock(block) => {
607 self.within_pow_or_disconnect(peer_id, move |this| {
608 this.swarm.state_mut().on_new_block(peer_id, block.hash);
609 this.block_import.on_new_block(peer_id, NewBlockEvent::Block(block));
611 });
612 }
613 PeerMessage::PooledTransactions(msg) => {
614 self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
615 peer_id,
616 msg,
617 });
618 }
619 PeerMessage::EthRequest(req) => {
620 self.on_eth_request(peer_id, req);
621 }
622 PeerMessage::ReceivedTransaction(msg) => {
623 self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
624 peer_id,
625 msg,
626 });
627 }
628 PeerMessage::SendTransactions(_) => {
629 unreachable!("Not emitted by session")
630 }
631 PeerMessage::BlockRangeUpdated(_) => {}
632 PeerMessage::Other(other) => {
633 debug!(target: "net", message_id=%other.id, "Ignoring unsupported message");
634 }
635 }
636 }
637
638 fn on_handle_message(&mut self, msg: NetworkHandleMessage<N>) {
640 match msg {
641 NetworkHandleMessage::DiscoveryListener(tx) => {
642 self.swarm.state_mut().discovery_mut().add_listener(tx);
643 }
644 NetworkHandleMessage::AnnounceBlock(block, hash) => {
645 if self.handle.mode().is_stake() {
646 warn!(target: "net", "Peer performed block propagation, but it is not supported in proof of stake (EIP-3675)");
648 return
649 }
650 let msg = NewBlockMessage { hash, block: Arc::new(block) };
651 self.swarm.state_mut().announce_new_block(msg);
652 }
653 NetworkHandleMessage::EthRequest { peer_id, request } => {
654 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::EthRequest(request))
655 }
656 NetworkHandleMessage::SendTransaction { peer_id, msg } => {
657 self.swarm.sessions_mut().send_message(&peer_id, PeerMessage::SendTransactions(msg))
658 }
659 NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg } => self
660 .swarm
661 .sessions_mut()
662 .send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
663 NetworkHandleMessage::AddTrustedPeerId(peer_id) => {
664 self.swarm.state_mut().add_trusted_peer_id(peer_id);
665 }
666 NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
667 if !self.swarm.is_shutting_down() {
669 self.swarm.state_mut().add_peer_kind(peer, kind, addr);
670 }
671 }
672 NetworkHandleMessage::RemovePeer(peer_id, kind) => {
673 self.swarm.state_mut().remove_peer_kind(peer_id, kind);
674 }
675 NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
676 self.swarm.sessions_mut().disconnect(peer_id, reason);
677 }
678 NetworkHandleMessage::ConnectPeer(peer_id, kind, addr) => {
679 self.swarm.state_mut().add_and_connect(peer_id, kind, addr);
680 }
681 NetworkHandleMessage::SetNetworkState(net_state) => {
682 self.swarm.on_network_state_change(net_state);
687 }
688
689 NetworkHandleMessage::Shutdown(tx) => {
690 self.perform_network_shutdown();
691 let _ = tx.send(());
692 }
693 NetworkHandleMessage::ReputationChange(peer_id, kind) => {
694 self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
695 }
696 NetworkHandleMessage::GetReputationById(peer_id, tx) => {
697 let _ = tx.send(self.swarm.state_mut().peers().get_reputation(&peer_id));
698 }
699 NetworkHandleMessage::FetchClient(tx) => {
700 let _ = tx.send(self.fetch_client());
701 }
702 NetworkHandleMessage::GetStatus(tx) => {
703 let _ = tx.send(self.status());
704 }
705 NetworkHandleMessage::StatusUpdate { head } => {
706 if let Some(transition) = self.swarm.sessions_mut().on_status_update(head) {
707 self.swarm.state_mut().update_fork_id(transition.current);
708 }
709 }
710 NetworkHandleMessage::GetPeerInfos(tx) => {
711 let _ = tx.send(self.get_peer_infos());
712 }
713 NetworkHandleMessage::GetPeerInfoById(peer_id, tx) => {
714 let _ = tx.send(self.get_peer_info_by_id(peer_id));
715 }
716 NetworkHandleMessage::GetPeerInfosByIds(peer_ids, tx) => {
717 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
718 }
719 NetworkHandleMessage::GetPeerInfosByPeerKind(kind, tx) => {
720 let peer_ids = self.swarm.state().peers().peers_by_kind(kind);
721 let _ = tx.send(self.get_peer_infos_by_ids(peer_ids));
722 }
723 NetworkHandleMessage::AddRlpxSubProtocol(proto) => self.add_rlpx_sub_protocol(proto),
724 NetworkHandleMessage::GetTransactionsHandle(tx) => {
725 if let Some(ref tx_inner) = self.to_transactions_manager {
726 let _ = tx_inner.send(NetworkTransactionEvent::GetTransactionsHandle(tx));
727 } else {
728 let _ = tx.send(None);
729 }
730 }
731 NetworkHandleMessage::InternalBlockRangeUpdate(block_range_update) => {
732 self.swarm.sessions_mut().update_advertised_block_range(block_range_update);
733 }
734 NetworkHandleMessage::EthMessage { peer_id, message } => {
735 self.swarm.sessions_mut().send_message(&peer_id, message)
736 }
737 }
738 }
739
740 fn on_swarm_event(&mut self, event: SwarmEvent<N>) {
741 match event {
743 SwarmEvent::ValidMessage { peer_id, message } => self.on_peer_message(peer_id, message),
744 SwarmEvent::TcpListenerClosed { remote_addr } => {
745 trace!(target: "net", ?remote_addr, "TCP listener closed.");
746 }
747 SwarmEvent::TcpListenerError(err) => {
748 trace!(target: "net", %err, "TCP connection error.");
749 }
750 SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
751 trace!(target: "net", ?session_id, ?remote_addr, "Incoming connection");
752 self.metrics.total_incoming_connections.increment(1);
753 self.metrics
754 .incoming_connections
755 .set(self.swarm.state().peers().num_inbound_connections() as f64);
756 }
757 SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
758 trace!(target: "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
759 self.metrics.total_outgoing_connections.increment(1);
760 self.update_pending_connection_metrics()
761 }
762 SwarmEvent::SessionEstablished {
763 peer_id,
764 remote_addr,
765 client_version,
766 capabilities,
767 version,
768 messages,
769 status,
770 direction,
771 } => {
772 let total_active = self.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
773 self.metrics.connected_peers.set(total_active as f64);
774 debug!(
775 target: "net",
776 ?remote_addr,
777 %client_version,
778 ?peer_id,
779 ?total_active,
780 kind=%direction,
781 peer_enode=%NodeRecord::new(remote_addr, peer_id),
782 "Session established"
783 );
784
785 if direction.is_incoming() {
786 self.swarm
787 .state_mut()
788 .peers_mut()
789 .on_incoming_session_established(peer_id, remote_addr);
790 }
791
792 if direction.is_outgoing() {
793 self.swarm.state_mut().peers_mut().on_active_outgoing_established(peer_id);
794 }
795
796 self.update_active_connection_metrics();
797
798 let peer_kind = self
799 .swarm
800 .state()
801 .peers()
802 .peer_by_id(peer_id)
803 .map(|(_, kind)| kind)
804 .unwrap_or_default();
805 let session_info = SessionInfo {
806 peer_id,
807 remote_addr,
808 client_version,
809 capabilities,
810 status,
811 version,
812 peer_kind,
813 };
814
815 self.event_sender
816 .notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
817 }
818 SwarmEvent::PeerAdded(peer_id) => {
819 trace!(target: "net", ?peer_id, "Peer added");
820 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
821 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
822 }
823 SwarmEvent::PeerRemoved(peer_id) => {
824 trace!(target: "net", ?peer_id, "Peer dropped");
825 self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
826 self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
827 }
828 SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
829 let total_active = self.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
830 self.metrics.connected_peers.set(total_active as f64);
831 trace!(
832 target: "net",
833 ?remote_addr,
834 ?peer_id,
835 ?total_active,
836 ?error,
837 "Session disconnected"
838 );
839
840 let reason = if let Some(ref err) = error {
841 self.swarm.state_mut().peers_mut().on_active_session_dropped(
844 &remote_addr,
845 &peer_id,
846 err,
847 );
848 err.as_disconnected()
849 } else {
850 self.swarm.state_mut().peers_mut().on_active_session_gracefully_closed(peer_id);
852 None
853 };
854 self.metrics.closed_sessions.increment(1);
855 self.update_active_connection_metrics();
856
857 if let Some(reason) = reason {
858 self.disconnect_metrics.increment(reason);
859 }
860 self.metrics.backed_off_peers.set(
861 self.swarm
862 .state()
863 .peers()
864 .num_backed_off_peers()
865 .saturating_sub(1)
866 as f64,
867 );
868 self.event_sender
869 .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
870 }
871 SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
872 trace!(
873 target: "net",
874 ?remote_addr,
875 ?error,
876 "Incoming pending session failed"
877 );
878
879 if let Some(ref err) = error {
880 self.swarm
881 .state_mut()
882 .peers_mut()
883 .on_incoming_pending_session_dropped(remote_addr, err);
884 self.metrics.pending_session_failures.increment(1);
885 if let Some(reason) = err.as_disconnected() {
886 self.disconnect_metrics.increment(reason);
887 }
888 } else {
889 self.swarm
890 .state_mut()
891 .peers_mut()
892 .on_incoming_pending_session_gracefully_closed();
893 }
894 self.metrics.closed_sessions.increment(1);
895 self.metrics
896 .incoming_connections
897 .set(self.swarm.state().peers().num_inbound_connections() as f64);
898 self.metrics.backed_off_peers.set(
899 self.swarm
900 .state()
901 .peers()
902 .num_backed_off_peers()
903 .saturating_sub(1)
904 as f64,
905 );
906 }
907 SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
908 trace!(
909 target: "net",
910 ?remote_addr,
911 ?peer_id,
912 ?error,
913 "Outgoing pending session failed"
914 );
915
916 if let Some(ref err) = error {
917 self.swarm.state_mut().peers_mut().on_outgoing_pending_session_dropped(
918 &remote_addr,
919 &peer_id,
920 err,
921 );
922 self.metrics.pending_session_failures.increment(1);
923 if let Some(reason) = err.as_disconnected() {
924 self.disconnect_metrics.increment(reason);
925 }
926 } else {
927 self.swarm
928 .state_mut()
929 .peers_mut()
930 .on_outgoing_pending_session_gracefully_closed(&peer_id);
931 }
932 self.metrics.closed_sessions.increment(1);
933 self.update_pending_connection_metrics();
934
935 self.metrics.backed_off_peers.set(
936 self.swarm
937 .state()
938 .peers()
939 .num_backed_off_peers()
940 .saturating_sub(1)
941 as f64,
942 );
943 }
944 SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
945 trace!(
946 target: "net",
947 ?remote_addr,
948 ?peer_id,
949 %error,
950 "Outgoing connection error"
951 );
952
953 self.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
954 &remote_addr,
955 &peer_id,
956 &error,
957 );
958
959 self.metrics.backed_off_peers.set(
960 self.swarm
961 .state()
962 .peers()
963 .num_backed_off_peers()
964 .saturating_sub(1)
965 as f64,
966 );
967 self.update_pending_connection_metrics();
968 }
969 SwarmEvent::BadMessage { peer_id } => {
970 self.swarm
971 .state_mut()
972 .peers_mut()
973 .apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
974 self.metrics.invalid_messages_received.increment(1);
975 }
976 SwarmEvent::ProtocolBreach { peer_id } => {
977 self.swarm
978 .state_mut()
979 .peers_mut()
980 .apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
981 }
982 }
983 }
984
985 fn get_peer_infos(&self) -> Vec<PeerInfo> {
987 self.swarm
988 .sessions()
989 .active_sessions()
990 .iter()
991 .filter_map(|(&peer_id, session)| {
992 self.swarm
993 .state()
994 .peers()
995 .peer_by_id(peer_id)
996 .map(|(record, kind)| session.peer_info(&record, kind))
997 })
998 .collect()
999 }
1000
1001 fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
1005 self.swarm.sessions().active_sessions().get(&peer_id).and_then(|session| {
1006 self.swarm
1007 .state()
1008 .peers()
1009 .peer_by_id(peer_id)
1010 .map(|(record, kind)| session.peer_info(&record, kind))
1011 })
1012 }
1013
1014 fn get_peer_infos_by_ids(&self, peer_ids: impl IntoIterator<Item = PeerId>) -> Vec<PeerInfo> {
1018 peer_ids.into_iter().filter_map(|peer_id| self.get_peer_info_by_id(peer_id)).collect()
1019 }
1020
1021 #[inline]
1023 fn update_active_connection_metrics(&self) {
1024 self.metrics
1025 .incoming_connections
1026 .set(self.swarm.state().peers().num_inbound_connections() as f64);
1027 self.metrics
1028 .outgoing_connections
1029 .set(self.swarm.state().peers().num_outbound_connections() as f64);
1030 }
1031
1032 #[inline]
1034 fn update_pending_connection_metrics(&self) {
1035 self.metrics
1036 .pending_outgoing_connections
1037 .set(self.swarm.state().peers().num_pending_outbound_connections() as f64);
1038 self.metrics
1039 .total_pending_connections
1040 .set(self.swarm.sessions().num_pending_connections() as f64);
1041 }
1042
1043 pub async fn run_until_graceful_shutdown<F, R>(
1047 mut self,
1048 shutdown: GracefulShutdown,
1049 shutdown_hook: F,
1050 ) -> R
1051 where
1052 F: FnOnce(Self) -> R,
1053 {
1054 let mut graceful_guard = None;
1055 tokio::select! {
1056 _ = &mut self => {},
1057 guard = shutdown => {
1058 graceful_guard = Some(guard);
1059 },
1060 }
1061
1062 self.perform_network_shutdown();
1063 let res = shutdown_hook(self);
1064 drop(graceful_guard);
1065 res
1066 }
1067
1068 fn perform_network_shutdown(&mut self) {
1071 self.swarm.on_shutdown_requested();
1075 self.swarm.sessions_mut().disconnect_all(Some(DisconnectReason::ClientQuitting));
1077 self.swarm.sessions_mut().disconnect_all_pending();
1079 }
1080}
1081
1082impl<N: NetworkPrimitives> Future for NetworkManager<N> {
1083 type Output = ();
1084
1085 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1086 let start = Instant::now();
1087 let mut poll_durations = NetworkManagerPollDurations::default();
1088
1089 let this = self.get_mut();
1090
1091 while let Poll::Ready(outcome) = this.block_import.poll(cx) {
1093 this.on_block_import_result(outcome);
1094 }
1095
1096 let start_network_handle = Instant::now();
1120 let maybe_more_handle_messages = poll_nested_stream_with_budget!(
1121 "net",
1122 "Network message channel",
1123 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_HANDLE_CHANNEL,
1124 this.from_handle_rx.poll_next_unpin(cx),
1125 |msg| this.on_handle_message(msg),
1126 error!("Network channel closed");
1127 );
1128 poll_durations.acc_network_handle = start_network_handle.elapsed();
1129
1130 let maybe_more_swarm_events = poll_nested_stream_with_budget!(
1132 "net",
1133 "Swarm events stream",
1134 DEFAULT_BUDGET_TRY_DRAIN_SWARM,
1135 this.swarm.poll_next_unpin(cx),
1136 |event| this.on_swarm_event(event),
1137 );
1138 poll_durations.acc_swarm =
1139 start_network_handle.elapsed() - poll_durations.acc_network_handle;
1140
1141 if maybe_more_handle_messages || maybe_more_swarm_events {
1143 cx.waker().wake_by_ref();
1145 return Poll::Pending
1146 }
1147
1148 this.update_poll_metrics(start, poll_durations);
1149
1150 Poll::Pending
1151 }
1152}
1153
1154#[derive(Debug, Default)]
1155struct NetworkManagerPollDurations {
1156 acc_network_handle: Duration,
1157 acc_swarm: Duration,
1158}