1use reth_eth_wire_types::{
4 message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities, Cells,
5 DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
6 GetBlockBodies, GetBlockHeaders, GetCells, GetNodeData, GetPooledTransactions, GetReceipts,
7 GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
8 Receipts70, UnifiedStatus,
9};
10use reth_ethereum_forks::ForkId;
11use reth_network_p2p::error::{RequestError, RequestResult};
12use reth_network_peers::{NodeRecord, PeerId};
13use reth_network_types::{PeerAddr, PeerKind};
14use reth_tokio_util::EventStream;
15use std::{
16 fmt,
17 net::SocketAddr,
18 pin::Pin,
19 sync::Arc,
20 task::{Context, Poll},
21};
22use tokio::sync::{mpsc, oneshot};
23use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
24
25pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);
27
28impl fmt::Debug for PeerEventStream {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 f.debug_struct("PeerEventStream").finish_non_exhaustive()
31 }
32}
33
34impl PeerEventStream {
35 pub fn new<S, T>(stream: S) -> Self
38 where
39 S: Stream<Item = T> + Send + Sync + 'static,
40 T: Into<PeerEvent> + 'static,
41 {
42 let mapped_stream = stream.map(Into::into);
43 Self(Box::pin(mapped_stream))
44 }
45}
46
47impl Stream for PeerEventStream {
48 type Item = PeerEvent;
49
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 self.0.as_mut().poll_next(cx)
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct SessionInfo {
58 pub peer_id: PeerId,
60 pub remote_addr: SocketAddr,
62 pub client_version: Arc<str>,
64 pub capabilities: Arc<Capabilities>,
66 pub status: Arc<UnifiedStatus>,
68 pub version: EthVersion,
70 pub peer_kind: PeerKind,
72}
73
74#[derive(Debug, Clone)]
80pub enum PeerEvent {
81 SessionClosed {
83 peer_id: PeerId,
85 reason: Option<DisconnectReason>,
87 },
88 SessionEstablished(SessionInfo),
90 PeerAdded(PeerId),
92 PeerRemoved(PeerId),
94}
95
96#[derive(Debug)]
98pub enum NetworkEvent<R = PeerRequest> {
99 Peer(PeerEvent),
101 ActivePeerSession {
103 info: SessionInfo,
105 messages: PeerRequestSender<R>,
107 },
108}
109
110impl<R> Clone for NetworkEvent<R> {
111 fn clone(&self) -> Self {
112 match self {
113 Self::Peer(event) => Self::Peer(event.clone()),
114 Self::ActivePeerSession { info, messages } => {
115 Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
116 }
117 }
118 }
119}
120
121impl<R> From<NetworkEvent<R>> for PeerEvent {
122 fn from(event: NetworkEvent<R>) -> Self {
123 match event {
124 NetworkEvent::Peer(peer_event) => peer_event,
125 NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
126 }
127 }
128}
129
130#[auto_impl::auto_impl(&, Arc)]
132pub trait NetworkPeersEvents: Send + Sync {
133 fn peer_events(&self) -> PeerEventStream;
135}
136
137#[auto_impl::auto_impl(&, Arc)]
139pub trait NetworkEventListenerProvider: NetworkPeersEvents {
140 type Primitives: NetworkPrimitives;
142
143 fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
145 fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
149}
150
151#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum DiscoveryEvent {
154 NewNode(DiscoveredEvent),
156 EnrForkId(NodeRecord, ForkId),
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
167pub enum DiscoveredEvent {
168 EventQueued {
180 peer_id: PeerId,
182 addr: PeerAddr,
184 fork_id: Option<ForkId>,
187 },
188}
189
190#[derive(Debug)]
192pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
193 GetBlockHeaders {
197 request: GetBlockHeaders,
199 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
201 },
202 GetBlockBodies {
206 request: GetBlockBodies,
208 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
210 },
211 GetPooledTransactions {
215 request: GetPooledTransactions,
217 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
219 },
220 GetNodeData {
224 request: GetNodeData,
226 response: oneshot::Sender<RequestResult<NodeData>>,
228 },
229 GetReceipts {
233 request: GetReceipts,
235 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
237 },
238 GetReceipts69 {
242 request: GetReceipts,
244 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
246 },
247 GetReceipts70 {
251 request: GetReceipts70,
253 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
255 },
256 GetBlockAccessLists {
260 request: GetBlockAccessLists,
262 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
264 },
265 GetCells {
269 request: GetCells,
271 response: oneshot::Sender<RequestResult<Cells>>,
273 },
274}
275
276impl<N: NetworkPrimitives> PeerRequest<N> {
279 pub fn send_bad_response(self) {
281 self.send_err_response(RequestError::BadResponse)
282 }
283
284 pub fn send_err_response(self, err: RequestError) {
286 let _ = match self {
287 Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
288 Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
289 Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
290 Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
291 Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
292 Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
293 Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
294 Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
295 Self::GetCells { response, .. } => response.send(Err(err)).ok(),
296 };
297 }
298
299 #[inline]
301 pub fn is_supported_by_eth_version(&self, version: EthVersion) -> bool {
302 match self {
303 Self::GetBlockAccessLists { .. } => version >= EthVersion::Eth71,
304 Self::GetCells { .. } => version >= EthVersion::Eth72,
305 _ => true,
306 }
307 }
308
309 pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
311 match self {
312 Self::GetBlockHeaders { request, .. } => {
313 EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
314 }
315 Self::GetBlockBodies { request, .. } => {
316 EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
317 }
318 Self::GetPooledTransactions { request, .. } => {
319 EthMessage::GetPooledTransactions(RequestPair {
320 request_id,
321 message: request.clone(),
322 })
323 }
324 Self::GetNodeData { request, .. } => {
325 EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
326 }
327 Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => {
328 EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
329 }
330 Self::GetReceipts70 { request, .. } => {
331 EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
332 }
333 Self::GetBlockAccessLists { request, .. } => {
334 EthMessage::GetBlockAccessLists(RequestPair {
335 request_id,
336 message: request.clone(),
337 })
338 }
339 Self::GetCells { request, .. } => {
340 EthMessage::GetCells(RequestPair { request_id, message: request.clone() })
341 }
342 }
343 }
344
345 pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
347 match self {
348 Self::GetPooledTransactions { request, .. } => Some(request),
349 _ => None,
350 }
351 }
352}
353
354pub struct PeerRequestSender<R = PeerRequest> {
356 pub peer_id: PeerId,
358 pub to_session_tx: mpsc::Sender<R>,
360}
361
362impl<R> Clone for PeerRequestSender<R> {
363 fn clone(&self) -> Self {
364 Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() }
365 }
366}
367
368impl<R> PeerRequestSender<R> {
371 pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<R>) -> Self {
373 Self { peer_id, to_session_tx }
374 }
375
376 pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError<R>> {
378 self.to_session_tx.try_send(req)
379 }
380
381 pub const fn peer_id(&self) -> &PeerId {
383 &self.peer_id
384 }
385}
386
387impl<R> fmt::Debug for PeerRequestSender<R> {
388 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
389 f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_get_block_access_lists_version_support() {
399 let (tx, _rx) = oneshot::channel();
400 let req: PeerRequest<EthNetworkPrimitives> =
401 PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
402
403 assert!(!req.is_supported_by_eth_version(EthVersion::Eth70));
404 assert!(req.is_supported_by_eth_version(EthVersion::Eth71));
405 }
406
407 #[test]
408 fn test_get_cells_version_support() {
409 let (tx, _rx) = oneshot::channel();
410 let req: PeerRequest<EthNetworkPrimitives> =
411 PeerRequest::GetCells { request: GetCells::default(), response: tx };
412
413 assert!(!req.is_supported_by_eth_version(EthVersion::Eth71));
414 assert!(req.is_supported_by_eth_version(EthVersion::Eth72));
415 }
416}