Skip to main content

reth_network/session/
mod.rs

1//! Support for handling peer sessions.
2
3mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11    message::PeerMessage,
12    metrics::SessionManagerMetrics,
13    protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14    session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21    errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22    BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23    HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24    HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35    collections::HashMap,
36    future::Future,
37    net::SocketAddr,
38    sync::{atomic::AtomicU64, Arc},
39    task::{Context, Poll},
40    time::{Duration, Instant},
41};
42use tokio::{
43    io::{AsyncRead, AsyncWrite},
44    net::TcpStream,
45    sync::{mpsc, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{instrument, trace};
50
51use crate::session::active::{BroadcastItemCounter, RANGE_UPDATE_INTERVAL};
52pub use conn::EthRlpxConnection;
53use handle::SessionCommandSender;
54pub use handle::{
55    ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
56    SessionCommand,
57};
58pub use reth_network_api::{Direction, PeerInfo};
59
60/// Internal identifier for active sessions.
61#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
62pub struct SessionId(usize);
63
64/// Manages a set of sessions.
65#[must_use = "Session Manager must be polled to process session events."]
66#[derive(Debug)]
67pub struct SessionManager<N: NetworkPrimitives> {
68    /// Tracks the identifier for the next session.
69    next_id: usize,
70    /// Keeps track of all sessions
71    counter: SessionCounter,
72    ///  The maximum initial time an [`ActiveSession`] waits for a response from the peer before it
73    /// responds to an _internal_ request with a `TimeoutError`
74    initial_internal_request_timeout: Duration,
75    /// If an [`ActiveSession`] does not receive a response at all within this duration then it is
76    /// considered a protocol violation and the session will initiate a drop.
77    protocol_breach_request_timeout: Duration,
78    /// The timeout after which a pending session attempt is considered failed.
79    pending_session_timeout: Duration,
80    /// The secret key used for authenticating sessions.
81    secret_key: SecretKey,
82    /// The `Status` message to send to peers.
83    status: UnifiedStatus,
84    /// The `HelloMessage` message to send to peers.
85    hello_message: HelloMessageWithProtocols,
86    /// The [`ForkFilter`] used to validate the peer's `Status` message.
87    fork_filter: ForkFilter,
88    /// Size of the command buffer per session.
89    session_command_buffer: usize,
90    /// The executor for spawned tasks.
91    executor: Runtime,
92    /// All pending session that are currently handshaking, exchanging `Hello`s.
93    ///
94    /// Events produced during the authentication phase are reported to this manager. Once the
95    /// session is authenticated, it can be moved to the `active_session` set.
96    pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
97    /// All active sessions that are ready to exchange messages.
98    active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
99    /// The original Sender half of the [`PendingSessionEvent`] channel.
100    ///
101    /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
102    /// get a clone of this sender half.
103    pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
104    /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
105    pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
106    /// The original Sender half of the [`ActiveSessionMessage`] channel.
107    ///
108    /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
109    /// clone of this sender half.
110    active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
111    /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
112    active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
113    /// Additional `RLPx` sub-protocols to be used by the session manager.
114    extra_protocols: RlpxSubProtocols,
115    /// Tracks the ongoing graceful disconnections attempts for incoming connections.
116    disconnections_counter: DisconnectionsCounter,
117    /// Metrics for the session manager.
118    metrics: SessionManagerMetrics,
119    /// The [`EthRlpxHandshake`] is used to perform the initial handshake with the peer.
120    handshake: Arc<dyn EthRlpxHandshake>,
121    /// Shared local range information that gets propagated to active sessions.
122    /// This represents the range of blocks that this node can serve to other peers.
123    local_range_info: BlockRangeInfo,
124}
125
126// === impl SessionManager ===
127
128impl<N: NetworkPrimitives> SessionManager<N> {
129    /// Creates a new empty [`SessionManager`].
130    #[expect(clippy::too_many_arguments)]
131    pub fn new(
132        secret_key: SecretKey,
133        config: SessionsConfig,
134        executor: Runtime,
135        status: UnifiedStatus,
136        hello_message: HelloMessageWithProtocols,
137        fork_filter: ForkFilter,
138        extra_protocols: RlpxSubProtocols,
139        handshake: Arc<dyn EthRlpxHandshake>,
140    ) -> Self {
141        let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
142        let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
143        let active_session_tx = PollSender::new(active_session_tx);
144
145        // Initialize local range info from the status
146        let local_range_info = BlockRangeInfo::new(
147            status.earliest_block.unwrap_or_default(),
148            status.latest_block.unwrap_or_default(),
149            status.blockhash,
150        );
151
152        Self {
153            next_id: 0,
154            counter: SessionCounter::new(config.limits),
155            initial_internal_request_timeout: config.initial_internal_request_timeout,
156            protocol_breach_request_timeout: config.protocol_breach_request_timeout,
157            pending_session_timeout: config.pending_session_timeout,
158            secret_key,
159            status,
160            hello_message,
161            fork_filter,
162            session_command_buffer: config.session_command_buffer,
163            executor,
164            pending_sessions: Default::default(),
165            active_sessions: Default::default(),
166            pending_sessions_tx,
167            pending_session_rx: ReceiverStream::new(pending_sessions_rx),
168            active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
169            active_session_rx: ReceiverStream::new(active_session_rx),
170            extra_protocols,
171            disconnections_counter: Default::default(),
172            metrics: Default::default(),
173            handshake,
174            local_range_info,
175        }
176    }
177
178    /// Returns the currently tracked [`ForkId`].
179    pub(crate) const fn fork_id(&self) -> ForkId {
180        self.fork_filter.current()
181    }
182
183    /// Check whether the provided [`ForkId`] is compatible based on the validation rules in
184    /// `EIP-2124`.
185    pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
186        self.fork_filter.validate(fork_id).is_ok()
187    }
188
189    /// Returns the next unique [`SessionId`].
190    const fn next_id(&mut self) -> SessionId {
191        let id = self.next_id;
192        self.next_id += 1;
193        SessionId(id)
194    }
195
196    /// Returns the current status of the session.
197    pub const fn status(&self) -> UnifiedStatus {
198        self.status
199    }
200
201    /// Returns the secret key used for authenticating sessions.
202    pub const fn secret_key(&self) -> SecretKey {
203        self.secret_key
204    }
205
206    /// Returns a borrowed reference to the active sessions.
207    pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
208        &self.active_sessions
209    }
210
211    /// Returns the session hello message.
212    pub fn hello_message(&self) -> HelloMessageWithProtocols {
213        self.hello_message.clone()
214    }
215
216    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
217    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
218        self.extra_protocols.push(protocol)
219    }
220
221    /// Returns the number of currently pending connections.
222    #[inline]
223    pub(crate) fn num_pending_connections(&self) -> usize {
224        self.pending_sessions.len()
225    }
226
227    /// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
228    /// [`JoinSet`](tokio::task::JoinSet).
229    fn spawn<F>(&self, f: F)
230    where
231        F: Future<Output = ()> + Send + 'static,
232    {
233        self.executor.spawn_task(f);
234    }
235
236    /// Invoked on a received status update.
237    ///
238    /// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
239    /// active [`ForkId`]. See also [`ForkFilter::set_head`].
240    pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
241        self.status.blockhash = head.hash;
242        self.status.total_difficulty = Some(head.total_difficulty);
243        let transition = self.fork_filter.set_head(head);
244        self.status.forkid = self.fork_filter.current();
245        self.status.latest_block = Some(head.number);
246
247        transition
248    }
249
250    /// An incoming TCP connection was received. This starts the authentication process to turn this
251    /// stream into an active peer session.
252    ///
253    /// Returns an error if the configured limit has been reached.
254    pub(crate) fn on_incoming(
255        &mut self,
256        stream: TcpStream,
257        remote_addr: SocketAddr,
258    ) -> Result<SessionId, ExceedsSessionLimit> {
259        self.counter.ensure_pending_inbound()?;
260
261        let session_id = self.next_id();
262
263        trace!(
264            target: "net::session",
265            ?remote_addr,
266            ?session_id,
267            "new pending incoming session"
268        );
269
270        let (disconnect_tx, disconnect_rx) = oneshot::channel();
271        let pending_events = self.pending_sessions_tx.clone();
272        let secret_key = self.secret_key;
273        let hello_message = self.hello_message.clone();
274        let status = self.status;
275        let fork_filter = self.fork_filter.clone();
276        let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
277        self.spawn(pending_session_with_timeout(
278            self.pending_session_timeout,
279            session_id,
280            remote_addr,
281            Direction::Incoming,
282            pending_events.clone(),
283            start_pending_incoming_session(
284                self.handshake.clone(),
285                disconnect_rx,
286                session_id,
287                stream,
288                pending_events,
289                remote_addr,
290                secret_key,
291                hello_message,
292                status,
293                fork_filter,
294                extra_handlers,
295            ),
296        ));
297
298        let handle = PendingSessionHandle {
299            disconnect_tx: Some(disconnect_tx),
300            direction: Direction::Incoming,
301        };
302        self.pending_sessions.insert(session_id, handle);
303        self.counter.inc_pending_inbound();
304        Ok(session_id)
305    }
306
307    /// Starts a new pending session from the local node to the given remote node.
308    pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
309        // The error can be dropped because no dial will be made if it would exceed the limit
310        if self.counter.ensure_pending_outbound().is_ok() {
311            let session_id = self.next_id();
312            let (disconnect_tx, disconnect_rx) = oneshot::channel();
313            let pending_events = self.pending_sessions_tx.clone();
314            let secret_key = self.secret_key;
315            let hello_message = self.hello_message.clone();
316            let fork_filter = self.fork_filter.clone();
317            let status = self.status;
318            let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
319            self.spawn(pending_session_with_timeout(
320                self.pending_session_timeout,
321                session_id,
322                remote_addr,
323                Direction::Outgoing(remote_peer_id),
324                pending_events.clone(),
325                start_pending_outbound_session(
326                    self.handshake.clone(),
327                    disconnect_rx,
328                    pending_events,
329                    session_id,
330                    remote_addr,
331                    remote_peer_id,
332                    secret_key,
333                    hello_message,
334                    status,
335                    fork_filter,
336                    extra_handlers,
337                ),
338            ));
339
340            let handle = PendingSessionHandle {
341                disconnect_tx: Some(disconnect_tx),
342                direction: Direction::Outgoing(remote_peer_id),
343            };
344            self.pending_sessions.insert(session_id, handle);
345            self.counter.inc_pending_outbound();
346        }
347    }
348
349    /// Initiates a shutdown of the channel.
350    ///
351    /// This will trigger the disconnect on the session task to gracefully terminate. The result
352    /// will be picked up by the receiver.
353    pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
354        if let Some(session) = self.active_sessions.get(&node) {
355            session.disconnect(reason);
356        }
357    }
358
359    /// Initiates a shutdown of all sessions.
360    ///
361    /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
362    /// will be picked by the receiver.
363    pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
364        for session in self.active_sessions.values() {
365            session.disconnect(reason);
366        }
367    }
368
369    /// Disconnects all pending sessions.
370    pub fn disconnect_all_pending(&mut self) {
371        for session in self.pending_sessions.values_mut() {
372            session.disconnect();
373        }
374    }
375
376    /// Sends a message to the peer's session.
377    ///
378    /// Broadcast messages use size-based backpressure: the total number of in-flight broadcast
379    /// items (across the command channel, overflow channel, and session outgoing queue) is tracked
380    /// by a shared atomic counter. If the bounded command channel is full but the broadcast limit
381    /// hasn't been reached, the message overflows to a dedicated unbounded channel.
382    pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
383        if let Some(session) = self.active_sessions.get(peer_id) &&
384            !session.commands.send_message(msg)
385        {
386            self.metrics.total_outgoing_peer_messages_dropped.increment(1);
387        }
388    }
389
390    /// Removes the [`PendingSessionHandle`] if it exists.
391    fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
392        let session = self.pending_sessions.remove(id)?;
393        self.counter.dec_pending(&session.direction);
394        Some(session)
395    }
396
397    /// Removes the [`PendingSessionHandle`] if it exists.
398    fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
399        let session = self.active_sessions.remove(id)?;
400        self.counter.dec_active(&session.direction);
401        Some(session)
402    }
403
404    /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
405    /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
406    /// simply drop the incoming connection.
407    pub(crate) fn try_disconnect_incoming_connection(
408        &self,
409        stream: TcpStream,
410        reason: DisconnectReason,
411    ) {
412        if !self.disconnections_counter.has_capacity() {
413            // drop the connection if we don't have capacity for gracefully disconnecting
414            return
415        }
416
417        let guard = self.disconnections_counter.clone();
418        let secret_key = self.secret_key;
419
420        self.spawn(async move {
421            trace!(
422                target: "net::session",
423                "gracefully disconnecting incoming connection"
424            );
425            if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
426                let mut unauth = UnauthedP2PStream::new(stream);
427                let _ = unauth.send_disconnect(reason).await;
428                drop(guard);
429            }
430        });
431    }
432
433    /// This polls all the session handles and returns [`SessionEvent`].
434    ///
435    /// Active sessions are prioritized.
436    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
437        // Poll events from active sessions
438        match self.active_session_rx.poll_next_unpin(cx) {
439            Poll::Pending => {}
440            Poll::Ready(None) => {
441                unreachable!("Manager holds both channel halves.")
442            }
443            Poll::Ready(Some(event)) => {
444                return match event {
445                    ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
446                        trace!(
447                            target: "net::session",
448                            ?peer_id,
449                            "gracefully disconnected active session."
450                        );
451                        self.remove_active_session(&peer_id);
452                        Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
453                    }
454                    ActiveSessionMessage::ClosedOnConnectionError {
455                        peer_id,
456                        remote_addr,
457                        error,
458                    } => {
459                        trace!(target: "net::session", ?peer_id, %error,"closed session.");
460                        self.remove_active_session(&peer_id);
461                        Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
462                            remote_addr,
463                            peer_id,
464                            error,
465                        })
466                    }
467                    ActiveSessionMessage::ValidMessage { peer_id, message } => {
468                        Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
469                    }
470                    ActiveSessionMessage::BadMessage { peer_id } => {
471                        Poll::Ready(SessionEvent::BadMessage { peer_id })
472                    }
473                    ActiveSessionMessage::ProtocolBreach { peer_id } => {
474                        Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
475                    }
476                }
477            }
478        }
479
480        // Poll the pending session event stream
481        let event = match self.pending_session_rx.poll_next_unpin(cx) {
482            Poll::Pending => return Poll::Pending,
483            Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
484            Poll::Ready(Some(event)) => event,
485        };
486        match event {
487            PendingSessionEvent::Established {
488                session_id,
489                remote_addr,
490                local_addr,
491                peer_id,
492                capabilities,
493                conn,
494                status,
495                direction,
496                client_id,
497            } => {
498                // move from pending to established.
499                self.remove_pending_session(&session_id);
500
501                // If there's already a session to the peer then we disconnect right away
502                if self.active_sessions.contains_key(&peer_id) {
503                    trace!(
504                        target: "net::session",
505                        ?session_id,
506                        ?remote_addr,
507                        ?peer_id,
508                        ?direction,
509                        "already connected"
510                    );
511
512                    self.spawn(async move {
513                        // send a disconnect message
514                        let _ =
515                            conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
516                    });
517
518                    return Poll::Ready(SessionEvent::AlreadyConnected {
519                        peer_id,
520                        remote_addr,
521                        direction,
522                    })
523                }
524
525                let (commands_tx, commands_rx) = mpsc::channel(self.session_command_buffer);
526                let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
527
528                let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
529
530                let messages = PeerRequestSender::new(peer_id, to_session_tx);
531
532                let timeout = Arc::new(AtomicU64::new(
533                    self.initial_internal_request_timeout.as_millis() as u64,
534                ));
535
536                // negotiated version
537                let version = conn.version();
538
539                // Configure the interval at which the range information is updated, starting with
540                // ETH69. We use interval_at to delay the first tick, avoiding sending
541                // BlockRangeUpdate immediately after connection (which can cause issues with
542                // peers that don't properly handle the message).
543                let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
544                    let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
545                    let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
546                    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
547                    interval
548                });
549
550                // Shared counter of in-flight broadcast items. The session task must decrement
551                // this when it pops messages from the outgoing queue, and the
552                // `SessionCommandSender` increments it before enqueuing. This invariant ensures
553                // the `SessionManager` always has an accurate view of total buffered broadcast
554                // pressure for a peer.
555                let broadcast_items = BroadcastItemCounter::new();
556
557                let session = ActiveSession {
558                    next_id: 0,
559                    remote_peer_id: peer_id,
560                    remote_addr,
561                    remote_capabilities: Arc::clone(&capabilities),
562                    session_id,
563                    commands_rx: ReceiverStream::new(commands_rx),
564                    unbounded_rx,
565                    unbounded_broadcast_msgs: self.metrics.total_unbounded_broadcast_msgs.clone(),
566                    to_session_manager: self.active_session_tx.clone(),
567                    pending_message_to_session: None,
568                    internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
569                    inflight_requests: Default::default(),
570                    conn,
571                    queued_outgoing: QueuedOutgoingMessages::new(
572                        self.metrics.queued_outgoing_messages.clone(),
573                        broadcast_items.clone(),
574                    ),
575                    received_requests_from_remote: Default::default(),
576                    internal_request_timeout_interval: tokio::time::interval(
577                        self.initial_internal_request_timeout,
578                    ),
579                    internal_request_timeout: Arc::clone(&timeout),
580                    protocol_breach_request_timeout: self.protocol_breach_request_timeout,
581                    terminate_message: None,
582                    range_info: None,
583                    local_range_info: self.local_range_info.clone(),
584                    range_update_interval,
585                    last_sent_latest_block: None,
586                };
587
588                self.spawn(session);
589
590                let client_version = client_id.into();
591                let handle = ActiveSessionHandle {
592                    status: status.clone(),
593                    direction,
594                    session_id,
595                    remote_id: peer_id,
596                    version,
597                    established: Instant::now(),
598                    capabilities: Arc::clone(&capabilities),
599                    commands: SessionCommandSender::new(commands_tx, unbounded_tx, broadcast_items),
600                    client_version: Arc::clone(&client_version),
601                    remote_addr,
602                    local_addr,
603                };
604
605                self.active_sessions.insert(peer_id, handle);
606                self.counter.inc_active(&direction);
607
608                if direction.is_outgoing() {
609                    self.metrics.total_dial_successes.increment(1);
610                }
611
612                Poll::Ready(SessionEvent::SessionEstablished {
613                    peer_id,
614                    remote_addr,
615                    client_version,
616                    version,
617                    capabilities,
618                    status,
619                    messages,
620                    direction,
621                    timeout,
622                    range_info: None,
623                })
624            }
625            PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
626                trace!(
627                    target: "net::session",
628                    ?session_id,
629                    ?remote_addr,
630                    ?error,
631                    "disconnected pending session"
632                );
633                self.remove_pending_session(&session_id);
634                match direction {
635                    Direction::Incoming => {
636                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
637                            remote_addr,
638                            error,
639                        })
640                    }
641                    Direction::Outgoing(peer_id) => {
642                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
643                            remote_addr,
644                            peer_id,
645                            error,
646                        })
647                    }
648                }
649            }
650            PendingSessionEvent::OutgoingConnectionError {
651                remote_addr,
652                session_id,
653                peer_id,
654                error,
655            } => {
656                trace!(
657                    target: "net::session",
658                    %error,
659                    ?session_id,
660                    ?remote_addr,
661                    ?peer_id,
662                    "connection refused"
663                );
664                self.remove_pending_session(&session_id);
665                Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
666            }
667            PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
668                trace!(
669                    target: "net::session",
670                    %error,
671                    ?session_id,
672                    ?remote_addr,
673                    "ecies auth failed"
674                );
675                self.remove_pending_session(&session_id);
676                match direction {
677                    Direction::Incoming => {
678                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
679                            remote_addr,
680                            error: Some(PendingSessionHandshakeError::Ecies(error)),
681                        })
682                    }
683                    Direction::Outgoing(peer_id) => {
684                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
685                            remote_addr,
686                            peer_id,
687                            error: Some(PendingSessionHandshakeError::Ecies(error)),
688                        })
689                    }
690                }
691            }
692        }
693    }
694
695    /// Updates the advertised block range that this node can serve to other peers starting with
696    /// Eth69.
697    ///
698    /// This method updates both the local status message that gets sent to peers during handshake
699    /// and the shared local range information that gets propagated to active sessions (Eth69).
700    /// The range information is used in ETH69 protocol where peers announce the range of blocks
701    /// they can serve to optimize data synchronization.
702    pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
703        self.status.earliest_block = Some(block_range_update.earliest);
704        self.status.latest_block = Some(block_range_update.latest);
705        self.status.blockhash = block_range_update.latest_hash;
706
707        // Update the shared local range info that gets propagated to active sessions
708        self.local_range_info.update(
709            block_range_update.earliest,
710            block_range_update.latest,
711            block_range_update.latest_hash,
712        );
713    }
714}
715
716/// A counter for ongoing graceful disconnections attempts.
717#[derive(Default, Debug, Clone)]
718struct DisconnectionsCounter(Arc<()>);
719
720impl DisconnectionsCounter {
721    const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
722
723    /// Returns true if the [`DisconnectionsCounter`] still has capacity
724    /// for an additional graceful disconnection.
725    fn has_capacity(&self) -> bool {
726        Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
727    }
728}
729
730/// Events produced by the [`SessionManager`]
731#[derive(Debug)]
732pub enum SessionEvent<N: NetworkPrimitives> {
733    /// A new session was successfully authenticated.
734    ///
735    /// This session is now able to exchange data.
736    SessionEstablished {
737        /// The remote node's public key
738        peer_id: PeerId,
739        /// The remote node's socket address
740        remote_addr: SocketAddr,
741        /// The user agent of the remote node, usually containing the client name and version
742        client_version: Arc<str>,
743        /// The capabilities the remote node has announced
744        capabilities: Arc<Capabilities>,
745        /// negotiated eth version
746        version: EthVersion,
747        /// The Status message the peer sent during the `eth` handshake
748        status: Arc<UnifiedStatus>,
749        /// The channel for sending messages to the peer with the session
750        messages: PeerRequestSender<PeerRequest<N>>,
751        /// The direction of the session, either `Inbound` or `Outgoing`
752        direction: Direction,
753        /// The maximum time that the session waits for a response from the peer before timing out
754        /// the connection
755        timeout: Arc<AtomicU64>,
756        /// The range info for the peer.
757        range_info: Option<BlockRangeInfo>,
758    },
759    /// The peer was already connected with another session.
760    AlreadyConnected {
761        /// The remote node's public key
762        peer_id: PeerId,
763        /// The remote node's socket address
764        remote_addr: SocketAddr,
765        /// The direction of the session, either `Inbound` or `Outgoing`
766        direction: Direction,
767    },
768    /// A session received a valid message via `RLPx`.
769    ValidMessage {
770        /// The remote node's public key
771        peer_id: PeerId,
772        /// Message received from the peer.
773        message: PeerMessage<N>,
774    },
775    /// Received a bad message from the peer.
776    BadMessage {
777        /// Identifier of the remote peer.
778        peer_id: PeerId,
779    },
780    /// Remote peer is considered in protocol violation
781    ProtocolBreach {
782        /// Identifier of the remote peer.
783        peer_id: PeerId,
784    },
785    /// Closed an incoming pending session during handshaking.
786    IncomingPendingSessionClosed {
787        /// The remote node's socket address
788        remote_addr: SocketAddr,
789        /// The pending handshake session error that caused the session to close
790        error: Option<PendingSessionHandshakeError>,
791    },
792    /// Closed an outgoing pending session during handshaking.
793    OutgoingPendingSessionClosed {
794        /// The remote node's socket address
795        remote_addr: SocketAddr,
796        /// The remote node's public key
797        peer_id: PeerId,
798        /// The pending handshake session error that caused the session to close
799        error: Option<PendingSessionHandshakeError>,
800    },
801    /// Failed to establish a tcp stream
802    OutgoingConnectionError {
803        /// The remote node's socket address
804        remote_addr: SocketAddr,
805        /// The remote node's public key
806        peer_id: PeerId,
807        /// The error that caused the outgoing connection to fail
808        error: io::Error,
809    },
810    /// Session was closed due to an error
811    SessionClosedOnConnectionError {
812        /// The id of the remote peer.
813        peer_id: PeerId,
814        /// The socket we were connected to.
815        remote_addr: SocketAddr,
816        /// The error that caused the session to close
817        error: EthStreamError,
818    },
819    /// Active session was gracefully disconnected.
820    Disconnected {
821        /// The remote node's public key
822        peer_id: PeerId,
823        /// The remote node's socket address that we were connected to
824        remote_addr: SocketAddr,
825    },
826}
827
828/// Errors that can occur during handshaking/authenticating the underlying streams.
829#[derive(Debug, thiserror::Error)]
830pub enum PendingSessionHandshakeError {
831    /// The pending session failed due to an error while establishing the `eth` stream
832    #[error(transparent)]
833    Eth(EthStreamError),
834    /// The pending session failed due to an error while establishing the ECIES stream
835    #[error(transparent)]
836    Ecies(ECIESError),
837    /// Thrown when the authentication timed out
838    #[error("authentication timed out")]
839    Timeout,
840    /// Thrown when the remote lacks the required capability
841    #[error("Mandatory extra capability unsupported")]
842    UnsupportedExtraCapability,
843}
844
845impl PendingSessionHandshakeError {
846    /// Returns the [`DisconnectReason`] if the error is a disconnect message
847    pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
848        match self {
849            Self::Eth(eth_err) => eth_err.as_disconnected(),
850            _ => None,
851        }
852    }
853}
854
855/// The error thrown when the max configured limit has been reached and no more connections are
856/// accepted.
857#[derive(Debug, Clone, thiserror::Error)]
858#[error("session limit reached {0}")]
859pub struct ExceedsSessionLimit(pub(crate) u32);
860
861/// Starts a pending session authentication with a timeout.
862pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
863    timeout: Duration,
864    session_id: SessionId,
865    remote_addr: SocketAddr,
866    direction: Direction,
867    events: mpsc::Sender<PendingSessionEvent<N>>,
868    f: F,
869) where
870    F: Future<Output = ()>,
871{
872    if tokio::time::timeout(timeout, f).await.is_err() {
873        trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
874        let event = PendingSessionEvent::Disconnected {
875            remote_addr,
876            session_id,
877            direction,
878            error: Some(PendingSessionHandshakeError::Timeout),
879        };
880        let _ = events.send(event).await;
881    }
882}
883
884/// Starts the authentication process for a connection initiated by a remote peer.
885///
886/// This will wait for the _incoming_ handshake request and answer it.
887#[expect(clippy::too_many_arguments)]
888pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
889    handshake: Arc<dyn EthRlpxHandshake>,
890    disconnect_rx: oneshot::Receiver<()>,
891    session_id: SessionId,
892    stream: TcpStream,
893    events: mpsc::Sender<PendingSessionEvent<N>>,
894    remote_addr: SocketAddr,
895    secret_key: SecretKey,
896    hello: HelloMessageWithProtocols,
897    status: UnifiedStatus,
898    fork_filter: ForkFilter,
899    extra_handlers: RlpxSubProtocolHandlers,
900) {
901    authenticate(
902        handshake,
903        disconnect_rx,
904        events,
905        stream,
906        session_id,
907        remote_addr,
908        secret_key,
909        Direction::Incoming,
910        hello,
911        status,
912        fork_filter,
913        extra_handlers,
914    )
915    .await
916}
917
918/// Starts the authentication process for a connection initiated by a remote peer.
919#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
920#[expect(clippy::too_many_arguments)]
921async fn start_pending_outbound_session<N: NetworkPrimitives>(
922    handshake: Arc<dyn EthRlpxHandshake>,
923    disconnect_rx: oneshot::Receiver<()>,
924    events: mpsc::Sender<PendingSessionEvent<N>>,
925    session_id: SessionId,
926    remote_addr: SocketAddr,
927    remote_peer_id: PeerId,
928    secret_key: SecretKey,
929    hello: HelloMessageWithProtocols,
930    status: UnifiedStatus,
931    fork_filter: ForkFilter,
932    extra_handlers: RlpxSubProtocolHandlers,
933) {
934    let stream = match TcpStream::connect(remote_addr).await {
935        Ok(stream) => {
936            if let Err(err) = stream.set_nodelay(true) {
937                tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
938            }
939            stream
940        }
941        Err(error) => {
942            let _ = events
943                .send(PendingSessionEvent::OutgoingConnectionError {
944                    remote_addr,
945                    session_id,
946                    peer_id: remote_peer_id,
947                    error,
948                })
949                .await;
950            return
951        }
952    };
953    authenticate(
954        handshake,
955        disconnect_rx,
956        events,
957        stream,
958        session_id,
959        remote_addr,
960        secret_key,
961        Direction::Outgoing(remote_peer_id),
962        hello,
963        status,
964        fork_filter,
965        extra_handlers,
966    )
967    .await
968}
969
970/// Authenticates a session
971#[expect(clippy::too_many_arguments)]
972async fn authenticate<N: NetworkPrimitives>(
973    handshake: Arc<dyn EthRlpxHandshake>,
974    disconnect_rx: oneshot::Receiver<()>,
975    events: mpsc::Sender<PendingSessionEvent<N>>,
976    stream: TcpStream,
977    session_id: SessionId,
978    remote_addr: SocketAddr,
979    secret_key: SecretKey,
980    direction: Direction,
981    hello: HelloMessageWithProtocols,
982    status: UnifiedStatus,
983    fork_filter: ForkFilter,
984    extra_handlers: RlpxSubProtocolHandlers,
985) {
986    let local_addr = stream.local_addr().ok();
987    let stream = match get_ecies_stream(stream, secret_key, direction).await {
988        Ok(stream) => stream,
989        Err(error) => {
990            let _ = events
991                .send(PendingSessionEvent::EciesAuthError {
992                    remote_addr,
993                    session_id,
994                    error,
995                    direction,
996                })
997                .await;
998            return
999        }
1000    };
1001
1002    let unauthed = UnauthedP2PStream::new(stream);
1003
1004    let auth = authenticate_stream(
1005        handshake,
1006        unauthed,
1007        session_id,
1008        remote_addr,
1009        local_addr,
1010        direction,
1011        hello,
1012        status,
1013        fork_filter,
1014        extra_handlers,
1015    )
1016    .boxed();
1017
1018    match futures::future::select(disconnect_rx, auth).await {
1019        Either::Left((_, _)) => {
1020            let _ = events
1021                .send(PendingSessionEvent::Disconnected {
1022                    remote_addr,
1023                    session_id,
1024                    direction,
1025                    error: None,
1026                })
1027                .await;
1028        }
1029        Either::Right((res, _)) => {
1030            let _ = events.send(res).await;
1031        }
1032    }
1033}
1034
1035/// Returns an [`ECIESStream`] if it can be built. If not, send a
1036/// [`PendingSessionEvent::EciesAuthError`] and returns `None`
1037async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1038    stream: Io,
1039    secret_key: SecretKey,
1040    direction: Direction,
1041) -> Result<ECIESStream<Io>, ECIESError> {
1042    match direction {
1043        Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1044        Direction::Outgoing(remote_peer_id) => {
1045            ECIESStream::connect(stream, secret_key, remote_peer_id).await
1046        }
1047    }
1048}
1049
1050/// Authenticate the stream via handshake
1051///
1052/// On Success return the authenticated stream as [`PendingSessionEvent`].
1053///
1054/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
1055/// also negotiate the additional protocols.
1056#[expect(clippy::too_many_arguments)]
1057async fn authenticate_stream<N: NetworkPrimitives>(
1058    handshake: Arc<dyn EthRlpxHandshake>,
1059    stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1060    session_id: SessionId,
1061    remote_addr: SocketAddr,
1062    local_addr: Option<SocketAddr>,
1063    direction: Direction,
1064    mut hello: HelloMessageWithProtocols,
1065    mut status: UnifiedStatus,
1066    fork_filter: ForkFilter,
1067    mut extra_handlers: RlpxSubProtocolHandlers,
1068) -> PendingSessionEvent<N> {
1069    // Add extra protocols to the hello message
1070    extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1071
1072    // conduct the p2p rlpx handshake and return the rlpx authenticated stream
1073    let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1074        Ok(stream_res) => stream_res,
1075        Err(err) => {
1076            return PendingSessionEvent::Disconnected {
1077                remote_addr,
1078                session_id,
1079                direction,
1080                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1081            }
1082        }
1083    };
1084
1085    // if we have extra handlers, check if it must be supported by the remote
1086    if !extra_handlers.is_empty() {
1087        // ensure that no extra handlers that aren't supported are not mandatory
1088        while let Some(pos) = extra_handlers.iter().position(|handler| {
1089            p2p_stream
1090                .shared_capabilities()
1091                .ensure_matching_capability(&handler.protocol().cap)
1092                .is_err()
1093        }) {
1094            let handler = extra_handlers.remove(pos);
1095            if handler.on_unsupported_by_peer(
1096                p2p_stream.shared_capabilities(),
1097                direction,
1098                their_hello.id,
1099            ) == OnNotSupported::Disconnect
1100            {
1101                return PendingSessionEvent::Disconnected {
1102                    remote_addr,
1103                    session_id,
1104                    direction,
1105                    error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1106                };
1107            }
1108        }
1109    }
1110
1111    // Ensure we negotiated mandatory eth protocol
1112    let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1113        Ok(version) => version,
1114        Err(err) => {
1115            return PendingSessionEvent::Disconnected {
1116                remote_addr,
1117                session_id,
1118                direction,
1119                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1120            }
1121        }
1122    };
1123
1124    // Before trying status handshake, set up the version to negotiated shared version
1125    status.set_eth_version(eth_version);
1126
1127    let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1128        // if the shared caps are 1, we know both support the eth version
1129        // if the hello handshake was successful we can try status handshake
1130
1131        // perform the eth protocol handshake
1132        match handshake
1133            .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1134            .await
1135        {
1136            Ok(their_status) => {
1137                let eth_stream = EthStream::new(eth_version, p2p_stream);
1138                (eth_stream.into(), their_status)
1139            }
1140            Err(err) => {
1141                return PendingSessionEvent::Disconnected {
1142                    remote_addr,
1143                    session_id,
1144                    direction,
1145                    error: Some(PendingSessionHandshakeError::Eth(err)),
1146                }
1147            }
1148        }
1149    } else {
1150        // Multiplex the stream with the extra protocols
1151        let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1152
1153        // install additional handlers
1154        for handler in extra_handlers.into_iter() {
1155            let cap = handler.protocol().cap;
1156            let remote_peer_id = their_hello.id;
1157
1158            multiplex_stream
1159                .install_protocol(&cap, move |conn| {
1160                    handler.into_connection(direction, remote_peer_id, conn)
1161                })
1162                .ok();
1163        }
1164
1165        let (multiplex_stream, their_status) = match multiplex_stream
1166            .into_eth_satellite_stream(status, fork_filter, handshake)
1167            .await
1168        {
1169            Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1170            Err(err) => {
1171                return PendingSessionEvent::Disconnected {
1172                    remote_addr,
1173                    session_id,
1174                    direction,
1175                    error: Some(PendingSessionHandshakeError::Eth(err)),
1176                }
1177            }
1178        };
1179
1180        (multiplex_stream.into(), their_status)
1181    };
1182
1183    PendingSessionEvent::Established {
1184        session_id,
1185        remote_addr,
1186        local_addr,
1187        peer_id: their_hello.id,
1188        capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1189        status: Arc::new(their_status),
1190        conn,
1191        direction,
1192        client_id: their_hello.client_version,
1193    }
1194}