Skip to main content

reth_network/test_utils/
testnet.rs

1//! A network implementation for testing purposes.
2
3use crate::{
4    builder::ETH_REQUEST_CHANNEL_CAPACITY,
5    error::NetworkError,
6    eth_requests::EthRequestHandler,
7    protocol::IntoRlpxSubProtocol,
8    transactions::{
9        config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
10        policy::NetworkPolicies,
11        TransactionsHandle, TransactionsManager, TransactionsManagerConfig,
12    },
13    NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
14};
15use futures::{FutureExt, StreamExt};
16use pin_project::pin_project;
17use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
18use reth_eth_wire::{
19    protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
20};
21use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
22use reth_evm_ethereum::EthEvmConfig;
23use reth_network_api::{
24    events::{PeerEvent, SessionInfo},
25    test_utils::{PeersHandle, PeersHandleProvider},
26    NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
27};
28use reth_network_peers::PeerId;
29use reth_storage_api::{
30    noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
31};
32use reth_tasks::TokioTaskExecutor;
33use reth_tokio_util::EventStream;
34use reth_transaction_pool::{
35    blobstore::InMemoryBlobStore,
36    test_utils::{TestPool, TestPoolBuilder},
37    EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
38};
39use secp256k1::SecretKey;
40use std::{
41    fmt,
42    future::Future,
43    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
44    pin::Pin,
45    task::{Context, Poll},
46};
47use tokio::{
48    sync::{
49        mpsc::{channel, unbounded_channel},
50        oneshot,
51    },
52    task::JoinHandle,
53};
54
55/// A test network consisting of multiple peers.
56pub struct Testnet<C, Pool> {
57    /// All running peers in the network.
58    peers: Vec<Peer<C, Pool>>,
59}
60
61// === impl Testnet ===
62
63impl<C> Testnet<C, TestPool>
64where
65    C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
66{
67    /// Same as [`Self::try_create_with`] but panics on error
68    pub async fn create_with(num_peers: usize, provider: C) -> Self {
69        Self::try_create_with(num_peers, provider).await.unwrap()
70    }
71
72    /// Creates a new [`Testnet`] with the given number of peers and the provider.
73    pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
74        let mut this = Self { peers: Vec::with_capacity(num_peers) };
75        for _ in 0..num_peers {
76            let config = PeerConfig::new(provider.clone());
77            this.add_peer_with_config(config).await?;
78        }
79        Ok(this)
80    }
81
82    /// Extend the list of peers with new peers that are configured with each of the given
83    /// [`PeerConfig`]s.
84    pub async fn extend_peer_with_config(
85        &mut self,
86        configs: impl IntoIterator<Item = PeerConfig<C>>,
87    ) -> Result<(), NetworkError> {
88        let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
89        let peers = futures::future::join_all(peers).await;
90        for peer in peers {
91            self.peers.push(peer?);
92        }
93        Ok(())
94    }
95}
96
97impl<C, Pool> Testnet<C, Pool>
98where
99    C: BlockReader + HeaderProvider + Clone + 'static,
100    Pool: TransactionPool,
101{
102    /// Return a mutable slice of all peers.
103    pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
104        &mut self.peers
105    }
106
107    /// Return a slice of all peers.
108    pub fn peers(&self) -> &[Peer<C, Pool>] {
109        &self.peers
110    }
111
112    /// Remove a peer from the [`Testnet`] and return it.
113    ///
114    /// # Panics
115    /// If the index is out of bounds.
116    pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
117        self.peers.remove(index)
118    }
119
120    /// Return a mutable iterator over all peers.
121    pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
122        self.peers.iter_mut()
123    }
124
125    /// Return an iterator over all peers.
126    pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
127        self.peers.iter()
128    }
129
130    /// Add a peer to the [`Testnet`] with the given [`PeerConfig`].
131    pub async fn add_peer_with_config(
132        &mut self,
133        config: PeerConfig<C>,
134    ) -> Result<(), NetworkError> {
135        let PeerConfig { config, client, secret_key } = config;
136
137        let network = NetworkManager::new(config).await?;
138        let peer = Peer {
139            network,
140            client,
141            secret_key,
142            request_handler: None,
143            transactions_manager: None,
144            pool: None,
145        };
146        self.peers.push(peer);
147        Ok(())
148    }
149
150    /// Returns all handles to the networks
151    pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
152        self.peers.iter().map(|p| p.handle())
153    }
154
155    /// Maps the pool of each peer with the given closure
156    pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
157    where
158        F: Fn(Peer<C, Pool>) -> Peer<C, P>,
159        P: TransactionPool,
160    {
161        Testnet { peers: self.peers.into_iter().map(f).collect() }
162    }
163
164    /// Apply a closure on each peer
165    pub fn for_each<F>(&self, f: F)
166    where
167        F: Fn(&Peer<C, Pool>),
168    {
169        self.peers.iter().for_each(f)
170    }
171
172    /// Apply a closure on each peer
173    pub fn for_each_mut<F>(&mut self, f: F)
174    where
175        F: FnMut(&mut Peer<C, Pool>),
176    {
177        self.peers.iter_mut().for_each(f)
178    }
179}
180
181impl<C, Pool> Testnet<C, Pool>
182where
183    C: ChainSpecProvider<ChainSpec: EthereumHardforks>
184        + StateProviderFactory
185        + BlockReaderIdExt
186        + HeaderProvider<Header = alloy_consensus::Header>
187        + Clone
188        + 'static,
189    Pool: TransactionPool,
190{
191    /// Installs an eth pool on each peer
192    pub fn with_eth_pool(
193        self,
194    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
195        self.map_pool(|peer| {
196            let blob_store = InMemoryBlobStore::default();
197            let pool = TransactionValidationTaskExecutor::eth(
198                peer.client.clone(),
199                EthEvmConfig::mainnet(),
200                blob_store.clone(),
201                TokioTaskExecutor::default(),
202            );
203            peer.map_transactions_manager(EthTransactionPool::eth_pool(
204                pool,
205                blob_store,
206                Default::default(),
207            ))
208        })
209    }
210
211    /// Installs an eth pool on each peer with custom transaction manager config
212    pub fn with_eth_pool_config(
213        self,
214        tx_manager_config: TransactionsManagerConfig,
215    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
216        self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
217    }
218
219    /// Installs an eth pool on each peer with custom transaction manager config and policy.
220    pub fn with_eth_pool_config_and_policy(
221        self,
222        tx_manager_config: TransactionsManagerConfig,
223        policy: TransactionPropagationKind,
224    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
225        self.map_pool(|peer| {
226            let blob_store = InMemoryBlobStore::default();
227            let pool = TransactionValidationTaskExecutor::eth(
228                peer.client.clone(),
229                EthEvmConfig::mainnet(),
230                blob_store.clone(),
231                TokioTaskExecutor::default(),
232            );
233
234            peer.map_transactions_manager_with(
235                EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
236                tx_manager_config.clone(),
237                policy,
238            )
239        })
240    }
241}
242
243impl<C, Pool> Testnet<C, Pool>
244where
245    C: BlockReader<
246            Block = reth_ethereum_primitives::Block,
247            Receipt = reth_ethereum_primitives::Receipt,
248            Header = alloy_consensus::Header,
249        > + HeaderProvider
250        + Clone
251        + Unpin
252        + 'static,
253    Pool: TransactionPool<
254            Transaction: PoolTransaction<
255                Consensus = TransactionSigned,
256                Pooled = PooledTransactionVariant,
257            >,
258        > + Unpin
259        + 'static,
260{
261    /// Spawns the testnet to a separate task
262    pub fn spawn(self) -> TestnetHandle<C, Pool> {
263        let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
264        let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
265        let mut net = self;
266        let handle = tokio::task::spawn(async move {
267            let mut tx = None;
268            tokio::select! {
269                _ = &mut net => {}
270                inc = rx => {
271                    tx = inc.ok();
272                }
273            }
274            if let Some(tx) = tx {
275                let _ = tx.send(net);
276            }
277        });
278
279        TestnetHandle { _handle: handle, peers, terminate: tx }
280    }
281}
282
283impl Testnet<NoopProvider, TestPool> {
284    /// Same as [`Self::try_create`] but panics on error
285    pub async fn create(num_peers: usize) -> Self {
286        Self::try_create(num_peers).await.unwrap()
287    }
288
289    /// Creates a new [`Testnet`] with the given number of peers
290    pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
291        let mut this = Self::default();
292
293        this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
294        Ok(this)
295    }
296
297    /// Add a peer to the [`Testnet`]
298    pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
299        self.add_peer_with_config(Default::default()).await
300    }
301}
302
303impl<C, Pool> Default for Testnet<C, Pool> {
304    fn default() -> Self {
305        Self { peers: Vec::new() }
306    }
307}
308
309impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        f.debug_struct("Testnet {{}}").finish_non_exhaustive()
312    }
313}
314
315impl<C, Pool> Future for Testnet<C, Pool>
316where
317    C: BlockReader<
318            Block = reth_ethereum_primitives::Block,
319            Receipt = reth_ethereum_primitives::Receipt,
320            Header = alloy_consensus::Header,
321        > + HeaderProvider
322        + Unpin
323        + 'static,
324    Pool: TransactionPool<
325            Transaction: PoolTransaction<
326                Consensus = TransactionSigned,
327                Pooled = PooledTransactionVariant,
328            >,
329        > + Unpin
330        + 'static,
331{
332    type Output = ();
333
334    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
335        let this = self.get_mut();
336        for peer in &mut this.peers {
337            let _ = peer.poll_unpin(cx);
338        }
339        Poll::Pending
340    }
341}
342
343/// A handle to a [`Testnet`] that can be shared.
344#[derive(Debug)]
345pub struct TestnetHandle<C, Pool> {
346    _handle: JoinHandle<()>,
347    peers: Vec<PeerHandle<Pool>>,
348    terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
349}
350
351// === impl TestnetHandle ===
352
353impl<C, Pool> TestnetHandle<C, Pool> {
354    /// Terminates the task and returns the [`Testnet`] back.
355    pub async fn terminate(self) -> Testnet<C, Pool> {
356        let (tx, rx) = oneshot::channel();
357        self.terminate.send(tx).unwrap();
358        rx.await.unwrap()
359    }
360
361    /// Returns the [`PeerHandle`]s of this [`Testnet`].
362    pub fn peers(&self) -> &[PeerHandle<Pool>] {
363        &self.peers
364    }
365
366    /// Connects all peers with each other.
367    ///
368    /// This establishes sessions concurrently between all peers.
369    ///
370    /// Returns once all sessions are established.
371    pub async fn connect_peers(&self) {
372        if self.peers.len() < 2 {
373            return
374        }
375
376        // add an event stream for _each_ peer
377        let streams =
378            self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
379
380        // add all peers to each other
381        for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
382            for idx in (idx + 1)..self.peers.len() {
383                let neighbour = &self.peers[idx];
384                handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
385            }
386        }
387
388        // await all sessions to be established
389        let num_sessions_per_peer = self.peers.len() - 1;
390        let fut = streams.into_iter().map(|mut stream| async move {
391            stream.take_session_established(num_sessions_per_peer).await
392        });
393
394        futures::future::join_all(fut).await;
395    }
396}
397
398/// A peer in the [`Testnet`].
399#[pin_project]
400#[derive(Debug)]
401pub struct Peer<C, Pool = TestPool> {
402    #[pin]
403    network: NetworkManager<EthNetworkPrimitives>,
404    #[pin]
405    request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
406    #[pin]
407    transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
408    pool: Option<Pool>,
409    client: C,
410    secret_key: SecretKey,
411}
412
413// === impl Peer ===
414
415impl<C, Pool> Peer<C, Pool>
416where
417    C: BlockReader + HeaderProvider + Clone + 'static,
418    Pool: TransactionPool,
419{
420    /// Returns the number of connected peers.
421    pub fn num_peers(&self) -> usize {
422        self.network.num_connected_peers()
423    }
424
425    /// Adds an additional protocol handler to the peer.
426    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
427        self.network.add_rlpx_sub_protocol(protocol);
428    }
429
430    /// Returns a handle to the peer's network.
431    pub fn peer_handle(&self) -> PeerHandle<Pool> {
432        PeerHandle {
433            network: self.network.handle().clone(),
434            pool: self.pool.clone(),
435            transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
436        }
437    }
438
439    /// The address that listens for incoming connections.
440    pub const fn local_addr(&self) -> SocketAddr {
441        self.network.local_addr()
442    }
443
444    /// The [`PeerId`] of this peer.
445    pub fn peer_id(&self) -> PeerId {
446        *self.network.peer_id()
447    }
448
449    /// Returns mutable access to the network.
450    pub const fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
451        &mut self.network
452    }
453
454    /// Returns the [`NetworkHandle`] of this peer.
455    pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
456        self.network.handle().clone()
457    }
458
459    /// Returns the [`TestPool`] of this peer.
460    pub const fn pool(&self) -> Option<&Pool> {
461        self.pool.as_ref()
462    }
463
464    /// Set a new request handler that's connected to the peer's network
465    pub fn install_request_handler(&mut self) {
466        let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
467        self.network.set_eth_request_handler(tx);
468        let peers = self.network.peers_handle();
469        let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
470        self.request_handler = Some(request_handler);
471    }
472
473    /// Set a new transactions manager that's connected to the peer's network
474    pub fn install_transactions_manager(&mut self, pool: Pool) {
475        let (tx, rx) = unbounded_channel();
476        self.network.set_transactions(tx);
477        let transactions_manager = TransactionsManager::new(
478            self.handle(),
479            pool.clone(),
480            rx,
481            TransactionsManagerConfig::default(),
482        );
483        self.transactions_manager = Some(transactions_manager);
484        self.pool = Some(pool);
485    }
486
487    /// Set a new transactions manager that's connected to the peer's network
488    pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
489    where
490        P: TransactionPool,
491    {
492        let Self { mut network, request_handler, client, secret_key, .. } = self;
493        let (tx, rx) = unbounded_channel();
494        network.set_transactions(tx);
495        let transactions_manager = TransactionsManager::new(
496            network.handle().clone(),
497            pool.clone(),
498            rx,
499            TransactionsManagerConfig::default(),
500        );
501        Peer {
502            network,
503            request_handler,
504            transactions_manager: Some(transactions_manager),
505            pool: Some(pool),
506            client,
507            secret_key,
508        }
509    }
510
511    /// Map transactions manager with custom config
512    pub fn map_transactions_manager_with_config<P>(
513        self,
514        pool: P,
515        config: TransactionsManagerConfig,
516    ) -> Peer<C, P>
517    where
518        P: TransactionPool,
519    {
520        self.map_transactions_manager_with(pool, config, Default::default())
521    }
522
523    /// Map transactions manager with custom config and the given policy.
524    pub fn map_transactions_manager_with<P>(
525        self,
526        pool: P,
527        config: TransactionsManagerConfig,
528        policy: TransactionPropagationKind,
529    ) -> Peer<C, P>
530    where
531        P: TransactionPool,
532    {
533        let Self { mut network, request_handler, client, secret_key, .. } = self;
534        let (tx, rx) = unbounded_channel();
535        network.set_transactions(tx);
536
537        let announcement_policy = StrictEthAnnouncementFilter::default();
538        let policies = NetworkPolicies::new(policy, announcement_policy);
539
540        let transactions_manager = TransactionsManager::with_policy(
541            network.handle().clone(),
542            pool.clone(),
543            rx,
544            config,
545            policies,
546        );
547
548        Peer {
549            network,
550            request_handler,
551            transactions_manager: Some(transactions_manager),
552            pool: Some(pool),
553            client,
554            secret_key,
555        }
556    }
557}
558
559impl<C> Peer<C>
560where
561    C: BlockReader + HeaderProvider + Clone + 'static,
562{
563    /// Installs a new [`TestPool`]
564    pub fn install_test_pool(&mut self) {
565        self.install_transactions_manager(TestPoolBuilder::default().into())
566    }
567}
568
569impl<C, Pool> Future for Peer<C, Pool>
570where
571    C: BlockReader<
572            Block = reth_ethereum_primitives::Block,
573            Receipt = reth_ethereum_primitives::Receipt,
574            Header = alloy_consensus::Header,
575        > + HeaderProvider
576        + Unpin
577        + 'static,
578    Pool: TransactionPool<
579            Transaction: PoolTransaction<
580                Consensus = TransactionSigned,
581                Pooled = PooledTransactionVariant,
582            >,
583        > + Unpin
584        + 'static,
585{
586    type Output = ();
587
588    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
589        let this = self.project();
590
591        if let Some(request) = this.request_handler.as_pin_mut() {
592            let _ = request.poll(cx);
593        }
594
595        if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
596            let _ = tx_manager.poll(cx);
597        }
598
599        this.network.poll(cx)
600    }
601}
602
603/// A helper config for setting up the reth networking stack.
604#[derive(Debug)]
605pub struct PeerConfig<C = NoopProvider> {
606    config: NetworkConfig<C>,
607    client: C,
608    secret_key: SecretKey,
609}
610
611/// A handle to a peer in the [`Testnet`].
612#[derive(Debug)]
613pub struct PeerHandle<Pool> {
614    network: NetworkHandle<EthNetworkPrimitives>,
615    transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
616    pool: Option<Pool>,
617}
618
619// === impl PeerHandle ===
620
621impl<Pool> PeerHandle<Pool> {
622    /// Returns the [`PeerId`] used in the network.
623    pub fn peer_id(&self) -> &PeerId {
624        self.network.peer_id()
625    }
626
627    /// Returns the [`PeersHandle`] from the network.
628    pub fn peer_handle(&self) -> &PeersHandle {
629        self.network.peers_handle()
630    }
631
632    /// Returns the local socket as configured for the network.
633    pub fn local_addr(&self) -> SocketAddr {
634        self.network.local_addr()
635    }
636
637    /// Creates a new [`NetworkEvent`] listener channel.
638    pub fn event_listener(&self) -> EventStream<NetworkEvent> {
639        self.network.event_listener()
640    }
641
642    /// Returns the [`TransactionsHandle`] of this peer.
643    pub const fn transactions(&self) -> Option<&TransactionsHandle> {
644        self.transactions.as_ref()
645    }
646
647    /// Returns the [`TestPool`] of this peer.
648    pub const fn pool(&self) -> Option<&Pool> {
649        self.pool.as_ref()
650    }
651
652    /// Returns the [`NetworkHandle`] of this peer.
653    pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
654        &self.network
655    }
656}
657
658// === impl PeerConfig ===
659
660impl<C> PeerConfig<C>
661where
662    C: BlockReader + HeaderProvider + Clone + 'static,
663{
664    /// Launches the network and returns the [Peer] that manages it
665    pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
666        let Self { config, client, secret_key } = self;
667        let network = NetworkManager::new(config).await?;
668        let peer = Peer {
669            network,
670            client,
671            secret_key,
672            request_handler: None,
673            transactions_manager: None,
674            pool: None,
675        };
676        Ok(peer)
677    }
678
679    /// Initialize the network with a random secret key, allowing the devp2p and discovery to bind
680    /// to any available IP and port.
681    pub fn new(client: C) -> Self
682    where
683        C: ChainSpecProvider<ChainSpec: Hardforks>,
684    {
685        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
686        let config = Self::network_config_builder(secret_key).build(client.clone());
687        Self { config, client, secret_key }
688    }
689
690    /// Initialize the network with a given secret key, allowing devp2p and discovery to bind any
691    /// available IP and port.
692    pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
693    where
694        C: ChainSpecProvider<ChainSpec: Hardforks>,
695    {
696        let config = Self::network_config_builder(secret_key).build(client.clone());
697        Self { config, client, secret_key }
698    }
699
700    /// Initialize the network with a given capabilities.
701    pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
702    where
703        C: ChainSpecProvider<ChainSpec: Hardforks>,
704    {
705        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
706
707        let builder = Self::network_config_builder(secret_key);
708        let hello_message =
709            HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
710        let config = builder.hello_message(hello_message).build(client.clone());
711
712        Self { config, client, secret_key }
713    }
714
715    fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
716        NetworkConfigBuilder::new(secret_key)
717            .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
718            .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
719            .disable_dns_discovery()
720            .disable_discv4_discovery()
721    }
722}
723
724impl Default for PeerConfig {
725    fn default() -> Self {
726        Self::new(NoopProvider::default())
727    }
728}
729
730/// A helper type to await network events
731///
732/// This makes it easier to await established connections
733#[derive(Debug)]
734pub struct NetworkEventStream {
735    inner: EventStream<NetworkEvent>,
736}
737
738// === impl NetworkEventStream ===
739
740impl NetworkEventStream {
741    /// Create a new [`NetworkEventStream`] from the given network event receiver stream.
742    pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
743        Self { inner }
744    }
745
746    /// Awaits the next event for a session to be closed
747    pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
748        while let Some(ev) = self.inner.next().await {
749            if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
750                return Some((peer_id, reason))
751            }
752        }
753        None
754    }
755
756    /// Awaits the next event for an established session
757    pub async fn next_session_established(&mut self) -> Option<PeerId> {
758        while let Some(ev) = self.inner.next().await {
759            match ev {
760                NetworkEvent::ActivePeerSession { info, .. } |
761                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
762                    return Some(info.peer_id)
763                }
764                _ => {}
765            }
766        }
767        None
768    }
769
770    /// Awaits the next `num` events for an established session
771    pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
772        if num == 0 {
773            return Vec::new();
774        }
775        let mut peers = Vec::with_capacity(num);
776        while let Some(ev) = self.inner.next().await {
777            if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
778                peers.push(peer_id);
779                num -= 1;
780                if num == 0 {
781                    return peers;
782                }
783            }
784        }
785        peers
786    }
787
788    /// Ensures that the first two events are a [`NetworkEvent::Peer`] and
789    /// [`PeerEvent::PeerAdded`][`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the
790    /// established session.
791    pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
792        let peer_id = match self.inner.next().await {
793            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
794            _ => return None,
795        };
796
797        match self.inner.next().await {
798            Some(NetworkEvent::ActivePeerSession {
799                info: SessionInfo { peer_id: peer_id2, .. },
800                ..
801            }) => {
802                debug_assert_eq!(
803                    peer_id, peer_id2,
804                    "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
805                );
806                Some(peer_id)
807            }
808            _ => None,
809        }
810    }
811
812    /// Awaits the next event for a peer added.
813    pub async fn peer_added(&mut self) -> Option<PeerId> {
814        let peer_id = match self.inner.next().await {
815            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
816            _ => return None,
817        };
818
819        Some(peer_id)
820    }
821
822    /// Awaits the next event for a peer removed.
823    pub async fn peer_removed(&mut self) -> Option<PeerId> {
824        let peer_id = match self.inner.next().await {
825            Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
826            _ => return None,
827        };
828
829        Some(peer_id)
830    }
831}