1mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11 message::PeerMessage,
12 metrics::SessionManagerMetrics,
13 protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14 session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21 errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22 BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23 HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24 HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35 collections::HashMap,
36 future::Future,
37 net::SocketAddr,
38 sync::{atomic::AtomicU64, Arc},
39 task::{Context, Poll},
40 time::{Duration, Instant},
41};
42use tokio::{
43 io::{AsyncRead, AsyncWrite},
44 net::TcpStream,
45 sync::{mpsc, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{instrument, trace};
50
51use crate::session::active::{BroadcastItemCounter, RANGE_UPDATE_INTERVAL};
52pub use conn::EthRlpxConnection;
53use handle::SessionCommandSender;
54pub use handle::{
55 ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
56 SessionCommand,
57};
58pub use reth_network_api::{Direction, PeerInfo};
59
60#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
62pub struct SessionId(usize);
63
64#[must_use = "Session Manager must be polled to process session events."]
66#[derive(Debug)]
67pub struct SessionManager<N: NetworkPrimitives> {
68 next_id: usize,
70 counter: SessionCounter,
72 initial_internal_request_timeout: Duration,
75 protocol_breach_request_timeout: Duration,
78 pending_session_timeout: Duration,
80 secret_key: SecretKey,
82 status: UnifiedStatus,
84 hello_message: HelloMessageWithProtocols,
86 fork_filter: ForkFilter,
88 session_command_buffer: usize,
90 executor: Runtime,
92 pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
97 active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
99 pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
104 pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
106 active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
111 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
113 extra_protocols: RlpxSubProtocols,
115 disconnections_counter: DisconnectionsCounter,
117 metrics: SessionManagerMetrics,
119 handshake: Arc<dyn EthRlpxHandshake>,
121 eth_max_message_size: usize,
123 local_range_info: BlockRangeInfo,
126}
127
128impl<N: NetworkPrimitives> SessionManager<N> {
131 #[expect(clippy::too_many_arguments)]
133 pub fn new(
134 secret_key: SecretKey,
135 config: SessionsConfig,
136 executor: Runtime,
137 status: UnifiedStatus,
138 hello_message: HelloMessageWithProtocols,
139 fork_filter: ForkFilter,
140 extra_protocols: RlpxSubProtocols,
141 handshake: Arc<dyn EthRlpxHandshake>,
142 eth_max_message_size: usize,
143 ) -> Self {
144 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
145 let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
146 let active_session_tx = PollSender::new(active_session_tx);
147
148 let local_range_info = BlockRangeInfo::new(
150 status.earliest_block.unwrap_or_default(),
151 status.latest_block.unwrap_or_default(),
152 status.blockhash,
153 );
154
155 Self {
156 next_id: 0,
157 counter: SessionCounter::new(config.limits),
158 initial_internal_request_timeout: config.initial_internal_request_timeout,
159 protocol_breach_request_timeout: config.protocol_breach_request_timeout,
160 pending_session_timeout: config.pending_session_timeout,
161 secret_key,
162 status,
163 hello_message,
164 fork_filter,
165 session_command_buffer: config.session_command_buffer,
166 executor,
167 pending_sessions: Default::default(),
168 active_sessions: Default::default(),
169 pending_sessions_tx,
170 pending_session_rx: ReceiverStream::new(pending_sessions_rx),
171 active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
172 active_session_rx: ReceiverStream::new(active_session_rx),
173 extra_protocols,
174 disconnections_counter: Default::default(),
175 metrics: Default::default(),
176 handshake,
177 eth_max_message_size,
178 local_range_info,
179 }
180 }
181
182 pub(crate) const fn fork_id(&self) -> ForkId {
184 self.fork_filter.current()
185 }
186
187 pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
190 self.fork_filter.validate(fork_id).is_ok()
191 }
192
193 const fn next_id(&mut self) -> SessionId {
195 let id = self.next_id;
196 self.next_id += 1;
197 SessionId(id)
198 }
199
200 pub const fn status(&self) -> UnifiedStatus {
202 self.status
203 }
204
205 pub const fn secret_key(&self) -> SecretKey {
207 self.secret_key
208 }
209
210 pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
212 &self.active_sessions
213 }
214
215 pub fn hello_message(&self) -> HelloMessageWithProtocols {
217 self.hello_message.clone()
218 }
219
220 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
222 self.extra_protocols.push(protocol)
223 }
224
225 #[inline]
227 pub(crate) fn num_pending_connections(&self) -> usize {
228 self.pending_sessions.len()
229 }
230
231 fn spawn<F>(&self, f: F)
234 where
235 F: Future<Output = ()> + Send + 'static,
236 {
237 self.executor.spawn_task(f);
238 }
239
240 pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
245 self.status.blockhash = head.hash;
246 self.status.total_difficulty = Some(head.total_difficulty);
247 let transition = self.fork_filter.set_head(head);
248 self.status.forkid = self.fork_filter.current();
249 self.status.latest_block = Some(head.number);
250
251 transition
252 }
253
254 pub(crate) fn on_incoming(
259 &mut self,
260 stream: TcpStream,
261 remote_addr: SocketAddr,
262 ) -> Result<SessionId, ExceedsSessionLimit> {
263 self.counter.ensure_pending_inbound()?;
264
265 let session_id = self.next_id();
266
267 trace!(
268 target: "net::session",
269 ?remote_addr,
270 ?session_id,
271 "new pending incoming session"
272 );
273
274 let (disconnect_tx, disconnect_rx) = oneshot::channel();
275 let pending_events = self.pending_sessions_tx.clone();
276 let secret_key = self.secret_key;
277 let hello_message = self.hello_message.clone();
278 let status = self.status;
279 let fork_filter = self.fork_filter.clone();
280 let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
281 self.spawn(pending_session_with_timeout(
282 self.pending_session_timeout,
283 session_id,
284 remote_addr,
285 Direction::Incoming,
286 pending_events.clone(),
287 start_pending_incoming_session(
288 self.handshake.clone(),
289 self.eth_max_message_size,
290 disconnect_rx,
291 session_id,
292 stream,
293 pending_events,
294 remote_addr,
295 secret_key,
296 hello_message,
297 status,
298 fork_filter,
299 extra_handlers,
300 ),
301 ));
302
303 let handle = PendingSessionHandle {
304 disconnect_tx: Some(disconnect_tx),
305 direction: Direction::Incoming,
306 };
307 self.pending_sessions.insert(session_id, handle);
308 self.counter.inc_pending_inbound();
309 Ok(session_id)
310 }
311
312 pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
314 if self.counter.ensure_pending_outbound().is_ok() {
316 let session_id = self.next_id();
317 let (disconnect_tx, disconnect_rx) = oneshot::channel();
318 let pending_events = self.pending_sessions_tx.clone();
319 let secret_key = self.secret_key;
320 let hello_message = self.hello_message.clone();
321 let fork_filter = self.fork_filter.clone();
322 let status = self.status;
323 let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
324 self.spawn(pending_session_with_timeout(
325 self.pending_session_timeout,
326 session_id,
327 remote_addr,
328 Direction::Outgoing(remote_peer_id),
329 pending_events.clone(),
330 start_pending_outbound_session(
331 self.handshake.clone(),
332 self.eth_max_message_size,
333 disconnect_rx,
334 pending_events,
335 session_id,
336 remote_addr,
337 remote_peer_id,
338 secret_key,
339 hello_message,
340 status,
341 fork_filter,
342 extra_handlers,
343 ),
344 ));
345
346 let handle = PendingSessionHandle {
347 disconnect_tx: Some(disconnect_tx),
348 direction: Direction::Outgoing(remote_peer_id),
349 };
350 self.pending_sessions.insert(session_id, handle);
351 self.counter.inc_pending_outbound();
352 }
353 }
354
355 pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
360 if let Some(session) = self.active_sessions.get(&node) {
361 session.disconnect(reason);
362 }
363 }
364
365 pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
370 for session in self.active_sessions.values() {
371 session.disconnect(reason);
372 }
373 }
374
375 pub fn disconnect_all_pending(&mut self) {
377 for session in self.pending_sessions.values_mut() {
378 session.disconnect();
379 }
380 }
381
382 pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
389 if let Some(session) = self.active_sessions.get(peer_id) &&
390 !session.commands.send_message(msg)
391 {
392 self.metrics.total_outgoing_peer_messages_dropped.increment(1);
393 }
394 }
395
396 fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
398 let session = self.pending_sessions.remove(id)?;
399 self.counter.dec_pending(&session.direction);
400 Some(session)
401 }
402
403 fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
405 let session = self.active_sessions.remove(id)?;
406 self.counter.dec_active(&session.direction);
407 Some(session)
408 }
409
410 pub(crate) fn try_disconnect_incoming_connection(
414 &self,
415 stream: TcpStream,
416 reason: DisconnectReason,
417 ) {
418 if !self.disconnections_counter.has_capacity() {
419 return
421 }
422
423 let guard = self.disconnections_counter.clone();
424 let secret_key = self.secret_key;
425
426 self.spawn(async move {
427 trace!(
428 target: "net::session",
429 "gracefully disconnecting incoming connection"
430 );
431 if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
432 let mut unauth = UnauthedP2PStream::new(stream);
433 let _ = unauth.send_disconnect(reason).await;
434 drop(guard);
435 }
436 });
437 }
438
439 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
443 match self.active_session_rx.poll_next_unpin(cx) {
445 Poll::Pending => {}
446 Poll::Ready(None) => {
447 unreachable!("Manager holds both channel halves.")
448 }
449 Poll::Ready(Some(event)) => {
450 return match event {
451 ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
452 trace!(
453 target: "net::session",
454 ?peer_id,
455 "gracefully disconnected active session."
456 );
457 self.remove_active_session(&peer_id);
458 Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
459 }
460 ActiveSessionMessage::ClosedOnConnectionError {
461 peer_id,
462 remote_addr,
463 error,
464 } => {
465 trace!(target: "net::session", ?peer_id, %error,"closed session.");
466 self.remove_active_session(&peer_id);
467 Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
468 remote_addr,
469 peer_id,
470 error,
471 })
472 }
473 ActiveSessionMessage::ValidMessage { peer_id, message } => {
474 Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
475 }
476 ActiveSessionMessage::BadMessage { peer_id } => {
477 Poll::Ready(SessionEvent::BadMessage { peer_id })
478 }
479 ActiveSessionMessage::ProtocolBreach { peer_id } => {
480 Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
481 }
482 }
483 }
484 }
485
486 let event = match self.pending_session_rx.poll_next_unpin(cx) {
488 Poll::Pending => return Poll::Pending,
489 Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
490 Poll::Ready(Some(event)) => event,
491 };
492 match event {
493 PendingSessionEvent::Established {
494 session_id,
495 remote_addr,
496 local_addr,
497 peer_id,
498 capabilities,
499 conn,
500 status,
501 direction,
502 client_id,
503 } => {
504 self.remove_pending_session(&session_id);
506
507 if self.active_sessions.contains_key(&peer_id) {
509 trace!(
510 target: "net::session",
511 ?session_id,
512 ?remote_addr,
513 ?peer_id,
514 ?direction,
515 "already connected"
516 );
517
518 self.spawn(async move {
519 let _ =
521 conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
522 });
523
524 return Poll::Ready(SessionEvent::AlreadyConnected {
525 peer_id,
526 remote_addr,
527 direction,
528 })
529 }
530
531 let (commands_tx, commands_rx) = mpsc::channel(self.session_command_buffer);
532 let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
533
534 let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
535
536 let messages = PeerRequestSender::new(peer_id, to_session_tx);
537
538 let timeout = Arc::new(AtomicU64::new(
539 self.initial_internal_request_timeout.as_millis() as u64,
540 ));
541
542 let version = conn.version();
544
545 let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
550 let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
551 let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
552 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
553 interval
554 });
555
556 let broadcast_items = BroadcastItemCounter::new();
562 let remote_range_info = status.block_range_update().map(|update| {
563 BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash)
564 });
565
566 let session = ActiveSession {
567 next_id: 0,
568 remote_peer_id: peer_id,
569 remote_addr,
570 remote_capabilities: Arc::clone(&capabilities),
571 session_id,
572 commands_rx: ReceiverStream::new(commands_rx),
573 unbounded_rx,
574 unbounded_broadcast_msgs: self.metrics.total_unbounded_broadcast_msgs.clone(),
575 to_session_manager: self.active_session_tx.clone(),
576 pending_message_to_session: None,
577 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
578 inflight_requests: Default::default(),
579 conn,
580 queued_outgoing: QueuedOutgoingMessages::new(
581 self.metrics.queued_outgoing_messages.clone(),
582 broadcast_items.clone(),
583 ),
584 received_requests_from_remote: Default::default(),
585 internal_request_timeout_interval: tokio::time::interval(
586 self.initial_internal_request_timeout,
587 ),
588 internal_request_timeout: Arc::clone(&timeout),
589 protocol_breach_request_timeout: self.protocol_breach_request_timeout,
590 terminate_message: None,
591 range_info: remote_range_info.clone(),
592 local_range_info: self.local_range_info.clone(),
593 range_update_interval,
594 last_sent_latest_block: None,
595 };
596
597 self.spawn(session);
598
599 let client_version = client_id.into();
600 let handle = ActiveSessionHandle {
601 status: status.clone(),
602 direction,
603 session_id,
604 remote_id: peer_id,
605 version,
606 established: Instant::now(),
607 capabilities: Arc::clone(&capabilities),
608 commands: SessionCommandSender::new(commands_tx, unbounded_tx, broadcast_items),
609 client_version: Arc::clone(&client_version),
610 remote_addr,
611 local_addr,
612 };
613
614 self.active_sessions.insert(peer_id, handle);
615 self.counter.inc_active(&direction);
616
617 if direction.is_outgoing() {
618 self.metrics.total_dial_successes.increment(1);
619 }
620
621 Poll::Ready(SessionEvent::SessionEstablished {
622 peer_id,
623 remote_addr,
624 client_version,
625 version,
626 capabilities,
627 status,
628 messages,
629 direction,
630 timeout,
631 range_info: remote_range_info,
632 })
633 }
634 PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
635 trace!(
636 target: "net::session",
637 ?session_id,
638 ?remote_addr,
639 ?error,
640 "disconnected pending session"
641 );
642 self.remove_pending_session(&session_id);
643 match direction {
644 Direction::Incoming => {
645 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
646 remote_addr,
647 error,
648 })
649 }
650 Direction::Outgoing(peer_id) => {
651 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
652 remote_addr,
653 peer_id,
654 error,
655 })
656 }
657 }
658 }
659 PendingSessionEvent::OutgoingConnectionError {
660 remote_addr,
661 session_id,
662 peer_id,
663 error,
664 } => {
665 trace!(
666 target: "net::session",
667 %error,
668 ?session_id,
669 ?remote_addr,
670 ?peer_id,
671 "connection refused"
672 );
673 self.remove_pending_session(&session_id);
674 Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
675 }
676 PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
677 trace!(
678 target: "net::session",
679 %error,
680 ?session_id,
681 ?remote_addr,
682 "ecies auth failed"
683 );
684 self.remove_pending_session(&session_id);
685 match direction {
686 Direction::Incoming => {
687 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
688 remote_addr,
689 error: Some(PendingSessionHandshakeError::Ecies(error)),
690 })
691 }
692 Direction::Outgoing(peer_id) => {
693 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
694 remote_addr,
695 peer_id,
696 error: Some(PendingSessionHandshakeError::Ecies(error)),
697 })
698 }
699 }
700 }
701 }
702 }
703
704 pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
712 self.status.earliest_block = Some(block_range_update.earliest);
713 self.status.latest_block = Some(block_range_update.latest);
714 self.status.blockhash = block_range_update.latest_hash;
715
716 self.local_range_info.update(
718 block_range_update.earliest,
719 block_range_update.latest,
720 block_range_update.latest_hash,
721 );
722 }
723}
724
725#[derive(Default, Debug, Clone)]
727struct DisconnectionsCounter(Arc<()>);
728
729impl DisconnectionsCounter {
730 const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
731
732 fn has_capacity(&self) -> bool {
735 Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
736 }
737}
738
739#[derive(Debug)]
741pub enum SessionEvent<N: NetworkPrimitives> {
742 SessionEstablished {
746 peer_id: PeerId,
748 remote_addr: SocketAddr,
750 client_version: Arc<str>,
752 capabilities: Arc<Capabilities>,
754 version: EthVersion,
756 status: Arc<UnifiedStatus>,
758 messages: PeerRequestSender<PeerRequest<N>>,
760 direction: Direction,
762 timeout: Arc<AtomicU64>,
765 range_info: Option<BlockRangeInfo>,
767 },
768 AlreadyConnected {
770 peer_id: PeerId,
772 remote_addr: SocketAddr,
774 direction: Direction,
776 },
777 ValidMessage {
779 peer_id: PeerId,
781 message: PeerMessage<N>,
783 },
784 BadMessage {
786 peer_id: PeerId,
788 },
789 ProtocolBreach {
791 peer_id: PeerId,
793 },
794 IncomingPendingSessionClosed {
796 remote_addr: SocketAddr,
798 error: Option<PendingSessionHandshakeError>,
800 },
801 OutgoingPendingSessionClosed {
803 remote_addr: SocketAddr,
805 peer_id: PeerId,
807 error: Option<PendingSessionHandshakeError>,
809 },
810 OutgoingConnectionError {
812 remote_addr: SocketAddr,
814 peer_id: PeerId,
816 error: io::Error,
818 },
819 SessionClosedOnConnectionError {
821 peer_id: PeerId,
823 remote_addr: SocketAddr,
825 error: EthStreamError,
827 },
828 Disconnected {
830 peer_id: PeerId,
832 remote_addr: SocketAddr,
834 },
835}
836
837#[derive(Debug, thiserror::Error)]
839pub enum PendingSessionHandshakeError {
840 #[error(transparent)]
842 Eth(EthStreamError),
843 #[error(transparent)]
845 Ecies(ECIESError),
846 #[error("authentication timed out")]
848 Timeout,
849 #[error("Mandatory extra capability unsupported")]
851 UnsupportedExtraCapability,
852}
853
854impl PendingSessionHandshakeError {
855 pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
857 match self {
858 Self::Eth(eth_err) => eth_err.as_disconnected(),
859 _ => None,
860 }
861 }
862}
863
864#[derive(Debug, Clone, thiserror::Error)]
867#[error("session limit reached {0}")]
868pub struct ExceedsSessionLimit(pub(crate) u32);
869
870pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
872 timeout: Duration,
873 session_id: SessionId,
874 remote_addr: SocketAddr,
875 direction: Direction,
876 events: mpsc::Sender<PendingSessionEvent<N>>,
877 f: F,
878) where
879 F: Future<Output = ()>,
880{
881 if tokio::time::timeout(timeout, f).await.is_err() {
882 trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
883 let event = PendingSessionEvent::Disconnected {
884 remote_addr,
885 session_id,
886 direction,
887 error: Some(PendingSessionHandshakeError::Timeout),
888 };
889 let _ = events.send(event).await;
890 }
891}
892
893#[expect(clippy::too_many_arguments)]
897pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
898 handshake: Arc<dyn EthRlpxHandshake>,
899 eth_max_message_size: usize,
900 disconnect_rx: oneshot::Receiver<()>,
901 session_id: SessionId,
902 stream: TcpStream,
903 events: mpsc::Sender<PendingSessionEvent<N>>,
904 remote_addr: SocketAddr,
905 secret_key: SecretKey,
906 hello: HelloMessageWithProtocols,
907 status: UnifiedStatus,
908 fork_filter: ForkFilter,
909 extra_handlers: RlpxSubProtocolHandlers,
910) {
911 authenticate(
912 handshake,
913 eth_max_message_size,
914 disconnect_rx,
915 events,
916 stream,
917 session_id,
918 remote_addr,
919 secret_key,
920 Direction::Incoming,
921 hello,
922 status,
923 fork_filter,
924 extra_handlers,
925 )
926 .await
927}
928
929#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
931#[expect(clippy::too_many_arguments)]
932async fn start_pending_outbound_session<N: NetworkPrimitives>(
933 handshake: Arc<dyn EthRlpxHandshake>,
934 eth_max_message_size: usize,
935 disconnect_rx: oneshot::Receiver<()>,
936 events: mpsc::Sender<PendingSessionEvent<N>>,
937 session_id: SessionId,
938 remote_addr: SocketAddr,
939 remote_peer_id: PeerId,
940 secret_key: SecretKey,
941 hello: HelloMessageWithProtocols,
942 status: UnifiedStatus,
943 fork_filter: ForkFilter,
944 extra_handlers: RlpxSubProtocolHandlers,
945) {
946 let stream = match TcpStream::connect(remote_addr).await {
947 Ok(stream) => {
948 if let Err(err) = stream.set_nodelay(true) {
949 tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
950 }
951 stream
952 }
953 Err(error) => {
954 let _ = events
955 .send(PendingSessionEvent::OutgoingConnectionError {
956 remote_addr,
957 session_id,
958 peer_id: remote_peer_id,
959 error,
960 })
961 .await;
962 return
963 }
964 };
965 authenticate(
966 handshake,
967 eth_max_message_size,
968 disconnect_rx,
969 events,
970 stream,
971 session_id,
972 remote_addr,
973 secret_key,
974 Direction::Outgoing(remote_peer_id),
975 hello,
976 status,
977 fork_filter,
978 extra_handlers,
979 )
980 .await
981}
982
983#[expect(clippy::too_many_arguments)]
985async fn authenticate<N: NetworkPrimitives>(
986 handshake: Arc<dyn EthRlpxHandshake>,
987 eth_max_message_size: usize,
988 disconnect_rx: oneshot::Receiver<()>,
989 events: mpsc::Sender<PendingSessionEvent<N>>,
990 stream: TcpStream,
991 session_id: SessionId,
992 remote_addr: SocketAddr,
993 secret_key: SecretKey,
994 direction: Direction,
995 hello: HelloMessageWithProtocols,
996 status: UnifiedStatus,
997 fork_filter: ForkFilter,
998 extra_handlers: RlpxSubProtocolHandlers,
999) {
1000 let local_addr = stream.local_addr().ok();
1001 let stream = match get_ecies_stream(stream, secret_key, direction).await {
1002 Ok(stream) => stream,
1003 Err(error) => {
1004 let _ = events
1005 .send(PendingSessionEvent::EciesAuthError {
1006 remote_addr,
1007 session_id,
1008 error,
1009 direction,
1010 })
1011 .await;
1012 return
1013 }
1014 };
1015
1016 let unauthed = UnauthedP2PStream::new(stream);
1017
1018 let auth = authenticate_stream(
1019 handshake,
1020 eth_max_message_size,
1021 unauthed,
1022 session_id,
1023 remote_addr,
1024 local_addr,
1025 direction,
1026 hello,
1027 status,
1028 fork_filter,
1029 extra_handlers,
1030 )
1031 .boxed();
1032
1033 match futures::future::select(disconnect_rx, auth).await {
1034 Either::Left((_, _)) => {
1035 let _ = events
1036 .send(PendingSessionEvent::Disconnected {
1037 remote_addr,
1038 session_id,
1039 direction,
1040 error: None,
1041 })
1042 .await;
1043 }
1044 Either::Right((res, _)) => {
1045 let _ = events.send(res).await;
1046 }
1047 }
1048}
1049
1050async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1053 stream: Io,
1054 secret_key: SecretKey,
1055 direction: Direction,
1056) -> Result<ECIESStream<Io>, ECIESError> {
1057 match direction {
1058 Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1059 Direction::Outgoing(remote_peer_id) => {
1060 ECIESStream::connect(stream, secret_key, remote_peer_id).await
1061 }
1062 }
1063}
1064
1065#[expect(clippy::too_many_arguments)]
1072async fn authenticate_stream<N: NetworkPrimitives>(
1073 handshake: Arc<dyn EthRlpxHandshake>,
1074 eth_max_message_size: usize,
1075 stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1076 session_id: SessionId,
1077 remote_addr: SocketAddr,
1078 local_addr: Option<SocketAddr>,
1079 direction: Direction,
1080 mut hello: HelloMessageWithProtocols,
1081 mut status: UnifiedStatus,
1082 fork_filter: ForkFilter,
1083 mut extra_handlers: RlpxSubProtocolHandlers,
1084) -> PendingSessionEvent<N> {
1085 extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1087
1088 let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1090 Ok(stream_res) => stream_res,
1091 Err(err) => {
1092 return PendingSessionEvent::Disconnected {
1093 remote_addr,
1094 session_id,
1095 direction,
1096 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1097 }
1098 }
1099 };
1100
1101 if !extra_handlers.is_empty() {
1103 while let Some(pos) = extra_handlers.iter().position(|handler| {
1105 p2p_stream
1106 .shared_capabilities()
1107 .ensure_matching_capability(&handler.protocol().cap)
1108 .is_err()
1109 }) {
1110 let handler = extra_handlers.remove(pos);
1111 if handler.on_unsupported_by_peer(
1112 p2p_stream.shared_capabilities(),
1113 direction,
1114 their_hello.id,
1115 ) == OnNotSupported::Disconnect
1116 {
1117 return PendingSessionEvent::Disconnected {
1118 remote_addr,
1119 session_id,
1120 direction,
1121 error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1122 };
1123 }
1124 }
1125 }
1126
1127 let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1129 Ok(version) => version,
1130 Err(err) => {
1131 return PendingSessionEvent::Disconnected {
1132 remote_addr,
1133 session_id,
1134 direction,
1135 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1136 }
1137 }
1138 };
1139
1140 status.set_eth_version(eth_version);
1142
1143 let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1144 match handshake
1149 .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1150 .await
1151 {
1152 Ok(their_status) => {
1153 let eth_stream =
1154 EthStream::with_max_message_size(eth_version, p2p_stream, eth_max_message_size);
1155 (eth_stream.into(), their_status)
1156 }
1157 Err(err) => {
1158 return PendingSessionEvent::Disconnected {
1159 remote_addr,
1160 session_id,
1161 direction,
1162 error: Some(PendingSessionHandshakeError::Eth(err)),
1163 }
1164 }
1165 }
1166 } else {
1167 let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1169
1170 for handler in extra_handlers.into_iter() {
1172 let cap = handler.protocol().cap;
1173 let remote_peer_id = their_hello.id;
1174
1175 multiplex_stream
1176 .install_protocol(&cap, move |conn| {
1177 handler.into_connection(direction, remote_peer_id, conn)
1178 })
1179 .ok();
1180 }
1181
1182 let (multiplex_stream, their_status) = match multiplex_stream
1183 .into_eth_satellite_stream(status, fork_filter, handshake, eth_max_message_size)
1184 .await
1185 {
1186 Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1187 Err(err) => {
1188 return PendingSessionEvent::Disconnected {
1189 remote_addr,
1190 session_id,
1191 direction,
1192 error: Some(PendingSessionHandshakeError::Eth(err)),
1193 }
1194 }
1195 };
1196
1197 (multiplex_stream.into(), their_status)
1198 };
1199
1200 PendingSessionEvent::Established {
1201 session_id,
1202 remote_addr,
1203 local_addr,
1204 peer_id: their_hello.id,
1205 capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1206 status: Arc::new(their_status),
1207 conn,
1208 direction,
1209 client_id: their_hello.client_version,
1210 }
1211}