reth_network_api/test_utils/
peers_manager.rs1use std::net::SocketAddr;
5
6use derive_more::Constructor;
7use reth_network_peers::{NodeRecord, PeerId};
8use reth_network_types::{Peer, ReputationChangeKind};
9use tokio::sync::{mpsc, oneshot};
10
11#[auto_impl::auto_impl(&, Arc)]
13pub trait PeersHandleProvider {
14 fn peers_handle(&self) -> &PeersHandle;
18}
19
20#[derive(Clone, Debug, Constructor)]
22pub struct PeersHandle {
23 manager_tx: mpsc::UnboundedSender<PeerCommand>,
25}
26
27impl PeersHandle {
30 fn send(&self, cmd: PeerCommand) {
31 let _ = self.manager_tx.send(cmd);
32 }
33
34 pub fn add_peer(&self, peer_id: PeerId, addr: SocketAddr) {
39 self.send(PeerCommand::Add(peer_id, addr));
40 }
41
42 pub fn remove_peer(&self, peer_id: PeerId) {
44 self.send(PeerCommand::Remove(peer_id));
45 }
46
47 pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
49 self.send(PeerCommand::ReputationChange(peer_id, kind));
50 }
51
52 pub async fn peer_by_id(&self, peer_id: PeerId) -> Option<Peer> {
54 let (tx, rx) = oneshot::channel();
55 self.send(PeerCommand::GetPeer(peer_id, tx));
56
57 rx.await.unwrap_or(None)
58 }
59
60 pub async fn all_peers(&self) -> Vec<NodeRecord> {
62 let (tx, rx) = oneshot::channel();
63 self.send(PeerCommand::GetPeers(tx));
64
65 rx.await.unwrap_or_default()
66 }
67}
68
69#[derive(Debug)]
71pub enum PeerCommand {
72 Add(PeerId, SocketAddr),
74 Remove(PeerId),
78 ReputationChange(PeerId, ReputationChangeKind),
80 GetPeer(PeerId, oneshot::Sender<Option<Peer>>),
82 GetPeers(oneshot::Sender<Vec<NodeRecord>>),
84}