reth_network/test_utils/
testnet.rs

1//! A network implementation for testing purposes.
2
3use crate::{
4    builder::ETH_REQUEST_CHANNEL_CAPACITY,
5    error::NetworkError,
6    eth_requests::EthRequestHandler,
7    protocol::IntoRlpxSubProtocol,
8    transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
9    NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
10};
11use futures::{FutureExt, StreamExt};
12use pin_project::pin_project;
13use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
14use reth_eth_wire::{
15    protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
16};
17use reth_network_api::{
18    events::{PeerEvent, SessionInfo},
19    test_utils::{PeersHandle, PeersHandleProvider},
20    NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
21};
22use reth_network_peers::PeerId;
23use reth_primitives::{PooledTransaction, TransactionSigned};
24use reth_storage_api::{
25    noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
26};
27use reth_tasks::TokioTaskExecutor;
28use reth_tokio_util::EventStream;
29use reth_transaction_pool::{
30    blobstore::InMemoryBlobStore,
31    test_utils::{TestPool, TestPoolBuilder},
32    EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
33};
34use secp256k1::SecretKey;
35use std::{
36    fmt,
37    future::Future,
38    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
39    pin::Pin,
40    task::{Context, Poll},
41};
42use tokio::{
43    sync::{
44        mpsc::{channel, unbounded_channel},
45        oneshot,
46    },
47    task::JoinHandle,
48};
49
50/// A test network consisting of multiple peers.
51pub struct Testnet<C, Pool> {
52    /// All running peers in the network.
53    peers: Vec<Peer<C, Pool>>,
54}
55
56// === impl Testnet ===
57
58impl<C> Testnet<C, TestPool>
59where
60    C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
61{
62    /// Same as [`Self::try_create_with`] but panics on error
63    pub async fn create_with(num_peers: usize, provider: C) -> Self {
64        Self::try_create_with(num_peers, provider).await.unwrap()
65    }
66
67    /// Creates a new [`Testnet`] with the given number of peers and the provider.
68    pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
69        let mut this = Self { peers: Vec::with_capacity(num_peers) };
70        for _ in 0..num_peers {
71            let config = PeerConfig::new(provider.clone());
72            this.add_peer_with_config(config).await?;
73        }
74        Ok(this)
75    }
76
77    /// Extend the list of peers with new peers that are configured with each of the given
78    /// [`PeerConfig`]s.
79    pub async fn extend_peer_with_config(
80        &mut self,
81        configs: impl IntoIterator<Item = PeerConfig<C>>,
82    ) -> Result<(), NetworkError> {
83        let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
84        let peers = futures::future::join_all(peers).await;
85        for peer in peers {
86            self.peers.push(peer?);
87        }
88        Ok(())
89    }
90}
91
92impl<C, Pool> Testnet<C, Pool>
93where
94    C: BlockReader + HeaderProvider + Clone + 'static,
95    Pool: TransactionPool,
96{
97    /// Return a mutable slice of all peers.
98    pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
99        &mut self.peers
100    }
101
102    /// Return a slice of all peers.
103    pub fn peers(&self) -> &[Peer<C, Pool>] {
104        &self.peers
105    }
106
107    /// Remove a peer from the [`Testnet`] and return it.
108    ///
109    /// # Panics
110    /// If the index is out of bounds.
111    pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
112        self.peers.remove(index)
113    }
114
115    /// Return a mutable iterator over all peers.
116    pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
117        self.peers.iter_mut()
118    }
119
120    /// Return an iterator over all peers.
121    pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
122        self.peers.iter()
123    }
124
125    /// Add a peer to the [`Testnet`] with the given [`PeerConfig`].
126    pub async fn add_peer_with_config(
127        &mut self,
128        config: PeerConfig<C>,
129    ) -> Result<(), NetworkError> {
130        let PeerConfig { config, client, secret_key } = config;
131
132        let network = NetworkManager::new(config).await?;
133        let peer = Peer {
134            network,
135            client,
136            secret_key,
137            request_handler: None,
138            transactions_manager: None,
139            pool: None,
140        };
141        self.peers.push(peer);
142        Ok(())
143    }
144
145    /// Returns all handles to the networks
146    pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
147        self.peers.iter().map(|p| p.handle())
148    }
149
150    /// Maps the pool of each peer with the given closure
151    pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
152    where
153        F: Fn(Peer<C, Pool>) -> Peer<C, P>,
154        P: TransactionPool,
155    {
156        Testnet { peers: self.peers.into_iter().map(f).collect() }
157    }
158
159    /// Apply a closure on each peer
160    pub fn for_each<F>(&self, f: F)
161    where
162        F: Fn(&Peer<C, Pool>),
163    {
164        self.peers.iter().for_each(f)
165    }
166
167    /// Apply a closure on each peer
168    pub fn for_each_mut<F>(&mut self, f: F)
169    where
170        F: FnMut(&mut Peer<C, Pool>),
171    {
172        self.peers.iter_mut().for_each(f)
173    }
174}
175
176impl<C, Pool> Testnet<C, Pool>
177where
178    C: ChainSpecProvider<ChainSpec: EthereumHardforks>
179        + StateProviderFactory
180        + BlockReaderIdExt
181        + HeaderProvider
182        + Clone
183        + 'static,
184    Pool: TransactionPool,
185{
186    /// Installs an eth pool on each peer
187    pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
188        self.map_pool(|peer| {
189            let blob_store = InMemoryBlobStore::default();
190            let pool = TransactionValidationTaskExecutor::eth(
191                peer.client.clone(),
192                blob_store.clone(),
193                TokioTaskExecutor::default(),
194            );
195            peer.map_transactions_manager(EthTransactionPool::eth_pool(
196                pool,
197                blob_store,
198                Default::default(),
199            ))
200        })
201    }
202
203    /// Installs an eth pool on each peer with custom transaction manager config
204    pub fn with_eth_pool_config(
205        self,
206        tx_manager_config: TransactionsManagerConfig,
207    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
208        self.map_pool(|peer| {
209            let blob_store = InMemoryBlobStore::default();
210            let pool = TransactionValidationTaskExecutor::eth(
211                peer.client.clone(),
212                blob_store.clone(),
213                TokioTaskExecutor::default(),
214            );
215
216            peer.map_transactions_manager_with_config(
217                EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
218                tx_manager_config.clone(),
219            )
220        })
221    }
222}
223
224impl<C, Pool> Testnet<C, Pool>
225where
226    C: BlockReader<
227            Block = reth_primitives::Block,
228            Receipt = reth_primitives::Receipt,
229            Header = reth_primitives::Header,
230        > + HeaderProvider
231        + Clone
232        + Unpin
233        + 'static,
234    Pool: TransactionPool<
235            Transaction: PoolTransaction<Consensus = TransactionSigned, Pooled = PooledTransaction>,
236        > + Unpin
237        + 'static,
238{
239    /// Spawns the testnet to a separate task
240    pub fn spawn(self) -> TestnetHandle<C, Pool> {
241        let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
242        let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
243        let mut net = self;
244        let handle = tokio::task::spawn(async move {
245            let mut tx = None;
246            tokio::select! {
247                _ = &mut net => {}
248                inc = rx => {
249                    tx = inc.ok();
250                }
251            }
252            if let Some(tx) = tx {
253                let _ = tx.send(net);
254            }
255        });
256
257        TestnetHandle { _handle: handle, peers, terminate: tx }
258    }
259}
260
261impl Testnet<NoopProvider, TestPool> {
262    /// Same as [`Self::try_create`] but panics on error
263    pub async fn create(num_peers: usize) -> Self {
264        Self::try_create(num_peers).await.unwrap()
265    }
266
267    /// Creates a new [`Testnet`] with the given number of peers
268    pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
269        let mut this = Self::default();
270
271        this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
272        Ok(this)
273    }
274
275    /// Add a peer to the [`Testnet`]
276    pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
277        self.add_peer_with_config(Default::default()).await
278    }
279}
280
281impl<C, Pool> Default for Testnet<C, Pool> {
282    fn default() -> Self {
283        Self { peers: Vec::new() }
284    }
285}
286
287impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.debug_struct("Testnet {{}}").finish_non_exhaustive()
290    }
291}
292
293impl<C, Pool> Future for Testnet<C, Pool>
294where
295    C: BlockReader<
296            Block = reth_primitives::Block,
297            Receipt = reth_primitives::Receipt,
298            Header = reth_primitives::Header,
299        > + HeaderProvider
300        + Unpin
301        + 'static,
302    Pool: TransactionPool<
303            Transaction: PoolTransaction<Consensus = TransactionSigned, Pooled = PooledTransaction>,
304        > + Unpin
305        + 'static,
306{
307    type Output = ();
308
309    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
310        let this = self.get_mut();
311        for peer in &mut this.peers {
312            let _ = peer.poll_unpin(cx);
313        }
314        Poll::Pending
315    }
316}
317
318/// A handle to a [`Testnet`] that can be shared.
319#[derive(Debug)]
320pub struct TestnetHandle<C, Pool> {
321    _handle: JoinHandle<()>,
322    peers: Vec<PeerHandle<Pool>>,
323    terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
324}
325
326// === impl TestnetHandle ===
327
328impl<C, Pool> TestnetHandle<C, Pool> {
329    /// Terminates the task and returns the [`Testnet`] back.
330    pub async fn terminate(self) -> Testnet<C, Pool> {
331        let (tx, rx) = oneshot::channel();
332        self.terminate.send(tx).unwrap();
333        rx.await.unwrap()
334    }
335
336    /// Returns the [`PeerHandle`]s of this [`Testnet`].
337    pub fn peers(&self) -> &[PeerHandle<Pool>] {
338        &self.peers
339    }
340
341    /// Connects all peers with each other.
342    ///
343    /// This establishes sessions concurrently between all peers.
344    ///
345    /// Returns once all sessions are established.
346    pub async fn connect_peers(&self) {
347        if self.peers.len() < 2 {
348            return
349        }
350
351        // add an event stream for _each_ peer
352        let streams =
353            self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
354
355        // add all peers to each other
356        for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
357            for idx in (idx + 1)..self.peers.len() {
358                let neighbour = &self.peers[idx];
359                handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
360            }
361        }
362
363        // await all sessions to be established
364        let num_sessions_per_peer = self.peers.len() - 1;
365        let fut = streams.into_iter().map(|mut stream| async move {
366            stream.take_session_established(num_sessions_per_peer).await
367        });
368
369        futures::future::join_all(fut).await;
370    }
371}
372
373/// A peer in the [`Testnet`].
374#[pin_project]
375#[derive(Debug)]
376pub struct Peer<C, Pool = TestPool> {
377    #[pin]
378    network: NetworkManager<EthNetworkPrimitives>,
379    #[pin]
380    request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
381    #[pin]
382    transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
383    pool: Option<Pool>,
384    client: C,
385    secret_key: SecretKey,
386}
387
388// === impl Peer ===
389
390impl<C, Pool> Peer<C, Pool>
391where
392    C: BlockReader + HeaderProvider + Clone + 'static,
393    Pool: TransactionPool,
394{
395    /// Returns the number of connected peers.
396    pub fn num_peers(&self) -> usize {
397        self.network.num_connected_peers()
398    }
399
400    /// Adds an additional protocol handler to the peer.
401    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
402        self.network.add_rlpx_sub_protocol(protocol);
403    }
404
405    /// Returns a handle to the peer's network.
406    pub fn peer_handle(&self) -> PeerHandle<Pool> {
407        PeerHandle {
408            network: self.network.handle().clone(),
409            pool: self.pool.clone(),
410            transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
411        }
412    }
413
414    /// The address that listens for incoming connections.
415    pub const fn local_addr(&self) -> SocketAddr {
416        self.network.local_addr()
417    }
418
419    /// The [`PeerId`] of this peer.
420    pub fn peer_id(&self) -> PeerId {
421        *self.network.peer_id()
422    }
423
424    /// Returns mutable access to the network.
425    pub fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
426        &mut self.network
427    }
428
429    /// Returns the [`NetworkHandle`] of this peer.
430    pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
431        self.network.handle().clone()
432    }
433
434    /// Returns the [`TestPool`] of this peer.
435    pub const fn pool(&self) -> Option<&Pool> {
436        self.pool.as_ref()
437    }
438
439    /// Set a new request handler that's connected to the peer's network
440    pub fn install_request_handler(&mut self) {
441        let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
442        self.network.set_eth_request_handler(tx);
443        let peers = self.network.peers_handle();
444        let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
445        self.request_handler = Some(request_handler);
446    }
447
448    /// Set a new transactions manager that's connected to the peer's network
449    pub fn install_transactions_manager(&mut self, pool: Pool) {
450        let (tx, rx) = unbounded_channel();
451        self.network.set_transactions(tx);
452        let transactions_manager = TransactionsManager::new(
453            self.handle(),
454            pool.clone(),
455            rx,
456            TransactionsManagerConfig::default(),
457        );
458        self.transactions_manager = Some(transactions_manager);
459        self.pool = Some(pool);
460    }
461
462    /// Set a new transactions manager that's connected to the peer's network
463    pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
464    where
465        P: TransactionPool,
466    {
467        let Self { mut network, request_handler, client, secret_key, .. } = self;
468        let (tx, rx) = unbounded_channel();
469        network.set_transactions(tx);
470        let transactions_manager = TransactionsManager::new(
471            network.handle().clone(),
472            pool.clone(),
473            rx,
474            TransactionsManagerConfig::default(),
475        );
476        Peer {
477            network,
478            request_handler,
479            transactions_manager: Some(transactions_manager),
480            pool: Some(pool),
481            client,
482            secret_key,
483        }
484    }
485
486    /// Map transactions manager with custom config
487    pub fn map_transactions_manager_with_config<P>(
488        self,
489        pool: P,
490        config: TransactionsManagerConfig,
491    ) -> Peer<C, P>
492    where
493        P: TransactionPool,
494    {
495        let Self { mut network, request_handler, client, secret_key, .. } = self;
496        let (tx, rx) = unbounded_channel();
497        network.set_transactions(tx);
498
499        let transactions_manager = TransactionsManager::new(
500            network.handle().clone(),
501            pool.clone(),
502            rx,
503            config, // Use provided config
504        );
505
506        Peer {
507            network,
508            request_handler,
509            transactions_manager: Some(transactions_manager),
510            pool: Some(pool),
511            client,
512            secret_key,
513        }
514    }
515}
516
517impl<C> Peer<C>
518where
519    C: BlockReader + HeaderProvider + Clone + 'static,
520{
521    /// Installs a new [`TestPool`]
522    pub fn install_test_pool(&mut self) {
523        self.install_transactions_manager(TestPoolBuilder::default().into())
524    }
525}
526
527impl<C, Pool> Future for Peer<C, Pool>
528where
529    C: BlockReader<
530            Block = reth_primitives::Block,
531            Receipt = reth_primitives::Receipt,
532            Header = reth_primitives::Header,
533        > + HeaderProvider
534        + Unpin
535        + 'static,
536    Pool: TransactionPool<
537            Transaction: PoolTransaction<Consensus = TransactionSigned, Pooled = PooledTransaction>,
538        > + Unpin
539        + 'static,
540{
541    type Output = ();
542
543    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544        let this = self.project();
545
546        if let Some(request) = this.request_handler.as_pin_mut() {
547            let _ = request.poll(cx);
548        }
549
550        if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
551            let _ = tx_manager.poll(cx);
552        }
553
554        this.network.poll(cx)
555    }
556}
557
558/// A helper config for setting up the reth networking stack.
559#[derive(Debug)]
560pub struct PeerConfig<C = NoopProvider> {
561    config: NetworkConfig<C>,
562    client: C,
563    secret_key: SecretKey,
564}
565
566/// A handle to a peer in the [`Testnet`].
567#[derive(Debug)]
568pub struct PeerHandle<Pool> {
569    network: NetworkHandle<EthNetworkPrimitives>,
570    transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
571    pool: Option<Pool>,
572}
573
574// === impl PeerHandle ===
575
576impl<Pool> PeerHandle<Pool> {
577    /// Returns the [`PeerId`] used in the network.
578    pub fn peer_id(&self) -> &PeerId {
579        self.network.peer_id()
580    }
581
582    /// Returns the [`PeersHandle`] from the network.
583    pub fn peer_handle(&self) -> &PeersHandle {
584        self.network.peers_handle()
585    }
586
587    /// Returns the local socket as configured for the network.
588    pub fn local_addr(&self) -> SocketAddr {
589        self.network.local_addr()
590    }
591
592    /// Creates a new [`NetworkEvent`] listener channel.
593    pub fn event_listener(&self) -> EventStream<NetworkEvent> {
594        self.network.event_listener()
595    }
596
597    /// Returns the [`TransactionsHandle`] of this peer.
598    pub const fn transactions(&self) -> Option<&TransactionsHandle> {
599        self.transactions.as_ref()
600    }
601
602    /// Returns the [`TestPool`] of this peer.
603    pub const fn pool(&self) -> Option<&Pool> {
604        self.pool.as_ref()
605    }
606
607    /// Returns the [`NetworkHandle`] of this peer.
608    pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
609        &self.network
610    }
611}
612
613// === impl PeerConfig ===
614
615impl<C> PeerConfig<C>
616where
617    C: BlockReader + HeaderProvider + Clone + 'static,
618{
619    /// Launches the network and returns the [Peer] that manages it
620    pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
621        let Self { config, client, secret_key } = self;
622        let network = NetworkManager::new(config).await?;
623        let peer = Peer {
624            network,
625            client,
626            secret_key,
627            request_handler: None,
628            transactions_manager: None,
629            pool: None,
630        };
631        Ok(peer)
632    }
633
634    /// Initialize the network with a random secret key, allowing the devp2p and discovery to bind
635    /// to any available IP and port.
636    pub fn new(client: C) -> Self
637    where
638        C: ChainSpecProvider<ChainSpec: Hardforks>,
639    {
640        let secret_key = SecretKey::new(&mut rand::thread_rng());
641        let config = Self::network_config_builder(secret_key).build(client.clone());
642        Self { config, client, secret_key }
643    }
644
645    /// Initialize the network with a given secret key, allowing devp2p and discovery to bind any
646    /// available IP and port.
647    pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
648    where
649        C: ChainSpecProvider<ChainSpec: Hardforks>,
650    {
651        let config = Self::network_config_builder(secret_key).build(client.clone());
652        Self { config, client, secret_key }
653    }
654
655    /// Initialize the network with a given capabilities.
656    pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
657    where
658        C: ChainSpecProvider<ChainSpec: Hardforks>,
659    {
660        let secret_key = SecretKey::new(&mut rand::thread_rng());
661
662        let builder = Self::network_config_builder(secret_key);
663        let hello_message =
664            HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
665        let config = builder.hello_message(hello_message).build(client.clone());
666
667        Self { config, client, secret_key }
668    }
669
670    fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
671        NetworkConfigBuilder::new(secret_key)
672            .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
673            .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
674            .disable_dns_discovery()
675            .disable_discv4_discovery()
676    }
677}
678
679impl Default for PeerConfig {
680    fn default() -> Self {
681        Self::new(NoopProvider::default())
682    }
683}
684
685/// A helper type to await network events
686///
687/// This makes it easier to await established connections
688#[derive(Debug)]
689pub struct NetworkEventStream {
690    inner: EventStream<NetworkEvent>,
691}
692
693// === impl NetworkEventStream ===
694
695impl NetworkEventStream {
696    /// Create a new [`NetworkEventStream`] from the given network event receiver stream.
697    pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
698        Self { inner }
699    }
700
701    /// Awaits the next event for a session to be closed
702    pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
703        while let Some(ev) = self.inner.next().await {
704            if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
705                return Some((peer_id, reason))
706            }
707        }
708        None
709    }
710
711    /// Awaits the next event for an established session
712    pub async fn next_session_established(&mut self) -> Option<PeerId> {
713        while let Some(ev) = self.inner.next().await {
714            match ev {
715                NetworkEvent::ActivePeerSession { info, .. } |
716                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
717                    return Some(info.peer_id)
718                }
719                _ => {}
720            }
721        }
722        None
723    }
724
725    /// Awaits the next `num` events for an established session
726    pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
727        if num == 0 {
728            return Vec::new();
729        }
730        let mut peers = Vec::with_capacity(num);
731        while let Some(ev) = self.inner.next().await {
732            if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
733                peers.push(peer_id);
734                num -= 1;
735                if num == 0 {
736                    return peers;
737                }
738            }
739        }
740        peers
741    }
742
743    /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and
744    /// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established
745    /// session.
746    pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
747        let peer_id = match self.inner.next().await {
748            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
749            _ => return None,
750        };
751
752        match self.inner.next().await {
753            Some(NetworkEvent::ActivePeerSession {
754                info: SessionInfo { peer_id: peer_id2, .. },
755                ..
756            }) => {
757                debug_assert_eq!(
758                    peer_id, peer_id2,
759                    "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
760                );
761                Some(peer_id)
762            }
763            _ => None,
764        }
765    }
766
767    /// Awaits the next event for a peer added.
768    pub async fn peer_added(&mut self) -> Option<PeerId> {
769        let peer_id = match self.inner.next().await {
770            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
771            _ => return None,
772        };
773
774        Some(peer_id)
775    }
776
777    /// Awaits the next event for a peer removed.
778    pub async fn peer_removed(&mut self) -> Option<PeerId> {
779        let peer_id = match self.inner.next().await {
780            Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
781            _ => return None,
782        };
783
784        Some(peer_id)
785    }
786}