Skip to main content

reth_network/session/
mod.rs

1//! Support for handling peer sessions.
2
3mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11    message::PeerMessage,
12    metrics::SessionManagerMetrics,
13    protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14    session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21    errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22    BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23    HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24    HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35    collections::HashMap,
36    future::Future,
37    net::SocketAddr,
38    sync::{atomic::AtomicU64, Arc},
39    task::{Context, Poll},
40    time::{Duration, Instant},
41};
42use tokio::{
43    io::{AsyncRead, AsyncWrite},
44    net::TcpStream,
45    sync::{mpsc, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{instrument, trace};
50
51use crate::session::active::{BroadcastItemCounter, RANGE_UPDATE_INTERVAL};
52pub use conn::EthRlpxConnection;
53use handle::SessionCommandSender;
54pub use handle::{
55    ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
56    SessionCommand,
57};
58pub use reth_network_api::{Direction, PeerInfo};
59
60/// Internal identifier for active sessions.
61#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
62pub struct SessionId(usize);
63
64/// Manages a set of sessions.
65#[must_use = "Session Manager must be polled to process session events."]
66#[derive(Debug)]
67pub struct SessionManager<N: NetworkPrimitives> {
68    /// Tracks the identifier for the next session.
69    next_id: usize,
70    /// Keeps track of all sessions
71    counter: SessionCounter,
72    ///  The maximum initial time an [`ActiveSession`] waits for a response from the peer before it
73    /// responds to an _internal_ request with a `TimeoutError`
74    initial_internal_request_timeout: Duration,
75    /// If an [`ActiveSession`] does not receive a response at all within this duration then it is
76    /// considered a protocol violation and the session will initiate a drop.
77    protocol_breach_request_timeout: Duration,
78    /// The timeout after which a pending session attempt is considered failed.
79    pending_session_timeout: Duration,
80    /// The secret key used for authenticating sessions.
81    secret_key: SecretKey,
82    /// The `Status` message to send to peers.
83    status: UnifiedStatus,
84    /// The `HelloMessage` message to send to peers.
85    hello_message: HelloMessageWithProtocols,
86    /// The [`ForkFilter`] used to validate the peer's `Status` message.
87    fork_filter: ForkFilter,
88    /// Size of the command buffer per session.
89    session_command_buffer: usize,
90    /// The executor for spawned tasks.
91    executor: Runtime,
92    /// All pending session that are currently handshaking, exchanging `Hello`s.
93    ///
94    /// Events produced during the authentication phase are reported to this manager. Once the
95    /// session is authenticated, it can be moved to the `active_session` set.
96    pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
97    /// All active sessions that are ready to exchange messages.
98    active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
99    /// The original Sender half of the [`PendingSessionEvent`] channel.
100    ///
101    /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
102    /// get a clone of this sender half.
103    pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
104    /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
105    pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
106    /// The original Sender half of the [`ActiveSessionMessage`] channel.
107    ///
108    /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
109    /// clone of this sender half.
110    active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
111    /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
112    active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
113    /// Additional `RLPx` sub-protocols to be used by the session manager.
114    extra_protocols: RlpxSubProtocols,
115    /// Tracks the ongoing graceful disconnections attempts for incoming connections.
116    disconnections_counter: DisconnectionsCounter,
117    /// Metrics for the session manager.
118    metrics: SessionManagerMetrics,
119    /// The [`EthRlpxHandshake`] is used to perform the initial handshake with the peer.
120    handshake: Arc<dyn EthRlpxHandshake>,
121    /// Maximum allowed ETH message size for post-handshake ETH/Snap streams.
122    eth_max_message_size: usize,
123    /// Shared local range information that gets propagated to active sessions.
124    /// This represents the range of blocks that this node can serve to other peers.
125    local_range_info: BlockRangeInfo,
126}
127
128// === impl SessionManager ===
129
130impl<N: NetworkPrimitives> SessionManager<N> {
131    /// Creates a new empty [`SessionManager`].
132    #[expect(clippy::too_many_arguments)]
133    pub fn new(
134        secret_key: SecretKey,
135        config: SessionsConfig,
136        executor: Runtime,
137        status: UnifiedStatus,
138        hello_message: HelloMessageWithProtocols,
139        fork_filter: ForkFilter,
140        extra_protocols: RlpxSubProtocols,
141        handshake: Arc<dyn EthRlpxHandshake>,
142        eth_max_message_size: usize,
143    ) -> Self {
144        let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
145        let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
146        let active_session_tx = PollSender::new(active_session_tx);
147
148        // Initialize local range info from the status
149        let local_range_info = BlockRangeInfo::new(
150            status.earliest_block.unwrap_or_default(),
151            status.latest_block.unwrap_or_default(),
152            status.blockhash,
153        );
154
155        Self {
156            next_id: 0,
157            counter: SessionCounter::new(config.limits),
158            initial_internal_request_timeout: config.initial_internal_request_timeout,
159            protocol_breach_request_timeout: config.protocol_breach_request_timeout,
160            pending_session_timeout: config.pending_session_timeout,
161            secret_key,
162            status,
163            hello_message,
164            fork_filter,
165            session_command_buffer: config.session_command_buffer,
166            executor,
167            pending_sessions: Default::default(),
168            active_sessions: Default::default(),
169            pending_sessions_tx,
170            pending_session_rx: ReceiverStream::new(pending_sessions_rx),
171            active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
172            active_session_rx: ReceiverStream::new(active_session_rx),
173            extra_protocols,
174            disconnections_counter: Default::default(),
175            metrics: Default::default(),
176            handshake,
177            eth_max_message_size,
178            local_range_info,
179        }
180    }
181
182    /// Returns the currently tracked [`ForkId`].
183    pub(crate) const fn fork_id(&self) -> ForkId {
184        self.fork_filter.current()
185    }
186
187    /// Check whether the provided [`ForkId`] is compatible based on the validation rules in
188    /// `EIP-2124`.
189    pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
190        self.fork_filter.validate(fork_id).is_ok()
191    }
192
193    /// Returns the next unique [`SessionId`].
194    const fn next_id(&mut self) -> SessionId {
195        let id = self.next_id;
196        self.next_id += 1;
197        SessionId(id)
198    }
199
200    /// Returns the current status of the session.
201    pub const fn status(&self) -> UnifiedStatus {
202        self.status
203    }
204
205    /// Returns the secret key used for authenticating sessions.
206    pub const fn secret_key(&self) -> SecretKey {
207        self.secret_key
208    }
209
210    /// Returns a borrowed reference to the active sessions.
211    pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
212        &self.active_sessions
213    }
214
215    /// Returns the session hello message.
216    pub fn hello_message(&self) -> HelloMessageWithProtocols {
217        self.hello_message.clone()
218    }
219
220    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
221    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
222        self.extra_protocols.push(protocol)
223    }
224
225    /// Returns the number of currently pending connections.
226    #[inline]
227    pub(crate) fn num_pending_connections(&self) -> usize {
228        self.pending_sessions.len()
229    }
230
231    /// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
232    /// [`JoinSet`](tokio::task::JoinSet).
233    fn spawn<F>(&self, f: F)
234    where
235        F: Future<Output = ()> + Send + 'static,
236    {
237        self.executor.spawn_task(f);
238    }
239
240    /// Invoked on a received status update.
241    ///
242    /// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
243    /// active [`ForkId`]. See also [`ForkFilter::set_head`].
244    pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
245        self.status.blockhash = head.hash;
246        self.status.total_difficulty = Some(head.total_difficulty);
247        let transition = self.fork_filter.set_head(head);
248        self.status.forkid = self.fork_filter.current();
249        self.status.latest_block = Some(head.number);
250
251        transition
252    }
253
254    /// An incoming TCP connection was received. This starts the authentication process to turn this
255    /// stream into an active peer session.
256    ///
257    /// Returns an error if the configured limit has been reached.
258    pub(crate) fn on_incoming(
259        &mut self,
260        stream: TcpStream,
261        remote_addr: SocketAddr,
262    ) -> Result<SessionId, ExceedsSessionLimit> {
263        self.counter.ensure_pending_inbound()?;
264
265        let session_id = self.next_id();
266
267        trace!(
268            target: "net::session",
269            ?remote_addr,
270            ?session_id,
271            "new pending incoming session"
272        );
273
274        let (disconnect_tx, disconnect_rx) = oneshot::channel();
275        let pending_events = self.pending_sessions_tx.clone();
276        let secret_key = self.secret_key;
277        let hello_message = self.hello_message.clone();
278        let status = self.status;
279        let fork_filter = self.fork_filter.clone();
280        let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
281        self.spawn(pending_session_with_timeout(
282            self.pending_session_timeout,
283            session_id,
284            remote_addr,
285            Direction::Incoming,
286            pending_events.clone(),
287            start_pending_incoming_session(
288                self.handshake.clone(),
289                self.eth_max_message_size,
290                disconnect_rx,
291                session_id,
292                stream,
293                pending_events,
294                remote_addr,
295                secret_key,
296                hello_message,
297                status,
298                fork_filter,
299                extra_handlers,
300            ),
301        ));
302
303        let handle = PendingSessionHandle {
304            disconnect_tx: Some(disconnect_tx),
305            direction: Direction::Incoming,
306        };
307        self.pending_sessions.insert(session_id, handle);
308        self.counter.inc_pending_inbound();
309        Ok(session_id)
310    }
311
312    /// Starts a new pending session from the local node to the given remote node.
313    pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
314        // The error can be dropped because no dial will be made if it would exceed the limit
315        if self.counter.ensure_pending_outbound().is_ok() {
316            let session_id = self.next_id();
317            let (disconnect_tx, disconnect_rx) = oneshot::channel();
318            let pending_events = self.pending_sessions_tx.clone();
319            let secret_key = self.secret_key;
320            let hello_message = self.hello_message.clone();
321            let fork_filter = self.fork_filter.clone();
322            let status = self.status;
323            let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
324            self.spawn(pending_session_with_timeout(
325                self.pending_session_timeout,
326                session_id,
327                remote_addr,
328                Direction::Outgoing(remote_peer_id),
329                pending_events.clone(),
330                start_pending_outbound_session(
331                    self.handshake.clone(),
332                    self.eth_max_message_size,
333                    disconnect_rx,
334                    pending_events,
335                    session_id,
336                    remote_addr,
337                    remote_peer_id,
338                    secret_key,
339                    hello_message,
340                    status,
341                    fork_filter,
342                    extra_handlers,
343                ),
344            ));
345
346            let handle = PendingSessionHandle {
347                disconnect_tx: Some(disconnect_tx),
348                direction: Direction::Outgoing(remote_peer_id),
349            };
350            self.pending_sessions.insert(session_id, handle);
351            self.counter.inc_pending_outbound();
352        }
353    }
354
355    /// Initiates a shutdown of the channel.
356    ///
357    /// This will trigger the disconnect on the session task to gracefully terminate. The result
358    /// will be picked up by the receiver.
359    pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
360        if let Some(session) = self.active_sessions.get(&node) {
361            session.disconnect(reason);
362        }
363    }
364
365    /// Initiates a shutdown of all sessions.
366    ///
367    /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
368    /// will be picked by the receiver.
369    pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
370        for session in self.active_sessions.values() {
371            session.disconnect(reason);
372        }
373    }
374
375    /// Disconnects all pending sessions.
376    pub fn disconnect_all_pending(&mut self) {
377        for session in self.pending_sessions.values_mut() {
378            session.disconnect();
379        }
380    }
381
382    /// Sends a message to the peer's session.
383    ///
384    /// Broadcast messages use size-based backpressure: the total number of in-flight broadcast
385    /// items (across the command channel, overflow channel, and session outgoing queue) is tracked
386    /// by a shared atomic counter. If the bounded command channel is full but the broadcast limit
387    /// hasn't been reached, the message overflows to a dedicated unbounded channel.
388    pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
389        if let Some(session) = self.active_sessions.get(peer_id) &&
390            !session.commands.send_message(msg)
391        {
392            self.metrics.total_outgoing_peer_messages_dropped.increment(1);
393        }
394    }
395
396    /// Removes the [`PendingSessionHandle`] if it exists.
397    fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
398        let session = self.pending_sessions.remove(id)?;
399        self.counter.dec_pending(&session.direction);
400        Some(session)
401    }
402
403    /// Removes the [`PendingSessionHandle`] if it exists.
404    fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
405        let session = self.active_sessions.remove(id)?;
406        self.counter.dec_active(&session.direction);
407        Some(session)
408    }
409
410    /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
411    /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
412    /// simply drop the incoming connection.
413    pub(crate) fn try_disconnect_incoming_connection(
414        &self,
415        stream: TcpStream,
416        reason: DisconnectReason,
417    ) {
418        if !self.disconnections_counter.has_capacity() {
419            // drop the connection if we don't have capacity for gracefully disconnecting
420            return
421        }
422
423        let guard = self.disconnections_counter.clone();
424        let secret_key = self.secret_key;
425
426        self.spawn(async move {
427            trace!(
428                target: "net::session",
429                "gracefully disconnecting incoming connection"
430            );
431            if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
432                let mut unauth = UnauthedP2PStream::new(stream);
433                let _ = unauth.send_disconnect(reason).await;
434                drop(guard);
435            }
436        });
437    }
438
439    /// This polls all the session handles and returns [`SessionEvent`].
440    ///
441    /// Active sessions are prioritized.
442    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
443        // Poll events from active sessions
444        match self.active_session_rx.poll_next_unpin(cx) {
445            Poll::Pending => {}
446            Poll::Ready(None) => {
447                unreachable!("Manager holds both channel halves.")
448            }
449            Poll::Ready(Some(event)) => {
450                return match event {
451                    ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
452                        trace!(
453                            target: "net::session",
454                            ?peer_id,
455                            "gracefully disconnected active session."
456                        );
457                        self.remove_active_session(&peer_id);
458                        Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
459                    }
460                    ActiveSessionMessage::ClosedOnConnectionError {
461                        peer_id,
462                        remote_addr,
463                        error,
464                    } => {
465                        trace!(target: "net::session", ?peer_id, %error,"closed session.");
466                        self.remove_active_session(&peer_id);
467                        Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
468                            remote_addr,
469                            peer_id,
470                            error,
471                        })
472                    }
473                    ActiveSessionMessage::ValidMessage { peer_id, message } => {
474                        Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
475                    }
476                    ActiveSessionMessage::BadMessage { peer_id } => {
477                        Poll::Ready(SessionEvent::BadMessage { peer_id })
478                    }
479                    ActiveSessionMessage::ProtocolBreach { peer_id } => {
480                        Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
481                    }
482                }
483            }
484        }
485
486        // Poll the pending session event stream
487        let event = match self.pending_session_rx.poll_next_unpin(cx) {
488            Poll::Pending => return Poll::Pending,
489            Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
490            Poll::Ready(Some(event)) => event,
491        };
492        match event {
493            PendingSessionEvent::Established {
494                session_id,
495                remote_addr,
496                local_addr,
497                peer_id,
498                capabilities,
499                conn,
500                status,
501                direction,
502                client_id,
503            } => {
504                // move from pending to established.
505                self.remove_pending_session(&session_id);
506
507                // If there's already a session to the peer then we disconnect right away
508                if self.active_sessions.contains_key(&peer_id) {
509                    trace!(
510                        target: "net::session",
511                        ?session_id,
512                        ?remote_addr,
513                        ?peer_id,
514                        ?direction,
515                        "already connected"
516                    );
517
518                    self.spawn(async move {
519                        // send a disconnect message
520                        let _ =
521                            conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
522                    });
523
524                    return Poll::Ready(SessionEvent::AlreadyConnected {
525                        peer_id,
526                        remote_addr,
527                        direction,
528                    })
529                }
530
531                let (commands_tx, commands_rx) = mpsc::channel(self.session_command_buffer);
532                let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
533
534                let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
535
536                let messages = PeerRequestSender::new(peer_id, to_session_tx);
537
538                let timeout = Arc::new(AtomicU64::new(
539                    self.initial_internal_request_timeout.as_millis() as u64,
540                ));
541
542                // negotiated version
543                let version = conn.version();
544
545                // Configure the interval at which the range information is updated, starting with
546                // ETH69. We use interval_at to delay the first tick, avoiding sending
547                // BlockRangeUpdate immediately after connection (which can cause issues with
548                // peers that don't properly handle the message).
549                let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
550                    let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
551                    let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
552                    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
553                    interval
554                });
555
556                // Shared counter of in-flight broadcast items. The session task must decrement
557                // this when it pops messages from the outgoing queue, and the
558                // `SessionCommandSender` increments it before enqueuing. This invariant ensures
559                // the `SessionManager` always has an accurate view of total buffered broadcast
560                // pressure for a peer.
561                let broadcast_items = BroadcastItemCounter::new();
562                let remote_range_info = status.block_range_update().map(|update| {
563                    BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash)
564                });
565
566                let session = ActiveSession {
567                    next_id: 0,
568                    remote_peer_id: peer_id,
569                    remote_addr,
570                    remote_capabilities: Arc::clone(&capabilities),
571                    session_id,
572                    commands_rx: ReceiverStream::new(commands_rx),
573                    unbounded_rx,
574                    unbounded_broadcast_msgs: self.metrics.total_unbounded_broadcast_msgs.clone(),
575                    to_session_manager: self.active_session_tx.clone(),
576                    pending_message_to_session: None,
577                    internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
578                    inflight_requests: Default::default(),
579                    conn,
580                    queued_outgoing: QueuedOutgoingMessages::new(
581                        self.metrics.queued_outgoing_messages.clone(),
582                        broadcast_items.clone(),
583                    ),
584                    received_requests_from_remote: Default::default(),
585                    internal_request_timeout_interval: tokio::time::interval(
586                        self.initial_internal_request_timeout,
587                    ),
588                    internal_request_timeout: Arc::clone(&timeout),
589                    protocol_breach_request_timeout: self.protocol_breach_request_timeout,
590                    terminate_message: None,
591                    range_info: remote_range_info.clone(),
592                    local_range_info: self.local_range_info.clone(),
593                    range_update_interval,
594                    last_sent_latest_block: None,
595                };
596
597                self.spawn(session);
598
599                let client_version = client_id.into();
600                let handle = ActiveSessionHandle {
601                    status: status.clone(),
602                    direction,
603                    session_id,
604                    remote_id: peer_id,
605                    version,
606                    established: Instant::now(),
607                    capabilities: Arc::clone(&capabilities),
608                    commands: SessionCommandSender::new(commands_tx, unbounded_tx, broadcast_items),
609                    client_version: Arc::clone(&client_version),
610                    remote_addr,
611                    local_addr,
612                };
613
614                self.active_sessions.insert(peer_id, handle);
615                self.counter.inc_active(&direction);
616
617                if direction.is_outgoing() {
618                    self.metrics.total_dial_successes.increment(1);
619                }
620
621                Poll::Ready(SessionEvent::SessionEstablished {
622                    peer_id,
623                    remote_addr,
624                    client_version,
625                    version,
626                    capabilities,
627                    status,
628                    messages,
629                    direction,
630                    timeout,
631                    range_info: remote_range_info,
632                })
633            }
634            PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
635                trace!(
636                    target: "net::session",
637                    ?session_id,
638                    ?remote_addr,
639                    ?error,
640                    "disconnected pending session"
641                );
642                self.remove_pending_session(&session_id);
643                match direction {
644                    Direction::Incoming => {
645                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
646                            remote_addr,
647                            error,
648                        })
649                    }
650                    Direction::Outgoing(peer_id) => {
651                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
652                            remote_addr,
653                            peer_id,
654                            error,
655                        })
656                    }
657                }
658            }
659            PendingSessionEvent::OutgoingConnectionError {
660                remote_addr,
661                session_id,
662                peer_id,
663                error,
664            } => {
665                trace!(
666                    target: "net::session",
667                    %error,
668                    ?session_id,
669                    ?remote_addr,
670                    ?peer_id,
671                    "connection refused"
672                );
673                self.remove_pending_session(&session_id);
674                Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
675            }
676            PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
677                trace!(
678                    target: "net::session",
679                    %error,
680                    ?session_id,
681                    ?remote_addr,
682                    "ecies auth failed"
683                );
684                self.remove_pending_session(&session_id);
685                match direction {
686                    Direction::Incoming => {
687                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
688                            remote_addr,
689                            error: Some(PendingSessionHandshakeError::Ecies(error)),
690                        })
691                    }
692                    Direction::Outgoing(peer_id) => {
693                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
694                            remote_addr,
695                            peer_id,
696                            error: Some(PendingSessionHandshakeError::Ecies(error)),
697                        })
698                    }
699                }
700            }
701        }
702    }
703
704    /// Updates the advertised block range that this node can serve to other peers starting with
705    /// Eth69.
706    ///
707    /// This method updates both the local status message that gets sent to peers during handshake
708    /// and the shared local range information that gets propagated to active sessions (Eth69).
709    /// The range information is used in ETH69 protocol where peers announce the range of blocks
710    /// they can serve to optimize data synchronization.
711    pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
712        self.status.earliest_block = Some(block_range_update.earliest);
713        self.status.latest_block = Some(block_range_update.latest);
714        self.status.blockhash = block_range_update.latest_hash;
715
716        // Update the shared local range info that gets propagated to active sessions
717        self.local_range_info.update(
718            block_range_update.earliest,
719            block_range_update.latest,
720            block_range_update.latest_hash,
721        );
722    }
723}
724
725/// A counter for ongoing graceful disconnections attempts.
726#[derive(Default, Debug, Clone)]
727struct DisconnectionsCounter(Arc<()>);
728
729impl DisconnectionsCounter {
730    const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
731
732    /// Returns true if the [`DisconnectionsCounter`] still has capacity
733    /// for an additional graceful disconnection.
734    fn has_capacity(&self) -> bool {
735        Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
736    }
737}
738
739/// Events produced by the [`SessionManager`]
740#[derive(Debug)]
741pub enum SessionEvent<N: NetworkPrimitives> {
742    /// A new session was successfully authenticated.
743    ///
744    /// This session is now able to exchange data.
745    SessionEstablished {
746        /// The remote node's public key
747        peer_id: PeerId,
748        /// The remote node's socket address
749        remote_addr: SocketAddr,
750        /// The user agent of the remote node, usually containing the client name and version
751        client_version: Arc<str>,
752        /// The capabilities the remote node has announced
753        capabilities: Arc<Capabilities>,
754        /// negotiated eth version
755        version: EthVersion,
756        /// The Status message the peer sent during the `eth` handshake
757        status: Arc<UnifiedStatus>,
758        /// The channel for sending messages to the peer with the session
759        messages: PeerRequestSender<PeerRequest<N>>,
760        /// The direction of the session, either `Inbound` or `Outgoing`
761        direction: Direction,
762        /// The maximum time that the session waits for a response from the peer before timing out
763        /// the connection
764        timeout: Arc<AtomicU64>,
765        /// The range info for the peer.
766        range_info: Option<BlockRangeInfo>,
767    },
768    /// The peer was already connected with another session.
769    AlreadyConnected {
770        /// The remote node's public key
771        peer_id: PeerId,
772        /// The remote node's socket address
773        remote_addr: SocketAddr,
774        /// The direction of the session, either `Inbound` or `Outgoing`
775        direction: Direction,
776    },
777    /// A session received a valid message via `RLPx`.
778    ValidMessage {
779        /// The remote node's public key
780        peer_id: PeerId,
781        /// Message received from the peer.
782        message: PeerMessage<N>,
783    },
784    /// Received a bad message from the peer.
785    BadMessage {
786        /// Identifier of the remote peer.
787        peer_id: PeerId,
788    },
789    /// Remote peer is considered in protocol violation
790    ProtocolBreach {
791        /// Identifier of the remote peer.
792        peer_id: PeerId,
793    },
794    /// Closed an incoming pending session during handshaking.
795    IncomingPendingSessionClosed {
796        /// The remote node's socket address
797        remote_addr: SocketAddr,
798        /// The pending handshake session error that caused the session to close
799        error: Option<PendingSessionHandshakeError>,
800    },
801    /// Closed an outgoing pending session during handshaking.
802    OutgoingPendingSessionClosed {
803        /// The remote node's socket address
804        remote_addr: SocketAddr,
805        /// The remote node's public key
806        peer_id: PeerId,
807        /// The pending handshake session error that caused the session to close
808        error: Option<PendingSessionHandshakeError>,
809    },
810    /// Failed to establish a tcp stream
811    OutgoingConnectionError {
812        /// The remote node's socket address
813        remote_addr: SocketAddr,
814        /// The remote node's public key
815        peer_id: PeerId,
816        /// The error that caused the outgoing connection to fail
817        error: io::Error,
818    },
819    /// Session was closed due to an error
820    SessionClosedOnConnectionError {
821        /// The id of the remote peer.
822        peer_id: PeerId,
823        /// The socket we were connected to.
824        remote_addr: SocketAddr,
825        /// The error that caused the session to close
826        error: EthStreamError,
827    },
828    /// Active session was gracefully disconnected.
829    Disconnected {
830        /// The remote node's public key
831        peer_id: PeerId,
832        /// The remote node's socket address that we were connected to
833        remote_addr: SocketAddr,
834    },
835}
836
837/// Errors that can occur during handshaking/authenticating the underlying streams.
838#[derive(Debug, thiserror::Error)]
839pub enum PendingSessionHandshakeError {
840    /// The pending session failed due to an error while establishing the `eth` stream
841    #[error(transparent)]
842    Eth(EthStreamError),
843    /// The pending session failed due to an error while establishing the ECIES stream
844    #[error(transparent)]
845    Ecies(ECIESError),
846    /// Thrown when the authentication timed out
847    #[error("authentication timed out")]
848    Timeout,
849    /// Thrown when the remote lacks the required capability
850    #[error("Mandatory extra capability unsupported")]
851    UnsupportedExtraCapability,
852}
853
854impl PendingSessionHandshakeError {
855    /// Returns the [`DisconnectReason`] if the error is a disconnect message
856    pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
857        match self {
858            Self::Eth(eth_err) => eth_err.as_disconnected(),
859            _ => None,
860        }
861    }
862}
863
864/// The error thrown when the max configured limit has been reached and no more connections are
865/// accepted.
866#[derive(Debug, Clone, thiserror::Error)]
867#[error("session limit reached {0}")]
868pub struct ExceedsSessionLimit(pub(crate) u32);
869
870/// Starts a pending session authentication with a timeout.
871pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
872    timeout: Duration,
873    session_id: SessionId,
874    remote_addr: SocketAddr,
875    direction: Direction,
876    events: mpsc::Sender<PendingSessionEvent<N>>,
877    f: F,
878) where
879    F: Future<Output = ()>,
880{
881    if tokio::time::timeout(timeout, f).await.is_err() {
882        trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
883        let event = PendingSessionEvent::Disconnected {
884            remote_addr,
885            session_id,
886            direction,
887            error: Some(PendingSessionHandshakeError::Timeout),
888        };
889        let _ = events.send(event).await;
890    }
891}
892
893/// Starts the authentication process for a connection initiated by a remote peer.
894///
895/// This will wait for the _incoming_ handshake request and answer it.
896#[expect(clippy::too_many_arguments)]
897pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
898    handshake: Arc<dyn EthRlpxHandshake>,
899    eth_max_message_size: usize,
900    disconnect_rx: oneshot::Receiver<()>,
901    session_id: SessionId,
902    stream: TcpStream,
903    events: mpsc::Sender<PendingSessionEvent<N>>,
904    remote_addr: SocketAddr,
905    secret_key: SecretKey,
906    hello: HelloMessageWithProtocols,
907    status: UnifiedStatus,
908    fork_filter: ForkFilter,
909    extra_handlers: RlpxSubProtocolHandlers,
910) {
911    authenticate(
912        handshake,
913        eth_max_message_size,
914        disconnect_rx,
915        events,
916        stream,
917        session_id,
918        remote_addr,
919        secret_key,
920        Direction::Incoming,
921        hello,
922        status,
923        fork_filter,
924        extra_handlers,
925    )
926    .await
927}
928
929/// Starts the authentication process for a connection initiated by a remote peer.
930#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
931#[expect(clippy::too_many_arguments)]
932async fn start_pending_outbound_session<N: NetworkPrimitives>(
933    handshake: Arc<dyn EthRlpxHandshake>,
934    eth_max_message_size: usize,
935    disconnect_rx: oneshot::Receiver<()>,
936    events: mpsc::Sender<PendingSessionEvent<N>>,
937    session_id: SessionId,
938    remote_addr: SocketAddr,
939    remote_peer_id: PeerId,
940    secret_key: SecretKey,
941    hello: HelloMessageWithProtocols,
942    status: UnifiedStatus,
943    fork_filter: ForkFilter,
944    extra_handlers: RlpxSubProtocolHandlers,
945) {
946    let stream = match TcpStream::connect(remote_addr).await {
947        Ok(stream) => {
948            if let Err(err) = stream.set_nodelay(true) {
949                tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
950            }
951            stream
952        }
953        Err(error) => {
954            let _ = events
955                .send(PendingSessionEvent::OutgoingConnectionError {
956                    remote_addr,
957                    session_id,
958                    peer_id: remote_peer_id,
959                    error,
960                })
961                .await;
962            return
963        }
964    };
965    authenticate(
966        handshake,
967        eth_max_message_size,
968        disconnect_rx,
969        events,
970        stream,
971        session_id,
972        remote_addr,
973        secret_key,
974        Direction::Outgoing(remote_peer_id),
975        hello,
976        status,
977        fork_filter,
978        extra_handlers,
979    )
980    .await
981}
982
983/// Authenticates a session
984#[expect(clippy::too_many_arguments)]
985async fn authenticate<N: NetworkPrimitives>(
986    handshake: Arc<dyn EthRlpxHandshake>,
987    eth_max_message_size: usize,
988    disconnect_rx: oneshot::Receiver<()>,
989    events: mpsc::Sender<PendingSessionEvent<N>>,
990    stream: TcpStream,
991    session_id: SessionId,
992    remote_addr: SocketAddr,
993    secret_key: SecretKey,
994    direction: Direction,
995    hello: HelloMessageWithProtocols,
996    status: UnifiedStatus,
997    fork_filter: ForkFilter,
998    extra_handlers: RlpxSubProtocolHandlers,
999) {
1000    let local_addr = stream.local_addr().ok();
1001    let stream = match get_ecies_stream(stream, secret_key, direction).await {
1002        Ok(stream) => stream,
1003        Err(error) => {
1004            let _ = events
1005                .send(PendingSessionEvent::EciesAuthError {
1006                    remote_addr,
1007                    session_id,
1008                    error,
1009                    direction,
1010                })
1011                .await;
1012            return
1013        }
1014    };
1015
1016    let unauthed = UnauthedP2PStream::new(stream);
1017
1018    let auth = authenticate_stream(
1019        handshake,
1020        eth_max_message_size,
1021        unauthed,
1022        session_id,
1023        remote_addr,
1024        local_addr,
1025        direction,
1026        hello,
1027        status,
1028        fork_filter,
1029        extra_handlers,
1030    )
1031    .boxed();
1032
1033    match futures::future::select(disconnect_rx, auth).await {
1034        Either::Left((_, _)) => {
1035            let _ = events
1036                .send(PendingSessionEvent::Disconnected {
1037                    remote_addr,
1038                    session_id,
1039                    direction,
1040                    error: None,
1041                })
1042                .await;
1043        }
1044        Either::Right((res, _)) => {
1045            let _ = events.send(res).await;
1046        }
1047    }
1048}
1049
1050/// Returns an [`ECIESStream`] if it can be built. If not, send a
1051/// [`PendingSessionEvent::EciesAuthError`] and returns `None`
1052async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1053    stream: Io,
1054    secret_key: SecretKey,
1055    direction: Direction,
1056) -> Result<ECIESStream<Io>, ECIESError> {
1057    match direction {
1058        Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1059        Direction::Outgoing(remote_peer_id) => {
1060            ECIESStream::connect(stream, secret_key, remote_peer_id).await
1061        }
1062    }
1063}
1064
1065/// Authenticate the stream via handshake
1066///
1067/// On Success return the authenticated stream as [`PendingSessionEvent`].
1068///
1069/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
1070/// also negotiate the additional protocols.
1071#[expect(clippy::too_many_arguments)]
1072async fn authenticate_stream<N: NetworkPrimitives>(
1073    handshake: Arc<dyn EthRlpxHandshake>,
1074    eth_max_message_size: usize,
1075    stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1076    session_id: SessionId,
1077    remote_addr: SocketAddr,
1078    local_addr: Option<SocketAddr>,
1079    direction: Direction,
1080    mut hello: HelloMessageWithProtocols,
1081    mut status: UnifiedStatus,
1082    fork_filter: ForkFilter,
1083    mut extra_handlers: RlpxSubProtocolHandlers,
1084) -> PendingSessionEvent<N> {
1085    // Add extra protocols to the hello message
1086    extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1087
1088    // conduct the p2p rlpx handshake and return the rlpx authenticated stream
1089    let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1090        Ok(stream_res) => stream_res,
1091        Err(err) => {
1092            return PendingSessionEvent::Disconnected {
1093                remote_addr,
1094                session_id,
1095                direction,
1096                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1097            }
1098        }
1099    };
1100
1101    // if we have extra handlers, check if it must be supported by the remote
1102    if !extra_handlers.is_empty() {
1103        // ensure that no extra handlers that aren't supported are not mandatory
1104        while let Some(pos) = extra_handlers.iter().position(|handler| {
1105            p2p_stream
1106                .shared_capabilities()
1107                .ensure_matching_capability(&handler.protocol().cap)
1108                .is_err()
1109        }) {
1110            let handler = extra_handlers.remove(pos);
1111            if handler.on_unsupported_by_peer(
1112                p2p_stream.shared_capabilities(),
1113                direction,
1114                their_hello.id,
1115            ) == OnNotSupported::Disconnect
1116            {
1117                return PendingSessionEvent::Disconnected {
1118                    remote_addr,
1119                    session_id,
1120                    direction,
1121                    error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1122                };
1123            }
1124        }
1125    }
1126
1127    // Ensure we negotiated mandatory eth protocol
1128    let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1129        Ok(version) => version,
1130        Err(err) => {
1131            return PendingSessionEvent::Disconnected {
1132                remote_addr,
1133                session_id,
1134                direction,
1135                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1136            }
1137        }
1138    };
1139
1140    // Before trying status handshake, set up the version to negotiated shared version
1141    status.set_eth_version(eth_version);
1142
1143    let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1144        // if the shared caps are 1, we know both support the eth version
1145        // if the hello handshake was successful we can try status handshake
1146
1147        // perform the eth protocol handshake
1148        match handshake
1149            .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1150            .await
1151        {
1152            Ok(their_status) => {
1153                let eth_stream =
1154                    EthStream::with_max_message_size(eth_version, p2p_stream, eth_max_message_size);
1155                (eth_stream.into(), their_status)
1156            }
1157            Err(err) => {
1158                return PendingSessionEvent::Disconnected {
1159                    remote_addr,
1160                    session_id,
1161                    direction,
1162                    error: Some(PendingSessionHandshakeError::Eth(err)),
1163                }
1164            }
1165        }
1166    } else {
1167        // Multiplex the stream with the extra protocols
1168        let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1169
1170        // install additional handlers
1171        for handler in extra_handlers.into_iter() {
1172            let cap = handler.protocol().cap;
1173            let remote_peer_id = their_hello.id;
1174
1175            multiplex_stream
1176                .install_protocol(&cap, move |conn| {
1177                    handler.into_connection(direction, remote_peer_id, conn)
1178                })
1179                .ok();
1180        }
1181
1182        let (multiplex_stream, their_status) = match multiplex_stream
1183            .into_eth_satellite_stream(status, fork_filter, handshake, eth_max_message_size)
1184            .await
1185        {
1186            Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1187            Err(err) => {
1188                return PendingSessionEvent::Disconnected {
1189                    remote_addr,
1190                    session_id,
1191                    direction,
1192                    error: Some(PendingSessionHandshakeError::Eth(err)),
1193                }
1194            }
1195        };
1196
1197        (multiplex_stream.into(), their_status)
1198    };
1199
1200    PendingSessionEvent::Established {
1201        session_id,
1202        remote_addr,
1203        local_addr,
1204        peer_id: their_hello.id,
1205        capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1206        status: Arc::new(their_status),
1207        conn,
1208        direction,
1209        client_id: their_hello.client_version,
1210    }
1211}