Skip to main content

reth_network/test_utils/
testnet.rs

1//! A network implementation for testing purposes.
2
3use crate::{
4    builder::ETH_REQUEST_CHANNEL_CAPACITY,
5    error::NetworkError,
6    eth_requests::EthRequestHandler,
7    protocol::IntoRlpxSubProtocol,
8    transactions::{
9        config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
10        policy::NetworkPolicies,
11        TransactionsHandle, TransactionsManager, TransactionsManagerConfig,
12    },
13    NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, PeersConfig,
14};
15use futures::{FutureExt, StreamExt};
16use pin_project::pin_project;
17use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
18use reth_eth_wire::{
19    protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
20};
21use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
22use reth_evm_ethereum::EthEvmConfig;
23use reth_metrics::common::mpsc::memory_bounded_channel;
24use reth_network_api::{
25    events::{PeerEvent, SessionInfo},
26    test_utils::{PeersHandle, PeersHandleProvider},
27    NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
28};
29use reth_network_peers::PeerId;
30use reth_storage_api::{
31    noop::NoopProvider, BalProvider, BlockReader, BlockReaderIdExt, HeaderProvider,
32    StateProviderFactory,
33};
34use reth_tasks::Runtime;
35use reth_tokio_util::EventStream;
36use reth_transaction_pool::{
37    blobstore::InMemoryBlobStore,
38    test_utils::{TestPool, TestPoolBuilder},
39    EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
40};
41use secp256k1::SecretKey;
42use std::{
43    fmt,
44    future::Future,
45    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
46    pin::Pin,
47    task::{Context, Poll},
48};
49use tokio::{
50    sync::{mpsc::channel, oneshot},
51    task::JoinHandle,
52};
53
54use crate::transactions::constants::tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES;
55
56/// A test network consisting of multiple peers.
57pub struct Testnet<C, Pool> {
58    /// All running peers in the network.
59    peers: Vec<Peer<C, Pool>>,
60}
61
62// === impl Testnet ===
63
64impl<C> Testnet<C, TestPool>
65where
66    C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
67{
68    /// Same as [`Self::try_create_with`] but panics on error
69    pub async fn create_with(num_peers: usize, provider: C) -> Self {
70        Self::try_create_with(num_peers, provider).await.unwrap()
71    }
72
73    /// Creates a new [`Testnet`] with the given number of peers and the provider.
74    pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
75        let mut this = Self { peers: Vec::with_capacity(num_peers) };
76        for _ in 0..num_peers {
77            let config = PeerConfig::new(provider.clone());
78            this.add_peer_with_config(config).await?;
79        }
80        Ok(this)
81    }
82
83    /// Extend the list of peers with new peers that are configured with each of the given
84    /// [`PeerConfig`]s.
85    pub async fn extend_peer_with_config(
86        &mut self,
87        configs: impl IntoIterator<Item = PeerConfig<C>>,
88    ) -> Result<(), NetworkError> {
89        let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
90        let peers = futures::future::join_all(peers).await;
91        for peer in peers {
92            self.peers.push(peer?);
93        }
94        Ok(())
95    }
96}
97
98impl<C, Pool> Testnet<C, Pool>
99where
100    C: BlockReader + HeaderProvider + Clone + 'static,
101    Pool: TransactionPool,
102{
103    /// Return a mutable slice of all peers.
104    pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
105        &mut self.peers
106    }
107
108    /// Return a slice of all peers.
109    pub fn peers(&self) -> &[Peer<C, Pool>] {
110        &self.peers
111    }
112
113    /// Remove a peer from the [`Testnet`] and return it.
114    ///
115    /// # Panics
116    /// If the index is out of bounds.
117    pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
118        self.peers.remove(index)
119    }
120
121    /// Return a mutable iterator over all peers.
122    pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
123        self.peers.iter_mut()
124    }
125
126    /// Return an iterator over all peers.
127    pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
128        self.peers.iter()
129    }
130
131    /// Add a peer to the [`Testnet`] with the given [`PeerConfig`].
132    pub async fn add_peer_with_config(
133        &mut self,
134        config: PeerConfig<C>,
135    ) -> Result<(), NetworkError> {
136        let PeerConfig { config, client, secret_key } = config;
137
138        let network = NetworkManager::new(config).await?;
139        let peer = Peer {
140            network,
141            client,
142            secret_key,
143            request_handler: None,
144            transactions_manager: None,
145            pool: None,
146        };
147        self.peers.push(peer);
148        Ok(())
149    }
150
151    /// Returns all handles to the networks
152    pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
153        self.peers.iter().map(|p| p.handle())
154    }
155
156    /// Maps the pool of each peer with the given closure
157    pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
158    where
159        F: Fn(Peer<C, Pool>) -> Peer<C, P>,
160        P: TransactionPool,
161    {
162        Testnet { peers: self.peers.into_iter().map(f).collect() }
163    }
164
165    /// Apply a closure on each peer
166    pub fn for_each<F>(&self, f: F)
167    where
168        F: Fn(&Peer<C, Pool>),
169    {
170        self.peers.iter().for_each(f)
171    }
172
173    /// Apply a closure on each peer
174    pub fn for_each_mut<F>(&mut self, f: F)
175    where
176        F: FnMut(&mut Peer<C, Pool>),
177    {
178        self.peers.iter_mut().for_each(f)
179    }
180}
181
182impl<C, Pool> Testnet<C, Pool>
183where
184    C: ChainSpecProvider<ChainSpec: EthereumHardforks>
185        + StateProviderFactory
186        + BlockReaderIdExt
187        + HeaderProvider<Header = alloy_consensus::Header>
188        + Clone
189        + 'static,
190    Pool: TransactionPool,
191{
192    /// Installs an eth pool on each peer
193    pub fn with_eth_pool(
194        self,
195    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
196        self.map_pool(|peer| {
197            let blob_store = InMemoryBlobStore::default();
198            let pool = TransactionValidationTaskExecutor::eth(
199                peer.client.clone(),
200                EthEvmConfig::mainnet(),
201                blob_store.clone(),
202                Runtime::test(),
203            );
204            peer.map_transactions_manager(EthTransactionPool::eth_pool(
205                pool,
206                blob_store,
207                Default::default(),
208            ))
209        })
210    }
211
212    /// Installs an eth pool on each peer with custom transaction manager config
213    pub fn with_eth_pool_config(
214        self,
215        tx_manager_config: TransactionsManagerConfig,
216    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
217        self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
218    }
219
220    /// Installs an eth pool on each peer with custom transaction manager config and policy.
221    pub fn with_eth_pool_config_and_policy(
222        self,
223        tx_manager_config: TransactionsManagerConfig,
224        policy: TransactionPropagationKind,
225    ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
226        self.map_pool(|peer| {
227            let blob_store = InMemoryBlobStore::default();
228            let pool = TransactionValidationTaskExecutor::eth(
229                peer.client.clone(),
230                EthEvmConfig::mainnet(),
231                blob_store.clone(),
232                Runtime::test(),
233            );
234
235            peer.map_transactions_manager_with(
236                EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
237                tx_manager_config.clone(),
238                policy,
239            )
240        })
241    }
242}
243
244impl<C, Pool> Testnet<C, Pool>
245where
246    C: BlockReader<
247            Block = reth_ethereum_primitives::Block,
248            Receipt = reth_ethereum_primitives::Receipt,
249            Header = alloy_consensus::Header,
250        > + HeaderProvider
251        + BalProvider
252        + Clone
253        + Unpin
254        + 'static,
255    Pool: TransactionPool<
256            Transaction: PoolTransaction<
257                Consensus = TransactionSigned,
258                Pooled = PooledTransactionVariant,
259            >,
260        > + Unpin
261        + 'static,
262{
263    /// Spawns the testnet to a separate task
264    pub fn spawn(self) -> TestnetHandle<C, Pool> {
265        let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
266        let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
267        let mut net = self;
268        let handle = tokio::task::spawn(async move {
269            let mut tx = None;
270            tokio::select! {
271                _ = &mut net => {}
272                inc = rx => {
273                    tx = inc.ok();
274                }
275            }
276            if let Some(tx) = tx {
277                let _ = tx.send(net);
278            }
279        });
280
281        TestnetHandle { _handle: handle, peers, terminate: tx }
282    }
283}
284
285impl Testnet<NoopProvider, TestPool> {
286    /// Same as [`Self::try_create`] but panics on error
287    pub async fn create(num_peers: usize) -> Self {
288        Self::try_create(num_peers).await.unwrap()
289    }
290
291    /// Creates a new [`Testnet`] with the given number of peers
292    pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
293        let mut this = Self::default();
294
295        this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
296        Ok(this)
297    }
298
299    /// Add a peer to the [`Testnet`]
300    pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
301        self.add_peer_with_config(Default::default()).await
302    }
303}
304
305impl<C, Pool> Default for Testnet<C, Pool> {
306    fn default() -> Self {
307        Self { peers: Vec::new() }
308    }
309}
310
311impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
312    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313        f.debug_struct("Testnet {{}}").finish_non_exhaustive()
314    }
315}
316
317impl<C, Pool> Future for Testnet<C, Pool>
318where
319    C: BlockReader<
320            Block = reth_ethereum_primitives::Block,
321            Receipt = reth_ethereum_primitives::Receipt,
322            Header = alloy_consensus::Header,
323        > + HeaderProvider
324        + BalProvider
325        + Unpin
326        + 'static,
327    Pool: TransactionPool<
328            Transaction: PoolTransaction<
329                Consensus = TransactionSigned,
330                Pooled = PooledTransactionVariant,
331            >,
332        > + Unpin
333        + 'static,
334{
335    type Output = ();
336
337    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338        let this = self.get_mut();
339        for peer in &mut this.peers {
340            let _ = peer.poll_unpin(cx);
341        }
342        Poll::Pending
343    }
344}
345
346/// A handle to a [`Testnet`] that can be shared.
347#[derive(Debug)]
348pub struct TestnetHandle<C, Pool> {
349    _handle: JoinHandle<()>,
350    peers: Vec<PeerHandle<Pool>>,
351    terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
352}
353
354// === impl TestnetHandle ===
355
356impl<C, Pool> TestnetHandle<C, Pool> {
357    /// Terminates the task and returns the [`Testnet`] back.
358    pub async fn terminate(self) -> Testnet<C, Pool> {
359        let (tx, rx) = oneshot::channel();
360        self.terminate.send(tx).unwrap();
361        rx.await.unwrap()
362    }
363
364    /// Returns the [`PeerHandle`]s of this [`Testnet`].
365    pub fn peers(&self) -> &[PeerHandle<Pool>] {
366        &self.peers
367    }
368
369    /// Connects all peers with each other.
370    ///
371    /// This establishes sessions concurrently between all peers.
372    ///
373    /// Returns once all sessions are established.
374    pub async fn connect_peers(&self) {
375        if self.peers.len() < 2 {
376            return
377        }
378
379        // add an event stream for _each_ peer
380        let streams =
381            self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
382
383        // add all peers to each other
384        for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
385            for idx in (idx + 1)..self.peers.len() {
386                let neighbour = &self.peers[idx];
387                handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
388            }
389        }
390
391        // await all sessions to be established
392        let num_sessions_per_peer = self.peers.len() - 1;
393        let fut = streams.into_iter().map(|mut stream| async move {
394            stream.take_session_established(num_sessions_per_peer).await
395        });
396
397        futures::future::join_all(fut).await;
398    }
399}
400
401/// A peer in the [`Testnet`].
402#[pin_project]
403#[derive(Debug)]
404pub struct Peer<C, Pool = TestPool> {
405    #[pin]
406    network: NetworkManager<EthNetworkPrimitives>,
407    #[pin]
408    request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
409    #[pin]
410    transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
411    pool: Option<Pool>,
412    client: C,
413    secret_key: SecretKey,
414}
415
416// === impl Peer ===
417
418impl<C, Pool> Peer<C, Pool>
419where
420    C: BlockReader + HeaderProvider + Clone + 'static,
421    Pool: TransactionPool,
422{
423    /// Returns the number of connected peers.
424    pub fn num_peers(&self) -> usize {
425        self.network.num_connected_peers()
426    }
427
428    /// Adds an additional protocol handler to the peer.
429    pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
430        self.network.add_rlpx_sub_protocol(protocol);
431    }
432
433    /// Returns a handle to the peer's network.
434    pub fn peer_handle(&self) -> PeerHandle<Pool> {
435        PeerHandle {
436            network: self.network.handle().clone(),
437            pool: self.pool.clone(),
438            transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
439        }
440    }
441
442    /// The address that listens for incoming connections.
443    pub const fn local_addr(&self) -> SocketAddr {
444        self.network.local_addr()
445    }
446
447    /// The [`PeerId`] of this peer.
448    pub fn peer_id(&self) -> PeerId {
449        *self.network.peer_id()
450    }
451
452    /// Returns mutable access to the network.
453    pub const fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
454        &mut self.network
455    }
456
457    /// Returns the [`NetworkHandle`] of this peer.
458    pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
459        self.network.handle().clone()
460    }
461
462    /// Returns the [`TestPool`] of this peer.
463    pub const fn pool(&self) -> Option<&Pool> {
464        self.pool.as_ref()
465    }
466
467    /// Set a new request handler that's connected to the peer's network
468    pub fn install_request_handler(&mut self)
469    where
470        C: BalProvider,
471    {
472        let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
473        self.network.set_eth_request_handler(tx);
474        let peers = self.network.peers_handle();
475        let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
476        self.request_handler = Some(request_handler);
477    }
478
479    /// Set a new transactions manager that's connected to the peer's network
480    pub fn install_transactions_manager(&mut self, pool: Pool) {
481        let (tx, rx) = memory_bounded_channel(
482            DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
483            "test_tx_channel",
484        );
485        self.network.set_transactions(tx);
486        let transactions_manager = TransactionsManager::new(
487            self.handle(),
488            pool.clone(),
489            rx,
490            TransactionsManagerConfig::default(),
491        );
492        self.transactions_manager = Some(transactions_manager);
493        self.pool = Some(pool);
494    }
495
496    /// Set a new transactions manager that's connected to the peer's network
497    pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
498    where
499        P: TransactionPool,
500    {
501        let Self { mut network, request_handler, client, secret_key, .. } = self;
502        let (tx, rx) = memory_bounded_channel(
503            DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
504            "test_tx_channel",
505        );
506        network.set_transactions(tx);
507        let transactions_manager = TransactionsManager::new(
508            network.handle().clone(),
509            pool.clone(),
510            rx,
511            TransactionsManagerConfig::default(),
512        );
513        Peer {
514            network,
515            request_handler,
516            transactions_manager: Some(transactions_manager),
517            pool: Some(pool),
518            client,
519            secret_key,
520        }
521    }
522
523    /// Map transactions manager with custom config
524    pub fn map_transactions_manager_with_config<P>(
525        self,
526        pool: P,
527        config: TransactionsManagerConfig,
528    ) -> Peer<C, P>
529    where
530        P: TransactionPool,
531    {
532        self.map_transactions_manager_with(pool, config, Default::default())
533    }
534
535    /// Map transactions manager with custom config and the given policy.
536    pub fn map_transactions_manager_with<P>(
537        self,
538        pool: P,
539        config: TransactionsManagerConfig,
540        policy: TransactionPropagationKind,
541    ) -> Peer<C, P>
542    where
543        P: TransactionPool,
544    {
545        let Self { mut network, request_handler, client, secret_key, .. } = self;
546        let (tx, rx) = memory_bounded_channel(
547            DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
548            "test_tx_channel",
549        );
550        network.set_transactions(tx);
551
552        let announcement_policy = StrictEthAnnouncementFilter::default();
553        let policies = NetworkPolicies::new(policy, announcement_policy);
554
555        let transactions_manager = TransactionsManager::with_policy(
556            network.handle().clone(),
557            pool.clone(),
558            rx,
559            config,
560            policies,
561        );
562
563        Peer {
564            network,
565            request_handler,
566            transactions_manager: Some(transactions_manager),
567            pool: Some(pool),
568            client,
569            secret_key,
570        }
571    }
572}
573
574impl<C> Peer<C>
575where
576    C: BlockReader + HeaderProvider + Clone + 'static,
577{
578    /// Installs a new [`TestPool`]
579    pub fn install_test_pool(&mut self) {
580        self.install_transactions_manager(TestPoolBuilder::default().into())
581    }
582}
583
584impl<C, Pool> Future for Peer<C, Pool>
585where
586    C: BlockReader<
587            Block = reth_ethereum_primitives::Block,
588            Receipt = reth_ethereum_primitives::Receipt,
589            Header = alloy_consensus::Header,
590        > + HeaderProvider
591        + BalProvider
592        + Unpin
593        + 'static,
594    Pool: TransactionPool<
595            Transaction: PoolTransaction<
596                Consensus = TransactionSigned,
597                Pooled = PooledTransactionVariant,
598            >,
599        > + Unpin
600        + 'static,
601{
602    type Output = ();
603
604    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
605        let this = self.project();
606
607        if let Some(request) = this.request_handler.as_pin_mut() {
608            let _ = request.poll(cx);
609        }
610
611        if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
612            let _ = tx_manager.poll(cx);
613        }
614
615        this.network.poll(cx)
616    }
617}
618
619/// A helper config for setting up the reth networking stack.
620#[derive(Debug)]
621pub struct PeerConfig<C = NoopProvider> {
622    config: NetworkConfig<C>,
623    client: C,
624    secret_key: SecretKey,
625}
626
627/// A handle to a peer in the [`Testnet`].
628#[derive(Debug)]
629pub struct PeerHandle<Pool> {
630    network: NetworkHandle<EthNetworkPrimitives>,
631    transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
632    pool: Option<Pool>,
633}
634
635// === impl PeerHandle ===
636
637impl<Pool> PeerHandle<Pool> {
638    /// Returns the [`PeerId`] used in the network.
639    pub fn peer_id(&self) -> &PeerId {
640        self.network.peer_id()
641    }
642
643    /// Returns the [`PeersHandle`] from the network.
644    pub fn peer_handle(&self) -> &PeersHandle {
645        self.network.peers_handle()
646    }
647
648    /// Returns the local socket as configured for the network.
649    pub fn local_addr(&self) -> SocketAddr {
650        self.network.local_addr()
651    }
652
653    /// Creates a new [`NetworkEvent`] listener channel.
654    pub fn event_listener(&self) -> EventStream<NetworkEvent> {
655        self.network.event_listener()
656    }
657
658    /// Returns the [`TransactionsHandle`] of this peer.
659    pub const fn transactions(&self) -> Option<&TransactionsHandle> {
660        self.transactions.as_ref()
661    }
662
663    /// Returns the [`TestPool`] of this peer.
664    pub const fn pool(&self) -> Option<&Pool> {
665        self.pool.as_ref()
666    }
667
668    /// Returns the [`NetworkHandle`] of this peer.
669    pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
670        &self.network
671    }
672}
673
674// === impl PeerConfig ===
675
676impl<C> PeerConfig<C>
677where
678    C: BlockReader + HeaderProvider + Clone + 'static,
679{
680    /// Launches the network and returns the [Peer] that manages it
681    pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
682        let Self { config, client, secret_key } = self;
683        let network = NetworkManager::new(config).await?;
684        let peer = Peer {
685            network,
686            client,
687            secret_key,
688            request_handler: None,
689            transactions_manager: None,
690            pool: None,
691        };
692        Ok(peer)
693    }
694
695    /// Initialize the network with a random secret key, allowing the devp2p and discovery to bind
696    /// to any available IP and port.
697    pub fn new(client: C) -> Self
698    where
699        C: ChainSpecProvider<ChainSpec: Hardforks>,
700    {
701        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
702        let config = Self::network_config_builder(secret_key).build(client.clone());
703        Self { config, client, secret_key }
704    }
705
706    /// Initialize the network with a given secret key, allowing devp2p and discovery to bind any
707    /// available IP and port.
708    pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
709    where
710        C: ChainSpecProvider<ChainSpec: Hardforks>,
711    {
712        let config = Self::network_config_builder(secret_key).build(client.clone());
713        Self { config, client, secret_key }
714    }
715
716    /// Initialize the network with a given capabilities.
717    pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
718    where
719        C: ChainSpecProvider<ChainSpec: Hardforks>,
720    {
721        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
722
723        let builder = Self::network_config_builder(secret_key);
724        let hello_message =
725            HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
726        let config = builder.hello_message(hello_message).build(client.clone());
727
728        Self { config, client, secret_key }
729    }
730
731    fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
732        NetworkConfigBuilder::new(secret_key, Runtime::test())
733            .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
734            .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
735            .disable_dns_discovery()
736            .disable_discv4_discovery()
737            .peer_config(PeersConfig::test())
738    }
739}
740
741impl Default for PeerConfig {
742    fn default() -> Self {
743        Self::new(NoopProvider::default())
744    }
745}
746
747/// A helper type to await network events
748///
749/// This makes it easier to await established connections
750#[derive(Debug)]
751pub struct NetworkEventStream {
752    inner: EventStream<NetworkEvent>,
753}
754
755// === impl NetworkEventStream ===
756
757impl NetworkEventStream {
758    /// Create a new [`NetworkEventStream`] from the given network event receiver stream.
759    pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
760        Self { inner }
761    }
762
763    /// Awaits the next event for a session to be closed
764    pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
765        while let Some(ev) = self.inner.next().await {
766            if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
767                return Some((peer_id, reason))
768            }
769        }
770        None
771    }
772
773    /// Awaits the next event for an established session
774    pub async fn next_session_established(&mut self) -> Option<PeerId> {
775        while let Some(ev) = self.inner.next().await {
776            match ev {
777                NetworkEvent::ActivePeerSession { info, .. } |
778                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
779                    return Some(info.peer_id)
780                }
781                _ => {}
782            }
783        }
784        None
785    }
786
787    /// Awaits the next `num` events for an established session
788    pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
789        if num == 0 {
790            return Vec::new();
791        }
792        let mut peers = Vec::with_capacity(num);
793        while let Some(ev) = self.inner.next().await {
794            if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
795                peers.push(peer_id);
796                num -= 1;
797                if num == 0 {
798                    return peers;
799                }
800            }
801        }
802        peers
803    }
804
805    /// Ensures that the first two events are a [`NetworkEvent::Peer`] and
806    /// [`PeerEvent::PeerAdded`][`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the
807    /// established session.
808    pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
809        let peer_id = match self.inner.next().await {
810            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
811            _ => return None,
812        };
813
814        match self.inner.next().await {
815            Some(NetworkEvent::ActivePeerSession {
816                info: SessionInfo { peer_id: peer_id2, .. },
817                ..
818            }) => {
819                debug_assert_eq!(
820                    peer_id, peer_id2,
821                    "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
822                );
823                Some(peer_id)
824            }
825            _ => None,
826        }
827    }
828
829    /// Awaits the next event for a peer added.
830    pub async fn peer_added(&mut self) -> Option<PeerId> {
831        let peer_id = match self.inner.next().await {
832            Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
833            _ => return None,
834        };
835
836        Some(peer_id)
837    }
838
839    /// Awaits the next event for a peer removed.
840    pub async fn peer_removed(&mut self) -> Option<PeerId> {
841        let peer_id = match self.inner.next().await {
842            Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
843            _ => return None,
844        };
845
846        Some(peer_id)
847    }
848}