Skip to main content

reth_network/session/
mod.rs

1//! Support for handling peer sessions.
2
3mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11    message::PeerMessage,
12    metrics::SessionManagerMetrics,
13    protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14    session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21    errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22    BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23    HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24    HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35    collections::HashMap,
36    future::Future,
37    net::SocketAddr,
38    sync::{atomic::AtomicU64, Arc},
39    task::{Context, Poll},
40    time::{Duration, Instant},
41};
42use tokio::{
43    io::{AsyncRead, AsyncWrite},
44    net::TcpStream,
45    sync::{mpsc, mpsc::error::TrySendError, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{debug, instrument, trace};
50
51use crate::session::active::RANGE_UPDATE_INTERVAL;
52pub use conn::EthRlpxConnection;
53pub use handle::{
54    ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
55    SessionCommand,
56};
57pub use reth_network_api::{Direction, PeerInfo};
58
59/// Internal identifier for active sessions.
60#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
61pub struct SessionId(usize);
62
63/// Manages a set of sessions.
64#[must_use = "Session Manager must be polled to process session events."]
65#[derive(Debug)]
66pub struct SessionManager<N: NetworkPrimitives> {
67    /// Tracks the identifier for the next session.
68    next_id: usize,
69    /// Keeps track of all sessions
70    counter: SessionCounter,
71    ///  The maximum initial time an [`ActiveSession`] waits for a response from the peer before it
72    /// responds to an _internal_ request with a `TimeoutError`
73    initial_internal_request_timeout: Duration,
74    /// If an [`ActiveSession`] does not receive a response at all within this duration then it is
75    /// considered a protocol violation and the session will initiate a drop.
76    protocol_breach_request_timeout: Duration,
77    /// The timeout after which a pending session attempt is considered failed.
78    pending_session_timeout: Duration,
79    /// The secret key used for authenticating sessions.
80    secret_key: SecretKey,
81    /// The `Status` message to send to peers.
82    status: UnifiedStatus,
83    /// The `HelloMessage` message to send to peers.
84    hello_message: HelloMessageWithProtocols,
85    /// The [`ForkFilter`] used to validate the peer's `Status` message.
86    fork_filter: ForkFilter,
87    /// Size of the command buffer per session.
88    session_command_buffer: usize,
89    /// The executor for spawned tasks.
90    executor: Runtime,
91    /// All pending session that are currently handshaking, exchanging `Hello`s.
92    ///
93    /// Events produced during the authentication phase are reported to this manager. Once the
94    /// session is authenticated, it can be moved to the `active_session` set.
95    pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
96    /// All active sessions that are ready to exchange messages.
97    active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
98    /// The original Sender half of the [`PendingSessionEvent`] channel.
99    ///
100    /// When a new (pending) session is created, the corresponding [`PendingSessionHandle`] will
101    /// get a clone of this sender half.
102    pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
103    /// Receiver half that listens for [`PendingSessionEvent`] produced by pending sessions.
104    pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
105    /// The original Sender half of the [`ActiveSessionMessage`] channel.
106    ///
107    /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a
108    /// clone of this sender half.
109    active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
110    /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions.
111    active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
112    /// Additional `RLPx` sub-protocols to be used by the session manager.
113    extra_protocols: RlpxSubProtocols,
114    /// Tracks the ongoing graceful disconnections attempts for incoming connections.
115    disconnections_counter: DisconnectionsCounter,
116    /// Metrics for the session manager.
117    metrics: SessionManagerMetrics,
118    /// The [`EthRlpxHandshake`] is used to perform the initial handshake with the peer.
119    handshake: Arc<dyn EthRlpxHandshake>,
120    /// Shared local range information that gets propagated to active sessions.
121    /// This represents the range of blocks that this node can serve to other peers.
122    local_range_info: BlockRangeInfo,
123}
124
125// === impl SessionManager ===
126
127impl<N: NetworkPrimitives> SessionManager<N> {
128    /// Creates a new empty [`SessionManager`].
129    #[expect(clippy::too_many_arguments)]
130    pub fn new(
131        secret_key: SecretKey,
132        config: SessionsConfig,
133        executor: Runtime,
134        status: UnifiedStatus,
135        hello_message: HelloMessageWithProtocols,
136        fork_filter: ForkFilter,
137        extra_protocols: RlpxSubProtocols,
138        handshake: Arc<dyn EthRlpxHandshake>,
139    ) -> Self {
140        let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
141        let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
142        let active_session_tx = PollSender::new(active_session_tx);
143
144        // Initialize local range info from the status
145        let local_range_info = BlockRangeInfo::new(
146            status.earliest_block.unwrap_or_default(),
147            status.latest_block.unwrap_or_default(),
148            status.blockhash,
149        );
150
151        Self {
152            next_id: 0,
153            counter: SessionCounter::new(config.limits),
154            initial_internal_request_timeout: config.initial_internal_request_timeout,
155            protocol_breach_request_timeout: config.protocol_breach_request_timeout,
156            pending_session_timeout: config.pending_session_timeout,
157            secret_key,
158            status,
159            hello_message,
160            fork_filter,
161            session_command_buffer: config.session_command_buffer,
162            executor,
163            pending_sessions: Default::default(),
164            active_sessions: Default::default(),
165            pending_sessions_tx,
166            pending_session_rx: ReceiverStream::new(pending_sessions_rx),
167            active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
168            active_session_rx: ReceiverStream::new(active_session_rx),
169            extra_protocols,
170            disconnections_counter: Default::default(),
171            metrics: Default::default(),
172            handshake,
173            local_range_info,
174        }
175    }
176
177    /// Returns the currently tracked [`ForkId`].
178    pub(crate) const fn fork_id(&self) -> ForkId {
179        self.fork_filter.current()
180    }
181
182    /// Check whether the provided [`ForkId`] is compatible based on the validation rules in
183    /// `EIP-2124`.
184    pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
185        self.fork_filter.validate(fork_id).is_ok()
186    }
187
188    /// Returns the next unique [`SessionId`].
189    const fn next_id(&mut self) -> SessionId {
190        let id = self.next_id;
191        self.next_id += 1;
192        SessionId(id)
193    }
194
195    /// Returns the current status of the session.
196    pub const fn status(&self) -> UnifiedStatus {
197        self.status
198    }
199
200    /// Returns the secret key used for authenticating sessions.
201    pub const fn secret_key(&self) -> SecretKey {
202        self.secret_key
203    }
204
205    /// Returns a borrowed reference to the active sessions.
206    pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
207        &self.active_sessions
208    }
209
210    /// Returns the session hello message.
211    pub fn hello_message(&self) -> HelloMessageWithProtocols {
212        self.hello_message.clone()
213    }
214
215    /// Adds an additional protocol handler to the `RLPx` sub-protocol list.
216    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
217        self.extra_protocols.push(protocol)
218    }
219
220    /// Returns the number of currently pending connections.
221    #[inline]
222    pub(crate) fn num_pending_connections(&self) -> usize {
223        self.pending_sessions.len()
224    }
225
226    /// Spawns the given future onto a new task that is tracked in the `spawned_tasks`
227    /// [`JoinSet`](tokio::task::JoinSet).
228    fn spawn<F>(&self, f: F)
229    where
230        F: Future<Output = ()> + Send + 'static,
231    {
232        self.executor.spawn_task(f);
233    }
234
235    /// Invoked on a received status update.
236    ///
237    /// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
238    /// active [`ForkId`]. See also [`ForkFilter::set_head`].
239    pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
240        self.status.blockhash = head.hash;
241        self.status.total_difficulty = Some(head.total_difficulty);
242        let transition = self.fork_filter.set_head(head);
243        self.status.forkid = self.fork_filter.current();
244        self.status.latest_block = Some(head.number);
245
246        transition
247    }
248
249    /// An incoming TCP connection was received. This starts the authentication process to turn this
250    /// stream into an active peer session.
251    ///
252    /// Returns an error if the configured limit has been reached.
253    pub(crate) fn on_incoming(
254        &mut self,
255        stream: TcpStream,
256        remote_addr: SocketAddr,
257    ) -> Result<SessionId, ExceedsSessionLimit> {
258        self.counter.ensure_pending_inbound()?;
259
260        let session_id = self.next_id();
261
262        trace!(
263            target: "net::session",
264            ?remote_addr,
265            ?session_id,
266            "new pending incoming session"
267        );
268
269        let (disconnect_tx, disconnect_rx) = oneshot::channel();
270        let pending_events = self.pending_sessions_tx.clone();
271        let secret_key = self.secret_key;
272        let hello_message = self.hello_message.clone();
273        let status = self.status;
274        let fork_filter = self.fork_filter.clone();
275        let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
276        self.spawn(pending_session_with_timeout(
277            self.pending_session_timeout,
278            session_id,
279            remote_addr,
280            Direction::Incoming,
281            pending_events.clone(),
282            start_pending_incoming_session(
283                self.handshake.clone(),
284                disconnect_rx,
285                session_id,
286                stream,
287                pending_events,
288                remote_addr,
289                secret_key,
290                hello_message,
291                status,
292                fork_filter,
293                extra_handlers,
294            ),
295        ));
296
297        let handle = PendingSessionHandle {
298            disconnect_tx: Some(disconnect_tx),
299            direction: Direction::Incoming,
300        };
301        self.pending_sessions.insert(session_id, handle);
302        self.counter.inc_pending_inbound();
303        Ok(session_id)
304    }
305
306    /// Starts a new pending session from the local node to the given remote node.
307    pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
308        // The error can be dropped because no dial will be made if it would exceed the limit
309        if self.counter.ensure_pending_outbound().is_ok() {
310            let session_id = self.next_id();
311            let (disconnect_tx, disconnect_rx) = oneshot::channel();
312            let pending_events = self.pending_sessions_tx.clone();
313            let secret_key = self.secret_key;
314            let hello_message = self.hello_message.clone();
315            let fork_filter = self.fork_filter.clone();
316            let status = self.status;
317            let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
318            self.spawn(pending_session_with_timeout(
319                self.pending_session_timeout,
320                session_id,
321                remote_addr,
322                Direction::Outgoing(remote_peer_id),
323                pending_events.clone(),
324                start_pending_outbound_session(
325                    self.handshake.clone(),
326                    disconnect_rx,
327                    pending_events,
328                    session_id,
329                    remote_addr,
330                    remote_peer_id,
331                    secret_key,
332                    hello_message,
333                    status,
334                    fork_filter,
335                    extra_handlers,
336                ),
337            ));
338
339            let handle = PendingSessionHandle {
340                disconnect_tx: Some(disconnect_tx),
341                direction: Direction::Outgoing(remote_peer_id),
342            };
343            self.pending_sessions.insert(session_id, handle);
344            self.counter.inc_pending_outbound();
345        }
346    }
347
348    /// Initiates a shutdown of the channel.
349    ///
350    /// This will trigger the disconnect on the session task to gracefully terminate. The result
351    /// will be picked up by the receiver.
352    pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
353        if let Some(session) = self.active_sessions.get(&node) {
354            session.disconnect(reason);
355        }
356    }
357
358    /// Initiates a shutdown of all sessions.
359    ///
360    /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result
361    /// will be picked by the receiver.
362    pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
363        for session in self.active_sessions.values() {
364            session.disconnect(reason);
365        }
366    }
367
368    /// Disconnects all pending sessions.
369    pub fn disconnect_all_pending(&mut self) {
370        for session in self.pending_sessions.values_mut() {
371            session.disconnect();
372        }
373    }
374
375    /// Sends a message to the peer's session
376    pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
377        if let Some(session) = self.active_sessions.get(peer_id) {
378            let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
379                |e| {
380                    if let TrySendError::Full(SessionCommand::Message(msg)) = e {
381                        debug!(
382                            target: "net::session",
383                            ?peer_id,
384                            msg_kind = msg.message_kind(),
385                            items = msg.message_item_count(),
386                            "session command buffer full, dropping message"
387                        );
388                        self.metrics.total_outgoing_peer_messages_dropped.increment(1);
389                    }
390                },
391            );
392        }
393    }
394
395    /// Removes the [`PendingSessionHandle`] if it exists.
396    fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
397        let session = self.pending_sessions.remove(id)?;
398        self.counter.dec_pending(&session.direction);
399        Some(session)
400    }
401
402    /// Removes the [`PendingSessionHandle`] if it exists.
403    fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
404        let session = self.active_sessions.remove(id)?;
405        self.counter.dec_active(&session.direction);
406        Some(session)
407    }
408
409    /// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
410    /// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
411    /// simply drop the incoming connection.
412    pub(crate) fn try_disconnect_incoming_connection(
413        &self,
414        stream: TcpStream,
415        reason: DisconnectReason,
416    ) {
417        if !self.disconnections_counter.has_capacity() {
418            // drop the connection if we don't have capacity for gracefully disconnecting
419            return
420        }
421
422        let guard = self.disconnections_counter.clone();
423        let secret_key = self.secret_key;
424
425        self.spawn(async move {
426            trace!(
427                target: "net::session",
428                "gracefully disconnecting incoming connection"
429            );
430            if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
431                let mut unauth = UnauthedP2PStream::new(stream);
432                let _ = unauth.send_disconnect(reason).await;
433                drop(guard);
434            }
435        });
436    }
437
438    /// This polls all the session handles and returns [`SessionEvent`].
439    ///
440    /// Active sessions are prioritized.
441    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
442        // Poll events from active sessions
443        match self.active_session_rx.poll_next_unpin(cx) {
444            Poll::Pending => {}
445            Poll::Ready(None) => {
446                unreachable!("Manager holds both channel halves.")
447            }
448            Poll::Ready(Some(event)) => {
449                return match event {
450                    ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
451                        trace!(
452                            target: "net::session",
453                            ?peer_id,
454                            "gracefully disconnected active session."
455                        );
456                        self.remove_active_session(&peer_id);
457                        Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
458                    }
459                    ActiveSessionMessage::ClosedOnConnectionError {
460                        peer_id,
461                        remote_addr,
462                        error,
463                    } => {
464                        trace!(target: "net::session", ?peer_id, %error,"closed session.");
465                        self.remove_active_session(&peer_id);
466                        Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
467                            remote_addr,
468                            peer_id,
469                            error,
470                        })
471                    }
472                    ActiveSessionMessage::ValidMessage { peer_id, message } => {
473                        Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
474                    }
475                    ActiveSessionMessage::BadMessage { peer_id } => {
476                        Poll::Ready(SessionEvent::BadMessage { peer_id })
477                    }
478                    ActiveSessionMessage::ProtocolBreach { peer_id } => {
479                        Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
480                    }
481                }
482            }
483        }
484
485        // Poll the pending session event stream
486        let event = match self.pending_session_rx.poll_next_unpin(cx) {
487            Poll::Pending => return Poll::Pending,
488            Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
489            Poll::Ready(Some(event)) => event,
490        };
491        match event {
492            PendingSessionEvent::Established {
493                session_id,
494                remote_addr,
495                local_addr,
496                peer_id,
497                capabilities,
498                conn,
499                status,
500                direction,
501                client_id,
502            } => {
503                // move from pending to established.
504                self.remove_pending_session(&session_id);
505
506                // If there's already a session to the peer then we disconnect right away
507                if self.active_sessions.contains_key(&peer_id) {
508                    trace!(
509                        target: "net::session",
510                        ?session_id,
511                        ?remote_addr,
512                        ?peer_id,
513                        ?direction,
514                        "already connected"
515                    );
516
517                    self.spawn(async move {
518                        // send a disconnect message
519                        let _ =
520                            conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
521                    });
522
523                    return Poll::Ready(SessionEvent::AlreadyConnected {
524                        peer_id,
525                        remote_addr,
526                        direction,
527                    })
528                }
529
530                let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
531
532                let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
533
534                let messages = PeerRequestSender::new(peer_id, to_session_tx);
535
536                let timeout = Arc::new(AtomicU64::new(
537                    self.initial_internal_request_timeout.as_millis() as u64,
538                ));
539
540                // negotiated version
541                let version = conn.version();
542
543                // Configure the interval at which the range information is updated, starting with
544                // ETH69. We use interval_at to delay the first tick, avoiding sending
545                // BlockRangeUpdate immediately after connection (which can cause issues with
546                // peers that don't properly handle the message).
547                let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
548                    let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
549                    let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
550                    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
551                    interval
552                });
553
554                let session = ActiveSession {
555                    next_id: 0,
556                    remote_peer_id: peer_id,
557                    remote_addr,
558                    remote_capabilities: Arc::clone(&capabilities),
559                    session_id,
560                    commands_rx: ReceiverStream::new(commands_rx),
561                    to_session_manager: self.active_session_tx.clone(),
562                    pending_message_to_session: None,
563                    internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
564                    inflight_requests: Default::default(),
565                    conn,
566                    queued_outgoing: QueuedOutgoingMessages::new(
567                        self.metrics.queued_outgoing_messages.clone(),
568                    ),
569                    received_requests_from_remote: Default::default(),
570                    internal_request_timeout_interval: tokio::time::interval(
571                        self.initial_internal_request_timeout,
572                    ),
573                    internal_request_timeout: Arc::clone(&timeout),
574                    protocol_breach_request_timeout: self.protocol_breach_request_timeout,
575                    terminate_message: None,
576                    range_info: None,
577                    local_range_info: self.local_range_info.clone(),
578                    range_update_interval,
579                    last_sent_latest_block: None,
580                };
581
582                self.spawn(session);
583
584                let client_version = client_id.into();
585                let handle = ActiveSessionHandle {
586                    status: status.clone(),
587                    direction,
588                    session_id,
589                    remote_id: peer_id,
590                    version,
591                    established: Instant::now(),
592                    capabilities: Arc::clone(&capabilities),
593                    commands_to_session,
594                    client_version: Arc::clone(&client_version),
595                    remote_addr,
596                    local_addr,
597                };
598
599                self.active_sessions.insert(peer_id, handle);
600                self.counter.inc_active(&direction);
601
602                if direction.is_outgoing() {
603                    self.metrics.total_dial_successes.increment(1);
604                }
605
606                Poll::Ready(SessionEvent::SessionEstablished {
607                    peer_id,
608                    remote_addr,
609                    client_version,
610                    version,
611                    capabilities,
612                    status,
613                    messages,
614                    direction,
615                    timeout,
616                    range_info: None,
617                })
618            }
619            PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
620                trace!(
621                    target: "net::session",
622                    ?session_id,
623                    ?remote_addr,
624                    ?error,
625                    "disconnected pending session"
626                );
627                self.remove_pending_session(&session_id);
628                match direction {
629                    Direction::Incoming => {
630                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
631                            remote_addr,
632                            error,
633                        })
634                    }
635                    Direction::Outgoing(peer_id) => {
636                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
637                            remote_addr,
638                            peer_id,
639                            error,
640                        })
641                    }
642                }
643            }
644            PendingSessionEvent::OutgoingConnectionError {
645                remote_addr,
646                session_id,
647                peer_id,
648                error,
649            } => {
650                trace!(
651                    target: "net::session",
652                    %error,
653                    ?session_id,
654                    ?remote_addr,
655                    ?peer_id,
656                    "connection refused"
657                );
658                self.remove_pending_session(&session_id);
659                Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
660            }
661            PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
662                trace!(
663                    target: "net::session",
664                    %error,
665                    ?session_id,
666                    ?remote_addr,
667                    "ecies auth failed"
668                );
669                self.remove_pending_session(&session_id);
670                match direction {
671                    Direction::Incoming => {
672                        Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
673                            remote_addr,
674                            error: Some(PendingSessionHandshakeError::Ecies(error)),
675                        })
676                    }
677                    Direction::Outgoing(peer_id) => {
678                        Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
679                            remote_addr,
680                            peer_id,
681                            error: Some(PendingSessionHandshakeError::Ecies(error)),
682                        })
683                    }
684                }
685            }
686        }
687    }
688
689    /// Updates the advertised block range that this node can serve to other peers starting with
690    /// Eth69.
691    ///
692    /// This method updates both the local status message that gets sent to peers during handshake
693    /// and the shared local range information that gets propagated to active sessions (Eth69).
694    /// The range information is used in ETH69 protocol where peers announce the range of blocks
695    /// they can serve to optimize data synchronization.
696    pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
697        self.status.earliest_block = Some(block_range_update.earliest);
698        self.status.latest_block = Some(block_range_update.latest);
699        self.status.blockhash = block_range_update.latest_hash;
700
701        // Update the shared local range info that gets propagated to active sessions
702        self.local_range_info.update(
703            block_range_update.earliest,
704            block_range_update.latest,
705            block_range_update.latest_hash,
706        );
707    }
708}
709
710/// A counter for ongoing graceful disconnections attempts.
711#[derive(Default, Debug, Clone)]
712struct DisconnectionsCounter(Arc<()>);
713
714impl DisconnectionsCounter {
715    const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
716
717    /// Returns true if the [`DisconnectionsCounter`] still has capacity
718    /// for an additional graceful disconnection.
719    fn has_capacity(&self) -> bool {
720        Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
721    }
722}
723
724/// Events produced by the [`SessionManager`]
725#[derive(Debug)]
726pub enum SessionEvent<N: NetworkPrimitives> {
727    /// A new session was successfully authenticated.
728    ///
729    /// This session is now able to exchange data.
730    SessionEstablished {
731        /// The remote node's public key
732        peer_id: PeerId,
733        /// The remote node's socket address
734        remote_addr: SocketAddr,
735        /// The user agent of the remote node, usually containing the client name and version
736        client_version: Arc<str>,
737        /// The capabilities the remote node has announced
738        capabilities: Arc<Capabilities>,
739        /// negotiated eth version
740        version: EthVersion,
741        /// The Status message the peer sent during the `eth` handshake
742        status: Arc<UnifiedStatus>,
743        /// The channel for sending messages to the peer with the session
744        messages: PeerRequestSender<PeerRequest<N>>,
745        /// The direction of the session, either `Inbound` or `Outgoing`
746        direction: Direction,
747        /// The maximum time that the session waits for a response from the peer before timing out
748        /// the connection
749        timeout: Arc<AtomicU64>,
750        /// The range info for the peer.
751        range_info: Option<BlockRangeInfo>,
752    },
753    /// The peer was already connected with another session.
754    AlreadyConnected {
755        /// The remote node's public key
756        peer_id: PeerId,
757        /// The remote node's socket address
758        remote_addr: SocketAddr,
759        /// The direction of the session, either `Inbound` or `Outgoing`
760        direction: Direction,
761    },
762    /// A session received a valid message via `RLPx`.
763    ValidMessage {
764        /// The remote node's public key
765        peer_id: PeerId,
766        /// Message received from the peer.
767        message: PeerMessage<N>,
768    },
769    /// Received a bad message from the peer.
770    BadMessage {
771        /// Identifier of the remote peer.
772        peer_id: PeerId,
773    },
774    /// Remote peer is considered in protocol violation
775    ProtocolBreach {
776        /// Identifier of the remote peer.
777        peer_id: PeerId,
778    },
779    /// Closed an incoming pending session during handshaking.
780    IncomingPendingSessionClosed {
781        /// The remote node's socket address
782        remote_addr: SocketAddr,
783        /// The pending handshake session error that caused the session to close
784        error: Option<PendingSessionHandshakeError>,
785    },
786    /// Closed an outgoing pending session during handshaking.
787    OutgoingPendingSessionClosed {
788        /// The remote node's socket address
789        remote_addr: SocketAddr,
790        /// The remote node's public key
791        peer_id: PeerId,
792        /// The pending handshake session error that caused the session to close
793        error: Option<PendingSessionHandshakeError>,
794    },
795    /// Failed to establish a tcp stream
796    OutgoingConnectionError {
797        /// The remote node's socket address
798        remote_addr: SocketAddr,
799        /// The remote node's public key
800        peer_id: PeerId,
801        /// The error that caused the outgoing connection to fail
802        error: io::Error,
803    },
804    /// Session was closed due to an error
805    SessionClosedOnConnectionError {
806        /// The id of the remote peer.
807        peer_id: PeerId,
808        /// The socket we were connected to.
809        remote_addr: SocketAddr,
810        /// The error that caused the session to close
811        error: EthStreamError,
812    },
813    /// Active session was gracefully disconnected.
814    Disconnected {
815        /// The remote node's public key
816        peer_id: PeerId,
817        /// The remote node's socket address that we were connected to
818        remote_addr: SocketAddr,
819    },
820}
821
822/// Errors that can occur during handshaking/authenticating the underlying streams.
823#[derive(Debug, thiserror::Error)]
824pub enum PendingSessionHandshakeError {
825    /// The pending session failed due to an error while establishing the `eth` stream
826    #[error(transparent)]
827    Eth(EthStreamError),
828    /// The pending session failed due to an error while establishing the ECIES stream
829    #[error(transparent)]
830    Ecies(ECIESError),
831    /// Thrown when the authentication timed out
832    #[error("authentication timed out")]
833    Timeout,
834    /// Thrown when the remote lacks the required capability
835    #[error("Mandatory extra capability unsupported")]
836    UnsupportedExtraCapability,
837}
838
839impl PendingSessionHandshakeError {
840    /// Returns the [`DisconnectReason`] if the error is a disconnect message
841    pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
842        match self {
843            Self::Eth(eth_err) => eth_err.as_disconnected(),
844            _ => None,
845        }
846    }
847}
848
849/// The error thrown when the max configured limit has been reached and no more connections are
850/// accepted.
851#[derive(Debug, Clone, thiserror::Error)]
852#[error("session limit reached {0}")]
853pub struct ExceedsSessionLimit(pub(crate) u32);
854
855/// Starts a pending session authentication with a timeout.
856pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
857    timeout: Duration,
858    session_id: SessionId,
859    remote_addr: SocketAddr,
860    direction: Direction,
861    events: mpsc::Sender<PendingSessionEvent<N>>,
862    f: F,
863) where
864    F: Future<Output = ()>,
865{
866    if tokio::time::timeout(timeout, f).await.is_err() {
867        trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
868        let event = PendingSessionEvent::Disconnected {
869            remote_addr,
870            session_id,
871            direction,
872            error: Some(PendingSessionHandshakeError::Timeout),
873        };
874        let _ = events.send(event).await;
875    }
876}
877
878/// Starts the authentication process for a connection initiated by a remote peer.
879///
880/// This will wait for the _incoming_ handshake request and answer it.
881#[expect(clippy::too_many_arguments)]
882pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
883    handshake: Arc<dyn EthRlpxHandshake>,
884    disconnect_rx: oneshot::Receiver<()>,
885    session_id: SessionId,
886    stream: TcpStream,
887    events: mpsc::Sender<PendingSessionEvent<N>>,
888    remote_addr: SocketAddr,
889    secret_key: SecretKey,
890    hello: HelloMessageWithProtocols,
891    status: UnifiedStatus,
892    fork_filter: ForkFilter,
893    extra_handlers: RlpxSubProtocolHandlers,
894) {
895    authenticate(
896        handshake,
897        disconnect_rx,
898        events,
899        stream,
900        session_id,
901        remote_addr,
902        secret_key,
903        Direction::Incoming,
904        hello,
905        status,
906        fork_filter,
907        extra_handlers,
908    )
909    .await
910}
911
912/// Starts the authentication process for a connection initiated by a remote peer.
913#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
914#[expect(clippy::too_many_arguments)]
915async fn start_pending_outbound_session<N: NetworkPrimitives>(
916    handshake: Arc<dyn EthRlpxHandshake>,
917    disconnect_rx: oneshot::Receiver<()>,
918    events: mpsc::Sender<PendingSessionEvent<N>>,
919    session_id: SessionId,
920    remote_addr: SocketAddr,
921    remote_peer_id: PeerId,
922    secret_key: SecretKey,
923    hello: HelloMessageWithProtocols,
924    status: UnifiedStatus,
925    fork_filter: ForkFilter,
926    extra_handlers: RlpxSubProtocolHandlers,
927) {
928    let stream = match TcpStream::connect(remote_addr).await {
929        Ok(stream) => {
930            if let Err(err) = stream.set_nodelay(true) {
931                tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
932            }
933            stream
934        }
935        Err(error) => {
936            let _ = events
937                .send(PendingSessionEvent::OutgoingConnectionError {
938                    remote_addr,
939                    session_id,
940                    peer_id: remote_peer_id,
941                    error,
942                })
943                .await;
944            return
945        }
946    };
947    authenticate(
948        handshake,
949        disconnect_rx,
950        events,
951        stream,
952        session_id,
953        remote_addr,
954        secret_key,
955        Direction::Outgoing(remote_peer_id),
956        hello,
957        status,
958        fork_filter,
959        extra_handlers,
960    )
961    .await
962}
963
964/// Authenticates a session
965#[expect(clippy::too_many_arguments)]
966async fn authenticate<N: NetworkPrimitives>(
967    handshake: Arc<dyn EthRlpxHandshake>,
968    disconnect_rx: oneshot::Receiver<()>,
969    events: mpsc::Sender<PendingSessionEvent<N>>,
970    stream: TcpStream,
971    session_id: SessionId,
972    remote_addr: SocketAddr,
973    secret_key: SecretKey,
974    direction: Direction,
975    hello: HelloMessageWithProtocols,
976    status: UnifiedStatus,
977    fork_filter: ForkFilter,
978    extra_handlers: RlpxSubProtocolHandlers,
979) {
980    let local_addr = stream.local_addr().ok();
981    let stream = match get_ecies_stream(stream, secret_key, direction).await {
982        Ok(stream) => stream,
983        Err(error) => {
984            let _ = events
985                .send(PendingSessionEvent::EciesAuthError {
986                    remote_addr,
987                    session_id,
988                    error,
989                    direction,
990                })
991                .await;
992            return
993        }
994    };
995
996    let unauthed = UnauthedP2PStream::new(stream);
997
998    let auth = authenticate_stream(
999        handshake,
1000        unauthed,
1001        session_id,
1002        remote_addr,
1003        local_addr,
1004        direction,
1005        hello,
1006        status,
1007        fork_filter,
1008        extra_handlers,
1009    )
1010    .boxed();
1011
1012    match futures::future::select(disconnect_rx, auth).await {
1013        Either::Left((_, _)) => {
1014            let _ = events
1015                .send(PendingSessionEvent::Disconnected {
1016                    remote_addr,
1017                    session_id,
1018                    direction,
1019                    error: None,
1020                })
1021                .await;
1022        }
1023        Either::Right((res, _)) => {
1024            let _ = events.send(res).await;
1025        }
1026    }
1027}
1028
1029/// Returns an [`ECIESStream`] if it can be built. If not, send a
1030/// [`PendingSessionEvent::EciesAuthError`] and returns `None`
1031async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1032    stream: Io,
1033    secret_key: SecretKey,
1034    direction: Direction,
1035) -> Result<ECIESStream<Io>, ECIESError> {
1036    match direction {
1037        Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1038        Direction::Outgoing(remote_peer_id) => {
1039            ECIESStream::connect(stream, secret_key, remote_peer_id).await
1040        }
1041    }
1042}
1043
1044/// Authenticate the stream via handshake
1045///
1046/// On Success return the authenticated stream as [`PendingSessionEvent`].
1047///
1048/// If additional [`RlpxSubProtocolHandlers`] are provided, the hello message will be updated to
1049/// also negotiate the additional protocols.
1050#[expect(clippy::too_many_arguments)]
1051async fn authenticate_stream<N: NetworkPrimitives>(
1052    handshake: Arc<dyn EthRlpxHandshake>,
1053    stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1054    session_id: SessionId,
1055    remote_addr: SocketAddr,
1056    local_addr: Option<SocketAddr>,
1057    direction: Direction,
1058    mut hello: HelloMessageWithProtocols,
1059    mut status: UnifiedStatus,
1060    fork_filter: ForkFilter,
1061    mut extra_handlers: RlpxSubProtocolHandlers,
1062) -> PendingSessionEvent<N> {
1063    // Add extra protocols to the hello message
1064    extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1065
1066    // conduct the p2p rlpx handshake and return the rlpx authenticated stream
1067    let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1068        Ok(stream_res) => stream_res,
1069        Err(err) => {
1070            return PendingSessionEvent::Disconnected {
1071                remote_addr,
1072                session_id,
1073                direction,
1074                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1075            }
1076        }
1077    };
1078
1079    // if we have extra handlers, check if it must be supported by the remote
1080    if !extra_handlers.is_empty() {
1081        // ensure that no extra handlers that aren't supported are not mandatory
1082        while let Some(pos) = extra_handlers.iter().position(|handler| {
1083            p2p_stream
1084                .shared_capabilities()
1085                .ensure_matching_capability(&handler.protocol().cap)
1086                .is_err()
1087        }) {
1088            let handler = extra_handlers.remove(pos);
1089            if handler.on_unsupported_by_peer(
1090                p2p_stream.shared_capabilities(),
1091                direction,
1092                their_hello.id,
1093            ) == OnNotSupported::Disconnect
1094            {
1095                return PendingSessionEvent::Disconnected {
1096                    remote_addr,
1097                    session_id,
1098                    direction,
1099                    error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1100                };
1101            }
1102        }
1103    }
1104
1105    // Ensure we negotiated mandatory eth protocol
1106    let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1107        Ok(version) => version,
1108        Err(err) => {
1109            return PendingSessionEvent::Disconnected {
1110                remote_addr,
1111                session_id,
1112                direction,
1113                error: Some(PendingSessionHandshakeError::Eth(err.into())),
1114            }
1115        }
1116    };
1117
1118    // Before trying status handshake, set up the version to negotiated shared version
1119    status.set_eth_version(eth_version);
1120
1121    let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1122        // if the shared caps are 1, we know both support the eth version
1123        // if the hello handshake was successful we can try status handshake
1124
1125        // perform the eth protocol handshake
1126        match handshake
1127            .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1128            .await
1129        {
1130            Ok(their_status) => {
1131                let eth_stream = EthStream::new(eth_version, p2p_stream);
1132                (eth_stream.into(), their_status)
1133            }
1134            Err(err) => {
1135                return PendingSessionEvent::Disconnected {
1136                    remote_addr,
1137                    session_id,
1138                    direction,
1139                    error: Some(PendingSessionHandshakeError::Eth(err)),
1140                }
1141            }
1142        }
1143    } else {
1144        // Multiplex the stream with the extra protocols
1145        let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1146
1147        // install additional handlers
1148        for handler in extra_handlers.into_iter() {
1149            let cap = handler.protocol().cap;
1150            let remote_peer_id = their_hello.id;
1151
1152            multiplex_stream
1153                .install_protocol(&cap, move |conn| {
1154                    handler.into_connection(direction, remote_peer_id, conn)
1155                })
1156                .ok();
1157        }
1158
1159        let (multiplex_stream, their_status) = match multiplex_stream
1160            .into_eth_satellite_stream(status, fork_filter, handshake)
1161            .await
1162        {
1163            Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1164            Err(err) => {
1165                return PendingSessionEvent::Disconnected {
1166                    remote_addr,
1167                    session_id,
1168                    direction,
1169                    error: Some(PendingSessionHandshakeError::Eth(err)),
1170                }
1171            }
1172        };
1173
1174        (multiplex_stream.into(), their_status)
1175    };
1176
1177    PendingSessionEvent::Established {
1178        session_id,
1179        remote_addr,
1180        local_addr,
1181        peer_id: their_hello.id,
1182        capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1183        status: Arc::new(their_status),
1184        conn,
1185        direction,
1186        client_id: their_hello.client_version,
1187    }
1188}