1use reth_eth_wire_types::{
4 message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities,
5 DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
6 GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
7 GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
8 Receipts70, UnifiedStatus,
9};
10use reth_ethereum_forks::ForkId;
11use reth_network_p2p::error::{RequestError, RequestResult};
12use reth_network_peers::{NodeRecord, PeerId};
13use reth_network_types::{PeerAddr, PeerKind};
14use reth_tokio_util::EventStream;
15use std::{
16 fmt,
17 net::SocketAddr,
18 pin::Pin,
19 sync::Arc,
20 task::{Context, Poll},
21};
22use tokio::sync::{mpsc, oneshot};
23use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
24
25pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);
27
28impl fmt::Debug for PeerEventStream {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 f.debug_struct("PeerEventStream").finish_non_exhaustive()
31 }
32}
33
34impl PeerEventStream {
35 pub fn new<S, T>(stream: S) -> Self
38 where
39 S: Stream<Item = T> + Send + Sync + 'static,
40 T: Into<PeerEvent> + 'static,
41 {
42 let mapped_stream = stream.map(Into::into);
43 Self(Box::pin(mapped_stream))
44 }
45}
46
47impl Stream for PeerEventStream {
48 type Item = PeerEvent;
49
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 self.0.as_mut().poll_next(cx)
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct SessionInfo {
58 pub peer_id: PeerId,
60 pub remote_addr: SocketAddr,
62 pub client_version: Arc<str>,
64 pub capabilities: Arc<Capabilities>,
66 pub status: Arc<UnifiedStatus>,
68 pub version: EthVersion,
70 pub peer_kind: PeerKind,
72}
73
74#[derive(Debug, Clone)]
80pub enum PeerEvent {
81 SessionClosed {
83 peer_id: PeerId,
85 reason: Option<DisconnectReason>,
87 },
88 SessionEstablished(SessionInfo),
90 PeerAdded(PeerId),
92 PeerRemoved(PeerId),
94}
95
96#[derive(Debug)]
98pub enum NetworkEvent<R = PeerRequest> {
99 Peer(PeerEvent),
101 ActivePeerSession {
103 info: SessionInfo,
105 messages: PeerRequestSender<R>,
107 },
108}
109
110impl<R> Clone for NetworkEvent<R> {
111 fn clone(&self) -> Self {
112 match self {
113 Self::Peer(event) => Self::Peer(event.clone()),
114 Self::ActivePeerSession { info, messages } => {
115 Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
116 }
117 }
118 }
119}
120
121impl<R> From<NetworkEvent<R>> for PeerEvent {
122 fn from(event: NetworkEvent<R>) -> Self {
123 match event {
124 NetworkEvent::Peer(peer_event) => peer_event,
125 NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
126 }
127 }
128}
129
130#[auto_impl::auto_impl(&, Arc)]
132pub trait NetworkPeersEvents: Send + Sync {
133 fn peer_events(&self) -> PeerEventStream;
135}
136
137#[auto_impl::auto_impl(&, Arc)]
139pub trait NetworkEventListenerProvider: NetworkPeersEvents {
140 type Primitives: NetworkPrimitives;
142
143 fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
145 fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum DiscoveryEvent {
154 NewNode(DiscoveredEvent),
156 EnrForkId(NodeRecord, ForkId),
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167pub enum DiscoveredEvent {
168 EventQueued {
180 peer_id: PeerId,
182 addr: PeerAddr,
184 fork_id: Option<ForkId>,
187 },
188}
189
190#[derive(Debug)]
192pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
193 GetBlockHeaders {
197 request: GetBlockHeaders,
199 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
201 },
202 GetBlockBodies {
206 request: GetBlockBodies,
208 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
210 },
211 GetPooledTransactions {
215 request: GetPooledTransactions,
217 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
219 },
220 GetNodeData {
224 request: GetNodeData,
226 response: oneshot::Sender<RequestResult<NodeData>>,
228 },
229 GetReceipts {
233 request: GetReceipts,
235 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
237 },
238 GetReceipts69 {
242 request: GetReceipts,
244 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
246 },
247 GetReceipts70 {
251 request: GetReceipts70,
253 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
255 },
256 GetBlockAccessLists {
260 request: GetBlockAccessLists,
262 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
264 },
265}
266
267impl<N: NetworkPrimitives> PeerRequest<N> {
270 pub fn send_bad_response(self) {
272 self.send_err_response(RequestError::BadResponse)
273 }
274
275 pub fn send_err_response(self, err: RequestError) {
277 let _ = match self {
278 Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
279 Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
280 Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
281 Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
282 Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
283 Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
284 Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
285 Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
286 };
287 }
288
289 #[inline]
291 pub fn is_supported_by_eth_version(&self, version: EthVersion) -> bool {
292 match self {
293 Self::GetBlockAccessLists { .. } => version >= EthVersion::Eth71,
294 _ => true,
295 }
296 }
297
298 pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
300 match self {
301 Self::GetBlockHeaders { request, .. } => {
302 EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
303 }
304 Self::GetBlockBodies { request, .. } => {
305 EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
306 }
307 Self::GetPooledTransactions { request, .. } => {
308 EthMessage::GetPooledTransactions(RequestPair {
309 request_id,
310 message: request.clone(),
311 })
312 }
313 Self::GetNodeData { request, .. } => {
314 EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
315 }
316 Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => {
317 EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
318 }
319 Self::GetReceipts70 { request, .. } => {
320 EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
321 }
322 Self::GetBlockAccessLists { request, .. } => {
323 EthMessage::GetBlockAccessLists(RequestPair {
324 request_id,
325 message: request.clone(),
326 })
327 }
328 }
329 }
330
331 pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
333 match self {
334 Self::GetPooledTransactions { request, .. } => Some(request),
335 _ => None,
336 }
337 }
338}
339
340pub struct PeerRequestSender<R = PeerRequest> {
342 pub peer_id: PeerId,
344 pub to_session_tx: mpsc::Sender<R>,
346}
347
348impl<R> Clone for PeerRequestSender<R> {
349 fn clone(&self) -> Self {
350 Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() }
351 }
352}
353
354impl<R> PeerRequestSender<R> {
357 pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<R>) -> Self {
359 Self { peer_id, to_session_tx }
360 }
361
362 pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError<R>> {
364 self.to_session_tx.try_send(req)
365 }
366
367 pub const fn peer_id(&self) -> &PeerId {
369 &self.peer_id
370 }
371}
372
373impl<R> fmt::Debug for PeerRequestSender<R> {
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_get_block_access_lists_version_support() {
385 let (tx, _rx) = oneshot::channel();
386 let req: PeerRequest<EthNetworkPrimitives> =
387 PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
388
389 assert!(!req.is_supported_by_eth_version(EthVersion::Eth70));
390 assert!(req.is_supported_by_eth_version(EthVersion::Eth71));
391 }
392}