reth_network/session/handle.rs
1//! Session handles.
2
3use crate::{
4 message::PeerMessage,
5 session::{active::BroadcastItemCounter, conn::EthRlpxConnection, Direction, SessionId},
6 PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10 errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives,
11 UnifiedStatus,
12};
13use reth_network_api::PeerInfo;
14use reth_network_peers::{NodeRecord, PeerId};
15use reth_network_types::PeerKind;
16use std::{io, net::SocketAddr, sync::Arc, time::Instant};
17use tokio::sync::{
18 mpsc::{self, error::SendError},
19 oneshot,
20};
21use tracing::trace;
22
23/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
24/// message which exchanges the `capabilities` of the peer.
25///
26/// This session needs to wait until it is authenticated.
27#[derive(Debug)]
28pub struct PendingSessionHandle {
29 /// Can be used to tell the session to disconnect the connection/abort the handshake process.
30 pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
31 /// The direction of the session
32 pub(crate) direction: Direction,
33}
34
35// === impl PendingSessionHandle ===
36
37impl PendingSessionHandle {
38 /// Sends a disconnect command to the pending session.
39 pub fn disconnect(&mut self) {
40 if let Some(tx) = self.disconnect_tx.take() {
41 let _ = tx.send(());
42 }
43 }
44
45 /// Returns the direction of the pending session (inbound or outbound).
46 pub const fn direction(&self) -> Direction {
47 self.direction
48 }
49}
50
51/// An established session with a remote peer.
52///
53/// Within an active session that supports the `Ethereum Wire Protocol`, three high-level tasks can
54/// be performed: chain synchronization, block propagation and transaction exchange.
55#[derive(Debug)]
56pub struct ActiveSessionHandle<N: NetworkPrimitives> {
57 /// The direction of the session
58 pub(crate) direction: Direction,
59 /// The assigned id for this session
60 pub(crate) session_id: SessionId,
61 /// negotiated eth version
62 pub(crate) version: EthVersion,
63 /// The identifier of the remote peer
64 pub(crate) remote_id: PeerId,
65 /// The timestamp when the session has been established.
66 pub(crate) established: Instant,
67 /// Announced capabilities of the peer.
68 pub(crate) capabilities: Arc<Capabilities>,
69 /// Sender for commands to the spawned session with broadcast-aware backpressure.
70 pub(crate) commands: SessionCommandSender<N>,
71 /// The client's name and version
72 pub(crate) client_version: Arc<str>,
73 /// The address we're connected to
74 pub(crate) remote_addr: SocketAddr,
75 /// The local address of the connection.
76 pub(crate) local_addr: Option<SocketAddr>,
77 /// The TCP listening port the peer announced in its `Hello` message, if non-zero.
78 ///
79 /// This is effectively deprecated, but we still keep it around if a peer announced it as it's
80 /// likely still more useful than the ephemeral source port.
81 pub(crate) peer_listen_port: Option<u16>,
82 /// The Status message the peer sent for the `eth` handshake
83 pub(crate) status: Arc<UnifiedStatus>,
84}
85
86// === impl ActiveSessionHandle ===
87
88impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
89 /// Sends a disconnect command to the session.
90 pub fn disconnect(&self, reason: Option<DisconnectReason>) {
91 self.commands.disconnect(reason);
92 }
93
94 /// Sends a disconnect command to the session via the unbounded channel.
95 pub fn try_disconnect(
96 &self,
97 reason: Option<DisconnectReason>,
98 ) -> Result<(), SendError<SessionCommand<N>>> {
99 self.commands.try_disconnect(reason)
100 }
101
102 /// Returns the direction of the active session (inbound or outbound).
103 pub const fn direction(&self) -> Direction {
104 self.direction
105 }
106
107 /// Returns the assigned session id for this session.
108 pub const fn session_id(&self) -> SessionId {
109 self.session_id
110 }
111
112 /// Returns the negotiated eth version for this session.
113 pub const fn version(&self) -> EthVersion {
114 self.version
115 }
116
117 /// Returns the identifier of the remote peer.
118 pub const fn remote_id(&self) -> PeerId {
119 self.remote_id
120 }
121
122 /// Returns the timestamp when the session has been established.
123 pub const fn established(&self) -> Instant {
124 self.established
125 }
126
127 /// Returns the announced capabilities of the peer.
128 pub fn capabilities(&self) -> Arc<Capabilities> {
129 self.capabilities.clone()
130 }
131
132 /// Returns the client's name and version.
133 pub fn client_version(&self) -> Arc<str> {
134 self.client_version.clone()
135 }
136
137 /// Returns the address we're connected to.
138 pub const fn remote_addr(&self) -> SocketAddr {
139 self.remote_addr
140 }
141
142 /// Returns the current number of in-flight broadcast items.
143 pub fn queued_broadcast_items(&self) -> usize {
144 self.commands.queued_broadcast_items()
145 }
146
147 /// Extracts the [`PeerInfo`] from the session handle.
148 pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
149 // For inbound connections, the `record` was built from the TCP socket address, which
150 // carries the peer's OS-assigned ephemeral source port (not dialable). If the peer
151 // announced a non-zero listening port in its `Hello` message, prefer that combined with
152 // the connection IP so the resulting enode is actually dialable.
153 let enode = match (self.direction, self.peer_listen_port) {
154 (Direction::Incoming, Some(port)) => NodeRecord::new_with_ports(
155 self.remote_addr.ip(),
156 port,
157 Some(record.udp_port),
158 record.id,
159 )
160 .to_string(),
161 _ => record.to_string(),
162 };
163 PeerInfo {
164 remote_id: self.remote_id,
165 direction: self.direction,
166 enode,
167 enr: None,
168 remote_addr: self.remote_addr,
169 local_addr: self.local_addr,
170 capabilities: self.capabilities.clone(),
171 client_version: self.client_version.clone(),
172 eth_version: self.version,
173 status: self.status.clone(),
174 session_established: self.established,
175 kind,
176 }
177 }
178}
179
180/// Sender half of the session command channel with broadcast-aware backpressure.
181///
182/// Commands are first sent through a bounded channel. If the bounded channel is full and the
183/// message is a broadcast with room under the broadcast item limit, it overflows to a dedicated
184/// unbounded channel that the session task drains alongside the bounded one.
185///
186/// The shared `broadcast_items` counter tracks items across **all** buffers (bounded channel,
187/// overflow channel, and the session's outgoing queue), so the
188/// [`SessionManager`](super::SessionManager) has an accurate view of total in-flight broadcast
189/// pressure.
190#[derive(Debug)]
191pub(crate) struct SessionCommandSender<N: NetworkPrimitives> {
192 /// Bounded channel for all commands (primary path).
193 tx: mpsc::Sender<SessionCommand<N>>,
194 /// Unbounded channel used for broadcasts that overflow the bounded channel, and for
195 /// disconnect commands (which must never be dropped due to backpressure).
196 unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
197 /// Shared counter of in-flight broadcast items (channels + outgoing queue).
198 broadcast_items: BroadcastItemCounter,
199}
200
201impl<N: NetworkPrimitives> SessionCommandSender<N> {
202 /// Creates a new sender with the given bounded channel, unbounded channel, and shared counter.
203 pub(crate) const fn new(
204 tx: mpsc::Sender<SessionCommand<N>>,
205 unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
206 broadcast_items: BroadcastItemCounter,
207 ) -> Self {
208 Self { tx, unbounded_tx, broadcast_items }
209 }
210
211 /// Sends a disconnect command via the unbounded channel so it is never dropped due to
212 /// backpressure.
213 pub(crate) fn disconnect(&self, reason: Option<DisconnectReason>) {
214 let _ = self.unbounded_tx.send(SessionCommand::Disconnect { reason });
215 }
216
217 /// Sends a disconnect command via the unbounded channel.
218 ///
219 /// This is infallible from a capacity standpoint (unbounded), but will fail if the
220 /// receiver has been dropped (session closed).
221 pub(crate) fn try_disconnect(
222 &self,
223 reason: Option<DisconnectReason>,
224 ) -> Result<(), SendError<SessionCommand<N>>> {
225 self.unbounded_tx.send(SessionCommand::Disconnect { reason }).map_err(|e| SendError(e.0))
226 }
227
228 /// Sends a message to the session with broadcast-aware backpressure.
229 ///
230 /// For broadcast messages, the broadcast item counter is incremented **before** the message
231 /// enters any channel, ensuring the counter always reflects the true in-flight count.
232 /// If the bounded channel is full, broadcasts overflow to the unbounded channel (up to the
233 /// item limit). Non-broadcast messages that cannot fit in the bounded channel are dropped.
234 ///
235 /// Returns `true` if the message was accepted, `false` if it was dropped.
236 pub(crate) fn send_message(&self, msg: PeerMessage<N>) -> bool {
237 if msg.is_broadcast() {
238 let items = msg.message_item_count();
239
240 // Check + increment atomically (optimistic)
241 if !self.broadcast_items.try_add(items) {
242 return false;
243 }
244
245 // Try bounded channel first
246 match self.tx.try_send(SessionCommand::Message(msg)) {
247 Ok(()) => true,
248 Err(mpsc::error::TrySendError::Full(cmd)) => {
249 // Overflow to unbounded channel (counter already incremented)
250 let _ = self.unbounded_tx.send(cmd);
251 true
252 }
253 Err(_) => {
254 // Channel closed, undo increment
255 self.broadcast_items.sub(items);
256 false
257 }
258 }
259 } else {
260 // Non-broadcast: bounded channel only
261 match self.tx.try_send(SessionCommand::Message(msg)) {
262 Ok(()) => true,
263 Err(mpsc::error::TrySendError::Full(SessionCommand::Message(msg))) => {
264 trace!(
265 target: "net::session",
266 msg_kind = msg.message_kind(),
267 "session command buffer full, dropping non-broadcast message"
268 );
269 false
270 }
271 Err(_) => false,
272 }
273 }
274 }
275
276 /// Returns the current number of in-flight broadcast items.
277 pub(crate) fn queued_broadcast_items(&self) -> usize {
278 self.broadcast_items.get()
279 }
280}
281
282/// Events a pending session can produce.
283///
284/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
285///
286/// A session starts with a `Handshake`, followed by a `Hello` message which
287#[derive(Debug)]
288pub enum PendingSessionEvent<N: NetworkPrimitives> {
289 /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
290 Established {
291 /// An internal identifier for the established session
292 session_id: SessionId,
293 /// The remote node's socket address
294 remote_addr: SocketAddr,
295 /// The local address of the connection
296 local_addr: Option<SocketAddr>,
297 /// The remote node's public key
298 peer_id: PeerId,
299 /// All capabilities the peer announced
300 capabilities: Arc<Capabilities>,
301 /// The Status message the peer sent for the `eth` handshake
302 status: Arc<UnifiedStatus>,
303 /// The actual connection stream which can be used to send and receive `eth` protocol
304 /// messages
305 conn: EthRlpxConnection<N>,
306 /// The direction of the session, either `Inbound` or `Outgoing`
307 direction: Direction,
308 /// The remote node's user agent, usually containing the client name and version
309 client_id: String,
310 /// The TCP listening port the peer announced in its `Hello` message, if non-zero.
311 ///
312 /// See `ActiveSessionHandle::peer_listen_port` for context.
313 peer_listen_port: Option<u16>,
314 },
315 /// Handshake unsuccessful, session was disconnected.
316 Disconnected {
317 /// The remote node's socket address
318 remote_addr: SocketAddr,
319 /// The internal identifier for the disconnected session
320 session_id: SessionId,
321 /// The direction of the session, either `Inbound` or `Outgoing`
322 direction: Direction,
323 /// The error that caused the disconnect
324 error: Option<PendingSessionHandshakeError>,
325 },
326 /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
327 OutgoingConnectionError {
328 /// The remote node's socket address
329 remote_addr: SocketAddr,
330 /// The internal identifier for the disconnected session
331 session_id: SessionId,
332 /// The remote node's public key
333 peer_id: PeerId,
334 /// The error that caused the outgoing connection failure
335 error: io::Error,
336 },
337 /// Thrown when authentication via ECIES failed.
338 EciesAuthError {
339 /// The remote node's socket address
340 remote_addr: SocketAddr,
341 /// The internal identifier for the disconnected session
342 session_id: SessionId,
343 /// The error that caused the ECIES session to fail
344 error: ECIESError,
345 /// The direction of the session, either `Inbound` or `Outgoing`
346 direction: Direction,
347 },
348}
349
350/// Commands that can be sent to the spawned session.
351#[derive(Debug)]
352pub enum SessionCommand<N: NetworkPrimitives> {
353 /// Disconnect the connection
354 Disconnect {
355 /// Why the disconnect was initiated
356 reason: Option<DisconnectReason>,
357 },
358 /// Sends a message to the peer
359 Message(PeerMessage<N>),
360}
361
362/// Message variants an active session can produce and send back to the
363/// [`SessionManager`](crate::session::SessionManager)
364#[derive(Debug)]
365pub enum ActiveSessionMessage<N: NetworkPrimitives> {
366 /// Session was gracefully disconnected.
367 Disconnected {
368 /// The remote node's public key
369 peer_id: PeerId,
370 /// The remote node's socket address
371 remote_addr: SocketAddr,
372 },
373 /// Session was closed due an error
374 ClosedOnConnectionError {
375 /// The remote node's public key
376 peer_id: PeerId,
377 /// The remote node's socket address
378 remote_addr: SocketAddr,
379 /// The error that caused the session to close
380 error: EthStreamError,
381 },
382 /// A session received a valid message via `RLPx`.
383 ValidMessage {
384 /// Identifier of the remote peer.
385 peer_id: PeerId,
386 /// Message received from the peer.
387 message: PeerMessage<N>,
388 },
389 /// Received a bad message from the peer.
390 BadMessage {
391 /// Identifier of the remote peer.
392 peer_id: PeerId,
393 },
394 /// Remote peer is considered in protocol violation
395 ProtocolBreach {
396 /// Identifier of the remote peer.
397 peer_id: PeerId,
398 },
399}