1use crate::{
4 builder::ETH_REQUEST_CHANNEL_CAPACITY,
5 error::NetworkError,
6 eth_requests::EthRequestHandler,
7 protocol::IntoRlpxSubProtocol,
8 transactions::{
9 config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
10 policy::NetworkPolicies,
11 TransactionsHandle, TransactionsManager, TransactionsManagerConfig,
12 },
13 NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager,
14};
15use futures::{FutureExt, StreamExt};
16use pin_project::pin_project;
17use reth_chainspec::{ChainSpecProvider, EthereumHardforks, Hardforks};
18use reth_eth_wire::{
19 protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
20};
21use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
22use reth_evm_ethereum::EthEvmConfig;
23use reth_network_api::{
24 events::{PeerEvent, SessionInfo},
25 test_utils::{PeersHandle, PeersHandleProvider},
26 NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
27};
28use reth_network_peers::PeerId;
29use reth_storage_api::{
30 noop::NoopProvider, BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory,
31};
32use reth_tasks::TokioTaskExecutor;
33use reth_tokio_util::EventStream;
34use reth_transaction_pool::{
35 blobstore::InMemoryBlobStore,
36 test_utils::{TestPool, TestPoolBuilder},
37 EthTransactionPool, PoolTransaction, TransactionPool, TransactionValidationTaskExecutor,
38};
39use secp256k1::SecretKey;
40use std::{
41 fmt,
42 future::Future,
43 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
44 pin::Pin,
45 task::{Context, Poll},
46};
47use tokio::{
48 sync::{
49 mpsc::{channel, unbounded_channel},
50 oneshot,
51 },
52 task::JoinHandle,
53};
54
55pub struct Testnet<C, Pool> {
57 peers: Vec<Peer<C, Pool>>,
59}
60
61impl<C> Testnet<C, TestPool>
64where
65 C: BlockReader + HeaderProvider + Clone + 'static + ChainSpecProvider<ChainSpec: Hardforks>,
66{
67 pub async fn create_with(num_peers: usize, provider: C) -> Self {
69 Self::try_create_with(num_peers, provider).await.unwrap()
70 }
71
72 pub async fn try_create_with(num_peers: usize, provider: C) -> Result<Self, NetworkError> {
74 let mut this = Self { peers: Vec::with_capacity(num_peers) };
75 for _ in 0..num_peers {
76 let config = PeerConfig::new(provider.clone());
77 this.add_peer_with_config(config).await?;
78 }
79 Ok(this)
80 }
81
82 pub async fn extend_peer_with_config(
85 &mut self,
86 configs: impl IntoIterator<Item = PeerConfig<C>>,
87 ) -> Result<(), NetworkError> {
88 let peers = configs.into_iter().map(|c| c.launch()).collect::<Vec<_>>();
89 let peers = futures::future::join_all(peers).await;
90 for peer in peers {
91 self.peers.push(peer?);
92 }
93 Ok(())
94 }
95}
96
97impl<C, Pool> Testnet<C, Pool>
98where
99 C: BlockReader + HeaderProvider + Clone + 'static,
100 Pool: TransactionPool,
101{
102 pub fn peers_mut(&mut self) -> &mut [Peer<C, Pool>] {
104 &mut self.peers
105 }
106
107 pub fn peers(&self) -> &[Peer<C, Pool>] {
109 &self.peers
110 }
111
112 pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
117 self.peers.remove(index)
118 }
119
120 pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
122 self.peers.iter_mut()
123 }
124
125 pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C, Pool>> + '_ {
127 self.peers.iter()
128 }
129
130 pub async fn add_peer_with_config(
132 &mut self,
133 config: PeerConfig<C>,
134 ) -> Result<(), NetworkError> {
135 let PeerConfig { config, client, secret_key } = config;
136
137 let network = NetworkManager::new(config).await?;
138 let peer = Peer {
139 network,
140 client,
141 secret_key,
142 request_handler: None,
143 transactions_manager: None,
144 pool: None,
145 };
146 self.peers.push(peer);
147 Ok(())
148 }
149
150 pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
152 self.peers.iter().map(|p| p.handle())
153 }
154
155 pub fn map_pool<F, P>(self, f: F) -> Testnet<C, P>
157 where
158 F: Fn(Peer<C, Pool>) -> Peer<C, P>,
159 P: TransactionPool,
160 {
161 Testnet { peers: self.peers.into_iter().map(f).collect() }
162 }
163
164 pub fn for_each<F>(&self, f: F)
166 where
167 F: Fn(&Peer<C, Pool>),
168 {
169 self.peers.iter().for_each(f)
170 }
171
172 pub fn for_each_mut<F>(&mut self, f: F)
174 where
175 F: FnMut(&mut Peer<C, Pool>),
176 {
177 self.peers.iter_mut().for_each(f)
178 }
179}
180
181impl<C, Pool> Testnet<C, Pool>
182where
183 C: ChainSpecProvider<ChainSpec: EthereumHardforks>
184 + StateProviderFactory
185 + BlockReaderIdExt
186 + HeaderProvider<Header = alloy_consensus::Header>
187 + Clone
188 + 'static,
189 Pool: TransactionPool,
190{
191 pub fn with_eth_pool(
193 self,
194 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
195 self.map_pool(|peer| {
196 let blob_store = InMemoryBlobStore::default();
197 let pool = TransactionValidationTaskExecutor::eth(
198 peer.client.clone(),
199 EthEvmConfig::mainnet(),
200 blob_store.clone(),
201 TokioTaskExecutor::default(),
202 );
203 peer.map_transactions_manager(EthTransactionPool::eth_pool(
204 pool,
205 blob_store,
206 Default::default(),
207 ))
208 })
209 }
210
211 pub fn with_eth_pool_config(
213 self,
214 tx_manager_config: TransactionsManagerConfig,
215 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
216 self.with_eth_pool_config_and_policy(tx_manager_config, Default::default())
217 }
218
219 pub fn with_eth_pool_config_and_policy(
221 self,
222 tx_manager_config: TransactionsManagerConfig,
223 policy: TransactionPropagationKind,
224 ) -> Testnet<C, EthTransactionPool<C, InMemoryBlobStore, EthEvmConfig>> {
225 self.map_pool(|peer| {
226 let blob_store = InMemoryBlobStore::default();
227 let pool = TransactionValidationTaskExecutor::eth(
228 peer.client.clone(),
229 EthEvmConfig::mainnet(),
230 blob_store.clone(),
231 TokioTaskExecutor::default(),
232 );
233
234 peer.map_transactions_manager_with(
235 EthTransactionPool::eth_pool(pool, blob_store, Default::default()),
236 tx_manager_config.clone(),
237 policy,
238 )
239 })
240 }
241}
242
243impl<C, Pool> Testnet<C, Pool>
244where
245 C: BlockReader<
246 Block = reth_ethereum_primitives::Block,
247 Receipt = reth_ethereum_primitives::Receipt,
248 Header = alloy_consensus::Header,
249 > + HeaderProvider
250 + Clone
251 + Unpin
252 + 'static,
253 Pool: TransactionPool<
254 Transaction: PoolTransaction<
255 Consensus = TransactionSigned,
256 Pooled = PooledTransactionVariant,
257 >,
258 > + Unpin
259 + 'static,
260{
261 pub fn spawn(self) -> TestnetHandle<C, Pool> {
263 let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
264 let peers = self.peers.iter().map(|peer| peer.peer_handle()).collect::<Vec<_>>();
265 let mut net = self;
266 let handle = tokio::task::spawn(async move {
267 let mut tx = None;
268 tokio::select! {
269 _ = &mut net => {}
270 inc = rx => {
271 tx = inc.ok();
272 }
273 }
274 if let Some(tx) = tx {
275 let _ = tx.send(net);
276 }
277 });
278
279 TestnetHandle { _handle: handle, peers, terminate: tx }
280 }
281}
282
283impl Testnet<NoopProvider, TestPool> {
284 pub async fn create(num_peers: usize) -> Self {
286 Self::try_create(num_peers).await.unwrap()
287 }
288
289 pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
291 let mut this = Self::default();
292
293 this.extend_peer_with_config((0..num_peers).map(|_| Default::default())).await?;
294 Ok(this)
295 }
296
297 pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
299 self.add_peer_with_config(Default::default()).await
300 }
301}
302
303impl<C, Pool> Default for Testnet<C, Pool> {
304 fn default() -> Self {
305 Self { peers: Vec::new() }
306 }
307}
308
309impl<C, Pool> fmt::Debug for Testnet<C, Pool> {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 f.debug_struct("Testnet {{}}").finish_non_exhaustive()
312 }
313}
314
315impl<C, Pool> Future for Testnet<C, Pool>
316where
317 C: BlockReader<
318 Block = reth_ethereum_primitives::Block,
319 Receipt = reth_ethereum_primitives::Receipt,
320 Header = alloy_consensus::Header,
321 > + HeaderProvider
322 + Unpin
323 + 'static,
324 Pool: TransactionPool<
325 Transaction: PoolTransaction<
326 Consensus = TransactionSigned,
327 Pooled = PooledTransactionVariant,
328 >,
329 > + Unpin
330 + 'static,
331{
332 type Output = ();
333
334 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
335 let this = self.get_mut();
336 for peer in &mut this.peers {
337 let _ = peer.poll_unpin(cx);
338 }
339 Poll::Pending
340 }
341}
342
343#[derive(Debug)]
345pub struct TestnetHandle<C, Pool> {
346 _handle: JoinHandle<()>,
347 peers: Vec<PeerHandle<Pool>>,
348 terminate: oneshot::Sender<oneshot::Sender<Testnet<C, Pool>>>,
349}
350
351impl<C, Pool> TestnetHandle<C, Pool> {
354 pub async fn terminate(self) -> Testnet<C, Pool> {
356 let (tx, rx) = oneshot::channel();
357 self.terminate.send(tx).unwrap();
358 rx.await.unwrap()
359 }
360
361 pub fn peers(&self) -> &[PeerHandle<Pool>] {
363 &self.peers
364 }
365
366 pub async fn connect_peers(&self) {
372 if self.peers.len() < 2 {
373 return
374 }
375
376 let streams =
378 self.peers.iter().map(|handle| NetworkEventStream::new(handle.event_listener()));
379
380 for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) {
382 for idx in (idx + 1)..self.peers.len() {
383 let neighbour = &self.peers[idx];
384 handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr());
385 }
386 }
387
388 let num_sessions_per_peer = self.peers.len() - 1;
390 let fut = streams.into_iter().map(|mut stream| async move {
391 stream.take_session_established(num_sessions_per_peer).await
392 });
393
394 futures::future::join_all(fut).await;
395 }
396}
397
398#[pin_project]
400#[derive(Debug)]
401pub struct Peer<C, Pool = TestPool> {
402 #[pin]
403 network: NetworkManager<EthNetworkPrimitives>,
404 #[pin]
405 request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
406 #[pin]
407 transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
408 pool: Option<Pool>,
409 client: C,
410 secret_key: SecretKey,
411}
412
413impl<C, Pool> Peer<C, Pool>
416where
417 C: BlockReader + HeaderProvider + Clone + 'static,
418 Pool: TransactionPool,
419{
420 pub fn num_peers(&self) -> usize {
422 self.network.num_connected_peers()
423 }
424
425 pub fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
427 self.network.add_rlpx_sub_protocol(protocol);
428 }
429
430 pub fn peer_handle(&self) -> PeerHandle<Pool> {
432 PeerHandle {
433 network: self.network.handle().clone(),
434 pool: self.pool.clone(),
435 transactions: self.transactions_manager.as_ref().map(|mgr| mgr.handle()),
436 }
437 }
438
439 pub const fn local_addr(&self) -> SocketAddr {
441 self.network.local_addr()
442 }
443
444 pub fn peer_id(&self) -> PeerId {
446 *self.network.peer_id()
447 }
448
449 pub const fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
451 &mut self.network
452 }
453
454 pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
456 self.network.handle().clone()
457 }
458
459 pub const fn pool(&self) -> Option<&Pool> {
461 self.pool.as_ref()
462 }
463
464 pub fn install_request_handler(&mut self) {
466 let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
467 self.network.set_eth_request_handler(tx);
468 let peers = self.network.peers_handle();
469 let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
470 self.request_handler = Some(request_handler);
471 }
472
473 pub fn install_transactions_manager(&mut self, pool: Pool) {
475 let (tx, rx) = unbounded_channel();
476 self.network.set_transactions(tx);
477 let transactions_manager = TransactionsManager::new(
478 self.handle(),
479 pool.clone(),
480 rx,
481 TransactionsManagerConfig::default(),
482 );
483 self.transactions_manager = Some(transactions_manager);
484 self.pool = Some(pool);
485 }
486
487 pub fn map_transactions_manager<P>(self, pool: P) -> Peer<C, P>
489 where
490 P: TransactionPool,
491 {
492 let Self { mut network, request_handler, client, secret_key, .. } = self;
493 let (tx, rx) = unbounded_channel();
494 network.set_transactions(tx);
495 let transactions_manager = TransactionsManager::new(
496 network.handle().clone(),
497 pool.clone(),
498 rx,
499 TransactionsManagerConfig::default(),
500 );
501 Peer {
502 network,
503 request_handler,
504 transactions_manager: Some(transactions_manager),
505 pool: Some(pool),
506 client,
507 secret_key,
508 }
509 }
510
511 pub fn map_transactions_manager_with_config<P>(
513 self,
514 pool: P,
515 config: TransactionsManagerConfig,
516 ) -> Peer<C, P>
517 where
518 P: TransactionPool,
519 {
520 self.map_transactions_manager_with(pool, config, Default::default())
521 }
522
523 pub fn map_transactions_manager_with<P>(
525 self,
526 pool: P,
527 config: TransactionsManagerConfig,
528 policy: TransactionPropagationKind,
529 ) -> Peer<C, P>
530 where
531 P: TransactionPool,
532 {
533 let Self { mut network, request_handler, client, secret_key, .. } = self;
534 let (tx, rx) = unbounded_channel();
535 network.set_transactions(tx);
536
537 let announcement_policy = StrictEthAnnouncementFilter::default();
538 let policies = NetworkPolicies::new(policy, announcement_policy);
539
540 let transactions_manager = TransactionsManager::with_policy(
541 network.handle().clone(),
542 pool.clone(),
543 rx,
544 config,
545 policies,
546 );
547
548 Peer {
549 network,
550 request_handler,
551 transactions_manager: Some(transactions_manager),
552 pool: Some(pool),
553 client,
554 secret_key,
555 }
556 }
557}
558
559impl<C> Peer<C>
560where
561 C: BlockReader + HeaderProvider + Clone + 'static,
562{
563 pub fn install_test_pool(&mut self) {
565 self.install_transactions_manager(TestPoolBuilder::default().into())
566 }
567}
568
569impl<C, Pool> Future for Peer<C, Pool>
570where
571 C: BlockReader<
572 Block = reth_ethereum_primitives::Block,
573 Receipt = reth_ethereum_primitives::Receipt,
574 Header = alloy_consensus::Header,
575 > + HeaderProvider
576 + Unpin
577 + 'static,
578 Pool: TransactionPool<
579 Transaction: PoolTransaction<
580 Consensus = TransactionSigned,
581 Pooled = PooledTransactionVariant,
582 >,
583 > + Unpin
584 + 'static,
585{
586 type Output = ();
587
588 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
589 let this = self.project();
590
591 if let Some(request) = this.request_handler.as_pin_mut() {
592 let _ = request.poll(cx);
593 }
594
595 if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
596 let _ = tx_manager.poll(cx);
597 }
598
599 this.network.poll(cx)
600 }
601}
602
603#[derive(Debug)]
605pub struct PeerConfig<C = NoopProvider> {
606 config: NetworkConfig<C>,
607 client: C,
608 secret_key: SecretKey,
609}
610
611#[derive(Debug)]
613pub struct PeerHandle<Pool> {
614 network: NetworkHandle<EthNetworkPrimitives>,
615 transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
616 pool: Option<Pool>,
617}
618
619impl<Pool> PeerHandle<Pool> {
622 pub fn peer_id(&self) -> &PeerId {
624 self.network.peer_id()
625 }
626
627 pub fn peer_handle(&self) -> &PeersHandle {
629 self.network.peers_handle()
630 }
631
632 pub fn local_addr(&self) -> SocketAddr {
634 self.network.local_addr()
635 }
636
637 pub fn event_listener(&self) -> EventStream<NetworkEvent> {
639 self.network.event_listener()
640 }
641
642 pub const fn transactions(&self) -> Option<&TransactionsHandle> {
644 self.transactions.as_ref()
645 }
646
647 pub const fn pool(&self) -> Option<&Pool> {
649 self.pool.as_ref()
650 }
651
652 pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
654 &self.network
655 }
656}
657
658impl<C> PeerConfig<C>
661where
662 C: BlockReader + HeaderProvider + Clone + 'static,
663{
664 pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
666 let Self { config, client, secret_key } = self;
667 let network = NetworkManager::new(config).await?;
668 let peer = Peer {
669 network,
670 client,
671 secret_key,
672 request_handler: None,
673 transactions_manager: None,
674 pool: None,
675 };
676 Ok(peer)
677 }
678
679 pub fn new(client: C) -> Self
682 where
683 C: ChainSpecProvider<ChainSpec: Hardforks>,
684 {
685 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
686 let config = Self::network_config_builder(secret_key).build(client.clone());
687 Self { config, client, secret_key }
688 }
689
690 pub fn with_secret_key(client: C, secret_key: SecretKey) -> Self
693 where
694 C: ChainSpecProvider<ChainSpec: Hardforks>,
695 {
696 let config = Self::network_config_builder(secret_key).build(client.clone());
697 Self { config, client, secret_key }
698 }
699
700 pub fn with_protocols(client: C, protocols: impl IntoIterator<Item = Protocol>) -> Self
702 where
703 C: ChainSpecProvider<ChainSpec: Hardforks>,
704 {
705 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
706
707 let builder = Self::network_config_builder(secret_key);
708 let hello_message =
709 HelloMessageWithProtocols::builder(builder.get_peer_id()).protocols(protocols).build();
710 let config = builder.hello_message(hello_message).build(client.clone());
711
712 Self { config, client, secret_key }
713 }
714
715 fn network_config_builder(secret_key: SecretKey) -> NetworkConfigBuilder {
716 NetworkConfigBuilder::new(secret_key)
717 .listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
718 .discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
719 .disable_dns_discovery()
720 .disable_discv4_discovery()
721 }
722}
723
724impl Default for PeerConfig {
725 fn default() -> Self {
726 Self::new(NoopProvider::default())
727 }
728}
729
730#[derive(Debug)]
734pub struct NetworkEventStream {
735 inner: EventStream<NetworkEvent>,
736}
737
738impl NetworkEventStream {
741 pub const fn new(inner: EventStream<NetworkEvent>) -> Self {
743 Self { inner }
744 }
745
746 pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
748 while let Some(ev) = self.inner.next().await {
749 if let NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) = ev {
750 return Some((peer_id, reason))
751 }
752 }
753 None
754 }
755
756 pub async fn next_session_established(&mut self) -> Option<PeerId> {
758 while let Some(ev) = self.inner.next().await {
759 match ev {
760 NetworkEvent::ActivePeerSession { info, .. } |
761 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
762 return Some(info.peer_id)
763 }
764 _ => {}
765 }
766 }
767 None
768 }
769
770 pub async fn take_session_established(&mut self, mut num: usize) -> Vec<PeerId> {
772 if num == 0 {
773 return Vec::new();
774 }
775 let mut peers = Vec::with_capacity(num);
776 while let Some(ev) = self.inner.next().await {
777 if let NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } = ev {
778 peers.push(peer_id);
779 num -= 1;
780 if num == 0 {
781 return peers;
782 }
783 }
784 }
785 peers
786 }
787
788 pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
792 let peer_id = match self.inner.next().await {
793 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
794 _ => return None,
795 };
796
797 match self.inner.next().await {
798 Some(NetworkEvent::ActivePeerSession {
799 info: SessionInfo { peer_id: peer_id2, .. },
800 ..
801 }) => {
802 debug_assert_eq!(
803 peer_id, peer_id2,
804 "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
805 );
806 Some(peer_id)
807 }
808 _ => None,
809 }
810 }
811
812 pub async fn peer_added(&mut self) -> Option<PeerId> {
814 let peer_id = match self.inner.next().await {
815 Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
816 _ => return None,
817 };
818
819 Some(peer_id)
820 }
821
822 pub async fn peer_removed(&mut self) -> Option<PeerId> {
824 let peer_id = match self.inner.next().await {
825 Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id,
826 _ => return None,
827 };
828
829 Some(peer_id)
830 }
831}