Skip to main content

reth_network/session/
handle.rs

1//! Session handles.
2
3use crate::{
4    message::PeerMessage,
5    session::{active::BroadcastItemCounter, conn::EthRlpxConnection, Direction, SessionId},
6    PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10    errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives,
11    UnifiedStatus,
12};
13use reth_network_api::PeerInfo;
14use reth_network_peers::{NodeRecord, PeerId};
15use reth_network_types::PeerKind;
16use std::{io, net::SocketAddr, sync::Arc, time::Instant};
17use tokio::sync::{
18    mpsc::{self, error::SendError},
19    oneshot,
20};
21use tracing::trace;
22
23/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
24/// message which exchanges the `capabilities` of the peer.
25///
26/// This session needs to wait until it is authenticated.
27#[derive(Debug)]
28pub struct PendingSessionHandle {
29    /// Can be used to tell the session to disconnect the connection/abort the handshake process.
30    pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
31    /// The direction of the session
32    pub(crate) direction: Direction,
33}
34
35// === impl PendingSessionHandle ===
36
37impl PendingSessionHandle {
38    /// Sends a disconnect command to the pending session.
39    pub fn disconnect(&mut self) {
40        if let Some(tx) = self.disconnect_tx.take() {
41            let _ = tx.send(());
42        }
43    }
44
45    /// Returns the direction of the pending session (inbound or outbound).
46    pub const fn direction(&self) -> Direction {
47        self.direction
48    }
49}
50
51/// An established session with a remote peer.
52///
53/// Within an active session that supports the `Ethereum Wire Protocol`, three high-level tasks can
54/// be performed: chain synchronization, block propagation and transaction exchange.
55#[derive(Debug)]
56pub struct ActiveSessionHandle<N: NetworkPrimitives> {
57    /// The direction of the session
58    pub(crate) direction: Direction,
59    /// The assigned id for this session
60    pub(crate) session_id: SessionId,
61    /// negotiated eth version
62    pub(crate) version: EthVersion,
63    /// The identifier of the remote peer
64    pub(crate) remote_id: PeerId,
65    /// The timestamp when the session has been established.
66    pub(crate) established: Instant,
67    /// Announced capabilities of the peer.
68    pub(crate) capabilities: Arc<Capabilities>,
69    /// Sender for commands to the spawned session with broadcast-aware backpressure.
70    pub(crate) commands: SessionCommandSender<N>,
71    /// The client's name and version
72    pub(crate) client_version: Arc<str>,
73    /// The address we're connected to
74    pub(crate) remote_addr: SocketAddr,
75    /// The local address of the connection.
76    pub(crate) local_addr: Option<SocketAddr>,
77    /// The Status message the peer sent for the `eth` handshake
78    pub(crate) status: Arc<UnifiedStatus>,
79}
80
81// === impl ActiveSessionHandle ===
82
83impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
84    /// Sends a disconnect command to the session.
85    pub fn disconnect(&self, reason: Option<DisconnectReason>) {
86        self.commands.disconnect(reason);
87    }
88
89    /// Sends a disconnect command to the session via the unbounded channel.
90    pub fn try_disconnect(
91        &self,
92        reason: Option<DisconnectReason>,
93    ) -> Result<(), SendError<SessionCommand<N>>> {
94        self.commands.try_disconnect(reason)
95    }
96
97    /// Returns the direction of the active session (inbound or outbound).
98    pub const fn direction(&self) -> Direction {
99        self.direction
100    }
101
102    /// Returns the assigned session id for this session.
103    pub const fn session_id(&self) -> SessionId {
104        self.session_id
105    }
106
107    /// Returns the negotiated eth version for this session.
108    pub const fn version(&self) -> EthVersion {
109        self.version
110    }
111
112    /// Returns the identifier of the remote peer.
113    pub const fn remote_id(&self) -> PeerId {
114        self.remote_id
115    }
116
117    /// Returns the timestamp when the session has been established.
118    pub const fn established(&self) -> Instant {
119        self.established
120    }
121
122    /// Returns the announced capabilities of the peer.
123    pub fn capabilities(&self) -> Arc<Capabilities> {
124        self.capabilities.clone()
125    }
126
127    /// Returns the client's name and version.
128    pub fn client_version(&self) -> Arc<str> {
129        self.client_version.clone()
130    }
131
132    /// Returns the address we're connected to.
133    pub const fn remote_addr(&self) -> SocketAddr {
134        self.remote_addr
135    }
136
137    /// Returns the current number of in-flight broadcast items.
138    pub fn queued_broadcast_items(&self) -> usize {
139        self.commands.queued_broadcast_items()
140    }
141
142    /// Extracts the [`PeerInfo`] from the session handle.
143    pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
144        PeerInfo {
145            remote_id: self.remote_id,
146            direction: self.direction,
147            enode: record.to_string(),
148            enr: None,
149            remote_addr: self.remote_addr,
150            local_addr: self.local_addr,
151            capabilities: self.capabilities.clone(),
152            client_version: self.client_version.clone(),
153            eth_version: self.version,
154            status: self.status.clone(),
155            session_established: self.established,
156            kind,
157        }
158    }
159}
160
161/// Sender half of the session command channel with broadcast-aware backpressure.
162///
163/// Commands are first sent through a bounded channel. If the bounded channel is full and the
164/// message is a broadcast with room under the broadcast item limit, it overflows to a dedicated
165/// unbounded channel that the session task drains alongside the bounded one.
166///
167/// The shared `broadcast_items` counter tracks items across **all** buffers (bounded channel,
168/// overflow channel, and the session's outgoing queue), so the
169/// [`SessionManager`](super::SessionManager) has an accurate view of total in-flight broadcast
170/// pressure.
171#[derive(Debug)]
172pub(crate) struct SessionCommandSender<N: NetworkPrimitives> {
173    /// Bounded channel for all commands (primary path).
174    tx: mpsc::Sender<SessionCommand<N>>,
175    /// Unbounded channel used for broadcasts that overflow the bounded channel, and for
176    /// disconnect commands (which must never be dropped due to backpressure).
177    unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
178    /// Shared counter of in-flight broadcast items (channels + outgoing queue).
179    broadcast_items: BroadcastItemCounter,
180}
181
182impl<N: NetworkPrimitives> SessionCommandSender<N> {
183    /// Creates a new sender with the given bounded channel, unbounded channel, and shared counter.
184    pub(crate) const fn new(
185        tx: mpsc::Sender<SessionCommand<N>>,
186        unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
187        broadcast_items: BroadcastItemCounter,
188    ) -> Self {
189        Self { tx, unbounded_tx, broadcast_items }
190    }
191
192    /// Sends a disconnect command via the unbounded channel so it is never dropped due to
193    /// backpressure.
194    pub(crate) fn disconnect(&self, reason: Option<DisconnectReason>) {
195        let _ = self.unbounded_tx.send(SessionCommand::Disconnect { reason });
196    }
197
198    /// Sends a disconnect command via the unbounded channel.
199    ///
200    /// This is infallible from a capacity standpoint (unbounded), but will fail if the
201    /// receiver has been dropped (session closed).
202    pub(crate) fn try_disconnect(
203        &self,
204        reason: Option<DisconnectReason>,
205    ) -> Result<(), SendError<SessionCommand<N>>> {
206        self.unbounded_tx.send(SessionCommand::Disconnect { reason }).map_err(|e| SendError(e.0))
207    }
208
209    /// Sends a message to the session with broadcast-aware backpressure.
210    ///
211    /// For broadcast messages, the broadcast item counter is incremented **before** the message
212    /// enters any channel, ensuring the counter always reflects the true in-flight count.
213    /// If the bounded channel is full, broadcasts overflow to the unbounded channel (up to the
214    /// item limit). Non-broadcast messages that cannot fit in the bounded channel are dropped.
215    ///
216    /// Returns `true` if the message was accepted, `false` if it was dropped.
217    pub(crate) fn send_message(&self, msg: PeerMessage<N>) -> bool {
218        if msg.is_broadcast() {
219            let items = msg.message_item_count();
220
221            // Check + increment atomically (optimistic)
222            if !self.broadcast_items.try_add(items) {
223                return false;
224            }
225
226            // Try bounded channel first
227            match self.tx.try_send(SessionCommand::Message(msg)) {
228                Ok(()) => true,
229                Err(mpsc::error::TrySendError::Full(cmd)) => {
230                    // Overflow to unbounded channel (counter already incremented)
231                    let _ = self.unbounded_tx.send(cmd);
232                    true
233                }
234                Err(_) => {
235                    // Channel closed, undo increment
236                    self.broadcast_items.sub(items);
237                    false
238                }
239            }
240        } else {
241            // Non-broadcast: bounded channel only
242            match self.tx.try_send(SessionCommand::Message(msg)) {
243                Ok(()) => true,
244                Err(mpsc::error::TrySendError::Full(SessionCommand::Message(msg))) => {
245                    trace!(
246                        target: "net::session",
247                        msg_kind = msg.message_kind(),
248                        "session command buffer full, dropping non-broadcast message"
249                    );
250                    false
251                }
252                Err(_) => false,
253            }
254        }
255    }
256
257    /// Returns the current number of in-flight broadcast items.
258    pub(crate) fn queued_broadcast_items(&self) -> usize {
259        self.broadcast_items.get()
260    }
261}
262
263/// Events a pending session can produce.
264///
265/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
266///
267/// A session starts with a `Handshake`, followed by a `Hello` message which
268#[derive(Debug)]
269pub enum PendingSessionEvent<N: NetworkPrimitives> {
270    /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
271    Established {
272        /// An internal identifier for the established session
273        session_id: SessionId,
274        /// The remote node's socket address
275        remote_addr: SocketAddr,
276        /// The local address of the connection
277        local_addr: Option<SocketAddr>,
278        /// The remote node's public key
279        peer_id: PeerId,
280        /// All capabilities the peer announced
281        capabilities: Arc<Capabilities>,
282        /// The Status message the peer sent for the `eth` handshake
283        status: Arc<UnifiedStatus>,
284        /// The actual connection stream which can be used to send and receive `eth` protocol
285        /// messages
286        conn: EthRlpxConnection<N>,
287        /// The direction of the session, either `Inbound` or `Outgoing`
288        direction: Direction,
289        /// The remote node's user agent, usually containing the client name and version
290        client_id: String,
291    },
292    /// Handshake unsuccessful, session was disconnected.
293    Disconnected {
294        /// The remote node's socket address
295        remote_addr: SocketAddr,
296        /// The internal identifier for the disconnected session
297        session_id: SessionId,
298        /// The direction of the session, either `Inbound` or `Outgoing`
299        direction: Direction,
300        /// The error that caused the disconnect
301        error: Option<PendingSessionHandshakeError>,
302    },
303    /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
304    OutgoingConnectionError {
305        /// The remote node's socket address
306        remote_addr: SocketAddr,
307        /// The internal identifier for the disconnected session
308        session_id: SessionId,
309        /// The remote node's public key
310        peer_id: PeerId,
311        /// The error that caused the outgoing connection failure
312        error: io::Error,
313    },
314    /// Thrown when authentication via ECIES failed.
315    EciesAuthError {
316        /// The remote node's socket address
317        remote_addr: SocketAddr,
318        /// The internal identifier for the disconnected session
319        session_id: SessionId,
320        /// The error that caused the ECIES session to fail
321        error: ECIESError,
322        /// The direction of the session, either `Inbound` or `Outgoing`
323        direction: Direction,
324    },
325}
326
327/// Commands that can be sent to the spawned session.
328#[derive(Debug)]
329pub enum SessionCommand<N: NetworkPrimitives> {
330    /// Disconnect the connection
331    Disconnect {
332        /// Why the disconnect was initiated
333        reason: Option<DisconnectReason>,
334    },
335    /// Sends a message to the peer
336    Message(PeerMessage<N>),
337}
338
339/// Message variants an active session can produce and send back to the
340/// [`SessionManager`](crate::session::SessionManager)
341#[derive(Debug)]
342pub enum ActiveSessionMessage<N: NetworkPrimitives> {
343    /// Session was gracefully disconnected.
344    Disconnected {
345        /// The remote node's public key
346        peer_id: PeerId,
347        /// The remote node's socket address
348        remote_addr: SocketAddr,
349    },
350    /// Session was closed due an error
351    ClosedOnConnectionError {
352        /// The remote node's public key
353        peer_id: PeerId,
354        /// The remote node's socket address
355        remote_addr: SocketAddr,
356        /// The error that caused the session to close
357        error: EthStreamError,
358    },
359    /// A session received a valid message via `RLPx`.
360    ValidMessage {
361        /// Identifier of the remote peer.
362        peer_id: PeerId,
363        /// Message received from the peer.
364        message: PeerMessage<N>,
365    },
366    /// Received a bad message from the peer.
367    BadMessage {
368        /// Identifier of the remote peer.
369        peer_id: PeerId,
370    },
371    /// Remote peer is considered in protocol violation
372    ProtocolBreach {
373        /// Identifier of the remote peer.
374        peer_id: PeerId,
375    },
376}