Skip to main content

reth_network/session/
mod.rs

1//! Support for handling peer sessions.
2
3mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11    message::PeerMessage,
12    metrics::SessionManagerMetrics,
13    protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14    session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21    errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22    BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23    HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24    HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35    collections::HashMap,
36    future::Future,
37    net::SocketAddr,
38    sync::{atomic::AtomicU64, Arc},
39    task::{Context, Poll},
40    time::{Duration, Instant},
41};
42use tokio::{
43    io::{AsyncRead, AsyncWrite},
44    net::TcpStream,
45    sync::{mpsc, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{instrument, trace};
50
51use crate::session::active::{BroadcastItemCounter, RANGE_UPDATE_INTERVAL};
52pub use conn::EthRlpxConnection;
53use handle::SessionCommandSender;
54pub use handle::{
55    ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
56    SessionCommand,
57};
58pub use reth_network_api::{Direction, PeerInfo};
59
60/// Internal identifier for active sessions.
61#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
62pub struct SessionId(usize);
63
64/// Manages a set of sessions.
65#[must_use = "Session Manager must be polled to process session events."]
66#[derive(Debug)]
67pub struct SessionManager<N: NetworkPrimitives> {
68    /// Tracks the identifier for the next session.
69    next_id: usize,
70    /// Keeps track of all sessions
71    counter: SessionCounter,
72    ///  The maximum initial time an [`ActiveSession`] waits for a response from the peer before it
73    /// responds to an _internal_ request with a `TimeoutError`
74    initial_internal_request_timeout: Duration,
75    /// If an [`ActiveSession`] does not receive a response at all within this duration then it is
76    /// considered a protocol violation and the session will initiate a drop.
77    protocol_breach_request_timeout: Duration,
78    /// The timeout after which a pending session attempt is considered failed.
79    pending_session_timeout: Duration,
80    /// The secret key used for authenticating sessions.
81    secret_key: SecretKey,
82    /// The `Status` message to send to peers.
83    status: UnifiedStatus,
84    /// The `HelloMessage` message to send to peers.
85    hello_message: HelloMessageWithProtocols,
86    /// The [`ForkFilter`] used to validate the peer's `Status` message.
87    fork_filter: ForkFilter,
88    /// Size of the command buffer per session.
89    session_command_buffer: usize,
90    /// The executor for spawned tasks.
91    executor: Runtime,
92    /// All pending session that are currently handshaking, exchanging `Hello`s.
93    ///
94    /// Events produced during the authentication phase are reported to this manager. Once the
95    /// session is authenticated, it can be moved to the `active_session` set.
96    pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
97    /// All active sessions that are ready to exchange messages.
98    active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
99    /// The original Sender half of the [`PendingSessionEvent`] channel.
100    ///
101    /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
102    /// get a clone of this sender half.
103    pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
104    /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
105    pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
106    /// The original Sender half of the [`ActiveSessionMessage`] channel.
107    ///
108    /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
109    /// clone of this sender half.
110    active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
111    /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
112    active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
113    /// Additional `RLPx` sub-protocols to be used by the session manager.
114    extra_protocols: RlpxSubProtocols,
115    /// Tracks the ongoing graceful disconnections attempts for incoming connections.
116    disconnections_counter: DisconnectionsCounter,
117    /// Metrics for the session manager.
118    metrics: SessionManagerMetrics,
119    /// The [`EthRlpxHandshake`] is used to perform the initial handshake with the peer.
120    handshake: Arc<dyn EthRlpxHandshake>,
121    /// Maximum allowed ETH message size for post-handshake ETH/Snap streams.
122    eth_max_message_size: usize,
123    /// Shared local range information that gets propagated to active sessions.
124    /// This represents the range of blocks that this node can serve to other peers.
125    local_range_info: BlockRangeInfo,
126    /// When true, block announcement messages (`NewBlock`, `NewBlockHashes`) are rejected before
127    /// RLP decoding on new sessions to avoid memory amplification.
128    reject_block_announcements: bool,
129}
130
131// === impl SessionManager ===
132
133impl<N: NetworkPrimitives> SessionManager<N> {
134    /// Creates a new empty [`SessionManager`].
135    #[expect(clippy::too_many_arguments)]
136    pub fn new(
137        secret_key: SecretKey,
138        config: SessionsConfig,
139        executor: Runtime,
140        status: UnifiedStatus,
141        hello_message: HelloMessageWithProtocols,
142        fork_filter: ForkFilter,
143        extra_protocols: RlpxSubProtocols,
144        handshake: Arc<dyn EthRlpxHandshake>,
145        eth_max_message_size: usize,
146        reject_block_announcements: bool,
147    ) -> Self {
148        let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
149        let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
150        let active_session_tx = PollSender::new(active_session_tx);
151
152        // Initialize local range info from the status
153        let local_range_info = BlockRangeInfo::new(
154            status.earliest_block.unwrap_or_default(),
155            status.latest_block.unwrap_or_default(),
156            status.blockhash,
157        );
158
159        Self {
160            next_id: 0,
161            counter: SessionCounter::new(config.limits),
162            initial_internal_request_timeout: config.initial_internal_request_timeout,
163            protocol_breach_request_timeout: config.protocol_breach_request_timeout,
164            pending_session_timeout: config.pending_session_timeout,
165            secret_key,
166            status,
167            hello_message,
168            fork_filter,
169            session_command_buffer: config.session_command_buffer,
170            executor,
171            pending_sessions: Default::default(),
172            active_sessions: Default::default(),
173            pending_sessions_tx,
174            pending_session_rx: ReceiverStream::new(pending_sessions_rx),
175            active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
176            active_session_rx: ReceiverStream::new(active_session_rx),
177            extra_protocols,
178            disconnections_counter: Default::default(),
179            metrics: Default::default(),
180            handshake,
181            eth_max_message_size,
182            local_range_info,
183            reject_block_announcements,
184        }
185    }
186
187    /// Returns the currently tracked [`ForkId`].
188    pub(crate) const fn fork_id(&self) -> ForkId {
189        self.fork_filter.current()
190    }
191
192    /// Check whether the provided [`ForkId`] is compatible based on the validation rules in
193    /// `EIP-2124`.
194    pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
195        self.fork_filter.validate(fork_id).is_ok()
196    }
197
198    /// Returns the next unique [`SessionId`].
199    const fn next_id(&mut self) -> SessionId {
200        let id = self.next_id;
201        self.next_id += 1;
202        SessionId(id)
203    }
204
205    /// Returns the current status of the session.
206    pub const fn status(&self) -> UnifiedStatus {
207        self.status
208    }
209
210    /// Returns the secret key used for authenticating sessions.
211    pub const fn secret_key(&self) -> SecretKey {
212        self.secret_key
213    }
214
215    /// Returns a borrowed reference to the active sessions.
216    pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
217        &self.active_sessions
218    }
219
220    /// Returns the session hello message.
221    pub fn hello_message(&self) -> HelloMessageWithProtocols {
222        self.hello_message.clone()
223    }
224
225    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
226    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
227        self.extra_protocols.push(protocol)
228    }
229
230    /// Returns the number of currently pending connections.
231    #[inline]
232    pub(crate) fn num_pending_connections(&self) -> usize {
233        self.pending_sessions.len()
234    }
235
236    /// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
237    /// [`JoinSet`](tokio::task::JoinSet).
238    fn spawn<F>(&self, f: F)
239    where
240        F: Future<Output = ()> + Send + 'static,
241    {
242        self.executor.spawn_task(f);
243    }
244
245    /// Invoked on a received status update.
246    ///
247    /// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
248    /// active [`ForkId`]. See also [`ForkFilter::set_head`].
249    pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
250        self.status.blockhash = head.hash;
251        self.status.total_difficulty = Some(head.total_difficulty);
252        let transition = self.fork_filter.set_head(head);
253        self.status.forkid = self.fork_filter.current();
254        self.status.latest_block = Some(head.number);
255
256        transition
257    }
258
259    /// An incoming TCP connection was received. This starts the authentication process to turn this
260    /// stream into an active peer session.
261    ///
262    /// Returns an error if the configured limit has been reached.
263    pub(crate) fn on_incoming(
264        &mut self,
265        stream: TcpStream,
266        remote_addr: SocketAddr,
267    ) -> Result<SessionId, ExceedsSessionLimit> {
268        self.counter.ensure_pending_inbound()?;
269
270        let session_id = self.next_id();
271
272        trace!(
273            target: "net::session",
274            ?remote_addr,
275            ?session_id,
276            "new pending incoming session"
277        );
278
279        let (disconnect_tx, disconnect_rx) = oneshot::channel();
280        let pending_events = self.pending_sessions_tx.clone();
281        let secret_key = self.secret_key;
282        let hello_message = self.hello_message.clone();
283        let status = self.status;
284        let fork_filter = self.fork_filter.clone();
285        let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
286        self.spawn(pending_session_with_timeout(
287            self.pending_session_timeout,
288            session_id,
289            remote_addr,
290            Direction::Incoming,
291            pending_events.clone(),
292            start_pending_incoming_session(
293                self.handshake.clone(),
294                self.eth_max_message_size,
295                disconnect_rx,
296                session_id,
297                stream,
298                pending_events,
299                remote_addr,
300                secret_key,
301                hello_message,
302                status,
303                fork_filter,
304                extra_handlers,
305            ),
306        ));
307
308        let handle = PendingSessionHandle {
309            disconnect_tx: Some(disconnect_tx),
310            direction: Direction::Incoming,
311        };
312        self.pending_sessions.insert(session_id, handle);
313        self.counter.inc_pending_inbound();
314        Ok(session_id)
315    }
316
317    /// Starts a new pending session from the local node to the given remote node.
318    pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
319        // The error can be dropped because no dial will be made if it would exceed the limit
320        if self.counter.ensure_pending_outbound().is_ok() {
321            let session_id = self.next_id();
322            let (disconnect_tx, disconnect_rx) = oneshot::channel();
323            let pending_events = self.pending_sessions_tx.clone();
324            let secret_key = self.secret_key;
325            let hello_message = self.hello_message.clone();
326            let fork_filter = self.fork_filter.clone();
327            let status = self.status;
328            let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
329            self.spawn(pending_session_with_timeout(
330                self.pending_session_timeout,
331                session_id,
332                remote_addr,
333                Direction::Outgoing(remote_peer_id),
334                pending_events.clone(),
335                start_pending_outbound_session(
336                    self.handshake.clone(),
337                    self.eth_max_message_size,
338                    disconnect_rx,
339                    pending_events,
340                    session_id,
341                    remote_addr,
342                    remote_peer_id,
343                    secret_key,
344                    hello_message,
345                    status,
346                    fork_filter,
347                    extra_handlers,
348                ),
349            ));
350
351            let handle = PendingSessionHandle {
352                disconnect_tx: Some(disconnect_tx),
353                direction: Direction::Outgoing(remote_peer_id),
354            };
355            self.pending_sessions.insert(session_id, handle);
356            self.counter.inc_pending_outbound();
357        }
358    }
359
360    /// Initiates a shutdown of the channel.
361    ///
362    /// This will trigger the disconnect on the session task to gracefully terminate. The result
363    /// will be picked up by the receiver.
364    pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
365        if let Some(session) = self.active_sessions.get(&node) {
366            session.disconnect(reason);
367        }
368    }
369
370    /// Initiates a shutdown of all sessions.
371    ///
372    /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
373    /// will be picked by the receiver.
374    pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
375        for session in self.active_sessions.values() {
376            session.disconnect(reason);
377        }
378    }
379
380    /// Disconnects all pending sessions.
381    pub fn disconnect_all_pending(&mut self) {
382        for session in self.pending_sessions.values_mut() {
383            session.disconnect();
384        }
385    }
386
387    /// Sends a message to the peer's session.
388    ///
389    /// Broadcast messages use size-based backpressure: the total number of in-flight broadcast
390    /// items (across the command channel, overflow channel, and session outgoing queue) is tracked
391    /// by a shared atomic counter. If the bounded command channel is full but the broadcast limit
392    /// hasn't been reached, the message overflows to a dedicated unbounded channel.
393    pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
394        if let Some(session) = self.active_sessions.get(peer_id) &&
395            !session.commands.send_message(msg)
396        {
397            self.metrics.total_outgoing_peer_messages_dropped.increment(1);
398        }
399    }
400
401    /// Removes the [`PendingSessionHandle`] if it exists.
402    fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
403        let session = self.pending_sessions.remove(id)?;
404        self.counter.dec_pending(&session.direction);
405        Some(session)
406    }
407
408    /// Removes the [`PendingSessionHandle`] if it exists.
409    fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
410        let session = self.active_sessions.remove(id)?;
411        self.counter.dec_active(&session.direction);
412        Some(session)
413    }
414
415    /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
416    /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
417    /// simply drop the incoming connection.
418    pub(crate) fn try_disconnect_incoming_connection(
419        &self,
420        stream: TcpStream,
421        reason: DisconnectReason,
422    ) {
423        if !self.disconnections_counter.has_capacity() {
424            // drop the connection if we don't have capacity for gracefully disconnecting
425            return
426        }
427
428        let guard = self.disconnections_counter.clone();
429        let secret_key = self.secret_key;
430
431        self.spawn(async move {
432            trace!(
433                target: "net::session",
434                "gracefully disconnecting incoming connection"
435            );
436            if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
437                let mut unauth = UnauthedP2PStream::new(stream);
438                let _ = unauth.send_disconnect(reason).await;
439                drop(guard);
440            }
441        });
442    }
443
444    /// This polls all the session handles and returns [`SessionEvent`].
445    ///
446    /// Active sessions are prioritized.
447    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
448        // Poll events from active sessions
449        match self.active_session_rx.poll_next_unpin(cx) {
450            Poll::Pending => {}
451            Poll::Ready(None) => {
452                unreachable!("Manager holds both channel halves.")
453            }
454            Poll::Ready(Some(event)) => {
455                return match event {
456                    ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
457                        trace!(
458                            target: "net::session",
459                            ?peer_id,
460                            "gracefully disconnected active session."
461                        );
462                        self.remove_active_session(&peer_id);
463                        Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
464                    }
465                    ActiveSessionMessage::ClosedOnConnectionError {
466                        peer_id,
467                        remote_addr,
468                        error,
469                    } => {
470                        trace!(target: "net::session", ?peer_id, %error,"closed session.");
471                        self.remove_active_session(&peer_id);
472                        Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
473                            remote_addr,
474                            peer_id,
475                            error,
476                        })
477                    }
478                    ActiveSessionMessage::ValidMessage { peer_id, message } => {
479                        Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
480                    }
481                    ActiveSessionMessage::BadMessage { peer_id } => {
482                        Poll::Ready(SessionEvent::BadMessage { peer_id })
483                    }
484                    ActiveSessionMessage::ProtocolBreach { peer_id } => {
485                        Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
486                    }
487                }
488            }
489        }
490
491        // Poll the pending session event stream
492        let event = match self.pending_session_rx.poll_next_unpin(cx) {
493            Poll::Pending => return Poll::Pending,
494            Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
495            Poll::Ready(Some(event)) => event,
496        };
497        match event {
498            PendingSessionEvent::Established {
499                session_id,
500                remote_addr,
501                local_addr,
502                peer_id,
503                capabilities,
504                mut conn,
505                status,
506                direction,
507                client_id,
508                peer_listen_port,
509            } => {
510                // move from pending to established.
511                self.remove_pending_session(&session_id);
512
513                // If there's already a session to the peer then we disconnect right away
514                if self.active_sessions.contains_key(&peer_id) {
515                    trace!(
516                        target: "net::session",
517                        ?session_id,
518                        ?remote_addr,
519                        ?peer_id,
520                        ?direction,
521                        "already connected"
522                    );
523
524                    self.spawn(async move {
525                        // send a disconnect message
526                        let _ =
527                            conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
528                    });
529
530                    return Poll::Ready(SessionEvent::AlreadyConnected {
531                        peer_id,
532                        remote_addr,
533                        direction,
534                    })
535                }
536
537                let (commands_tx, commands_rx) = mpsc::channel(self.session_command_buffer);
538                let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
539
540                let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
541
542                let messages = PeerRequestSender::new(peer_id, to_session_tx);
543
544                let timeout = Arc::new(AtomicU64::new(
545                    self.initial_internal_request_timeout.as_millis() as u64,
546                ));
547
548                // negotiated version
549                let version = conn.version();
550
551                // Configure the interval at which the range information is updated, starting with
552                // ETH69. We use interval_at to delay the first tick, avoiding sending
553                // BlockRangeUpdate immediately after connection (which can cause issues with
554                // peers that don't properly handle the message).
555                let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
556                    let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
557                    let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
558                    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
559                    interval
560                });
561
562                // Shared counter of in-flight broadcast items. The session task must decrement
563                // this when it pops messages from the outgoing queue, and the
564                // `SessionCommandSender` increments it before enqueuing. This invariant ensures
565                // the `SessionManager` always has an accurate view of total buffered broadcast
566                // pressure for a peer.
567                let broadcast_items = BroadcastItemCounter::new();
568                let remote_range_info = status.block_range_update().map(|update| {
569                    BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash)
570                });
571
572                if self.reject_block_announcements {
573                    conn.set_reject_block_announcements(true);
574                }
575
576                let session = ActiveSession {
577                    next_id: 0,
578                    remote_peer_id: peer_id,
579                    remote_addr,
580                    remote_capabilities: Arc::clone(&capabilities),
581                    session_id,
582                    commands_rx: ReceiverStream::new(commands_rx),
583                    unbounded_rx,
584                    unbounded_broadcast_msgs: self.metrics.total_unbounded_broadcast_msgs.clone(),
585                    to_session_manager: self.active_session_tx.clone(),
586                    pending_message_to_session: None,
587                    internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
588                    inflight_requests: Default::default(),
589                    conn,
590                    queued_outgoing: QueuedOutgoingMessages::new(
591                        self.metrics.queued_outgoing_messages.clone(),
592                        broadcast_items.clone(),
593                    ),
594                    received_requests_from_remote: Default::default(),
595                    internal_request_timeout_interval: tokio::time::interval(
596                        self.initial_internal_request_timeout,
597                    ),
598                    internal_request_timeout: Arc::clone(&timeout),
599                    protocol_breach_request_timeout: self.protocol_breach_request_timeout,
600                    terminate_message: None,
601                    range_info: remote_range_info.clone(),
602                    local_range_info: self.local_range_info.clone(),
603                    range_update_interval,
604                    last_sent_latest_block: None,
605                };
606
607                self.spawn(session);
608
609                let client_version = client_id.into();
610                let handle = ActiveSessionHandle {
611                    status: status.clone(),
612                    direction,
613                    session_id,
614                    remote_id: peer_id,
615                    version,
616                    established: Instant::now(),
617                    capabilities: Arc::clone(&capabilities),
618                    commands: SessionCommandSender::new(commands_tx, unbounded_tx, broadcast_items),
619                    client_version: Arc::clone(&client_version),
620                    remote_addr,
621                    local_addr,
622                    peer_listen_port,
623                };
624
625                self.active_sessions.insert(peer_id, handle);
626                self.counter.inc_active(&direction);
627
628                if direction.is_outgoing() {
629                    self.metrics.total_dial_successes.increment(1);
630                }
631
632                Poll::Ready(SessionEvent::SessionEstablished {
633                    peer_id,
634                    remote_addr,
635                    client_version,
636                    version,
637                    capabilities,
638                    status,
639                    messages,
640                    direction,
641                    timeout,
642                    range_info: remote_range_info,
643                })
644            }
645            PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
646                trace!(
647                    target: "net::session",
648                    ?session_id,
649                    ?remote_addr,
650                    ?error,
651                    "disconnected pending session"
652                );
653                self.remove_pending_session(&session_id);
654                match direction {
655                    Direction::Incoming => {
656                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
657                            remote_addr,
658                            error,
659                        })
660                    }
661                    Direction::Outgoing(peer_id) => {
662                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
663                            remote_addr,
664                            peer_id,
665                            error,
666                        })
667                    }
668                }
669            }
670            PendingSessionEvent::OutgoingConnectionError {
671                remote_addr,
672                session_id,
673                peer_id,
674                error,
675            } => {
676                trace!(
677                    target: "net::session",
678                    %error,
679                    ?session_id,
680                    ?remote_addr,
681                    ?peer_id,
682                    "connection refused"
683                );
684                self.remove_pending_session(&session_id);
685                Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
686            }
687            PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
688                trace!(
689                    target: "net::session",
690                    %error,
691                    ?session_id,
692                    ?remote_addr,
693                    "ecies auth failed"
694                );
695                self.remove_pending_session(&session_id);
696                match direction {
697                    Direction::Incoming => {
698                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
699                            remote_addr,
700                            error: Some(PendingSessionHandshakeError::Ecies(error)),
701                        })
702                    }
703                    Direction::Outgoing(peer_id) => {
704                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
705                            remote_addr,
706                            peer_id,
707                            error: Some(PendingSessionHandshakeError::Ecies(error)),
708                        })
709                    }
710                }
711            }
712        }
713    }
714
715    /// Updates the advertised block range that this node can serve to other peers starting with
716    /// Eth69.
717    ///
718    /// This method updates both the local status message that gets sent to peers during handshake
719    /// and the shared local range information that gets propagated to active sessions (Eth69).
720    /// The range information is used in ETH69 protocol where peers announce the range of blocks
721    /// they can serve to optimize data synchronization.
722    pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
723        self.status.earliest_block = Some(block_range_update.earliest);
724        self.status.latest_block = Some(block_range_update.latest);
725        self.status.blockhash = block_range_update.latest_hash;
726
727        // Update the shared local range info that gets propagated to active sessions
728        self.local_range_info.update(
729            block_range_update.earliest,
730            block_range_update.latest,
731            block_range_update.latest_hash,
732        );
733    }
734}
735
736/// A counter for ongoing graceful disconnections attempts.
737#[derive(Default, Debug, Clone)]
738struct DisconnectionsCounter(Arc<()>);
739
740impl DisconnectionsCounter {
741    const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
742
743    /// Returns true if the [`DisconnectionsCounter`] still has capacity
744    /// for an additional graceful disconnection.
745    fn has_capacity(&self) -> bool {
746        Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
747    }
748}
749
750/// Events produced by the [`SessionManager`]
751#[derive(Debug)]
752pub enum SessionEvent<N: NetworkPrimitives> {
753    /// A new session was successfully authenticated.
754    ///
755    /// This session is now able to exchange data.
756    SessionEstablished {
757        /// The remote node's public key
758        peer_id: PeerId,
759        /// The remote node's socket address
760        remote_addr: SocketAddr,
761        /// The user agent of the remote node, usually containing the client name and version
762        client_version: Arc<str>,
763        /// The capabilities the remote node has announced
764        capabilities: Arc<Capabilities>,
765        /// negotiated eth version
766        version: EthVersion,
767        /// The Status message the peer sent during the `eth` handshake
768        status: Arc<UnifiedStatus>,
769        /// The channel for sending messages to the peer with the session
770        messages: PeerRequestSender<PeerRequest<N>>,
771        /// The direction of the session, either `Inbound` or `Outgoing`
772        direction: Direction,
773        /// The maximum time that the session waits for a response from the peer before timing out
774        /// the connection
775        timeout: Arc<AtomicU64>,
776        /// The range info for the peer.
777        range_info: Option<BlockRangeInfo>,
778    },
779    /// The peer was already connected with another session.
780    AlreadyConnected {
781        /// The remote node's public key
782        peer_id: PeerId,
783        /// The remote node's socket address
784        remote_addr: SocketAddr,
785        /// The direction of the session, either `Inbound` or `Outgoing`
786        direction: Direction,
787    },
788    /// A session received a valid message via `RLPx`.
789    ValidMessage {
790        /// The remote node's public key
791        peer_id: PeerId,
792        /// Message received from the peer.
793        message: PeerMessage<N>,
794    },
795    /// Received a bad message from the peer.
796    BadMessage {
797        /// Identifier of the remote peer.
798        peer_id: PeerId,
799    },
800    /// Remote peer is considered in protocol violation
801    ProtocolBreach {
802        /// Identifier of the remote peer.
803        peer_id: PeerId,
804    },
805    /// Closed an incoming pending session during handshaking.
806    IncomingPendingSessionClosed {
807        /// The remote node's socket address
808        remote_addr: SocketAddr,
809        /// The pending handshake session error that caused the session to close
810        error: Option<PendingSessionHandshakeError>,
811    },
812    /// Closed an outgoing pending session during handshaking.
813    OutgoingPendingSessionClosed {
814        /// The remote node's socket address
815        remote_addr: SocketAddr,
816        /// The remote node's public key
817        peer_id: PeerId,
818        /// The pending handshake session error that caused the session to close
819        error: Option<PendingSessionHandshakeError>,
820    },
821    /// Failed to establish a tcp stream
822    OutgoingConnectionError {
823        /// The remote node's socket address
824        remote_addr: SocketAddr,
825        /// The remote node's public key
826        peer_id: PeerId,
827        /// The error that caused the outgoing connection to fail
828        error: io::Error,
829    },
830    /// Session was closed due to an error
831    SessionClosedOnConnectionError {
832        /// The id of the remote peer.
833        peer_id: PeerId,
834        /// The socket we were connected to.
835        remote_addr: SocketAddr,
836        /// The error that caused the session to close
837        error: EthStreamError,
838    },
839    /// Active session was gracefully disconnected.
840    Disconnected {
841        /// The remote node's public key
842        peer_id: PeerId,
843        /// The remote node's socket address that we were connected to
844        remote_addr: SocketAddr,
845    },
846}
847
848/// Errors that can occur during handshaking/authenticating the underlying streams.
849#[derive(Debug, thiserror::Error)]
850pub enum PendingSessionHandshakeError {
851    /// The pending session failed due to an error while establishing the `eth` stream
852    #[error(transparent)]
853    Eth(EthStreamError),
854    /// The pending session failed due to an error while establishing the ECIES stream
855    #[error(transparent)]
856    Ecies(ECIESError),
857    /// Thrown when the authentication timed out
858    #[error("authentication timed out")]
859    Timeout,
860    /// Thrown when the remote lacks the required capability
861    #[error("Mandatory extra capability unsupported")]
862    UnsupportedExtraCapability,
863}
864
865impl PendingSessionHandshakeError {
866    /// Returns the [`DisconnectReason`] if the error is a disconnect message
867    pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
868        match self {
869            Self::Eth(eth_err) => eth_err.as_disconnected(),
870            _ => None,
871        }
872    }
873}
874
875/// The error thrown when the max configured limit has been reached and no more connections are
876/// accepted.
877#[derive(Debug, Clone, thiserror::Error)]
878#[error("session limit reached {0}")]
879pub struct ExceedsSessionLimit(pub(crate) u32);
880
881/// Starts a pending session authentication with a timeout.
882pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
883    timeout: Duration,
884    session_id: SessionId,
885    remote_addr: SocketAddr,
886    direction: Direction,
887    events: mpsc::Sender<PendingSessionEvent<N>>,
888    f: F,
889) where
890    F: Future<Output = ()>,
891{
892    if tokio::time::timeout(timeout, f).await.is_err() {
893        trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
894        let event = PendingSessionEvent::Disconnected {
895            remote_addr,
896            session_id,
897            direction,
898            error: Some(PendingSessionHandshakeError::Timeout),
899        };
900        let _ = events.send(event).await;
901    }
902}
903
904/// Starts the authentication process for a connection initiated by a remote peer.
905///
906/// This will wait for the _incoming_ handshake request and answer it.
907#[expect(clippy::too_many_arguments)]
908pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
909    handshake: Arc<dyn EthRlpxHandshake>,
910    eth_max_message_size: usize,
911    disconnect_rx: oneshot::Receiver<()>,
912    session_id: SessionId,
913    stream: TcpStream,
914    events: mpsc::Sender<PendingSessionEvent<N>>,
915    remote_addr: SocketAddr,
916    secret_key: SecretKey,
917    hello: HelloMessageWithProtocols,
918    status: UnifiedStatus,
919    fork_filter: ForkFilter,
920    extra_handlers: RlpxSubProtocolHandlers,
921) {
922    authenticate(
923        handshake,
924        eth_max_message_size,
925        disconnect_rx,
926        events,
927        stream,
928        session_id,
929        remote_addr,
930        secret_key,
931        Direction::Incoming,
932        hello,
933        status,
934        fork_filter,
935        extra_handlers,
936    )
937    .await
938}
939
940/// Starts the authentication process for a connection initiated by a remote peer.
941#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
942#[expect(clippy::too_many_arguments)]
943async fn start_pending_outbound_session<N: NetworkPrimitives>(
944    handshake: Arc<dyn EthRlpxHandshake>,
945    eth_max_message_size: usize,
946    disconnect_rx: oneshot::Receiver<()>,
947    events: mpsc::Sender<PendingSessionEvent<N>>,
948    session_id: SessionId,
949    remote_addr: SocketAddr,
950    remote_peer_id: PeerId,
951    secret_key: SecretKey,
952    hello: HelloMessageWithProtocols,
953    status: UnifiedStatus,
954    fork_filter: ForkFilter,
955    extra_handlers: RlpxSubProtocolHandlers,
956) {
957    let stream = match TcpStream::connect(remote_addr).await {
958        Ok(stream) => {
959            if let Err(err) = stream.set_nodelay(true) {
960                tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
961            }
962            stream
963        }
964        Err(error) => {
965            let _ = events
966                .send(PendingSessionEvent::OutgoingConnectionError {
967                    remote_addr,
968                    session_id,
969                    peer_id: remote_peer_id,
970                    error,
971                })
972                .await;
973            return
974        }
975    };
976    authenticate(
977        handshake,
978        eth_max_message_size,
979        disconnect_rx,
980        events,
981        stream,
982        session_id,
983        remote_addr,
984        secret_key,
985        Direction::Outgoing(remote_peer_id),
986        hello,
987        status,
988        fork_filter,
989        extra_handlers,
990    )
991    .await
992}
993
994/// Authenticates a session
995#[expect(clippy::too_many_arguments)]
996async fn authenticate<N: NetworkPrimitives>(
997    handshake: Arc<dyn EthRlpxHandshake>,
998    eth_max_message_size: usize,
999    disconnect_rx: oneshot::Receiver<()>,
1000    events: mpsc::Sender<PendingSessionEvent<N>>,
1001    stream: TcpStream,
1002    session_id: SessionId,
1003    remote_addr: SocketAddr,
1004    secret_key: SecretKey,
1005    direction: Direction,
1006    hello: HelloMessageWithProtocols,
1007    status: UnifiedStatus,
1008    fork_filter: ForkFilter,
1009    extra_handlers: RlpxSubProtocolHandlers,
1010) {
1011    let local_addr = stream.local_addr().ok();
1012    let stream = match get_ecies_stream(stream, secret_key, direction).await {
1013        Ok(stream) => stream,
1014        Err(error) => {
1015            let _ = events
1016                .send(PendingSessionEvent::EciesAuthError {
1017                    remote_addr,
1018                    session_id,
1019                    error,
1020                    direction,
1021                })
1022                .await;
1023            return
1024        }
1025    };
1026
1027    let unauthed = UnauthedP2PStream::new(stream);
1028
1029    let auth = authenticate_stream(
1030        handshake,
1031        eth_max_message_size,
1032        unauthed,
1033        session_id,
1034        remote_addr,
1035        local_addr,
1036        direction,
1037        hello,
1038        status,
1039        fork_filter,
1040        extra_handlers,
1041    )
1042    .boxed();
1043
1044    match futures::future::select(disconnect_rx, auth).await {
1045        Either::Left((_, _)) => {
1046            let _ = events
1047                .send(PendingSessionEvent::Disconnected {
1048                    remote_addr,
1049                    session_id,
1050                    direction,
1051                    error: None,
1052                })
1053                .await;
1054        }
1055        Either::Right((res, _)) => {
1056            let _ = events.send(res).await;
1057        }
1058    }
1059}
1060
1061/// Returns an [`ECIESStream`] if it can be built. If not, send a
1062/// [`PendingSessionEvent::EciesAuthError`] and returns `None`
1063async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1064    stream: Io,
1065    secret_key: SecretKey,
1066    direction: Direction,
1067) -> Result<ECIESStream<Io>, ECIESError> {
1068    match direction {
1069        Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1070        Direction::Outgoing(remote_peer_id) => {
1071            ECIESStream::connect(stream, secret_key, remote_peer_id).await
1072        }
1073    }
1074}
1075
1076/// Authenticate the stream via handshake
1077///
1078/// On Success return the authenticated stream as [`PendingSessionEvent`].
1079///
1080/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
1081/// also negotiate the additional protocols.
1082#[expect(clippy::too_many_arguments)]
1083async fn authenticate_stream<N: NetworkPrimitives>(
1084    handshake: Arc<dyn EthRlpxHandshake>,
1085    eth_max_message_size: usize,
1086    stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1087    session_id: SessionId,
1088    remote_addr: SocketAddr,
1089    local_addr: Option<SocketAddr>,
1090    direction: Direction,
1091    mut hello: HelloMessageWithProtocols,
1092    mut status: UnifiedStatus,
1093    fork_filter: ForkFilter,
1094    mut extra_handlers: RlpxSubProtocolHandlers,
1095) -> PendingSessionEvent<N> {
1096    // Add extra protocols to the hello message
1097    extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1098
1099    // conduct the p2p rlpx handshake and return the rlpx authenticated stream
1100    let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1101        Ok(stream_res) => stream_res,
1102        Err(err) => {
1103            return PendingSessionEvent::Disconnected {
1104                remote_addr,
1105                session_id,
1106                direction,
1107                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1108            }
1109        }
1110    };
1111
1112    // if we have extra handlers, check if it must be supported by the remote
1113    if !extra_handlers.is_empty() {
1114        // ensure that no extra handlers that aren't supported are not mandatory
1115        while let Some(pos) = extra_handlers.iter().position(|handler| {
1116            p2p_stream
1117                .shared_capabilities()
1118                .ensure_matching_capability(&handler.protocol().cap)
1119                .is_err()
1120        }) {
1121            let handler = extra_handlers.remove(pos);
1122            if handler.on_unsupported_by_peer(
1123                p2p_stream.shared_capabilities(),
1124                direction,
1125                their_hello.id,
1126            ) == OnNotSupported::Disconnect
1127            {
1128                return PendingSessionEvent::Disconnected {
1129                    remote_addr,
1130                    session_id,
1131                    direction,
1132                    error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1133                };
1134            }
1135        }
1136    }
1137
1138    // Ensure we negotiated mandatory eth protocol
1139    let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1140        Ok(version) => version,
1141        Err(err) => {
1142            return PendingSessionEvent::Disconnected {
1143                remote_addr,
1144                session_id,
1145                direction,
1146                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1147            }
1148        }
1149    };
1150
1151    // Before trying status handshake, set up the version to negotiated shared version
1152    status.set_eth_version(eth_version);
1153
1154    let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1155        // if the shared caps are 1, we know both support the eth version
1156        // if the hello handshake was successful we can try status handshake
1157
1158        // perform the eth protocol handshake
1159        match handshake
1160            .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1161            .await
1162        {
1163            Ok(their_status) => {
1164                let eth_stream =
1165                    EthStream::with_max_message_size(eth_version, p2p_stream, eth_max_message_size);
1166                (eth_stream.into(), their_status)
1167            }
1168            Err(err) => {
1169                return PendingSessionEvent::Disconnected {
1170                    remote_addr,
1171                    session_id,
1172                    direction,
1173                    error: Some(PendingSessionHandshakeError::Eth(err)),
1174                }
1175            }
1176        }
1177    } else {
1178        // Multiplex the stream with the extra protocols
1179        let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1180
1181        // install additional handlers
1182        for handler in extra_handlers.into_iter() {
1183            let cap = handler.protocol().cap;
1184            let remote_peer_id = their_hello.id;
1185
1186            multiplex_stream
1187                .install_protocol(&cap, move |conn| {
1188                    handler.into_connection(direction, remote_peer_id, conn)
1189                })
1190                .ok();
1191        }
1192
1193        let (multiplex_stream, their_status) = match multiplex_stream
1194            .into_eth_satellite_stream(status, fork_filter, handshake, eth_max_message_size)
1195            .await
1196        {
1197            Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1198            Err(err) => {
1199                return PendingSessionEvent::Disconnected {
1200                    remote_addr,
1201                    session_id,
1202                    direction,
1203                    error: Some(PendingSessionHandshakeError::Eth(err)),
1204                }
1205            }
1206        };
1207
1208        (multiplex_stream.into(), their_status)
1209    };
1210
1211    // `port` field is effectively deprecated, so we treat 0 value as a missing port.
1212    let peer_listen_port = (their_hello.port != 0).then_some(their_hello.port);
1213
1214    PendingSessionEvent::Established {
1215        session_id,
1216        remote_addr,
1217        local_addr,
1218        peer_id: their_hello.id,
1219        capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1220        status: Arc::new(their_status),
1221        conn,
1222        direction,
1223        client_id: their_hello.client_version,
1224        peer_listen_port,
1225    }
1226}