Skip to main content

reth_network/session/
handle.rs

1//! Session handles.
2
3use crate::{
4    message::PeerMessage,
5    session::{active::BroadcastItemCounter, conn::EthRlpxConnection, Direction, SessionId},
6    PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10    errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives,
11    UnifiedStatus,
12};
13use reth_network_api::PeerInfo;
14use reth_network_peers::{NodeRecord, PeerId};
15use reth_network_types::PeerKind;
16use std::{io, net::SocketAddr, sync::Arc, time::Instant};
17use tokio::sync::{
18    mpsc::{self, error::SendError},
19    oneshot,
20};
21use tracing::trace;
22
23/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
24/// message which exchanges the `capabilities` of the peer.
25///
26/// This session needs to wait until it is authenticated.
27#[derive(Debug)]
28pub struct PendingSessionHandle {
29    /// Can be used to tell the session to disconnect the connection/abort the handshake process.
30    pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
31    /// The direction of the session
32    pub(crate) direction: Direction,
33}
34
35// === impl PendingSessionHandle ===
36
37impl PendingSessionHandle {
38    /// Sends a disconnect command to the pending session.
39    pub fn disconnect(&mut self) {
40        if let Some(tx) = self.disconnect_tx.take() {
41            let _ = tx.send(());
42        }
43    }
44
45    /// Returns the direction of the pending session (inbound or outbound).
46    pub const fn direction(&self) -> Direction {
47        self.direction
48    }
49}
50
51/// An established session with a remote peer.
52///
53/// Within an active session that supports the `Ethereum Wire Protocol`, three high-level tasks can
54/// be performed: chain synchronization, block propagation and transaction exchange.
55#[derive(Debug)]
56pub struct ActiveSessionHandle<N: NetworkPrimitives> {
57    /// The direction of the session
58    pub(crate) direction: Direction,
59    /// The assigned id for this session
60    pub(crate) session_id: SessionId,
61    /// negotiated eth version
62    pub(crate) version: EthVersion,
63    /// The identifier of the remote peer
64    pub(crate) remote_id: PeerId,
65    /// The timestamp when the session has been established.
66    pub(crate) established: Instant,
67    /// Announced capabilities of the peer.
68    pub(crate) capabilities: Arc<Capabilities>,
69    /// Sender for commands to the spawned session with broadcast-aware backpressure.
70    pub(crate) commands: SessionCommandSender<N>,
71    /// The client's name and version
72    pub(crate) client_version: Arc<str>,
73    /// The address we're connected to
74    pub(crate) remote_addr: SocketAddr,
75    /// The local address of the connection.
76    pub(crate) local_addr: Option<SocketAddr>,
77    /// The TCP listening port the peer announced in its `Hello` message, if non-zero.
78    ///
79    /// This is effectively deprecated, but we still keep it around if a peer announced it as it's
80    /// likely still more useful than the ephemeral source port.
81    pub(crate) peer_listen_port: Option<u16>,
82    /// The Status message the peer sent for the `eth` handshake
83    pub(crate) status: Arc<UnifiedStatus>,
84}
85
86// === impl ActiveSessionHandle ===
87
88impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
89    /// Sends a disconnect command to the session.
90    pub fn disconnect(&self, reason: Option<DisconnectReason>) {
91        self.commands.disconnect(reason);
92    }
93
94    /// Sends a disconnect command to the session via the unbounded channel.
95    pub fn try_disconnect(
96        &self,
97        reason: Option<DisconnectReason>,
98    ) -> Result<(), SendError<SessionCommand<N>>> {
99        self.commands.try_disconnect(reason)
100    }
101
102    /// Returns the direction of the active session (inbound or outbound).
103    pub const fn direction(&self) -> Direction {
104        self.direction
105    }
106
107    /// Returns the assigned session id for this session.
108    pub const fn session_id(&self) -> SessionId {
109        self.session_id
110    }
111
112    /// Returns the negotiated eth version for this session.
113    pub const fn version(&self) -> EthVersion {
114        self.version
115    }
116
117    /// Returns the identifier of the remote peer.
118    pub const fn remote_id(&self) -> PeerId {
119        self.remote_id
120    }
121
122    /// Returns the timestamp when the session has been established.
123    pub const fn established(&self) -> Instant {
124        self.established
125    }
126
127    /// Returns the announced capabilities of the peer.
128    pub fn capabilities(&self) -> Arc<Capabilities> {
129        self.capabilities.clone()
130    }
131
132    /// Returns the client's name and version.
133    pub fn client_version(&self) -> Arc<str> {
134        self.client_version.clone()
135    }
136
137    /// Returns the address we're connected to.
138    pub const fn remote_addr(&self) -> SocketAddr {
139        self.remote_addr
140    }
141
142    /// Returns the current number of in-flight broadcast items.
143    pub fn queued_broadcast_items(&self) -> usize {
144        self.commands.queued_broadcast_items()
145    }
146
147    /// Extracts the [`PeerInfo`] from the session handle.
148    pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
149        // For inbound connections, the `record` was built from the TCP socket address, which
150        // carries the peer's OS-assigned ephemeral source port (not dialable). If the peer
151        // announced a non-zero listening port in its `Hello` message, prefer that combined with
152        // the connection IP so the resulting enode is actually dialable.
153        let enode = match (self.direction, self.peer_listen_port) {
154            (Direction::Incoming, Some(port)) => NodeRecord::new_with_ports(
155                self.remote_addr.ip(),
156                port,
157                Some(record.udp_port),
158                record.id,
159            )
160            .to_string(),
161            _ => record.to_string(),
162        };
163        PeerInfo {
164            remote_id: self.remote_id,
165            direction: self.direction,
166            enode,
167            enr: None,
168            remote_addr: self.remote_addr,
169            local_addr: self.local_addr,
170            capabilities: self.capabilities.clone(),
171            client_version: self.client_version.clone(),
172            eth_version: self.version,
173            status: self.status.clone(),
174            session_established: self.established,
175            kind,
176        }
177    }
178}
179
180/// Sender half of the session command channel with broadcast-aware backpressure.
181///
182/// Commands are first sent through a bounded channel. If the bounded channel is full and the
183/// message is a broadcast with room under the broadcast item limit, it overflows to a dedicated
184/// unbounded channel that the session task drains alongside the bounded one.
185///
186/// The shared `broadcast_items` counter tracks items across **all** buffers (bounded channel,
187/// overflow channel, and the session's outgoing queue), so the
188/// [`SessionManager`](super::SessionManager) has an accurate view of total in-flight broadcast
189/// pressure.
190#[derive(Debug)]
191pub(crate) struct SessionCommandSender<N: NetworkPrimitives> {
192    /// Bounded channel for all commands (primary path).
193    tx: mpsc::Sender<SessionCommand<N>>,
194    /// Unbounded channel used for broadcasts that overflow the bounded channel, and for
195    /// disconnect commands (which must never be dropped due to backpressure).
196    unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
197    /// Shared counter of in-flight broadcast items (channels + outgoing queue).
198    broadcast_items: BroadcastItemCounter,
199}
200
201impl<N: NetworkPrimitives> SessionCommandSender<N> {
202    /// Creates a new sender with the given bounded channel, unbounded channel, and shared counter.
203    pub(crate) const fn new(
204        tx: mpsc::Sender<SessionCommand<N>>,
205        unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
206        broadcast_items: BroadcastItemCounter,
207    ) -> Self {
208        Self { tx, unbounded_tx, broadcast_items }
209    }
210
211    /// Sends a disconnect command via the unbounded channel so it is never dropped due to
212    /// backpressure.
213    pub(crate) fn disconnect(&self, reason: Option<DisconnectReason>) {
214        let _ = self.unbounded_tx.send(SessionCommand::Disconnect { reason });
215    }
216
217    /// Sends a disconnect command via the unbounded channel.
218    ///
219    /// This is infallible from a capacity standpoint (unbounded), but will fail if the
220    /// receiver has been dropped (session closed).
221    pub(crate) fn try_disconnect(
222        &self,
223        reason: Option<DisconnectReason>,
224    ) -> Result<(), SendError<SessionCommand<N>>> {
225        self.unbounded_tx.send(SessionCommand::Disconnect { reason }).map_err(|e| SendError(e.0))
226    }
227
228    /// Sends a message to the session with broadcast-aware backpressure.
229    ///
230    /// For broadcast messages, the broadcast item counter is incremented **before** the message
231    /// enters any channel, ensuring the counter always reflects the true in-flight count.
232    /// If the bounded channel is full, broadcasts overflow to the unbounded channel (up to the
233    /// item limit). Non-broadcast messages that cannot fit in the bounded channel are dropped.
234    ///
235    /// Returns `true` if the message was accepted, `false` if it was dropped.
236    pub(crate) fn send_message(&self, msg: PeerMessage<N>) -> bool {
237        if msg.is_broadcast() {
238            let items = msg.message_item_count();
239
240            // Check + increment atomically (optimistic)
241            if !self.broadcast_items.try_add(items) {
242                return false;
243            }
244
245            // Try bounded channel first
246            match self.tx.try_send(SessionCommand::Message(msg)) {
247                Ok(()) => true,
248                Err(mpsc::error::TrySendError::Full(cmd)) => {
249                    // Overflow to unbounded channel (counter already incremented)
250                    let _ = self.unbounded_tx.send(cmd);
251                    true
252                }
253                Err(_) => {
254                    // Channel closed, undo increment
255                    self.broadcast_items.sub(items);
256                    false
257                }
258            }
259        } else {
260            // Non-broadcast: bounded channel only
261            match self.tx.try_send(SessionCommand::Message(msg)) {
262                Ok(()) => true,
263                Err(mpsc::error::TrySendError::Full(SessionCommand::Message(msg))) => {
264                    trace!(
265                        target: "net::session",
266                        msg_kind = msg.message_kind(),
267                        "session command buffer full, dropping non-broadcast message"
268                    );
269                    false
270                }
271                Err(_) => false,
272            }
273        }
274    }
275
276    /// Returns the current number of in-flight broadcast items.
277    pub(crate) fn queued_broadcast_items(&self) -> usize {
278        self.broadcast_items.get()
279    }
280}
281
282/// Events a pending session can produce.
283///
284/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
285///
286/// A session starts with a `Handshake`, followed by a `Hello` message which
287#[derive(Debug)]
288pub enum PendingSessionEvent<N: NetworkPrimitives> {
289    /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
290    Established {
291        /// An internal identifier for the established session
292        session_id: SessionId,
293        /// The remote node's socket address
294        remote_addr: SocketAddr,
295        /// The local address of the connection
296        local_addr: Option<SocketAddr>,
297        /// The remote node's public key
298        peer_id: PeerId,
299        /// All capabilities the peer announced
300        capabilities: Arc<Capabilities>,
301        /// The Status message the peer sent for the `eth` handshake
302        status: Arc<UnifiedStatus>,
303        /// The actual connection stream which can be used to send and receive `eth` protocol
304        /// messages
305        conn: EthRlpxConnection<N>,
306        /// The direction of the session, either `Inbound` or `Outgoing`
307        direction: Direction,
308        /// The remote node's user agent, usually containing the client name and version
309        client_id: String,
310        /// The TCP listening port the peer announced in its `Hello` message, if non-zero.
311        ///
312        /// See `ActiveSessionHandle::peer_listen_port` for context.
313        peer_listen_port: Option<u16>,
314    },
315    /// Handshake unsuccessful, session was disconnected.
316    Disconnected {
317        /// The remote node's socket address
318        remote_addr: SocketAddr,
319        /// The internal identifier for the disconnected session
320        session_id: SessionId,
321        /// The direction of the session, either `Inbound` or `Outgoing`
322        direction: Direction,
323        /// The error that caused the disconnect
324        error: Option<PendingSessionHandshakeError>,
325    },
326    /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
327    OutgoingConnectionError {
328        /// The remote node's socket address
329        remote_addr: SocketAddr,
330        /// The internal identifier for the disconnected session
331        session_id: SessionId,
332        /// The remote node's public key
333        peer_id: PeerId,
334        /// The error that caused the outgoing connection failure
335        error: io::Error,
336    },
337    /// Thrown when authentication via ECIES failed.
338    EciesAuthError {
339        /// The remote node's socket address
340        remote_addr: SocketAddr,
341        /// The internal identifier for the disconnected session
342        session_id: SessionId,
343        /// The error that caused the ECIES session to fail
344        error: ECIESError,
345        /// The direction of the session, either `Inbound` or `Outgoing`
346        direction: Direction,
347    },
348}
349
350/// Commands that can be sent to the spawned session.
351#[derive(Debug)]
352pub enum SessionCommand<N: NetworkPrimitives> {
353    /// Disconnect the connection
354    Disconnect {
355        /// Why the disconnect was initiated
356        reason: Option<DisconnectReason>,
357    },
358    /// Sends a message to the peer
359    Message(PeerMessage<N>),
360}
361
362/// Message variants an active session can produce and send back to the
363/// [`SessionManager`](crate::session::SessionManager)
364#[derive(Debug)]
365pub enum ActiveSessionMessage<N: NetworkPrimitives> {
366    /// Session was gracefully disconnected.
367    Disconnected {
368        /// The remote node's public key
369        peer_id: PeerId,
370        /// The remote node's socket address
371        remote_addr: SocketAddr,
372    },
373    /// Session was closed due an error
374    ClosedOnConnectionError {
375        /// The remote node's public key
376        peer_id: PeerId,
377        /// The remote node's socket address
378        remote_addr: SocketAddr,
379        /// The error that caused the session to close
380        error: EthStreamError,
381    },
382    /// A session received a valid message via `RLPx`.
383    ValidMessage {
384        /// Identifier of the remote peer.
385        peer_id: PeerId,
386        /// Message received from the peer.
387        message: PeerMessage<N>,
388    },
389    /// Received a bad message from the peer.
390    BadMessage {
391        /// Identifier of the remote peer.
392        peer_id: PeerId,
393    },
394    /// Remote peer is considered in protocol violation
395    ProtocolBreach {
396        /// Identifier of the remote peer.
397        peer_id: PeerId,
398    },
399}