reth_network_api/
events.rs

1//! API related to listening for network events.
2
3use reth_eth_wire_types::{
4    message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
5    EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
6    GetPooledTransactions, GetReceipts, NetworkPrimitives, NodeData, PooledTransactions, Receipts,
7    Status,
8};
9use reth_ethereum_forks::ForkId;
10use reth_network_p2p::error::{RequestError, RequestResult};
11use reth_network_peers::PeerId;
12use reth_network_types::{PeerAddr, PeerKind};
13use reth_tokio_util::EventStream;
14use std::{
15    fmt,
16    net::SocketAddr,
17    pin::Pin,
18    sync::Arc,
19    task::{Context, Poll},
20};
21use tokio::sync::{mpsc, oneshot};
22use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
23
24/// A boxed stream of network peer events that provides a type-erased interface.
25pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);
26
27impl fmt::Debug for PeerEventStream {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("PeerEventStream").finish_non_exhaustive()
30    }
31}
32
33impl PeerEventStream {
34    /// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer
35    /// events [`PeerEvent`]
36    pub fn new<S, T>(stream: S) -> Self
37    where
38        S: Stream<Item = T> + Send + Sync + 'static,
39        T: Into<PeerEvent> + 'static,
40    {
41        let mapped_stream = stream.map(Into::into);
42        Self(Box::pin(mapped_stream))
43    }
44}
45
46impl Stream for PeerEventStream {
47    type Item = PeerEvent;
48
49    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        self.0.as_mut().poll_next(cx)
51    }
52}
53
54/// Represents information about an established peer session.
55#[derive(Debug, Clone)]
56pub struct SessionInfo {
57    /// The identifier of the peer to which a session was established.
58    pub peer_id: PeerId,
59    /// The remote addr of the peer to which a session was established.
60    pub remote_addr: SocketAddr,
61    /// The client version of the peer to which a session was established.
62    pub client_version: Arc<str>,
63    /// Capabilities the peer announced.
64    pub capabilities: Arc<Capabilities>,
65    /// The status of the peer to which a session was established.
66    pub status: Arc<Status>,
67    /// Negotiated eth version of the session.
68    pub version: EthVersion,
69    /// The kind of peer this session represents
70    pub peer_kind: PeerKind,
71}
72
73/// (Non-exhaustive) List of the different events emitted by the network that are of interest for
74/// subscribers.
75///
76/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
77/// etc.
78#[derive(Debug, Clone)]
79pub enum PeerEvent {
80    /// Closed the peer session.
81    SessionClosed {
82        /// The identifier of the peer to which a session was closed.
83        peer_id: PeerId,
84        /// Why the disconnect was triggered
85        reason: Option<DisconnectReason>,
86    },
87    /// Established a new session with the given peer.
88    SessionEstablished(SessionInfo),
89    /// Event emitted when a new peer is added
90    PeerAdded(PeerId),
91    /// Event emitted when a new peer is removed
92    PeerRemoved(PeerId),
93}
94
95/// (Non-exhaustive) Network events representing peer lifecycle events and session requests.
96#[derive(Debug)]
97pub enum NetworkEvent<R = PeerRequest> {
98    /// Basic peer lifecycle event.
99    Peer(PeerEvent),
100    /// Session established with requests.
101    ActivePeerSession {
102        /// Session information
103        info: SessionInfo,
104        /// A request channel to the session task.
105        messages: PeerRequestSender<R>,
106    },
107}
108
109impl<R> Clone for NetworkEvent<R> {
110    fn clone(&self) -> Self {
111        match self {
112            Self::Peer(event) => Self::Peer(event.clone()),
113            Self::ActivePeerSession { info, messages } => {
114                Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
115            }
116        }
117    }
118}
119
120impl<R> From<NetworkEvent<R>> for PeerEvent {
121    fn from(event: NetworkEvent<R>) -> Self {
122        match event {
123            NetworkEvent::Peer(peer_event) => peer_event,
124            NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
125        }
126    }
127}
128
129/// Provides peer event subscription for the network.
130#[auto_impl::auto_impl(&, Arc)]
131pub trait NetworkPeersEvents: Send + Sync {
132    /// Creates a new peer event listener stream.
133    fn peer_events(&self) -> PeerEventStream;
134}
135
136/// Provides event subscription for the network.
137#[auto_impl::auto_impl(&, Arc)]
138pub trait NetworkEventListenerProvider: NetworkPeersEvents {
139    /// The primitive types to use in the `PeerRequest` used in the stream.
140    type Primitives: NetworkPrimitives;
141
142    /// Creates a new [`NetworkEvent`] listener channel.
143    fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
144    /// Returns a new [`DiscoveryEvent`] stream.
145    ///
146    /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
147    fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
148}
149
150/// Events produced by the `Discovery` manager.
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum DiscoveryEvent {
153    /// Discovered a node
154    NewNode(DiscoveredEvent),
155    /// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
156    EnrForkId(PeerId, ForkId),
157}
158
159/// Represents events related to peer discovery in the network.
160#[derive(Debug, Clone, PartialEq, Eq)]
161pub enum DiscoveredEvent {
162    /// Indicates that a new peer has been discovered and queued for potential connection.
163    ///
164    /// This event is generated when the system becomes aware of a new peer
165    /// but hasn't yet established a connection.
166    ///
167    /// # Fields
168    ///
169    /// * `peer_id` - The unique identifier of the discovered peer.
170    /// * `addr` - The network address of the discovered peer.
171    /// * `fork_id` - An optional identifier for the fork that this peer is associated with. `None`
172    ///   if the peer is not associated with a specific fork.
173    EventQueued {
174        /// The unique identifier of the discovered peer.
175        peer_id: PeerId,
176        /// The network address of the discovered peer.
177        addr: PeerAddr,
178        /// An optional identifier for the fork that this peer is associated with.
179        /// `None` if the peer is not associated with a specific fork.
180        fork_id: Option<ForkId>,
181    },
182}
183
184/// Protocol related request messages that expect a response
185#[derive(Debug)]
186pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
187    /// Requests block headers from the peer.
188    ///
189    /// The response should be sent through the channel.
190    GetBlockHeaders {
191        /// The request for block headers.
192        request: GetBlockHeaders,
193        /// The channel to send the response for block headers.
194        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
195    },
196    /// Requests block bodies from the peer.
197    ///
198    /// The response should be sent through the channel.
199    GetBlockBodies {
200        /// The request for block bodies.
201        request: GetBlockBodies,
202        /// The channel to send the response for block bodies.
203        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
204    },
205    /// Requests pooled transactions from the peer.
206    ///
207    /// The response should be sent through the channel.
208    GetPooledTransactions {
209        /// The request for pooled transactions.
210        request: GetPooledTransactions,
211        /// The channel to send the response for pooled transactions.
212        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
213    },
214    /// Requests `NodeData` from the peer.
215    ///
216    /// The response should be sent through the channel.
217    GetNodeData {
218        /// The request for `NodeData`.
219        request: GetNodeData,
220        /// The channel to send the response for `NodeData`.
221        response: oneshot::Sender<RequestResult<NodeData>>,
222    },
223    /// Requests receipts from the peer.
224    ///
225    /// The response should be sent through the channel.
226    GetReceipts {
227        /// The request for receipts.
228        request: GetReceipts,
229        /// The channel to send the response for receipts.
230        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
231    },
232}
233
234// === impl PeerRequest ===
235
236impl<N: NetworkPrimitives> PeerRequest<N> {
237    /// Invoked if we received a response which does not match the request
238    pub fn send_bad_response(self) {
239        self.send_err_response(RequestError::BadResponse)
240    }
241
242    /// Send an error back to the receiver.
243    pub fn send_err_response(self, err: RequestError) {
244        let _ = match self {
245            Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
246            Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
247            Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
248            Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
249            Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
250        };
251    }
252
253    /// Returns the [`EthMessage`] for this type
254    pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
255        match self {
256            Self::GetBlockHeaders { request, .. } => {
257                EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
258            }
259            Self::GetBlockBodies { request, .. } => {
260                EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
261            }
262            Self::GetPooledTransactions { request, .. } => {
263                EthMessage::GetPooledTransactions(RequestPair {
264                    request_id,
265                    message: request.clone(),
266                })
267            }
268            Self::GetNodeData { request, .. } => {
269                EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
270            }
271            Self::GetReceipts { request, .. } => {
272                EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
273            }
274        }
275    }
276
277    /// Consumes the type and returns the inner [`GetPooledTransactions`] variant.
278    pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
279        match self {
280            Self::GetPooledTransactions { request, .. } => Some(request),
281            _ => None,
282        }
283    }
284}
285
286/// A Cloneable connection for sending _requests_ directly to the session of a peer.
287pub struct PeerRequestSender<R = PeerRequest> {
288    /// id of the remote node.
289    pub peer_id: PeerId,
290    /// The Sender half connected to a session.
291    pub to_session_tx: mpsc::Sender<R>,
292}
293
294impl<R> Clone for PeerRequestSender<R> {
295    fn clone(&self) -> Self {
296        Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() }
297    }
298}
299
300// === impl PeerRequestSender ===
301
302impl<R> PeerRequestSender<R> {
303    /// Constructs a new sender instance that's wired to a session
304    pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<R>) -> Self {
305        Self { peer_id, to_session_tx }
306    }
307
308    /// Attempts to immediately send a message on this Sender
309    pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError<R>> {
310        self.to_session_tx.try_send(req)
311    }
312
313    /// Returns the peer id of the remote peer.
314    pub const fn peer_id(&self) -> &PeerId {
315        &self.peer_id
316    }
317}
318
319impl<R> fmt::Debug for PeerRequestSender<R> {
320    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
321        f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
322    }
323}