Skip to main content

reth_network_api/
events.rs

1//! API related to listening for network events.
2
3use reth_eth_wire_types::{
4    message::RequestPair, BlockAccessLists, BlockBodies, BlockHeaders, Capabilities,
5    DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockAccessLists,
6    GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
7    GetReceipts70, NetworkPrimitives, NodeData, PooledTransactions, Receipts, Receipts69,
8    Receipts70, UnifiedStatus,
9};
10use reth_ethereum_forks::ForkId;
11use reth_network_p2p::error::{RequestError, RequestResult};
12use reth_network_peers::{NodeRecord, PeerId};
13use reth_network_types::{PeerAddr, PeerKind};
14use reth_tokio_util::EventStream;
15use std::{
16    fmt,
17    net::SocketAddr,
18    pin::Pin,
19    sync::Arc,
20    task::{Context, Poll},
21};
22use tokio::sync::{mpsc, oneshot};
23use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};
24
25/// A boxed stream of network peer events that provides a type-erased interface.
26pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);
27
28impl fmt::Debug for PeerEventStream {
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        f.debug_struct("PeerEventStream").finish_non_exhaustive()
31    }
32}
33
34impl PeerEventStream {
35    /// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer
36    /// events [`PeerEvent`]
37    pub fn new<S, T>(stream: S) -> Self
38    where
39        S: Stream<Item = T> + Send + Sync + 'static,
40        T: Into<PeerEvent> + 'static,
41    {
42        let mapped_stream = stream.map(Into::into);
43        Self(Box::pin(mapped_stream))
44    }
45}
46
47impl Stream for PeerEventStream {
48    type Item = PeerEvent;
49
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        self.0.as_mut().poll_next(cx)
52    }
53}
54
55/// Represents information about an established peer session.
56#[derive(Debug, Clone)]
57pub struct SessionInfo {
58    /// The identifier of the peer to which a session was established.
59    pub peer_id: PeerId,
60    /// The remote addr of the peer to which a session was established.
61    pub remote_addr: SocketAddr,
62    /// The client version of the peer to which a session was established.
63    pub client_version: Arc<str>,
64    /// Capabilities the peer announced.
65    pub capabilities: Arc<Capabilities>,
66    /// The status of the peer to which a session was established.
67    pub status: Arc<UnifiedStatus>,
68    /// Negotiated eth version of the session.
69    pub version: EthVersion,
70    /// The kind of peer this session represents
71    pub peer_kind: PeerKind,
72}
73
74/// (Non-exhaustive) List of the different events emitted by the network that are of interest for
75/// subscribers.
76///
77/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
78/// etc.
79#[derive(Debug, Clone)]
80pub enum PeerEvent {
81    /// Closed the peer session.
82    SessionClosed {
83        /// The identifier of the peer to which a session was closed.
84        peer_id: PeerId,
85        /// Why the disconnect was triggered
86        reason: Option<DisconnectReason>,
87    },
88    /// Established a new session with the given peer.
89    SessionEstablished(SessionInfo),
90    /// Event emitted when a new peer is added
91    PeerAdded(PeerId),
92    /// Event emitted when a new peer is removed
93    PeerRemoved(PeerId),
94}
95
96/// (Non-exhaustive) Network events representing peer lifecycle events and session requests.
97#[derive(Debug)]
98pub enum NetworkEvent<R = PeerRequest> {
99    /// Basic peer lifecycle event.
100    Peer(PeerEvent),
101    /// Session established with requests.
102    ActivePeerSession {
103        /// Session information
104        info: SessionInfo,
105        /// A request channel to the session task.
106        messages: PeerRequestSender<R>,
107    },
108}
109
110impl<R> Clone for NetworkEvent<R> {
111    fn clone(&self) -> Self {
112        match self {
113            Self::Peer(event) => Self::Peer(event.clone()),
114            Self::ActivePeerSession { info, messages } => {
115                Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
116            }
117        }
118    }
119}
120
121impl<R> From<NetworkEvent<R>> for PeerEvent {
122    fn from(event: NetworkEvent<R>) -> Self {
123        match event {
124            NetworkEvent::Peer(peer_event) => peer_event,
125            NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
126        }
127    }
128}
129
130/// Provides peer event subscription for the network.
131#[auto_impl::auto_impl(&, Arc)]
132pub trait NetworkPeersEvents: Send + Sync {
133    /// Creates a new peer event listener stream.
134    fn peer_events(&self) -> PeerEventStream;
135}
136
137/// Provides event subscription for the network.
138#[auto_impl::auto_impl(&, Arc)]
139pub trait NetworkEventListenerProvider: NetworkPeersEvents {
140    /// The primitive types to use in the `PeerRequest` used in the stream.
141    type Primitives: NetworkPrimitives;
142
143    /// Creates a new [`NetworkEvent`] listener channel.
144    fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<Self::Primitives>>>;
145    /// Returns a new [`DiscoveryEvent`] stream.
146    ///
147    /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
148    fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
149}
150
151/// Events produced by the `Discovery` manager.
152#[derive(Debug, Clone, PartialEq, Eq)]
153pub enum DiscoveryEvent {
154    /// Discovered a node
155    NewNode(DiscoveredEvent),
156    /// Retrieved a [`ForkId`] from the peer via ENR request.
157    ///
158    /// Contains the full [`NodeRecord`] (peer ID + address) and the reported [`ForkId`].
159    /// Used to verify fork compatibility before admitting the peer.
160    ///
161    /// See also <https://eips.ethereum.org/EIPS/eip-868>
162    EnrForkId(NodeRecord, ForkId),
163}
164
165/// Represents events related to peer discovery in the network.
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub enum DiscoveredEvent {
168    /// Indicates that a new peer has been discovered and queued for potential connection.
169    ///
170    /// This event is generated when the system becomes aware of a new peer
171    /// but hasn't yet established a connection.
172    ///
173    /// # Fields
174    ///
175    /// * `peer_id` - The unique identifier of the discovered peer.
176    /// * `addr` - The network address of the discovered peer.
177    /// * `fork_id` - An optional identifier for the fork that this peer is associated with. `None`
178    ///   if the peer is not associated with a specific fork.
179    EventQueued {
180        /// The unique identifier of the discovered peer.
181        peer_id: PeerId,
182        /// The network address of the discovered peer.
183        addr: PeerAddr,
184        /// An optional identifier for the fork that this peer is associated with.
185        /// `None` if the peer is not associated with a specific fork.
186        fork_id: Option<ForkId>,
187    },
188}
189
190/// Protocol related request messages that expect a response
191#[derive(Debug)]
192pub enum PeerRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
193    /// Requests block headers from the peer.
194    ///
195    /// The response should be sent through the channel.
196    GetBlockHeaders {
197        /// The request for block headers.
198        request: GetBlockHeaders,
199        /// The channel to send the response for block headers.
200        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
201    },
202    /// Requests block bodies from the peer.
203    ///
204    /// The response should be sent through the channel.
205    GetBlockBodies {
206        /// The request for block bodies.
207        request: GetBlockBodies,
208        /// The channel to send the response for block bodies.
209        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
210    },
211    /// Requests pooled transactions from the peer.
212    ///
213    /// The response should be sent through the channel.
214    GetPooledTransactions {
215        /// The request for pooled transactions.
216        request: GetPooledTransactions,
217        /// The channel to send the response for pooled transactions.
218        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
219    },
220    /// Requests `NodeData` from the peer.
221    ///
222    /// The response should be sent through the channel.
223    GetNodeData {
224        /// The request for `NodeData`.
225        request: GetNodeData,
226        /// The channel to send the response for `NodeData`.
227        response: oneshot::Sender<RequestResult<NodeData>>,
228    },
229    /// Requests receipts from the peer.
230    ///
231    /// The response should be sent through the channel.
232    GetReceipts {
233        /// The request for receipts.
234        request: GetReceipts,
235        /// The channel to send the response for receipts.
236        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
237    },
238    /// Requests receipts from the peer without bloom filter.
239    ///
240    /// The response should be sent through the channel.
241    GetReceipts69 {
242        /// The request for receipts.
243        request: GetReceipts,
244        /// The channel to send the response for receipts.
245        response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
246    },
247    /// Requests receipts from the peer using eth/70 (supports `firstBlockReceiptIndex`).
248    ///
249    /// The response should be sent through the channel.
250    GetReceipts70 {
251        /// The request for receipts.
252        request: GetReceipts70,
253        /// The channel to send the response for receipts.
254        response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
255    },
256    /// Requests block access lists from the peer.
257    ///
258    /// The response should be sent through the channel.
259    GetBlockAccessLists {
260        /// The request for block access lists.
261        request: GetBlockAccessLists,
262        /// The channel to send the response for block access lists.
263        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
264    },
265}
266
267// === impl PeerRequest ===
268
269impl<N: NetworkPrimitives> PeerRequest<N> {
270    /// Invoked if we received a response which does not match the request
271    pub fn send_bad_response(self) {
272        self.send_err_response(RequestError::BadResponse)
273    }
274
275    /// Send an error back to the receiver.
276    pub fn send_err_response(self, err: RequestError) {
277        let _ = match self {
278            Self::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
279            Self::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
280            Self::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
281            Self::GetNodeData { response, .. } => response.send(Err(err)).ok(),
282            Self::GetReceipts { response, .. } => response.send(Err(err)).ok(),
283            Self::GetReceipts69 { response, .. } => response.send(Err(err)).ok(),
284            Self::GetReceipts70 { response, .. } => response.send(Err(err)).ok(),
285            Self::GetBlockAccessLists { response, .. } => response.send(Err(err)).ok(),
286        };
287    }
288
289    /// Returns true if this request is supported for the negotiated eth protocol version.
290    #[inline]
291    pub fn is_supported_by_eth_version(&self, version: EthVersion) -> bool {
292        match self {
293            Self::GetBlockAccessLists { .. } => version >= EthVersion::Eth71,
294            _ => true,
295        }
296    }
297
298    /// Returns the [`EthMessage`] for this type
299    pub fn create_request_message(&self, request_id: u64) -> EthMessage<N> {
300        match self {
301            Self::GetBlockHeaders { request, .. } => {
302                EthMessage::GetBlockHeaders(RequestPair { request_id, message: *request })
303            }
304            Self::GetBlockBodies { request, .. } => {
305                EthMessage::GetBlockBodies(RequestPair { request_id, message: request.clone() })
306            }
307            Self::GetPooledTransactions { request, .. } => {
308                EthMessage::GetPooledTransactions(RequestPair {
309                    request_id,
310                    message: request.clone(),
311                })
312            }
313            Self::GetNodeData { request, .. } => {
314                EthMessage::GetNodeData(RequestPair { request_id, message: request.clone() })
315            }
316            Self::GetReceipts { request, .. } | Self::GetReceipts69 { request, .. } => {
317                EthMessage::GetReceipts(RequestPair { request_id, message: request.clone() })
318            }
319            Self::GetReceipts70 { request, .. } => {
320                EthMessage::GetReceipts70(RequestPair { request_id, message: request.clone() })
321            }
322            Self::GetBlockAccessLists { request, .. } => {
323                EthMessage::GetBlockAccessLists(RequestPair {
324                    request_id,
325                    message: request.clone(),
326                })
327            }
328        }
329    }
330
331    /// Consumes the type and returns the inner [`GetPooledTransactions`] variant.
332    pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
333        match self {
334            Self::GetPooledTransactions { request, .. } => Some(request),
335            _ => None,
336        }
337    }
338}
339
340/// A Cloneable connection for sending _requests_ directly to the session of a peer.
341pub struct PeerRequestSender<R = PeerRequest> {
342    /// id of the remote node.
343    pub peer_id: PeerId,
344    /// The Sender half connected to a session.
345    pub to_session_tx: mpsc::Sender<R>,
346}
347
348impl<R> Clone for PeerRequestSender<R> {
349    fn clone(&self) -> Self {
350        Self { peer_id: self.peer_id, to_session_tx: self.to_session_tx.clone() }
351    }
352}
353
354// === impl PeerRequestSender ===
355
356impl<R> PeerRequestSender<R> {
357    /// Constructs a new sender instance that's wired to a session
358    pub const fn new(peer_id: PeerId, to_session_tx: mpsc::Sender<R>) -> Self {
359        Self { peer_id, to_session_tx }
360    }
361
362    /// Attempts to immediately send a message on this Sender
363    pub fn try_send(&self, req: R) -> Result<(), mpsc::error::TrySendError<R>> {
364        self.to_session_tx.try_send(req)
365    }
366
367    /// Returns the peer id of the remote peer.
368    pub const fn peer_id(&self) -> &PeerId {
369        &self.peer_id
370    }
371}
372
373impl<R> fmt::Debug for PeerRequestSender<R> {
374    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375        f.debug_struct("PeerRequestSender").field("peer_id", &self.peer_id).finish_non_exhaustive()
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_get_block_access_lists_version_support() {
385        let (tx, _rx) = oneshot::channel();
386        let req: PeerRequest<EthNetworkPrimitives> =
387            PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
388
389        assert!(!req.is_supported_by_eth_version(EthVersion::Eth70));
390        assert!(req.is_supported_by_eth_version(EthVersion::Eth71));
391    }
392}