reth_network/session/handle.rs
1//! Session handles.
2
3use crate::{
4 message::PeerMessage,
5 session::{active::BroadcastItemCounter, conn::EthRlpxConnection, Direction, SessionId},
6 PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10 errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives,
11 UnifiedStatus,
12};
13use reth_network_api::PeerInfo;
14use reth_network_peers::{NodeRecord, PeerId};
15use reth_network_types::PeerKind;
16use std::{io, net::SocketAddr, sync::Arc, time::Instant};
17use tokio::sync::{
18 mpsc::{self, error::SendError},
19 oneshot,
20};
21use tracing::trace;
22
23/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
24/// message which exchanges the `capabilities` of the peer.
25///
26/// This session needs to wait until it is authenticated.
27#[derive(Debug)]
28pub struct PendingSessionHandle {
29 /// Can be used to tell the session to disconnect the connection/abort the handshake process.
30 pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
31 /// The direction of the session
32 pub(crate) direction: Direction,
33}
34
35// === impl PendingSessionHandle ===
36
37impl PendingSessionHandle {
38 /// Sends a disconnect command to the pending session.
39 pub fn disconnect(&mut self) {
40 if let Some(tx) = self.disconnect_tx.take() {
41 let _ = tx.send(());
42 }
43 }
44
45 /// Returns the direction of the pending session (inbound or outbound).
46 pub const fn direction(&self) -> Direction {
47 self.direction
48 }
49}
50
51/// An established session with a remote peer.
52///
53/// Within an active session that supports the `Ethereum Wire Protocol`, three high-level tasks can
54/// be performed: chain synchronization, block propagation and transaction exchange.
55#[derive(Debug)]
56pub struct ActiveSessionHandle<N: NetworkPrimitives> {
57 /// The direction of the session
58 pub(crate) direction: Direction,
59 /// The assigned id for this session
60 pub(crate) session_id: SessionId,
61 /// negotiated eth version
62 pub(crate) version: EthVersion,
63 /// The identifier of the remote peer
64 pub(crate) remote_id: PeerId,
65 /// The timestamp when the session has been established.
66 pub(crate) established: Instant,
67 /// Announced capabilities of the peer.
68 pub(crate) capabilities: Arc<Capabilities>,
69 /// Sender for commands to the spawned session with broadcast-aware backpressure.
70 pub(crate) commands: SessionCommandSender<N>,
71 /// The client's name and version
72 pub(crate) client_version: Arc<str>,
73 /// The address we're connected to
74 pub(crate) remote_addr: SocketAddr,
75 /// The local address of the connection.
76 pub(crate) local_addr: Option<SocketAddr>,
77 /// The Status message the peer sent for the `eth` handshake
78 pub(crate) status: Arc<UnifiedStatus>,
79}
80
81// === impl ActiveSessionHandle ===
82
83impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
84 /// Sends a disconnect command to the session.
85 pub fn disconnect(&self, reason: Option<DisconnectReason>) {
86 self.commands.disconnect(reason);
87 }
88
89 /// Sends a disconnect command to the session via the unbounded channel.
90 pub fn try_disconnect(
91 &self,
92 reason: Option<DisconnectReason>,
93 ) -> Result<(), SendError<SessionCommand<N>>> {
94 self.commands.try_disconnect(reason)
95 }
96
97 /// Returns the direction of the active session (inbound or outbound).
98 pub const fn direction(&self) -> Direction {
99 self.direction
100 }
101
102 /// Returns the assigned session id for this session.
103 pub const fn session_id(&self) -> SessionId {
104 self.session_id
105 }
106
107 /// Returns the negotiated eth version for this session.
108 pub const fn version(&self) -> EthVersion {
109 self.version
110 }
111
112 /// Returns the identifier of the remote peer.
113 pub const fn remote_id(&self) -> PeerId {
114 self.remote_id
115 }
116
117 /// Returns the timestamp when the session has been established.
118 pub const fn established(&self) -> Instant {
119 self.established
120 }
121
122 /// Returns the announced capabilities of the peer.
123 pub fn capabilities(&self) -> Arc<Capabilities> {
124 self.capabilities.clone()
125 }
126
127 /// Returns the client's name and version.
128 pub fn client_version(&self) -> Arc<str> {
129 self.client_version.clone()
130 }
131
132 /// Returns the address we're connected to.
133 pub const fn remote_addr(&self) -> SocketAddr {
134 self.remote_addr
135 }
136
137 /// Returns the current number of in-flight broadcast items.
138 pub fn queued_broadcast_items(&self) -> usize {
139 self.commands.queued_broadcast_items()
140 }
141
142 /// Extracts the [`PeerInfo`] from the session handle.
143 pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
144 PeerInfo {
145 remote_id: self.remote_id,
146 direction: self.direction,
147 enode: record.to_string(),
148 enr: None,
149 remote_addr: self.remote_addr,
150 local_addr: self.local_addr,
151 capabilities: self.capabilities.clone(),
152 client_version: self.client_version.clone(),
153 eth_version: self.version,
154 status: self.status.clone(),
155 session_established: self.established,
156 kind,
157 }
158 }
159}
160
161/// Sender half of the session command channel with broadcast-aware backpressure.
162///
163/// Commands are first sent through a bounded channel. If the bounded channel is full and the
164/// message is a broadcast with room under the broadcast item limit, it overflows to a dedicated
165/// unbounded channel that the session task drains alongside the bounded one.
166///
167/// The shared `broadcast_items` counter tracks items across **all** buffers (bounded channel,
168/// overflow channel, and the session's outgoing queue), so the
169/// [`SessionManager`](super::SessionManager) has an accurate view of total in-flight broadcast
170/// pressure.
171#[derive(Debug)]
172pub(crate) struct SessionCommandSender<N: NetworkPrimitives> {
173 /// Bounded channel for all commands (primary path).
174 tx: mpsc::Sender<SessionCommand<N>>,
175 /// Unbounded channel used for broadcasts that overflow the bounded channel, and for
176 /// disconnect commands (which must never be dropped due to backpressure).
177 unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
178 /// Shared counter of in-flight broadcast items (channels + outgoing queue).
179 broadcast_items: BroadcastItemCounter,
180}
181
182impl<N: NetworkPrimitives> SessionCommandSender<N> {
183 /// Creates a new sender with the given bounded channel, unbounded channel, and shared counter.
184 pub(crate) const fn new(
185 tx: mpsc::Sender<SessionCommand<N>>,
186 unbounded_tx: mpsc::UnboundedSender<SessionCommand<N>>,
187 broadcast_items: BroadcastItemCounter,
188 ) -> Self {
189 Self { tx, unbounded_tx, broadcast_items }
190 }
191
192 /// Sends a disconnect command via the unbounded channel so it is never dropped due to
193 /// backpressure.
194 pub(crate) fn disconnect(&self, reason: Option<DisconnectReason>) {
195 let _ = self.unbounded_tx.send(SessionCommand::Disconnect { reason });
196 }
197
198 /// Sends a disconnect command via the unbounded channel.
199 ///
200 /// This is infallible from a capacity standpoint (unbounded), but will fail if the
201 /// receiver has been dropped (session closed).
202 pub(crate) fn try_disconnect(
203 &self,
204 reason: Option<DisconnectReason>,
205 ) -> Result<(), SendError<SessionCommand<N>>> {
206 self.unbounded_tx.send(SessionCommand::Disconnect { reason }).map_err(|e| SendError(e.0))
207 }
208
209 /// Sends a message to the session with broadcast-aware backpressure.
210 ///
211 /// For broadcast messages, the broadcast item counter is incremented **before** the message
212 /// enters any channel, ensuring the counter always reflects the true in-flight count.
213 /// If the bounded channel is full, broadcasts overflow to the unbounded channel (up to the
214 /// item limit). Non-broadcast messages that cannot fit in the bounded channel are dropped.
215 ///
216 /// Returns `true` if the message was accepted, `false` if it was dropped.
217 pub(crate) fn send_message(&self, msg: PeerMessage<N>) -> bool {
218 if msg.is_broadcast() {
219 let items = msg.message_item_count();
220
221 // Check + increment atomically (optimistic)
222 if !self.broadcast_items.try_add(items) {
223 return false;
224 }
225
226 // Try bounded channel first
227 match self.tx.try_send(SessionCommand::Message(msg)) {
228 Ok(()) => true,
229 Err(mpsc::error::TrySendError::Full(cmd)) => {
230 // Overflow to unbounded channel (counter already incremented)
231 let _ = self.unbounded_tx.send(cmd);
232 true
233 }
234 Err(_) => {
235 // Channel closed, undo increment
236 self.broadcast_items.sub(items);
237 false
238 }
239 }
240 } else {
241 // Non-broadcast: bounded channel only
242 match self.tx.try_send(SessionCommand::Message(msg)) {
243 Ok(()) => true,
244 Err(mpsc::error::TrySendError::Full(SessionCommand::Message(msg))) => {
245 trace!(
246 target: "net::session",
247 msg_kind = msg.message_kind(),
248 "session command buffer full, dropping non-broadcast message"
249 );
250 false
251 }
252 Err(_) => false,
253 }
254 }
255 }
256
257 /// Returns the current number of in-flight broadcast items.
258 pub(crate) fn queued_broadcast_items(&self) -> usize {
259 self.broadcast_items.get()
260 }
261}
262
263/// Events a pending session can produce.
264///
265/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
266///
267/// A session starts with a `Handshake`, followed by a `Hello` message which
268#[derive(Debug)]
269pub enum PendingSessionEvent<N: NetworkPrimitives> {
270 /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
271 Established {
272 /// An internal identifier for the established session
273 session_id: SessionId,
274 /// The remote node's socket address
275 remote_addr: SocketAddr,
276 /// The local address of the connection
277 local_addr: Option<SocketAddr>,
278 /// The remote node's public key
279 peer_id: PeerId,
280 /// All capabilities the peer announced
281 capabilities: Arc<Capabilities>,
282 /// The Status message the peer sent for the `eth` handshake
283 status: Arc<UnifiedStatus>,
284 /// The actual connection stream which can be used to send and receive `eth` protocol
285 /// messages
286 conn: EthRlpxConnection<N>,
287 /// The direction of the session, either `Inbound` or `Outgoing`
288 direction: Direction,
289 /// The remote node's user agent, usually containing the client name and version
290 client_id: String,
291 },
292 /// Handshake unsuccessful, session was disconnected.
293 Disconnected {
294 /// The remote node's socket address
295 remote_addr: SocketAddr,
296 /// The internal identifier for the disconnected session
297 session_id: SessionId,
298 /// The direction of the session, either `Inbound` or `Outgoing`
299 direction: Direction,
300 /// The error that caused the disconnect
301 error: Option<PendingSessionHandshakeError>,
302 },
303 /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
304 OutgoingConnectionError {
305 /// The remote node's socket address
306 remote_addr: SocketAddr,
307 /// The internal identifier for the disconnected session
308 session_id: SessionId,
309 /// The remote node's public key
310 peer_id: PeerId,
311 /// The error that caused the outgoing connection failure
312 error: io::Error,
313 },
314 /// Thrown when authentication via ECIES failed.
315 EciesAuthError {
316 /// The remote node's socket address
317 remote_addr: SocketAddr,
318 /// The internal identifier for the disconnected session
319 session_id: SessionId,
320 /// The error that caused the ECIES session to fail
321 error: ECIESError,
322 /// The direction of the session, either `Inbound` or `Outgoing`
323 direction: Direction,
324 },
325}
326
327/// Commands that can be sent to the spawned session.
328#[derive(Debug)]
329pub enum SessionCommand<N: NetworkPrimitives> {
330 /// Disconnect the connection
331 Disconnect {
332 /// Why the disconnect was initiated
333 reason: Option<DisconnectReason>,
334 },
335 /// Sends a message to the peer
336 Message(PeerMessage<N>),
337}
338
339/// Message variants an active session can produce and send back to the
340/// [`SessionManager`](crate::session::SessionManager)
341#[derive(Debug)]
342pub enum ActiveSessionMessage<N: NetworkPrimitives> {
343 /// Session was gracefully disconnected.
344 Disconnected {
345 /// The remote node's public key
346 peer_id: PeerId,
347 /// The remote node's socket address
348 remote_addr: SocketAddr,
349 },
350 /// Session was closed due an error
351 ClosedOnConnectionError {
352 /// The remote node's public key
353 peer_id: PeerId,
354 /// The remote node's socket address
355 remote_addr: SocketAddr,
356 /// The error that caused the session to close
357 error: EthStreamError,
358 },
359 /// A session received a valid message via `RLPx`.
360 ValidMessage {
361 /// Identifier of the remote peer.
362 peer_id: PeerId,
363 /// Message received from the peer.
364 message: PeerMessage<N>,
365 },
366 /// Received a bad message from the peer.
367 BadMessage {
368 /// Identifier of the remote peer.
369 peer_id: PeerId,
370 },
371 /// Remote peer is considered in protocol violation
372 ProtocolBreach {
373 /// Identifier of the remote peer.
374 peer_id: PeerId,
375 },
376}