1mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11 message::PeerMessage,
12 metrics::SessionManagerMetrics,
13 protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14 session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21 errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22 BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23 HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24 HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35 collections::HashMap,
36 future::Future,
37 net::SocketAddr,
38 sync::{atomic::AtomicU64, Arc},
39 task::{Context, Poll},
40 time::{Duration, Instant},
41};
42use tokio::{
43 io::{AsyncRead, AsyncWrite},
44 net::TcpStream,
45 sync::{mpsc, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{instrument, trace};
50
51use crate::session::active::{BroadcastItemCounter, RANGE_UPDATE_INTERVAL};
52pub use conn::EthRlpxConnection;
53use handle::SessionCommandSender;
54pub use handle::{
55 ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
56 SessionCommand,
57};
58pub use reth_network_api::{Direction, PeerInfo};
59
60#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
62pub struct SessionId(usize);
63
64#[must_use = "Session Manager must be polled to process session events."]
66#[derive(Debug)]
67pub struct SessionManager<N: NetworkPrimitives> {
68 next_id: usize,
70 counter: SessionCounter,
72 initial_internal_request_timeout: Duration,
75 protocol_breach_request_timeout: Duration,
78 pending_session_timeout: Duration,
80 secret_key: SecretKey,
82 status: UnifiedStatus,
84 hello_message: HelloMessageWithProtocols,
86 fork_filter: ForkFilter,
88 session_command_buffer: usize,
90 executor: Runtime,
92 pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
97 active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
99 pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
104 pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
106 active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
111 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
113 extra_protocols: RlpxSubProtocols,
115 disconnections_counter: DisconnectionsCounter,
117 metrics: SessionManagerMetrics,
119 handshake: Arc<dyn EthRlpxHandshake>,
121 eth_max_message_size: usize,
123 local_range_info: BlockRangeInfo,
126 reject_block_announcements: bool,
129}
130
131impl<N: NetworkPrimitives> SessionManager<N> {
134 #[expect(clippy::too_many_arguments)]
136 pub fn new(
137 secret_key: SecretKey,
138 config: SessionsConfig,
139 executor: Runtime,
140 status: UnifiedStatus,
141 hello_message: HelloMessageWithProtocols,
142 fork_filter: ForkFilter,
143 extra_protocols: RlpxSubProtocols,
144 handshake: Arc<dyn EthRlpxHandshake>,
145 eth_max_message_size: usize,
146 reject_block_announcements: bool,
147 ) -> Self {
148 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
149 let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
150 let active_session_tx = PollSender::new(active_session_tx);
151
152 let local_range_info = BlockRangeInfo::new(
154 status.earliest_block.unwrap_or_default(),
155 status.latest_block.unwrap_or_default(),
156 status.blockhash,
157 );
158
159 Self {
160 next_id: 0,
161 counter: SessionCounter::new(config.limits),
162 initial_internal_request_timeout: config.initial_internal_request_timeout,
163 protocol_breach_request_timeout: config.protocol_breach_request_timeout,
164 pending_session_timeout: config.pending_session_timeout,
165 secret_key,
166 status,
167 hello_message,
168 fork_filter,
169 session_command_buffer: config.session_command_buffer,
170 executor,
171 pending_sessions: Default::default(),
172 active_sessions: Default::default(),
173 pending_sessions_tx,
174 pending_session_rx: ReceiverStream::new(pending_sessions_rx),
175 active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
176 active_session_rx: ReceiverStream::new(active_session_rx),
177 extra_protocols,
178 disconnections_counter: Default::default(),
179 metrics: Default::default(),
180 handshake,
181 eth_max_message_size,
182 local_range_info,
183 reject_block_announcements,
184 }
185 }
186
187 pub(crate) const fn fork_id(&self) -> ForkId {
189 self.fork_filter.current()
190 }
191
192 pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
195 self.fork_filter.validate(fork_id).is_ok()
196 }
197
198 const fn next_id(&mut self) -> SessionId {
200 let id = self.next_id;
201 self.next_id += 1;
202 SessionId(id)
203 }
204
205 pub const fn status(&self) -> UnifiedStatus {
207 self.status
208 }
209
210 pub const fn secret_key(&self) -> SecretKey {
212 self.secret_key
213 }
214
215 pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
217 &self.active_sessions
218 }
219
220 pub fn hello_message(&self) -> HelloMessageWithProtocols {
222 self.hello_message.clone()
223 }
224
225 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
227 self.extra_protocols.push(protocol)
228 }
229
230 #[inline]
232 pub(crate) fn num_pending_connections(&self) -> usize {
233 self.pending_sessions.len()
234 }
235
236 fn spawn<F>(&self, f: F)
239 where
240 F: Future<Output = ()> + Send + 'static,
241 {
242 self.executor.spawn_task(f);
243 }
244
245 pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
250 self.status.blockhash = head.hash;
251 self.status.total_difficulty = Some(head.total_difficulty);
252 let transition = self.fork_filter.set_head(head);
253 self.status.forkid = self.fork_filter.current();
254 self.status.latest_block = Some(head.number);
255
256 transition
257 }
258
259 pub(crate) fn on_incoming(
264 &mut self,
265 stream: TcpStream,
266 remote_addr: SocketAddr,
267 ) -> Result<SessionId, ExceedsSessionLimit> {
268 self.counter.ensure_pending_inbound()?;
269
270 let session_id = self.next_id();
271
272 trace!(
273 target: "net::session",
274 ?remote_addr,
275 ?session_id,
276 "new pending incoming session"
277 );
278
279 let (disconnect_tx, disconnect_rx) = oneshot::channel();
280 let pending_events = self.pending_sessions_tx.clone();
281 let secret_key = self.secret_key;
282 let hello_message = self.hello_message.clone();
283 let status = self.status;
284 let fork_filter = self.fork_filter.clone();
285 let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
286 self.spawn(pending_session_with_timeout(
287 self.pending_session_timeout,
288 session_id,
289 remote_addr,
290 Direction::Incoming,
291 pending_events.clone(),
292 start_pending_incoming_session(
293 self.handshake.clone(),
294 self.eth_max_message_size,
295 disconnect_rx,
296 session_id,
297 stream,
298 pending_events,
299 remote_addr,
300 secret_key,
301 hello_message,
302 status,
303 fork_filter,
304 extra_handlers,
305 ),
306 ));
307
308 let handle = PendingSessionHandle {
309 disconnect_tx: Some(disconnect_tx),
310 direction: Direction::Incoming,
311 };
312 self.pending_sessions.insert(session_id, handle);
313 self.counter.inc_pending_inbound();
314 Ok(session_id)
315 }
316
317 pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
319 if self.counter.ensure_pending_outbound().is_ok() {
321 let session_id = self.next_id();
322 let (disconnect_tx, disconnect_rx) = oneshot::channel();
323 let pending_events = self.pending_sessions_tx.clone();
324 let secret_key = self.secret_key;
325 let hello_message = self.hello_message.clone();
326 let fork_filter = self.fork_filter.clone();
327 let status = self.status;
328 let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
329 self.spawn(pending_session_with_timeout(
330 self.pending_session_timeout,
331 session_id,
332 remote_addr,
333 Direction::Outgoing(remote_peer_id),
334 pending_events.clone(),
335 start_pending_outbound_session(
336 self.handshake.clone(),
337 self.eth_max_message_size,
338 disconnect_rx,
339 pending_events,
340 session_id,
341 remote_addr,
342 remote_peer_id,
343 secret_key,
344 hello_message,
345 status,
346 fork_filter,
347 extra_handlers,
348 ),
349 ));
350
351 let handle = PendingSessionHandle {
352 disconnect_tx: Some(disconnect_tx),
353 direction: Direction::Outgoing(remote_peer_id),
354 };
355 self.pending_sessions.insert(session_id, handle);
356 self.counter.inc_pending_outbound();
357 }
358 }
359
360 pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
365 if let Some(session) = self.active_sessions.get(&node) {
366 session.disconnect(reason);
367 }
368 }
369
370 pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
375 for session in self.active_sessions.values() {
376 session.disconnect(reason);
377 }
378 }
379
380 pub fn disconnect_all_pending(&mut self) {
382 for session in self.pending_sessions.values_mut() {
383 session.disconnect();
384 }
385 }
386
387 pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
394 if let Some(session) = self.active_sessions.get(peer_id) &&
395 !session.commands.send_message(msg)
396 {
397 self.metrics.total_outgoing_peer_messages_dropped.increment(1);
398 }
399 }
400
401 fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
403 let session = self.pending_sessions.remove(id)?;
404 self.counter.dec_pending(&session.direction);
405 Some(session)
406 }
407
408 fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
410 let session = self.active_sessions.remove(id)?;
411 self.counter.dec_active(&session.direction);
412 Some(session)
413 }
414
415 pub(crate) fn try_disconnect_incoming_connection(
419 &self,
420 stream: TcpStream,
421 reason: DisconnectReason,
422 ) {
423 if !self.disconnections_counter.has_capacity() {
424 return
426 }
427
428 let guard = self.disconnections_counter.clone();
429 let secret_key = self.secret_key;
430
431 self.spawn(async move {
432 trace!(
433 target: "net::session",
434 "gracefully disconnecting incoming connection"
435 );
436 if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
437 let mut unauth = UnauthedP2PStream::new(stream);
438 let _ = unauth.send_disconnect(reason).await;
439 drop(guard);
440 }
441 });
442 }
443
444 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
448 match self.active_session_rx.poll_next_unpin(cx) {
450 Poll::Pending => {}
451 Poll::Ready(None) => {
452 unreachable!("Manager holds both channel halves.")
453 }
454 Poll::Ready(Some(event)) => {
455 return match event {
456 ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
457 trace!(
458 target: "net::session",
459 ?peer_id,
460 "gracefully disconnected active session."
461 );
462 self.remove_active_session(&peer_id);
463 Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
464 }
465 ActiveSessionMessage::ClosedOnConnectionError {
466 peer_id,
467 remote_addr,
468 error,
469 } => {
470 trace!(target: "net::session", ?peer_id, %error,"closed session.");
471 self.remove_active_session(&peer_id);
472 Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
473 remote_addr,
474 peer_id,
475 error,
476 })
477 }
478 ActiveSessionMessage::ValidMessage { peer_id, message } => {
479 Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
480 }
481 ActiveSessionMessage::BadMessage { peer_id } => {
482 Poll::Ready(SessionEvent::BadMessage { peer_id })
483 }
484 ActiveSessionMessage::ProtocolBreach { peer_id } => {
485 Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
486 }
487 }
488 }
489 }
490
491 let event = match self.pending_session_rx.poll_next_unpin(cx) {
493 Poll::Pending => return Poll::Pending,
494 Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
495 Poll::Ready(Some(event)) => event,
496 };
497 match event {
498 PendingSessionEvent::Established {
499 session_id,
500 remote_addr,
501 local_addr,
502 peer_id,
503 capabilities,
504 mut conn,
505 status,
506 direction,
507 client_id,
508 peer_listen_port,
509 } => {
510 self.remove_pending_session(&session_id);
512
513 if self.active_sessions.contains_key(&peer_id) {
515 trace!(
516 target: "net::session",
517 ?session_id,
518 ?remote_addr,
519 ?peer_id,
520 ?direction,
521 "already connected"
522 );
523
524 self.spawn(async move {
525 let _ =
527 conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
528 });
529
530 return Poll::Ready(SessionEvent::AlreadyConnected {
531 peer_id,
532 remote_addr,
533 direction,
534 })
535 }
536
537 let (commands_tx, commands_rx) = mpsc::channel(self.session_command_buffer);
538 let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
539
540 let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
541
542 let messages = PeerRequestSender::new(peer_id, to_session_tx);
543
544 let timeout = Arc::new(AtomicU64::new(
545 self.initial_internal_request_timeout.as_millis() as u64,
546 ));
547
548 let version = conn.version();
550
551 let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
556 let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
557 let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
558 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
559 interval
560 });
561
562 let broadcast_items = BroadcastItemCounter::new();
568 let remote_range_info = status.block_range_update().map(|update| {
569 BlockRangeInfo::new(update.earliest, update.latest, update.latest_hash)
570 });
571
572 if self.reject_block_announcements {
573 conn.set_reject_block_announcements(true);
574 }
575
576 let session = ActiveSession {
577 next_id: 0,
578 remote_peer_id: peer_id,
579 remote_addr,
580 remote_capabilities: Arc::clone(&capabilities),
581 session_id,
582 commands_rx: ReceiverStream::new(commands_rx),
583 unbounded_rx,
584 unbounded_broadcast_msgs: self.metrics.total_unbounded_broadcast_msgs.clone(),
585 to_session_manager: self.active_session_tx.clone(),
586 pending_message_to_session: None,
587 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
588 inflight_requests: Default::default(),
589 conn,
590 queued_outgoing: QueuedOutgoingMessages::new(
591 self.metrics.queued_outgoing_messages.clone(),
592 broadcast_items.clone(),
593 ),
594 received_requests_from_remote: Default::default(),
595 internal_request_timeout_interval: tokio::time::interval(
596 self.initial_internal_request_timeout,
597 ),
598 internal_request_timeout: Arc::clone(&timeout),
599 protocol_breach_request_timeout: self.protocol_breach_request_timeout,
600 terminate_message: None,
601 range_info: remote_range_info.clone(),
602 local_range_info: self.local_range_info.clone(),
603 range_update_interval,
604 last_sent_latest_block: None,
605 };
606
607 self.spawn(session);
608
609 let client_version = client_id.into();
610 let handle = ActiveSessionHandle {
611 status: status.clone(),
612 direction,
613 session_id,
614 remote_id: peer_id,
615 version,
616 established: Instant::now(),
617 capabilities: Arc::clone(&capabilities),
618 commands: SessionCommandSender::new(commands_tx, unbounded_tx, broadcast_items),
619 client_version: Arc::clone(&client_version),
620 remote_addr,
621 local_addr,
622 peer_listen_port,
623 };
624
625 self.active_sessions.insert(peer_id, handle);
626 self.counter.inc_active(&direction);
627
628 if direction.is_outgoing() {
629 self.metrics.total_dial_successes.increment(1);
630 }
631
632 Poll::Ready(SessionEvent::SessionEstablished {
633 peer_id,
634 remote_addr,
635 client_version,
636 version,
637 capabilities,
638 status,
639 messages,
640 direction,
641 timeout,
642 range_info: remote_range_info,
643 })
644 }
645 PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
646 trace!(
647 target: "net::session",
648 ?session_id,
649 ?remote_addr,
650 ?error,
651 "disconnected pending session"
652 );
653 self.remove_pending_session(&session_id);
654 match direction {
655 Direction::Incoming => {
656 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
657 remote_addr,
658 error,
659 })
660 }
661 Direction::Outgoing(peer_id) => {
662 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
663 remote_addr,
664 peer_id,
665 error,
666 })
667 }
668 }
669 }
670 PendingSessionEvent::OutgoingConnectionError {
671 remote_addr,
672 session_id,
673 peer_id,
674 error,
675 } => {
676 trace!(
677 target: "net::session",
678 %error,
679 ?session_id,
680 ?remote_addr,
681 ?peer_id,
682 "connection refused"
683 );
684 self.remove_pending_session(&session_id);
685 Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
686 }
687 PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
688 trace!(
689 target: "net::session",
690 %error,
691 ?session_id,
692 ?remote_addr,
693 "ecies auth failed"
694 );
695 self.remove_pending_session(&session_id);
696 match direction {
697 Direction::Incoming => {
698 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
699 remote_addr,
700 error: Some(PendingSessionHandshakeError::Ecies(error)),
701 })
702 }
703 Direction::Outgoing(peer_id) => {
704 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
705 remote_addr,
706 peer_id,
707 error: Some(PendingSessionHandshakeError::Ecies(error)),
708 })
709 }
710 }
711 }
712 }
713 }
714
715 pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
723 self.status.earliest_block = Some(block_range_update.earliest);
724 self.status.latest_block = Some(block_range_update.latest);
725 self.status.blockhash = block_range_update.latest_hash;
726
727 self.local_range_info.update(
729 block_range_update.earliest,
730 block_range_update.latest,
731 block_range_update.latest_hash,
732 );
733 }
734}
735
736#[derive(Default, Debug, Clone)]
738struct DisconnectionsCounter(Arc<()>);
739
740impl DisconnectionsCounter {
741 const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
742
743 fn has_capacity(&self) -> bool {
746 Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
747 }
748}
749
750#[derive(Debug)]
752pub enum SessionEvent<N: NetworkPrimitives> {
753 SessionEstablished {
757 peer_id: PeerId,
759 remote_addr: SocketAddr,
761 client_version: Arc<str>,
763 capabilities: Arc<Capabilities>,
765 version: EthVersion,
767 status: Arc<UnifiedStatus>,
769 messages: PeerRequestSender<PeerRequest<N>>,
771 direction: Direction,
773 timeout: Arc<AtomicU64>,
776 range_info: Option<BlockRangeInfo>,
778 },
779 AlreadyConnected {
781 peer_id: PeerId,
783 remote_addr: SocketAddr,
785 direction: Direction,
787 },
788 ValidMessage {
790 peer_id: PeerId,
792 message: PeerMessage<N>,
794 },
795 BadMessage {
797 peer_id: PeerId,
799 },
800 ProtocolBreach {
802 peer_id: PeerId,
804 },
805 IncomingPendingSessionClosed {
807 remote_addr: SocketAddr,
809 error: Option<PendingSessionHandshakeError>,
811 },
812 OutgoingPendingSessionClosed {
814 remote_addr: SocketAddr,
816 peer_id: PeerId,
818 error: Option<PendingSessionHandshakeError>,
820 },
821 OutgoingConnectionError {
823 remote_addr: SocketAddr,
825 peer_id: PeerId,
827 error: io::Error,
829 },
830 SessionClosedOnConnectionError {
832 peer_id: PeerId,
834 remote_addr: SocketAddr,
836 error: EthStreamError,
838 },
839 Disconnected {
841 peer_id: PeerId,
843 remote_addr: SocketAddr,
845 },
846}
847
848#[derive(Debug, thiserror::Error)]
850pub enum PendingSessionHandshakeError {
851 #[error(transparent)]
853 Eth(EthStreamError),
854 #[error(transparent)]
856 Ecies(ECIESError),
857 #[error("authentication timed out")]
859 Timeout,
860 #[error("Mandatory extra capability unsupported")]
862 UnsupportedExtraCapability,
863}
864
865impl PendingSessionHandshakeError {
866 pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
868 match self {
869 Self::Eth(eth_err) => eth_err.as_disconnected(),
870 _ => None,
871 }
872 }
873}
874
875#[derive(Debug, Clone, thiserror::Error)]
878#[error("session limit reached {0}")]
879pub struct ExceedsSessionLimit(pub(crate) u32);
880
881pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
883 timeout: Duration,
884 session_id: SessionId,
885 remote_addr: SocketAddr,
886 direction: Direction,
887 events: mpsc::Sender<PendingSessionEvent<N>>,
888 f: F,
889) where
890 F: Future<Output = ()>,
891{
892 if tokio::time::timeout(timeout, f).await.is_err() {
893 trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
894 let event = PendingSessionEvent::Disconnected {
895 remote_addr,
896 session_id,
897 direction,
898 error: Some(PendingSessionHandshakeError::Timeout),
899 };
900 let _ = events.send(event).await;
901 }
902}
903
904#[expect(clippy::too_many_arguments)]
908pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
909 handshake: Arc<dyn EthRlpxHandshake>,
910 eth_max_message_size: usize,
911 disconnect_rx: oneshot::Receiver<()>,
912 session_id: SessionId,
913 stream: TcpStream,
914 events: mpsc::Sender<PendingSessionEvent<N>>,
915 remote_addr: SocketAddr,
916 secret_key: SecretKey,
917 hello: HelloMessageWithProtocols,
918 status: UnifiedStatus,
919 fork_filter: ForkFilter,
920 extra_handlers: RlpxSubProtocolHandlers,
921) {
922 authenticate(
923 handshake,
924 eth_max_message_size,
925 disconnect_rx,
926 events,
927 stream,
928 session_id,
929 remote_addr,
930 secret_key,
931 Direction::Incoming,
932 hello,
933 status,
934 fork_filter,
935 extra_handlers,
936 )
937 .await
938}
939
940#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
942#[expect(clippy::too_many_arguments)]
943async fn start_pending_outbound_session<N: NetworkPrimitives>(
944 handshake: Arc<dyn EthRlpxHandshake>,
945 eth_max_message_size: usize,
946 disconnect_rx: oneshot::Receiver<()>,
947 events: mpsc::Sender<PendingSessionEvent<N>>,
948 session_id: SessionId,
949 remote_addr: SocketAddr,
950 remote_peer_id: PeerId,
951 secret_key: SecretKey,
952 hello: HelloMessageWithProtocols,
953 status: UnifiedStatus,
954 fork_filter: ForkFilter,
955 extra_handlers: RlpxSubProtocolHandlers,
956) {
957 let stream = match TcpStream::connect(remote_addr).await {
958 Ok(stream) => {
959 if let Err(err) = stream.set_nodelay(true) {
960 tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
961 }
962 stream
963 }
964 Err(error) => {
965 let _ = events
966 .send(PendingSessionEvent::OutgoingConnectionError {
967 remote_addr,
968 session_id,
969 peer_id: remote_peer_id,
970 error,
971 })
972 .await;
973 return
974 }
975 };
976 authenticate(
977 handshake,
978 eth_max_message_size,
979 disconnect_rx,
980 events,
981 stream,
982 session_id,
983 remote_addr,
984 secret_key,
985 Direction::Outgoing(remote_peer_id),
986 hello,
987 status,
988 fork_filter,
989 extra_handlers,
990 )
991 .await
992}
993
994#[expect(clippy::too_many_arguments)]
996async fn authenticate<N: NetworkPrimitives>(
997 handshake: Arc<dyn EthRlpxHandshake>,
998 eth_max_message_size: usize,
999 disconnect_rx: oneshot::Receiver<()>,
1000 events: mpsc::Sender<PendingSessionEvent<N>>,
1001 stream: TcpStream,
1002 session_id: SessionId,
1003 remote_addr: SocketAddr,
1004 secret_key: SecretKey,
1005 direction: Direction,
1006 hello: HelloMessageWithProtocols,
1007 status: UnifiedStatus,
1008 fork_filter: ForkFilter,
1009 extra_handlers: RlpxSubProtocolHandlers,
1010) {
1011 let local_addr = stream.local_addr().ok();
1012 let stream = match get_ecies_stream(stream, secret_key, direction).await {
1013 Ok(stream) => stream,
1014 Err(error) => {
1015 let _ = events
1016 .send(PendingSessionEvent::EciesAuthError {
1017 remote_addr,
1018 session_id,
1019 error,
1020 direction,
1021 })
1022 .await;
1023 return
1024 }
1025 };
1026
1027 let unauthed = UnauthedP2PStream::new(stream);
1028
1029 let auth = authenticate_stream(
1030 handshake,
1031 eth_max_message_size,
1032 unauthed,
1033 session_id,
1034 remote_addr,
1035 local_addr,
1036 direction,
1037 hello,
1038 status,
1039 fork_filter,
1040 extra_handlers,
1041 )
1042 .boxed();
1043
1044 match futures::future::select(disconnect_rx, auth).await {
1045 Either::Left((_, _)) => {
1046 let _ = events
1047 .send(PendingSessionEvent::Disconnected {
1048 remote_addr,
1049 session_id,
1050 direction,
1051 error: None,
1052 })
1053 .await;
1054 }
1055 Either::Right((res, _)) => {
1056 let _ = events.send(res).await;
1057 }
1058 }
1059}
1060
1061async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1064 stream: Io,
1065 secret_key: SecretKey,
1066 direction: Direction,
1067) -> Result<ECIESStream<Io>, ECIESError> {
1068 match direction {
1069 Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1070 Direction::Outgoing(remote_peer_id) => {
1071 ECIESStream::connect(stream, secret_key, remote_peer_id).await
1072 }
1073 }
1074}
1075
1076#[expect(clippy::too_many_arguments)]
1083async fn authenticate_stream<N: NetworkPrimitives>(
1084 handshake: Arc<dyn EthRlpxHandshake>,
1085 eth_max_message_size: usize,
1086 stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1087 session_id: SessionId,
1088 remote_addr: SocketAddr,
1089 local_addr: Option<SocketAddr>,
1090 direction: Direction,
1091 mut hello: HelloMessageWithProtocols,
1092 mut status: UnifiedStatus,
1093 fork_filter: ForkFilter,
1094 mut extra_handlers: RlpxSubProtocolHandlers,
1095) -> PendingSessionEvent<N> {
1096 extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1098
1099 let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1101 Ok(stream_res) => stream_res,
1102 Err(err) => {
1103 return PendingSessionEvent::Disconnected {
1104 remote_addr,
1105 session_id,
1106 direction,
1107 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1108 }
1109 }
1110 };
1111
1112 if !extra_handlers.is_empty() {
1114 while let Some(pos) = extra_handlers.iter().position(|handler| {
1116 p2p_stream
1117 .shared_capabilities()
1118 .ensure_matching_capability(&handler.protocol().cap)
1119 .is_err()
1120 }) {
1121 let handler = extra_handlers.remove(pos);
1122 if handler.on_unsupported_by_peer(
1123 p2p_stream.shared_capabilities(),
1124 direction,
1125 their_hello.id,
1126 ) == OnNotSupported::Disconnect
1127 {
1128 return PendingSessionEvent::Disconnected {
1129 remote_addr,
1130 session_id,
1131 direction,
1132 error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1133 };
1134 }
1135 }
1136 }
1137
1138 let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1140 Ok(version) => version,
1141 Err(err) => {
1142 return PendingSessionEvent::Disconnected {
1143 remote_addr,
1144 session_id,
1145 direction,
1146 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1147 }
1148 }
1149 };
1150
1151 status.set_eth_version(eth_version);
1153
1154 let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1155 match handshake
1160 .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1161 .await
1162 {
1163 Ok(their_status) => {
1164 let eth_stream =
1165 EthStream::with_max_message_size(eth_version, p2p_stream, eth_max_message_size);
1166 (eth_stream.into(), their_status)
1167 }
1168 Err(err) => {
1169 return PendingSessionEvent::Disconnected {
1170 remote_addr,
1171 session_id,
1172 direction,
1173 error: Some(PendingSessionHandshakeError::Eth(err)),
1174 }
1175 }
1176 }
1177 } else {
1178 let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1180
1181 for handler in extra_handlers.into_iter() {
1183 let cap = handler.protocol().cap;
1184 let remote_peer_id = their_hello.id;
1185
1186 multiplex_stream
1187 .install_protocol(&cap, move |conn| {
1188 handler.into_connection(direction, remote_peer_id, conn)
1189 })
1190 .ok();
1191 }
1192
1193 let (multiplex_stream, their_status) = match multiplex_stream
1194 .into_eth_satellite_stream(status, fork_filter, handshake, eth_max_message_size)
1195 .await
1196 {
1197 Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1198 Err(err) => {
1199 return PendingSessionEvent::Disconnected {
1200 remote_addr,
1201 session_id,
1202 direction,
1203 error: Some(PendingSessionHandshakeError::Eth(err)),
1204 }
1205 }
1206 };
1207
1208 (multiplex_stream.into(), their_status)
1209 };
1210
1211 let peer_listen_port = (their_hello.port != 0).then_some(their_hello.port);
1213
1214 PendingSessionEvent::Established {
1215 session_id,
1216 remote_addr,
1217 local_addr,
1218 peer_id: their_hello.id,
1219 capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1220 status: Arc::new(their_status),
1221 conn,
1222 direction,
1223 client_id: their_hello.client_version,
1224 peer_listen_port,
1225 }
1226}