1use crate::{
4 builder::ETH_REQUEST_CHANNEL_CAPACITY,
5 error::NetworkError,
6 eth_requests::EthRequestHandler,
7 protocol::IntoRlpxSubProtocol,
8 transactions::{TransactionsHandle, TransactionsManager, TransactionsManagerConfig},
9 NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
10};
11use futures::{FutureExt, StreamExt};
12use pin_project::pin_project;
13use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
14use reth_eth_wire::{
15 protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
16};
17use reth_network_api::{
18 events::{PeerEvent, SessionInfo},
19 test_utils::{PeersHandle, PeersHandleProvider},
20 NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
21};
22use reth_network_peers::PeerId;
23use reth_primitives::{PooledTransaction, TransactionSigned};
24use reth_storage_api::{
25 noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
26};
27use reth_tasks::TokioTaskExecutor;
28use reth_tokio_util::EventStream;
29use reth_transaction_pool::{
30 blobstore::InMemoryBlobStore,
31 test_utils::{TestPool, TestPoolBuilder},
32 EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
33};
34use secp256k1::SecretKey;
35use std::{
36 fmt,
37 future::Future,
38 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
39 pin::Pin,
40 task::{Context, Poll},
41};
42use tokio::{
43 sync::{
44 mpsc::{channel, unbounded_channel},
45 oneshot,
46 },
47 task::JoinHandle,
48};
49
50pub struct Testnet<C, Pool> {
52 peers: Vec<Peer<C, Pool>>,
54}
55
56impl<C> Testnet<C, TestPool>
59where
60 C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
61{
62 pub async fn create_with(num_peers: usize, provider: C) -> Self {
64 Self::try_create_with(num_peers, provider).await.unwrap()
65 }
66
67 pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
69 let mut this = Self { peers: Vec::with_capacity(num_peers) };
70 for _ in 0..num_peers {
71 let config = PeerConfig::new(provider.clone());
72 this.add_peer_with_config(config).await?;
73 }
74 Ok(this)
75 }
76
77 pub async fn extend_peer_with_config(
80 &mut self,
81 configs: impl IntoIterator<Item = PeerConfig<C>>,
82 ) -> Result<(), NetworkError> {
83 let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
84 let peers = futures::future::join_all(peers).await;
85 for peer in peers {
86 self.peers.push(peer?);
87 }
88 Ok(())
89 }
90}
91
92impl<C, Pool> Testnet<C, Pool>
93where
94 C: BlockReader + HeaderProvider + Clone + 'static,
95 Pool: TransactionPool,
96{
97 pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
99 &mut self.peers
100 }
101
102 pub fn peers(&self) -> &[Peer<C, Pool>] {
104 &self.peers
105 }
106
107 pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
112 self.peers.remove(index)
113 }
114
115 pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
117 self.peers.iter_mut()
118 }
119
120 pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
122 self.peers.iter()
123 }
124
125 pub async fn add_peer_with_config(
127 &mut self,
128 config: PeerConfig<C>,
129 ) -> Result<(), NetworkError> {
130 let PeerConfig { config, client, secret_key } = config;
131
132 let network = NetworkManager::new(config).await?;
133 let peer = Peer {
134 network,
135 client,
136 secret_key,
137 request_handler: None,
138 transactions_manager: None,
139 pool: None,
140 };
141 self.peers.push(peer);
142 Ok(())
143 }
144
145 pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
147 self.peers.iter().map(|p| p.handle())
148 }
149
150 pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
152 where
153 F: Fn(Peer<C, Pool>) -> Peer<C, P>,
154 P: TransactionPool,
155 {
156 Testnet { peers: self.peers.into_iter().map(f).collect() }
157 }
158
159 pub fn for_each<F>(&self, f: F)
161 where
162 F: Fn(&Peer<C, Pool>),
163 {
164 self.peers.iter().for_each(f)
165 }
166
167 pub fn for_each_mut<F>(&mut self, f: F)
169 where
170 F: FnMut(&mut Peer<C, Pool>),
171 {
172 self.peers.iter_mut().for_each(f)
173 }
174}
175
176impl<C, Pool> Testnet<C, Pool>
177where
178 C: ChainSpecProvider<ChainSpec: EthereumHardforks>
179 + StateProviderFactory
180 + BlockReaderIdExt
181 + HeaderProvider
182 + Clone
183 + 'static,
184 Pool: TransactionPool,
185{
186 pub fn with_eth_pool(self) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
188 self.map_pool(|peer| {
189 let blob_store = InMemoryBlobStore::default();
190 let pool = TransactionValidationTaskExecutor::eth(
191 peer.client.clone(),
192 blob_store.clone(),
193 TokioTaskExecutor::default(),
194 );
195 peer.map_transactions_manager(EthTransactionPool::eth_pool(
196 pool,
197 blob_store,
198 Default::default(),
199 ))
200 })
201 }
202
203 pub fn with_eth_pool_config(
205 self,
206 tx_manager_config: TransactionsManagerConfig,
207 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore>> {
208 self.map_pool(|peer| {
209 let blob_store = InMemoryBlobStore::default();
210 let pool = TransactionValidationTaskExecutor::eth(
211 peer.client.clone(),
212 blob_store.clone(),
213 TokioTaskExecutor::default(),
214 );
215
216 peer.map_transactions_manager_with_config(
217 EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
218 tx_manager_config.clone(),
219 )
220 })
221 }
222}
223
224impl<C, Pool> Testnet<C, Pool>
225where
226 C: BlockReader<
227 Block = reth_primitives::Block,
228 Receipt = reth_primitives::Receipt,
229 Header = reth_primitives::Header,
230 > + HeaderProvider
231 + Clone
232 + Unpin
233 + 'static,
234 Pool: TransactionPool<
235 Transaction: PoolTransaction<Consensus = TransactionSigned, Pooled = PooledTransaction>,
236 > + Unpin
237 + 'static,
238{
239 pub fn spawn(self) -> TestnetHandle<C, Pool> {
241 let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
242 let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
243 let mut net = self;
244 let handle = tokio::task::spawn(async move {
245 let mut tx = None;
246 tokio::select! {
247 _ = &mut net => {}
248 inc = rx => {
249 tx = inc.ok();
250 }
251 }
252 if let Some(tx) = tx {
253 let _ = tx.send(net);
254 }
255 });
256
257 TestnetHandle { _handle: handle, peers, terminate: tx }
258 }
259}
260
261impl Testnet<NoopProvider, TestPool> {
262 pub async fn create(num_peers: usize) -> Self {
264 Self::try_create(num_peers).await.unwrap()
265 }
266
267 pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
269 let mut this = Self::default();
270
271 this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
272 Ok(this)
273 }
274
275 pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
277 self.add_peer_with_config(Default::default()).await
278 }
279}
280
281impl<C, Pool> Default for Testnet<C, Pool> {
282 fn default() -> Self {
283 Self { peers: Vec::new() }
284 }
285}
286
287impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.debug_struct("Testnet {{}}").finish_non_exhaustive()
290 }
291}
292
293impl<C, Pool> Future for Testnet<C, Pool>
294where
295 C: BlockReader<
296 Block = reth_primitives::Block,
297 Receipt = reth_primitives::Receipt,
298 Header = reth_primitives::Header,
299 > + HeaderProvider
300 + Unpin
301 + 'static,
302 Pool: TransactionPool<
303 Transaction: PoolTransaction<Consensus = TransactionSigned, Pooled = PooledTransaction>,
304 > + Unpin
305 + 'static,
306{
307 type Output = ();
308
309 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
310 let this = self.get_mut();
311 for peer in &mut this.peers {
312 let _ = peer.poll_unpin(cx);
313 }
314 Poll::Pending
315 }
316}
317
318#[derive(Debug)]
320pub struct TestnetHandle<C, Pool> {
321 _handle: JoinHandle<()>,
322 peers: Vec<PeerHandle<Pool>>,
323 terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
324}
325
326impl<C, Pool> TestnetHandle<C, Pool> {
329 pub async fn terminate(self) -> Testnet<C, Pool> {
331 let (tx, rx) = oneshot::channel();
332 self.terminate.send(tx).unwrap();
333 rx.await.unwrap()
334 }
335
336 pub fn peers(&self) -> &[PeerHandle<Pool>] {
338 &self.peers
339 }
340
341 pub async fn connect_peers(&self) {
347 if self.peers.len() < 2 {
348 return
349 }
350
351 let streams =
353 self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
354
355 for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
357 for idx in (idx + 1)..self.peers.len() {
358 let neighbour = &self.peers[idx];
359 handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
360 }
361 }
362
363 let num_sessions_per_peer = self.peers.len() - 1;
365 let fut = streams.into_iter().map(|mut stream| async move {
366 stream.take_session_established(num_sessions_per_peer).await
367 });
368
369 futures::future::join_all(fut).await;
370 }
371}
372
373#[pin_project]
375#[derive(Debug)]
376pub struct Peer<C, Pool = TestPool> {
377 #[pin]
378 network: NetworkManager<EthNetworkPrimitives>,
379 #[pin]
380 request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
381 #[pin]
382 transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
383 pool: Option<Pool>,
384 client: C,
385 secret_key: SecretKey,
386}
387
388impl<C, Pool> Peer<C, Pool>
391where
392 C: BlockReader + HeaderProvider + Clone + 'static,
393 Pool: TransactionPool,
394{
395 pub fn num_peers(&self) -> usize {
397 self.network.num_connected_peers()
398 }
399
400 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
402 self.network.add_rlpx_sub_protocol(protocol);
403 }
404
405 pub fn peer_handle(&self) -> PeerHandle<Pool> {
407 PeerHandle {
408 network: self.network.handle().clone(),
409 pool: self.pool.clone(),
410 transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
411 }
412 }
413
414 pub const fn local_addr(&self) -> SocketAddr {
416 self.network.local_addr()
417 }
418
419 pub fn peer_id(&self) -> PeerId {
421 *self.network.peer_id()
422 }
423
424 pub fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
426 &mut self.network
427 }
428
429 pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
431 self.network.handle().clone()
432 }
433
434 pub const fn pool(&self) -> Option<&Pool> {
436 self.pool.as_ref()
437 }
438
439 pub fn install_request_handler(&mut self) {
441 let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
442 self.network.set_eth_request_handler(tx);
443 let peers = self.network.peers_handle();
444 let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
445 self.request_handler = Some(request_handler);
446 }
447
448 pub fn install_transactions_manager(&mut self, pool: Pool) {
450 let (tx, rx) = unbounded_channel();
451 self.network.set_transactions(tx);
452 let transactions_manager = TransactionsManager::new(
453 self.handle(),
454 pool.clone(),
455 rx,
456 TransactionsManagerConfig::default(),
457 );
458 self.transactions_manager = Some(transactions_manager);
459 self.pool = Some(pool);
460 }
461
462 pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
464 where
465 P: TransactionPool,
466 {
467 let Self { mut network, request_handler, client, secret_key, .. } = self;
468 let (tx, rx) = unbounded_channel();
469 network.set_transactions(tx);
470 let transactions_manager = TransactionsManager::new(
471 network.handle().clone(),
472 pool.clone(),
473 rx,
474 TransactionsManagerConfig::default(),
475 );
476 Peer {
477 network,
478 request_handler,
479 transactions_manager: Some(transactions_manager),
480 pool: Some(pool),
481 client,
482 secret_key,
483 }
484 }
485
486 pub fn map_transactions_manager_with_config<P>(
488 self,
489 pool: P,
490 config: TransactionsManagerConfig,
491 ) -> Peer<C, P>
492 where
493 P: TransactionPool,
494 {
495 let Self { mut network, request_handler, client, secret_key, .. } = self;
496 let (tx, rx) = unbounded_channel();
497 network.set_transactions(tx);
498
499 let transactions_manager = TransactionsManager::new(
500 network.handle().clone(),
501 pool.clone(),
502 rx,
503 config, );
505
506 Peer {
507 network,
508 request_handler,
509 transactions_manager: Some(transactions_manager),
510 pool: Some(pool),
511 client,
512 secret_key,
513 }
514 }
515}
516
517impl<C> Peer<C>
518where
519 C: BlockReader + HeaderProvider + Clone + 'static,
520{
521 pub fn install_test_pool(&mut self) {
523 self.install_transactions_manager(TestPoolBuilder::default().into())
524 }
525}
526
527impl<C, Pool> Future for Peer<C, Pool>
528where
529 C: BlockReader<
530 Block = reth_primitives::Block,
531 Receipt = reth_primitives::Receipt,
532 Header = reth_primitives::Header,
533 > + HeaderProvider
534 + Unpin
535 + 'static,
536 Pool: TransactionPool<
537 Transaction: PoolTransaction<Consensus = TransactionSigned, Pooled = PooledTransaction>,
538 > + Unpin
539 + 'static,
540{
541 type Output = ();
542
543 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
544 let this = self.project();
545
546 if let Some(request) = this.request_handler.as_pin_mut() {
547 let _ = request.poll(cx);
548 }
549
550 if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
551 let _ = tx_manager.poll(cx);
552 }
553
554 this.network.poll(cx)
555 }
556}
557
558#[derive(Debug)]
560pub struct PeerConfig<C = NoopProvider> {
561 config: NetworkConfig<C>,
562 client: C,
563 secret_key: SecretKey,
564}
565
566#[derive(Debug)]
568pub struct PeerHandle<Pool> {
569 network: NetworkHandle<EthNetworkPrimitives>,
570 transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
571 pool: Option<Pool>,
572}
573
574impl<Pool> PeerHandle<Pool> {
577 pub fn peer_id(&self) -> &PeerId {
579 self.network.peer_id()
580 }
581
582 pub fn peer_handle(&self) -> &PeersHandle {
584 self.network.peers_handle()
585 }
586
587 pub fn local_addr(&self) -> SocketAddr {
589 self.network.local_addr()
590 }
591
592 pub fn event_listener(&self) -> EventStream<NetworkEvent> {
594 self.network.event_listener()
595 }
596
597 pub const fn transactions(&self) -> Option<&TransactionsHandle> {
599 self.transactions.as_ref()
600 }
601
602 pub const fn pool(&self) -> Option<&Pool> {
604 self.pool.as_ref()
605 }
606
607 pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
609 &self.network
610 }
611}
612
613impl<C> PeerConfig<C>
616where
617 C: BlockReader + HeaderProvider + Clone + 'static,
618{
619 pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
621 let Self { config, client, secret_key } = self;
622 let network = NetworkManager::new(config).await?;
623 let peer = Peer {
624 network,
625 client,
626 secret_key,
627 request_handler: None,
628 transactions_manager: None,
629 pool: None,
630 };
631 Ok(peer)
632 }
633
634 pub fn new(client: C) -> Self
637 where
638 C: ChainSpecProvider<ChainSpec: Hardforks>,
639 {
640 let secret_key = SecretKey::new(&mut rand::thread_rng());
641 let config = Self::network_config_builder(secret_key).build(client.clone());
642 Self { config, client, secret_key }
643 }
644
645 pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
648 where
649 C: ChainSpecProvider<ChainSpec: Hardforks>,
650 {
651 let config = Self::network_config_builder(secret_key).build(client.clone());
652 Self { config, client, secret_key }
653 }
654
655 pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
657 where
658 C: ChainSpecProvider<ChainSpec: Hardforks>,
659 {
660 let secret_key = SecretKey::new(&mut rand::thread_rng());
661
662 let builder = Self::network_config_builder(secret_key);
663 let hello_message =
664 HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
665 let config = builder.hello_message(hello_message).build(client.clone());
666
667 Self { config, client, secret_key }
668 }
669
670 fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
671 NetworkConfigBuilder::new(secret_key)
672 .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
673 .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
674 .disable_dns_discovery()
675 .disable_discv4_discovery()
676 }
677}
678
679impl Default for PeerConfig {
680 fn default() -> Self {
681 Self::new(NoopProvider::default())
682 }
683}
684
685#[derive(Debug)]
689pub struct NetworkEventStream {
690 inner: EventStream<NetworkEvent>,
691}
692
693impl NetworkEventStream {
696 pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
698 Self { inner }
699 }
700
701 pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
703 while let Some(ev) = self.inner.next().await {
704 if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
705 return Some((peer_id, reason))
706 }
707 }
708 None
709 }
710
711 pub async fn next_session_established(&mut self) -> Option<PeerId> {
713 while let Some(ev) = self.inner.next().await {
714 match ev {
715 NetworkEvent::ActivePeerSession { info, .. } |
716 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
717 return Some(info.peer_id)
718 }
719 _ => {}
720 }
721 }
722 None
723 }
724
725 pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
727 if num == 0 {
728 return Vec::new();
729 }
730 let mut peers = Vec::with_capacity(num);
731 while let Some(ev) = self.inner.next().await {
732 if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
733 peers.push(peer_id);
734 num -= 1;
735 if num == 0 {
736 return peers;
737 }
738 }
739 }
740 peers
741 }
742
743 pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
747 let peer_id = match self.inner.next().await {
748 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
749 _ => return None,
750 };
751
752 match self.inner.next().await {
753 Some(NetworkEvent::ActivePeerSession {
754 info: SessionInfo { peer_id: peer_id2, .. },
755 ..
756 }) => {
757 debug_assert_eq!(
758 peer_id, peer_id2,
759 "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
760 );
761 Some(peer_id)
762 }
763 _ => None,
764 }
765 }
766
767 pub async fn peer_added(&mut self) -> Option<PeerId> {
769 let peer_id = match self.inner.next().await {
770 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
771 _ => return None,
772 };
773
774 Some(peer_id)
775 }
776
777 pub async fn peer_removed(&mut self) -> Option<PeerId> {
779 let peer_id = match self.inner.next().await {
780 Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
781 _ => return None,
782 };
783
784 Some(peer_id)
785 }
786}