1use reth_eth_wire_types::{
4 message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
5 EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
6 GetPooledTransactions, GetReceipts, GetReceipts70, NetworkPrimitives, NodeData,
7 PooledTransactions, Receipts, Receipts69, Receipts70, UnifiedStatus,
8};
9use reth_ethereum_forks::ForkId;
10use reth_network_p2p::error::{RequestError, RequestResult};
11use reth_network_peers::{NodeRecord, PeerId};
12use reth_network_types::{PeerAddr, PeerKind};
13use reth_tokio_util::EventStream;
14use std::{
15 fmt,
16 net::SocketAddr,
17 pin::Pin,
18 sync::Arc,
19 task::{Context, Poll},
20};
21use tokio::sync::{mpsc, oneshot};
22use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
23
24pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);
26
27impl fmt::Debug for PeerEventStream {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("PeerEventStream").finish_non_exhaustive()
30 }
31}
32
33impl PeerEventStream {
34 pub fn new<S, T>(stream: S) -> Self
37 where
38 S: Stream<Item = T> + Send + Sync + 'static,
39 T: Into<PeerEvent> + 'static,
40 {
41 let mapped_stream = stream.map(Into::into);
42 Self(Box::pin(mapped_stream))
43 }
44}
45
46impl Stream for PeerEventStream {
47 type Item = PeerEvent;
48
49 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 self.0.as_mut().poll_next(cx)
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct SessionInfo {
57 pub peer_id: PeerId,
59 pub remote_addr: SocketAddr,
61 pub client_version: Arc<str>,
63 pub capabilities: Arc<Capabilities>,
65 pub status: Arc<UnifiedStatus>,
67 pub version: EthVersion,
69 pub peer_kind: PeerKind,
71}
72
73#[derive(Debug, Clone)]
79pub enum PeerEvent {
80 SessionClosed {
82 peer_id: PeerId,
84 reason: Option<DisconnectReason>,
86 },
87 SessionEstablished(SessionInfo),
89 PeerAdded(PeerId),
91 PeerRemoved(PeerId),
93}
94
95#[derive(Debug)]
97pub enum NetworkEvent<R = PeerRequest> {
98 Peer(PeerEvent),
100 ActivePeerSession {
102 info: SessionInfo,
104 messages: PeerRequestSender<R>,
106 },
107}
108
109impl<R> Clone for NetworkEvent<R> {
110 fn clone(&self) -> Self {
111 match self {
112 Self::Peer(event) => Self::Peer(event.clone()),
113 Self::ActivePeerSession { info, messages } => {
114 Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
115 }
116 }
117 }
118}
119
120impl<R> From<NetworkEvent<R>> for PeerEvent {
121 fn from(event: NetworkEvent<R>) -> Self {
122 match event {
123 NetworkEvent::Peer(peer_event) => peer_event,
124 NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
125 }
126 }
127}
128
129#[auto_impl::auto_impl(&, Arc)]
131pub trait NetworkPeersEvents: Send + Sync {
132 fn peer_events(&self) -> PeerEventStream;
134}
135
136#[auto_impl::auto_impl(&, Arc)]
138pub trait NetworkEventListenerProvider: NetworkPeersEvents {
139 type Primitives: NetworkPrimitives;
141
142 fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
144 fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum DiscoveryEvent {
153 NewNode(DiscoveredEvent),
155 EnrForkId(NodeRecord, ForkId),
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
166pub enum DiscoveredEvent {
167 EventQueued {
179 peer_id: PeerId,
181 addr: PeerAddr,
183 fork_id: Option<ForkId>,
186 },
187}
188
189#[derive(Debug)]
191pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
192 GetBlockHeaders {
196 request: GetBlockHeaders,
198 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
200 },
201 GetBlockBodies {
205 request: GetBlockBodies,
207 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
209 },
210 GetPooledTransactions {
214 request: GetPooledTransactions,
216 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
218 },
219 GetNodeData {
223 request: GetNodeData,
225 response: oneshot::Sender<RequestResult<NodeData>>,
227 },
228 GetReceipts {
232 request: GetReceipts,
234 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
236 },
237 GetReceipts69 {
241 request: GetReceipts,
243 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
245 },
246 GetReceipts70 {
250 request: GetReceipts70,
252 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
254 },
255}
256
257impl<N: NetworkPrimitives> PeerRequest<N> {
260 pub fn send_bad_response(self) {
262 self.send_err_response(RequestError::BadResponse)
263 }
264
265 pub fn send_err_response(self, err: RequestError) {
267 let _ = match self {
268 Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
269 Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
270 Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
271 Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
272 Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
273 Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
274 Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
275 };
276 }
277
278 pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
280 match self {
281 Self::GetBlockHeaders { request, .. } => {
282 EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
283 }
284 Self::GetBlockBodies { request, .. } => {
285 EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
286 }
287 Self::GetPooledTransactions { request, .. } => {
288 EthMessage::GetPooledTransactions(RequestPair {
289 request_id,
290 message: request.clone(),
291 })
292 }
293 Self::GetNodeData { request, .. } => {
294 EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
295 }
296 Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => {
297 EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
298 }
299 Self::GetReceipts70 { request, .. } => {
300 EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
301 }
302 }
303 }
304
305 pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
307 match self {
308 Self::GetPooledTransactions { request, .. } => Some(request),
309 _ => None,
310 }
311 }
312}
313
314pub struct PeerRequestSender<R = PeerRequest> {
316 pub peer_id: PeerId,
318 pub to_session_tx: mpsc::Sender<R>,
320}
321
322impl<R> Clone for PeerRequestSender<R> {
323 fn clone(&self) -> Self {
324 Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() }
325 }
326}
327
328impl<R> PeerRequestSender<R> {
331 pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<R>) -> Self {
333 Self { peer_id, to_session_tx }
334 }
335
336 pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError<R>> {
338 self.to_session_tx.try_send(req)
339 }
340
341 pub const fn peer_id(&self) -> &PeerId {
343 &self.peer_id
344 }
345}
346
347impl<R> fmt::Debug for PeerRequestSender<R> {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
350 }
351}