reth_network/session/
handle.rs

1//! Session handles.
2
3use crate::{
4    message::PeerMessage,
5    session::{conn::EthRlpxConnection, Direction, SessionId},
6    PendingSessionHandshakeError,
7};
8use reth_ecies::ECIESError;
9use reth_eth_wire::{
10    errors::EthStreamError, Capabilities, DisconnectReason, EthVersion, NetworkPrimitives, Status,
11};
12use reth_network_api::PeerInfo;
13use reth_network_peers::{NodeRecord, PeerId};
14use reth_network_types::PeerKind;
15use std::{io, net::SocketAddr, sync::Arc, time::Instant};
16use tokio::sync::{
17    mpsc::{self, error::SendError},
18    oneshot,
19};
20
21/// A handler attached to a peer session that's not authenticated yet, pending Handshake and hello
22/// message which exchanges the `capabilities` of the peer.
23///
24/// This session needs to wait until it is authenticated.
25#[derive(Debug)]
26pub struct PendingSessionHandle {
27    /// Can be used to tell the session to disconnect the connection/abort the handshake process.
28    pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
29    /// The direction of the session
30    pub(crate) direction: Direction,
31}
32
33// === impl PendingSessionHandle ===
34
35impl PendingSessionHandle {
36    /// Sends a disconnect command to the pending session.
37    pub fn disconnect(&mut self) {
38        if let Some(tx) = self.disconnect_tx.take() {
39            let _ = tx.send(());
40        }
41    }
42
43    /// Returns the direction of the pending session (inbound or outbound).
44    pub const fn direction(&self) -> Direction {
45        self.direction
46    }
47}
48
49/// An established session with a remote peer.
50///
51/// Within an active session that supports the `Ethereum Wire Protocol`, three high-level tasks can
52/// be performed: chain synchronization, block propagation and transaction exchange.
53#[derive(Debug)]
54pub struct ActiveSessionHandle<N: NetworkPrimitives> {
55    /// The direction of the session
56    pub(crate) direction: Direction,
57    /// The assigned id for this session
58    pub(crate) session_id: SessionId,
59    /// negotiated eth version
60    pub(crate) version: EthVersion,
61    /// The identifier of the remote peer
62    pub(crate) remote_id: PeerId,
63    /// The timestamp when the session has been established.
64    pub(crate) established: Instant,
65    /// Announced capabilities of the peer.
66    pub(crate) capabilities: Arc<Capabilities>,
67    /// Sender half of the command channel used send commands _to_ the spawned session
68    pub(crate) commands_to_session: mpsc::Sender<SessionCommand<N>>,
69    /// The client's name and version
70    pub(crate) client_version: Arc<str>,
71    /// The address we're connected to
72    pub(crate) remote_addr: SocketAddr,
73    /// The local address of the connection.
74    pub(crate) local_addr: Option<SocketAddr>,
75    /// The Status message the peer sent for the `eth` handshake
76    pub(crate) status: Arc<Status>,
77}
78
79// === impl ActiveSessionHandle ===
80
81impl<N: NetworkPrimitives> ActiveSessionHandle<N> {
82    /// Sends a disconnect command to the session.
83    pub fn disconnect(&self, reason: Option<DisconnectReason>) {
84        // Note: we clone the sender which ensures the channel has capacity to send the message
85        let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason });
86    }
87
88    /// Sends a disconnect command to the session, awaiting the command channel for available
89    /// capacity.
90    pub async fn try_disconnect(
91        &self,
92        reason: Option<DisconnectReason>,
93    ) -> Result<(), SendError<SessionCommand<N>>> {
94        self.commands_to_session.clone().send(SessionCommand::Disconnect { reason }).await
95    }
96
97    /// Returns the direction of the active session (inbound or outbound).
98    pub const fn direction(&self) -> Direction {
99        self.direction
100    }
101
102    /// Returns the assigned session id for this session.
103    pub const fn session_id(&self) -> SessionId {
104        self.session_id
105    }
106
107    /// Returns the negotiated eth version for this session.
108    pub const fn version(&self) -> EthVersion {
109        self.version
110    }
111
112    /// Returns the identifier of the remote peer.
113    pub const fn remote_id(&self) -> PeerId {
114        self.remote_id
115    }
116
117    /// Returns the timestamp when the session has been established.
118    pub const fn established(&self) -> Instant {
119        self.established
120    }
121
122    /// Returns the announced capabilities of the peer.
123    pub fn capabilities(&self) -> Arc<Capabilities> {
124        self.capabilities.clone()
125    }
126
127    /// Returns the client's name and version.
128    pub fn client_version(&self) -> Arc<str> {
129        self.client_version.clone()
130    }
131
132    /// Returns the address we're connected to.
133    pub const fn remote_addr(&self) -> SocketAddr {
134        self.remote_addr
135    }
136
137    /// Extracts the [`PeerInfo`] from the session handle.
138    pub(crate) fn peer_info(&self, record: &NodeRecord, kind: PeerKind) -> PeerInfo {
139        PeerInfo {
140            remote_id: self.remote_id,
141            direction: self.direction,
142            enode: record.to_string(),
143            enr: None,
144            remote_addr: self.remote_addr,
145            local_addr: self.local_addr,
146            capabilities: self.capabilities.clone(),
147            client_version: self.client_version.clone(),
148            eth_version: self.version,
149            status: self.status.clone(),
150            session_established: self.established,
151            kind,
152        }
153    }
154}
155
156/// Events a pending session can produce.
157///
158/// This represents the state changes a session can undergo until it is ready to send capability messages <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md>.
159///
160/// A session starts with a `Handshake`, followed by a `Hello` message which
161#[derive(Debug)]
162pub enum PendingSessionEvent<N: NetworkPrimitives> {
163    /// Represents a successful `Hello` and `Status` exchange: <https://github.com/ethereum/devp2p/blob/6b0abc3d956a626c28dce1307ee9f546db17b6bd/rlpx.md#hello-0x00>
164    Established {
165        /// An internal identifier for the established session
166        session_id: SessionId,
167        /// The remote node's socket address
168        remote_addr: SocketAddr,
169        /// The local address of the connection
170        local_addr: Option<SocketAddr>,
171        /// The remote node's public key
172        peer_id: PeerId,
173        /// All capabilities the peer announced
174        capabilities: Arc<Capabilities>,
175        /// The Status message the peer sent for the `eth` handshake
176        status: Arc<Status>,
177        /// The actual connection stream which can be used to send and receive `eth` protocol
178        /// messages
179        conn: EthRlpxConnection<N>,
180        /// The direction of the session, either `Inbound` or `Outgoing`
181        direction: Direction,
182        /// The remote node's user agent, usually containing the client name and version
183        client_id: String,
184    },
185    /// Handshake unsuccessful, session was disconnected.
186    Disconnected {
187        /// The remote node's socket address
188        remote_addr: SocketAddr,
189        /// The internal identifier for the disconnected session
190        session_id: SessionId,
191        /// The direction of the session, either `Inbound` or `Outgoing`
192        direction: Direction,
193        /// The error that caused the disconnect
194        error: Option<PendingSessionHandshakeError>,
195    },
196    /// Thrown when unable to establish a [`TcpStream`](tokio::net::TcpStream).
197    OutgoingConnectionError {
198        /// The remote node's socket address
199        remote_addr: SocketAddr,
200        /// The internal identifier for the disconnected session
201        session_id: SessionId,
202        /// The remote node's public key
203        peer_id: PeerId,
204        /// The error that caused the outgoing connection failure
205        error: io::Error,
206    },
207    /// Thrown when authentication via ECIES failed.
208    EciesAuthError {
209        /// The remote node's socket address
210        remote_addr: SocketAddr,
211        /// The internal identifier for the disconnected session
212        session_id: SessionId,
213        /// The error that caused the ECIES session to fail
214        error: ECIESError,
215        /// The direction of the session, either `Inbound` or `Outgoing`
216        direction: Direction,
217    },
218}
219
220/// Commands that can be sent to the spawned session.
221#[derive(Debug)]
222pub enum SessionCommand<N: NetworkPrimitives> {
223    /// Disconnect the connection
224    Disconnect {
225        /// Why the disconnect was initiated
226        reason: Option<DisconnectReason>,
227    },
228    /// Sends a message to the peer
229    Message(PeerMessage<N>),
230}
231
232/// Message variants an active session can produce and send back to the
233/// [`SessionManager`](crate::session::SessionManager)
234#[derive(Debug)]
235pub enum ActiveSessionMessage<N: NetworkPrimitives> {
236    /// Session was gracefully disconnected.
237    Disconnected {
238        /// The remote node's public key
239        peer_id: PeerId,
240        /// The remote node's socket address
241        remote_addr: SocketAddr,
242    },
243    /// Session was closed due an error
244    ClosedOnConnectionError {
245        /// The remote node's public key
246        peer_id: PeerId,
247        /// The remote node's socket address
248        remote_addr: SocketAddr,
249        /// The error that caused the session to close
250        error: EthStreamError,
251    },
252    /// A session received a valid message via `RLPx`.
253    ValidMessage {
254        /// Identifier of the remote peer.
255        peer_id: PeerId,
256        /// Message received from the peer.
257        message: PeerMessage<N>,
258    },
259    /// Received a bad message from the peer.
260    BadMessage {
261        /// Identifier of the remote peer.
262        peer_id: PeerId,
263    },
264    /// Remote peer is considered in protocol violation
265    ProtocolBreach {
266        /// Identifier of the remote peer.
267        peer_id: PeerId,
268    },
269}