reth_network/session/handle.rs
1//! Session handles.
2
3use crate::{
4 message::PeerMessage,
5 session::{conn::EthRlpxConnection, Direction, SessionId},
6 PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10 errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives, Status,
11};
12use reth_network_api::PeerInfo;
13use reth_network_peers::{NodeRecord, PeerId};
14use reth_network_types::PeerKind;
15use std::{io, net::SocketAddr, sync::Arc, time::Instant};
16use tokio::sync::{
17 mpsc::{self, error::SendError},
18 oneshot,
19};
20
21/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
22/// message which exchanges the `capabilities` of the peer.
23///
24/// This session needs to wait until it is authenticated.
25#[derive(Debug)]
26pub struct PendingSessionHandle {
27 /// Can be used to tell the session to disconnect the connection/abort the handshake process.
28 pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
29 /// The direction of the session
30 pub(crate) direction: Direction,
31}
32
33// === impl PendingSessionHandle ===
34
35impl PendingSessionHandle {
36 /// Sends a disconnect command to the pending session.
37 pub fn disconnect(&mut self) {
38 if let Some(tx) = self.disconnect_tx.take() {
39 let _ = tx.send(());
40 }
41 }
42
43 /// Returns the direction of the pending session (inbound or outbound).
44 pub const fn direction(&self) -> Direction {
45 self.direction
46 }
47}
48
49/// An established session with a remote peer.
50///
51/// Within an active session that supports the `Ethereum Wire Protocol`, three high-level tasks can
52/// be performed: chain synchronization, block propagation and transaction exchange.
53#[derive(Debug)]
54pub struct ActiveSessionHandle<N: NetworkPrimitives> {
55 /// The direction of the session
56 pub(crate) direction: Direction,
57 /// The assigned id for this session
58 pub(crate) session_id: SessionId,
59 /// negotiated eth version
60 pub(crate) version: EthVersion,
61 /// The identifier of the remote peer
62 pub(crate) remote_id: PeerId,
63 /// The timestamp when the session has been established.
64 pub(crate) established: Instant,
65 /// Announced capabilities of the peer.
66 pub(crate) capabilities: Arc<Capabilities>,
67 /// Sender half of the command channel used send commands _to_ the spawned session
68 pub(crate) commands_to_session: mpsc::Sender<SessionCommand<N>>,
69 /// The client's name and version
70 pub(crate) client_version: Arc<str>,
71 /// The address we're connected to
72 pub(crate) remote_addr: SocketAddr,
73 /// The local address of the connection.
74 pub(crate) local_addr: Option<SocketAddr>,
75 /// The Status message the peer sent for the `eth` handshake
76 pub(crate) status: Arc<Status>,
77}
78
79// === impl ActiveSessionHandle ===
80
81impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
82 /// Sends a disconnect command to the session.
83 pub fn disconnect(&self, reason: Option<DisconnectReason>) {
84 // Note: we clone the sender which ensures the channel has capacity to send the message
85 let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason });
86 }
87
88 /// Sends a disconnect command to the session, awaiting the command channel for available
89 /// capacity.
90 pub async fn try_disconnect(
91 &self,
92 reason: Option<DisconnectReason>,
93 ) -> Result<(), SendError<SessionCommand<N>>> {
94 self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await
95 }
96
97 /// Returns the direction of the active session (inbound or outbound).
98 pub const fn direction(&self) -> Direction {
99 self.direction
100 }
101
102 /// Returns the assigned session id for this session.
103 pub const fn session_id(&self) -> SessionId {
104 self.session_id
105 }
106
107 /// Returns the negotiated eth version for this session.
108 pub const fn version(&self) -> EthVersion {
109 self.version
110 }
111
112 /// Returns the identifier of the remote peer.
113 pub const fn remote_id(&self) -> PeerId {
114 self.remote_id
115 }
116
117 /// Returns the timestamp when the session has been established.
118 pub const fn established(&self) -> Instant {
119 self.established
120 }
121
122 /// Returns the announced capabilities of the peer.
123 pub fn capabilities(&self) -> Arc<Capabilities> {
124 self.capabilities.clone()
125 }
126
127 /// Returns the client's name and version.
128 pub fn client_version(&self) -> Arc<str> {
129 self.client_version.clone()
130 }
131
132 /// Returns the address we're connected to.
133 pub const fn remote_addr(&self) -> SocketAddr {
134 self.remote_addr
135 }
136
137 /// Extracts the [`PeerInfo`] from the session handle.
138 pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
139 PeerInfo {
140 remote_id: self.remote_id,
141 direction: self.direction,
142 enode: record.to_string(),
143 enr: None,
144 remote_addr: self.remote_addr,
145 local_addr: self.local_addr,
146 capabilities: self.capabilities.clone(),
147 client_version: self.client_version.clone(),
148 eth_version: self.version,
149 status: self.status.clone(),
150 session_established: self.established,
151 kind,
152 }
153 }
154}
155
156/// Events a pending session can produce.
157///
158/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
159///
160/// A session starts with a `Handshake`, followed by a `Hello` message which
161#[derive(Debug)]
162pub enum PendingSessionEvent<N: NetworkPrimitives> {
163 /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
164 Established {
165 /// An internal identifier for the established session
166 session_id: SessionId,
167 /// The remote node's socket address
168 remote_addr: SocketAddr,
169 /// The local address of the connection
170 local_addr: Option<SocketAddr>,
171 /// The remote node's public key
172 peer_id: PeerId,
173 /// All capabilities the peer announced
174 capabilities: Arc<Capabilities>,
175 /// The Status message the peer sent for the `eth` handshake
176 status: Arc<Status>,
177 /// The actual connection stream which can be used to send and receive `eth` protocol
178 /// messages
179 conn: EthRlpxConnection<N>,
180 /// The direction of the session, either `Inbound` or `Outgoing`
181 direction: Direction,
182 /// The remote node's user agent, usually containing the client name and version
183 client_id: String,
184 },
185 /// Handshake unsuccessful, session was disconnected.
186 Disconnected {
187 /// The remote node's socket address
188 remote_addr: SocketAddr,
189 /// The internal identifier for the disconnected session
190 session_id: SessionId,
191 /// The direction of the session, either `Inbound` or `Outgoing`
192 direction: Direction,
193 /// The error that caused the disconnect
194 error: Option<PendingSessionHandshakeError>,
195 },
196 /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
197 OutgoingConnectionError {
198 /// The remote node's socket address
199 remote_addr: SocketAddr,
200 /// The internal identifier for the disconnected session
201 session_id: SessionId,
202 /// The remote node's public key
203 peer_id: PeerId,
204 /// The error that caused the outgoing connection failure
205 error: io::Error,
206 },
207 /// Thrown when authentication via ECIES failed.
208 EciesAuthError {
209 /// The remote node's socket address
210 remote_addr: SocketAddr,
211 /// The internal identifier for the disconnected session
212 session_id: SessionId,
213 /// The error that caused the ECIES session to fail
214 error: ECIESError,
215 /// The direction of the session, either `Inbound` or `Outgoing`
216 direction: Direction,
217 },
218}
219
220/// Commands that can be sent to the spawned session.
221#[derive(Debug)]
222pub enum SessionCommand<N: NetworkPrimitives> {
223 /// Disconnect the connection
224 Disconnect {
225 /// Why the disconnect was initiated
226 reason: Option<DisconnectReason>,
227 },
228 /// Sends a message to the peer
229 Message(PeerMessage<N>),
230}
231
232/// Message variants an active session can produce and send back to the
233/// [`SessionManager`](crate::session::SessionManager)
234#[derive(Debug)]
235pub enum ActiveSessionMessage<N: NetworkPrimitives> {
236 /// Session was gracefully disconnected.
237 Disconnected {
238 /// The remote node's public key
239 peer_id: PeerId,
240 /// The remote node's socket address
241 remote_addr: SocketAddr,
242 },
243 /// Session was closed due an error
244 ClosedOnConnectionError {
245 /// The remote node's public key
246 peer_id: PeerId,
247 /// The remote node's socket address
248 remote_addr: SocketAddr,
249 /// The error that caused the session to close
250 error: EthStreamError,
251 },
252 /// A session received a valid message via `RLPx`.
253 ValidMessage {
254 /// Identifier of the remote peer.
255 peer_id: PeerId,
256 /// Message received from the peer.
257 message: PeerMessage<N>,
258 },
259 /// Received a bad message from the peer.
260 BadMessage {
261 /// Identifier of the remote peer.
262 peer_id: PeerId,
263 },
264 /// Remote peer is considered in protocol violation
265 ProtocolBreach {
266 /// Identifier of the remote peer.
267 peer_id: PeerId,
268 },
269}