1use crate::{
4 builder::ETH_REQUEST_CHANNEL_CAPACITY,
5 error::NetworkError,
6 eth_requests::EthRequestHandler,
7 protocol::IntoRlpxSubProtocol,
8 transactions::{
9 config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
10 policy::NetworkPolicies,
11 TransactionsHandle, TransactionsManager, TransactionsManagerConfig,
12 },
13 NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, PeersConfig,
14};
15use futures::{FutureExt, StreamExt};
16use pin_project::pin_project;
17use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
18use reth_eth_wire::{
19 protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
20};
21use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
22use reth_evm_ethereum::EthEvmConfig;
23use reth_metrics::common::mpsc::memory_bounded_channel;
24use reth_network_api::{
25 events::{PeerEvent, SessionInfo},
26 test_utils::{PeersHandle, PeersHandleProvider},
27 NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
28};
29use reth_network_peers::PeerId;
30use reth_storage_api::{
31 noop::NoopProvider, BalProvider, BlockReader, BlockReaderIdExt, HeaderProvider,
32 StateProviderFactory,
33};
34use reth_tasks::Runtime;
35use reth_tokio_util::EventStream;
36use reth_transaction_pool::{
37 blobstore::InMemoryBlobStore,
38 test_utils::{TestPool, TestPoolBuilder},
39 EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
40};
41use secp256k1::SecretKey;
42use std::{
43 fmt,
44 future::Future,
45 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
46 pin::Pin,
47 task::{Context, Poll},
48};
49use tokio::{
50 sync::{mpsc::channel, oneshot},
51 task::JoinHandle,
52};
53
54use crate::transactions::constants::tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES;
55
56pub struct Testnet<C, Pool> {
58 peers: Vec<Peer<C, Pool>>,
60}
61
62impl<C> Testnet<C, TestPool>
65where
66 C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
67{
68 pub async fn create_with(num_peers: usize, provider: C) -> Self {
70 Self::try_create_with(num_peers, provider).await.unwrap()
71 }
72
73 pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
75 let mut this = Self { peers: Vec::with_capacity(num_peers) };
76 for _ in 0..num_peers {
77 let config = PeerConfig::new(provider.clone());
78 this.add_peer_with_config(config).await?;
79 }
80 Ok(this)
81 }
82
83 pub async fn extend_peer_with_config(
86 &mut self,
87 configs: impl IntoIterator<Item = PeerConfig<C>>,
88 ) -> Result<(), NetworkError> {
89 let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
90 let peers = futures::future::join_all(peers).await;
91 for peer in peers {
92 self.peers.push(peer?);
93 }
94 Ok(())
95 }
96}
97
98impl<C, Pool> Testnet<C, Pool>
99where
100 C: BlockReader + HeaderProvider + Clone + 'static,
101 Pool: TransactionPool,
102{
103 pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
105 &mut self.peers
106 }
107
108 pub fn peers(&self) -> &[Peer<C, Pool>] {
110 &self.peers
111 }
112
113 pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
118 self.peers.remove(index)
119 }
120
121 pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
123 self.peers.iter_mut()
124 }
125
126 pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
128 self.peers.iter()
129 }
130
131 pub async fn add_peer_with_config(
133 &mut self,
134 config: PeerConfig<C>,
135 ) -> Result<(), NetworkError> {
136 let PeerConfig { config, client, secret_key } = config;
137
138 let network = NetworkManager::new(config).await?;
139 let peer = Peer {
140 network,
141 client,
142 secret_key,
143 request_handler: None,
144 transactions_manager: None,
145 pool: None,
146 };
147 self.peers.push(peer);
148 Ok(())
149 }
150
151 pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
153 self.peers.iter().map(|p| p.handle())
154 }
155
156 pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
158 where
159 F: Fn(Peer<C, Pool>) -> Peer<C, P>,
160 P: TransactionPool,
161 {
162 Testnet { peers: self.peers.into_iter().map(f).collect() }
163 }
164
165 pub fn for_each<F>(&self, f: F)
167 where
168 F: Fn(&Peer<C, Pool>),
169 {
170 self.peers.iter().for_each(f)
171 }
172
173 pub fn for_each_mut<F>(&mut self, f: F)
175 where
176 F: FnMut(&mut Peer<C, Pool>),
177 {
178 self.peers.iter_mut().for_each(f)
179 }
180}
181
182impl<C, Pool> Testnet<C, Pool>
183where
184 C: ChainSpecProvider<ChainSpec: EthereumHardforks>
185 + StateProviderFactory
186 + BlockReaderIdExt
187 + HeaderProvider<Header = alloy_consensus::Header>
188 + Clone
189 + 'static,
190 Pool: TransactionPool,
191{
192 pub fn with_eth_pool(
194 self,
195 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
196 self.map_pool(|peer| {
197 let blob_store = InMemoryBlobStore::default();
198 let pool = TransactionValidationTaskExecutor::eth(
199 peer.client.clone(),
200 EthEvmConfig::mainnet(),
201 blob_store.clone(),
202 Runtime::test(),
203 );
204 peer.map_transactions_manager(EthTransactionPool::eth_pool(
205 pool,
206 blob_store,
207 Default::default(),
208 ))
209 })
210 }
211
212 pub fn with_eth_pool_config(
214 self,
215 tx_manager_config: TransactionsManagerConfig,
216 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
217 self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
218 }
219
220 pub fn with_eth_pool_config_and_policy(
222 self,
223 tx_manager_config: TransactionsManagerConfig,
224 policy: TransactionPropagationKind,
225 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
226 self.map_pool(|peer| {
227 let blob_store = InMemoryBlobStore::default();
228 let pool = TransactionValidationTaskExecutor::eth(
229 peer.client.clone(),
230 EthEvmConfig::mainnet(),
231 blob_store.clone(),
232 Runtime::test(),
233 );
234
235 peer.map_transactions_manager_with(
236 EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
237 tx_manager_config.clone(),
238 policy,
239 )
240 })
241 }
242}
243
244impl<C, Pool> Testnet<C, Pool>
245where
246 C: BlockReader<
247 Block = reth_ethereum_primitives::Block,
248 Receipt = reth_ethereum_primitives::Receipt,
249 Header = alloy_consensus::Header,
250 > + HeaderProvider
251 + BalProvider
252 + Clone
253 + Unpin
254 + 'static,
255 Pool: TransactionPool<
256 Transaction: PoolTransaction<
257 Consensus = TransactionSigned,
258 Pooled = PooledTransactionVariant,
259 >,
260 > + Unpin
261 + 'static,
262{
263 pub fn spawn(self) -> TestnetHandle<C, Pool> {
265 let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
266 let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
267 let mut net = self;
268 let handle = tokio::task::spawn(async move {
269 let mut tx = None;
270 tokio::select! {
271 _ = &mut net => {}
272 inc = rx => {
273 tx = inc.ok();
274 }
275 }
276 if let Some(tx) = tx {
277 let _ = tx.send(net);
278 }
279 });
280
281 TestnetHandle { _handle: handle, peers, terminate: tx }
282 }
283}
284
285impl Testnet<NoopProvider, TestPool> {
286 pub async fn create(num_peers: usize) -> Self {
288 Self::try_create(num_peers).await.unwrap()
289 }
290
291 pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
293 let mut this = Self::default();
294
295 this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
296 Ok(this)
297 }
298
299 pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
301 self.add_peer_with_config(Default::default()).await
302 }
303}
304
305impl<C, Pool> Default for Testnet<C, Pool> {
306 fn default() -> Self {
307 Self { peers: Vec::new() }
308 }
309}
310
311impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
312 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313 f.debug_struct("Testnet {{}}").finish_non_exhaustive()
314 }
315}
316
317impl<C, Pool> Future for Testnet<C, Pool>
318where
319 C: BlockReader<
320 Block = reth_ethereum_primitives::Block,
321 Receipt = reth_ethereum_primitives::Receipt,
322 Header = alloy_consensus::Header,
323 > + HeaderProvider
324 + BalProvider
325 + Unpin
326 + 'static,
327 Pool: TransactionPool<
328 Transaction: PoolTransaction<
329 Consensus = TransactionSigned,
330 Pooled = PooledTransactionVariant,
331 >,
332 > + Unpin
333 + 'static,
334{
335 type Output = ();
336
337 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338 let this = self.get_mut();
339 for peer in &mut this.peers {
340 let _ = peer.poll_unpin(cx);
341 }
342 Poll::Pending
343 }
344}
345
346#[derive(Debug)]
348pub struct TestnetHandle<C, Pool> {
349 _handle: JoinHandle<()>,
350 peers: Vec<PeerHandle<Pool>>,
351 terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
352}
353
354impl<C, Pool> TestnetHandle<C, Pool> {
357 pub async fn terminate(self) -> Testnet<C, Pool> {
359 let (tx, rx) = oneshot::channel();
360 self.terminate.send(tx).unwrap();
361 rx.await.unwrap()
362 }
363
364 pub fn peers(&self) -> &[PeerHandle<Pool>] {
366 &self.peers
367 }
368
369 pub async fn connect_peers(&self) {
375 if self.peers.len() < 2 {
376 return
377 }
378
379 let streams =
381 self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
382
383 for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
385 for idx in (idx + 1)..self.peers.len() {
386 let neighbour = &self.peers[idx];
387 handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
388 }
389 }
390
391 let num_sessions_per_peer = self.peers.len() - 1;
393 let fut = streams.into_iter().map(|mut stream| async move {
394 stream.take_session_established(num_sessions_per_peer).await
395 });
396
397 futures::future::join_all(fut).await;
398 }
399}
400
401#[pin_project]
403#[derive(Debug)]
404pub struct Peer<C, Pool = TestPool> {
405 #[pin]
406 network: NetworkManager<EthNetworkPrimitives>,
407 #[pin]
408 request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
409 #[pin]
410 transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
411 pool: Option<Pool>,
412 client: C,
413 secret_key: SecretKey,
414}
415
416impl<C, Pool> Peer<C, Pool>
419where
420 C: BlockReader + HeaderProvider + Clone + 'static,
421 Pool: TransactionPool,
422{
423 pub fn num_peers(&self) -> usize {
425 self.network.num_connected_peers()
426 }
427
428 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
430 self.network.add_rlpx_sub_protocol(protocol);
431 }
432
433 pub fn peer_handle(&self) -> PeerHandle<Pool> {
435 PeerHandle {
436 network: self.network.handle().clone(),
437 pool: self.pool.clone(),
438 transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
439 }
440 }
441
442 pub const fn local_addr(&self) -> SocketAddr {
444 self.network.local_addr()
445 }
446
447 pub fn peer_id(&self) -> PeerId {
449 *self.network.peer_id()
450 }
451
452 pub const fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
454 &mut self.network
455 }
456
457 pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
459 self.network.handle().clone()
460 }
461
462 pub const fn pool(&self) -> Option<&Pool> {
464 self.pool.as_ref()
465 }
466
467 pub fn install_request_handler(&mut self)
469 where
470 C: BalProvider,
471 {
472 let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
473 self.network.set_eth_request_handler(tx);
474 let peers = self.network.peers_handle();
475 let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
476 self.request_handler = Some(request_handler);
477 }
478
479 pub fn install_transactions_manager(&mut self, pool: Pool) {
481 let (tx, rx) = memory_bounded_channel(
482 DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
483 "test_tx_channel",
484 );
485 self.network.set_transactions(tx);
486 let transactions_manager = TransactionsManager::new(
487 self.handle(),
488 pool.clone(),
489 rx,
490 TransactionsManagerConfig::default(),
491 );
492 self.transactions_manager = Some(transactions_manager);
493 self.pool = Some(pool);
494 }
495
496 pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
498 where
499 P: TransactionPool,
500 {
501 let Self { mut network, request_handler, client, secret_key, .. } = self;
502 let (tx, rx) = memory_bounded_channel(
503 DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
504 "test_tx_channel",
505 );
506 network.set_transactions(tx);
507 let transactions_manager = TransactionsManager::new(
508 network.handle().clone(),
509 pool.clone(),
510 rx,
511 TransactionsManagerConfig::default(),
512 );
513 Peer {
514 network,
515 request_handler,
516 transactions_manager: Some(transactions_manager),
517 pool: Some(pool),
518 client,
519 secret_key,
520 }
521 }
522
523 pub fn map_transactions_manager_with_config<P>(
525 self,
526 pool: P,
527 config: TransactionsManagerConfig,
528 ) -> Peer<C, P>
529 where
530 P: TransactionPool,
531 {
532 self.map_transactions_manager_with(pool, config, Default::default())
533 }
534
535 pub fn map_transactions_manager_with<P>(
537 self,
538 pool: P,
539 config: TransactionsManagerConfig,
540 policy: TransactionPropagationKind,
541 ) -> Peer<C, P>
542 where
543 P: TransactionPool,
544 {
545 let Self { mut network, request_handler, client, secret_key, .. } = self;
546 let (tx, rx) = memory_bounded_channel(
547 DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
548 "test_tx_channel",
549 );
550 network.set_transactions(tx);
551
552 let announcement_policy = StrictEthAnnouncementFilter::default();
553 let policies = NetworkPolicies::new(policy, announcement_policy);
554
555 let transactions_manager = TransactionsManager::with_policy(
556 network.handle().clone(),
557 pool.clone(),
558 rx,
559 config,
560 policies,
561 );
562
563 Peer {
564 network,
565 request_handler,
566 transactions_manager: Some(transactions_manager),
567 pool: Some(pool),
568 client,
569 secret_key,
570 }
571 }
572}
573
574impl<C> Peer<C>
575where
576 C: BlockReader + HeaderProvider + Clone + 'static,
577{
578 pub fn install_test_pool(&mut self) {
580 self.install_transactions_manager(TestPoolBuilder::default().into())
581 }
582}
583
584impl<C, Pool> Future for Peer<C, Pool>
585where
586 C: BlockReader<
587 Block = reth_ethereum_primitives::Block,
588 Receipt = reth_ethereum_primitives::Receipt,
589 Header = alloy_consensus::Header,
590 > + HeaderProvider
591 + BalProvider
592 + Unpin
593 + 'static,
594 Pool: TransactionPool<
595 Transaction: PoolTransaction<
596 Consensus = TransactionSigned,
597 Pooled = PooledTransactionVariant,
598 >,
599 > + Unpin
600 + 'static,
601{
602 type Output = ();
603
604 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
605 let this = self.project();
606
607 if let Some(request) = this.request_handler.as_pin_mut() {
608 let _ = request.poll(cx);
609 }
610
611 if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
612 let _ = tx_manager.poll(cx);
613 }
614
615 this.network.poll(cx)
616 }
617}
618
619#[derive(Debug)]
621pub struct PeerConfig<C = NoopProvider> {
622 config: NetworkConfig<C>,
623 client: C,
624 secret_key: SecretKey,
625}
626
627#[derive(Debug)]
629pub struct PeerHandle<Pool> {
630 network: NetworkHandle<EthNetworkPrimitives>,
631 transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
632 pool: Option<Pool>,
633}
634
635impl<Pool> PeerHandle<Pool> {
638 pub fn peer_id(&self) -> &PeerId {
640 self.network.peer_id()
641 }
642
643 pub fn peer_handle(&self) -> &PeersHandle {
645 self.network.peers_handle()
646 }
647
648 pub fn local_addr(&self) -> SocketAddr {
650 self.network.local_addr()
651 }
652
653 pub fn event_listener(&self) -> EventStream<NetworkEvent> {
655 self.network.event_listener()
656 }
657
658 pub const fn transactions(&self) -> Option<&TransactionsHandle> {
660 self.transactions.as_ref()
661 }
662
663 pub const fn pool(&self) -> Option<&Pool> {
665 self.pool.as_ref()
666 }
667
668 pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
670 &self.network
671 }
672}
673
674impl<C> PeerConfig<C>
677where
678 C: BlockReader + HeaderProvider + Clone + 'static,
679{
680 pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
682 let Self { config, client, secret_key } = self;
683 let network = NetworkManager::new(config).await?;
684 let peer = Peer {
685 network,
686 client,
687 secret_key,
688 request_handler: None,
689 transactions_manager: None,
690 pool: None,
691 };
692 Ok(peer)
693 }
694
695 pub fn new(client: C) -> Self
698 where
699 C: ChainSpecProvider<ChainSpec: Hardforks>,
700 {
701 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
702 let config = Self::network_config_builder(secret_key).build(client.clone());
703 Self { config, client, secret_key }
704 }
705
706 pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
709 where
710 C: ChainSpecProvider<ChainSpec: Hardforks>,
711 {
712 let config = Self::network_config_builder(secret_key).build(client.clone());
713 Self { config, client, secret_key }
714 }
715
716 pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
718 where
719 C: ChainSpecProvider<ChainSpec: Hardforks>,
720 {
721 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
722
723 let builder = Self::network_config_builder(secret_key);
724 let hello_message =
725 HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
726 let config = builder.hello_message(hello_message).build(client.clone());
727
728 Self { config, client, secret_key }
729 }
730
731 fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
732 NetworkConfigBuilder::new(secret_key, Runtime::test())
733 .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
734 .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
735 .disable_dns_discovery()
736 .disable_discv4_discovery()
737 .peer_config(PeersConfig::test())
738 }
739}
740
741impl Default for PeerConfig {
742 fn default() -> Self {
743 Self::new(NoopProvider::default())
744 }
745}
746
747#[derive(Debug)]
751pub struct NetworkEventStream {
752 inner: EventStream<NetworkEvent>,
753}
754
755impl NetworkEventStream {
758 pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
760 Self { inner }
761 }
762
763 pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
765 while let Some(ev) = self.inner.next().await {
766 if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
767 return Some((peer_id, reason))
768 }
769 }
770 None
771 }
772
773 pub async fn next_session_established(&mut self) -> Option<PeerId> {
775 while let Some(ev) = self.inner.next().await {
776 match ev {
777 NetworkEvent::ActivePeerSession { info, .. } |
778 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
779 return Some(info.peer_id)
780 }
781 _ => {}
782 }
783 }
784 None
785 }
786
787 pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
789 if num == 0 {
790 return Vec::new();
791 }
792 let mut peers = Vec::with_capacity(num);
793 while let Some(ev) = self.inner.next().await {
794 if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
795 peers.push(peer_id);
796 num -= 1;
797 if num == 0 {
798 return peers;
799 }
800 }
801 }
802 peers
803 }
804
805 pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
809 let peer_id = match self.inner.next().await {
810 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
811 _ => return None,
812 };
813
814 match self.inner.next().await {
815 Some(NetworkEvent::ActivePeerSession {
816 info: SessionInfo { peer_id: peer_id2, .. },
817 ..
818 }) => {
819 debug_assert_eq!(
820 peer_id, peer_id2,
821 "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
822 );
823 Some(peer_id)
824 }
825 _ => None,
826 }
827 }
828
829 pub async fn peer_added(&mut self) -> Option<PeerId> {
831 let peer_id = match self.inner.next().await {
832 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
833 _ => return None,
834 };
835
836 Some(peer_id)
837 }
838
839 pub async fn peer_removed(&mut self) -> Option<PeerId> {
841 let peer_id = match self.inner.next().await {
842 Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
843 _ => return None,
844 };
845
846 Some(peer_id)
847 }
848}