1mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11 message::PeerMessage,
12 metrics::SessionManagerMetrics,
13 protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14 session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21 errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22 BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23 HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24 HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35 collections::HashMap,
36 future::Future,
37 net::SocketAddr,
38 sync::{atomic::AtomicU64, Arc},
39 task::{Context, Poll},
40 time::{Duration, Instant},
41};
42use tokio::{
43 io::{AsyncRead, AsyncWrite},
44 net::TcpStream,
45 sync::{mpsc, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{instrument, trace};
50
51use crate::session::active::{BroadcastItemCounter, RANGE_UPDATE_INTERVAL};
52pub use conn::EthRlpxConnection;
53use handle::SessionCommandSender;
54pub use handle::{
55 ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
56 SessionCommand,
57};
58pub use reth_network_api::{Direction, PeerInfo};
59
60#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
62pub struct SessionId(usize);
63
64#[must_use = "Session Manager must be polled to process session events."]
66#[derive(Debug)]
67pub struct SessionManager<N: NetworkPrimitives> {
68 next_id: usize,
70 counter: SessionCounter,
72 initial_internal_request_timeout: Duration,
75 protocol_breach_request_timeout: Duration,
78 pending_session_timeout: Duration,
80 secret_key: SecretKey,
82 status: UnifiedStatus,
84 hello_message: HelloMessageWithProtocols,
86 fork_filter: ForkFilter,
88 session_command_buffer: usize,
90 executor: Runtime,
92 pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
97 active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
99 pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
104 pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
106 active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
111 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
113 extra_protocols: RlpxSubProtocols,
115 disconnections_counter: DisconnectionsCounter,
117 metrics: SessionManagerMetrics,
119 handshake: Arc<dyn EthRlpxHandshake>,
121 local_range_info: BlockRangeInfo,
124}
125
126impl<N: NetworkPrimitives> SessionManager<N> {
129 #[expect(clippy::too_many_arguments)]
131 pub fn new(
132 secret_key: SecretKey,
133 config: SessionsConfig,
134 executor: Runtime,
135 status: UnifiedStatus,
136 hello_message: HelloMessageWithProtocols,
137 fork_filter: ForkFilter,
138 extra_protocols: RlpxSubProtocols,
139 handshake: Arc<dyn EthRlpxHandshake>,
140 ) -> Self {
141 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
142 let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
143 let active_session_tx = PollSender::new(active_session_tx);
144
145 let local_range_info = BlockRangeInfo::new(
147 status.earliest_block.unwrap_or_default(),
148 status.latest_block.unwrap_or_default(),
149 status.blockhash,
150 );
151
152 Self {
153 next_id: 0,
154 counter: SessionCounter::new(config.limits),
155 initial_internal_request_timeout: config.initial_internal_request_timeout,
156 protocol_breach_request_timeout: config.protocol_breach_request_timeout,
157 pending_session_timeout: config.pending_session_timeout,
158 secret_key,
159 status,
160 hello_message,
161 fork_filter,
162 session_command_buffer: config.session_command_buffer,
163 executor,
164 pending_sessions: Default::default(),
165 active_sessions: Default::default(),
166 pending_sessions_tx,
167 pending_session_rx: ReceiverStream::new(pending_sessions_rx),
168 active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
169 active_session_rx: ReceiverStream::new(active_session_rx),
170 extra_protocols,
171 disconnections_counter: Default::default(),
172 metrics: Default::default(),
173 handshake,
174 local_range_info,
175 }
176 }
177
178 pub(crate) const fn fork_id(&self) -> ForkId {
180 self.fork_filter.current()
181 }
182
183 pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
186 self.fork_filter.validate(fork_id).is_ok()
187 }
188
189 const fn next_id(&mut self) -> SessionId {
191 let id = self.next_id;
192 self.next_id += 1;
193 SessionId(id)
194 }
195
196 pub const fn status(&self) -> UnifiedStatus {
198 self.status
199 }
200
201 pub const fn secret_key(&self) -> SecretKey {
203 self.secret_key
204 }
205
206 pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
208 &self.active_sessions
209 }
210
211 pub fn hello_message(&self) -> HelloMessageWithProtocols {
213 self.hello_message.clone()
214 }
215
216 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
218 self.extra_protocols.push(protocol)
219 }
220
221 #[inline]
223 pub(crate) fn num_pending_connections(&self) -> usize {
224 self.pending_sessions.len()
225 }
226
227 fn spawn<F>(&self, f: F)
230 where
231 F: Future<Output = ()> + Send + 'static,
232 {
233 self.executor.spawn_task(f);
234 }
235
236 pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
241 self.status.blockhash = head.hash;
242 self.status.total_difficulty = Some(head.total_difficulty);
243 let transition = self.fork_filter.set_head(head);
244 self.status.forkid = self.fork_filter.current();
245 self.status.latest_block = Some(head.number);
246
247 transition
248 }
249
250 pub(crate) fn on_incoming(
255 &mut self,
256 stream: TcpStream,
257 remote_addr: SocketAddr,
258 ) -> Result<SessionId, ExceedsSessionLimit> {
259 self.counter.ensure_pending_inbound()?;
260
261 let session_id = self.next_id();
262
263 trace!(
264 target: "net::session",
265 ?remote_addr,
266 ?session_id,
267 "new pending incoming session"
268 );
269
270 let (disconnect_tx, disconnect_rx) = oneshot::channel();
271 let pending_events = self.pending_sessions_tx.clone();
272 let secret_key = self.secret_key;
273 let hello_message = self.hello_message.clone();
274 let status = self.status;
275 let fork_filter = self.fork_filter.clone();
276 let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
277 self.spawn(pending_session_with_timeout(
278 self.pending_session_timeout,
279 session_id,
280 remote_addr,
281 Direction::Incoming,
282 pending_events.clone(),
283 start_pending_incoming_session(
284 self.handshake.clone(),
285 disconnect_rx,
286 session_id,
287 stream,
288 pending_events,
289 remote_addr,
290 secret_key,
291 hello_message,
292 status,
293 fork_filter,
294 extra_handlers,
295 ),
296 ));
297
298 let handle = PendingSessionHandle {
299 disconnect_tx: Some(disconnect_tx),
300 direction: Direction::Incoming,
301 };
302 self.pending_sessions.insert(session_id, handle);
303 self.counter.inc_pending_inbound();
304 Ok(session_id)
305 }
306
307 pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
309 if self.counter.ensure_pending_outbound().is_ok() {
311 let session_id = self.next_id();
312 let (disconnect_tx, disconnect_rx) = oneshot::channel();
313 let pending_events = self.pending_sessions_tx.clone();
314 let secret_key = self.secret_key;
315 let hello_message = self.hello_message.clone();
316 let fork_filter = self.fork_filter.clone();
317 let status = self.status;
318 let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
319 self.spawn(pending_session_with_timeout(
320 self.pending_session_timeout,
321 session_id,
322 remote_addr,
323 Direction::Outgoing(remote_peer_id),
324 pending_events.clone(),
325 start_pending_outbound_session(
326 self.handshake.clone(),
327 disconnect_rx,
328 pending_events,
329 session_id,
330 remote_addr,
331 remote_peer_id,
332 secret_key,
333 hello_message,
334 status,
335 fork_filter,
336 extra_handlers,
337 ),
338 ));
339
340 let handle = PendingSessionHandle {
341 disconnect_tx: Some(disconnect_tx),
342 direction: Direction::Outgoing(remote_peer_id),
343 };
344 self.pending_sessions.insert(session_id, handle);
345 self.counter.inc_pending_outbound();
346 }
347 }
348
349 pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
354 if let Some(session) = self.active_sessions.get(&node) {
355 session.disconnect(reason);
356 }
357 }
358
359 pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
364 for session in self.active_sessions.values() {
365 session.disconnect(reason);
366 }
367 }
368
369 pub fn disconnect_all_pending(&mut self) {
371 for session in self.pending_sessions.values_mut() {
372 session.disconnect();
373 }
374 }
375
376 pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
383 if let Some(session) = self.active_sessions.get(peer_id) &&
384 !session.commands.send_message(msg)
385 {
386 self.metrics.total_outgoing_peer_messages_dropped.increment(1);
387 }
388 }
389
390 fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
392 let session = self.pending_sessions.remove(id)?;
393 self.counter.dec_pending(&session.direction);
394 Some(session)
395 }
396
397 fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
399 let session = self.active_sessions.remove(id)?;
400 self.counter.dec_active(&session.direction);
401 Some(session)
402 }
403
404 pub(crate) fn try_disconnect_incoming_connection(
408 &self,
409 stream: TcpStream,
410 reason: DisconnectReason,
411 ) {
412 if !self.disconnections_counter.has_capacity() {
413 return
415 }
416
417 let guard = self.disconnections_counter.clone();
418 let secret_key = self.secret_key;
419
420 self.spawn(async move {
421 trace!(
422 target: "net::session",
423 "gracefully disconnecting incoming connection"
424 );
425 if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
426 let mut unauth = UnauthedP2PStream::new(stream);
427 let _ = unauth.send_disconnect(reason).await;
428 drop(guard);
429 }
430 });
431 }
432
433 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
437 match self.active_session_rx.poll_next_unpin(cx) {
439 Poll::Pending => {}
440 Poll::Ready(None) => {
441 unreachable!("Manager holds both channel halves.")
442 }
443 Poll::Ready(Some(event)) => {
444 return match event {
445 ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
446 trace!(
447 target: "net::session",
448 ?peer_id,
449 "gracefully disconnected active session."
450 );
451 self.remove_active_session(&peer_id);
452 Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
453 }
454 ActiveSessionMessage::ClosedOnConnectionError {
455 peer_id,
456 remote_addr,
457 error,
458 } => {
459 trace!(target: "net::session", ?peer_id, %error,"closed session.");
460 self.remove_active_session(&peer_id);
461 Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
462 remote_addr,
463 peer_id,
464 error,
465 })
466 }
467 ActiveSessionMessage::ValidMessage { peer_id, message } => {
468 Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
469 }
470 ActiveSessionMessage::BadMessage { peer_id } => {
471 Poll::Ready(SessionEvent::BadMessage { peer_id })
472 }
473 ActiveSessionMessage::ProtocolBreach { peer_id } => {
474 Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
475 }
476 }
477 }
478 }
479
480 let event = match self.pending_session_rx.poll_next_unpin(cx) {
482 Poll::Pending => return Poll::Pending,
483 Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
484 Poll::Ready(Some(event)) => event,
485 };
486 match event {
487 PendingSessionEvent::Established {
488 session_id,
489 remote_addr,
490 local_addr,
491 peer_id,
492 capabilities,
493 conn,
494 status,
495 direction,
496 client_id,
497 } => {
498 self.remove_pending_session(&session_id);
500
501 if self.active_sessions.contains_key(&peer_id) {
503 trace!(
504 target: "net::session",
505 ?session_id,
506 ?remote_addr,
507 ?peer_id,
508 ?direction,
509 "already connected"
510 );
511
512 self.spawn(async move {
513 let _ =
515 conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
516 });
517
518 return Poll::Ready(SessionEvent::AlreadyConnected {
519 peer_id,
520 remote_addr,
521 direction,
522 })
523 }
524
525 let (commands_tx, commands_rx) = mpsc::channel(self.session_command_buffer);
526 let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
527
528 let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
529
530 let messages = PeerRequestSender::new(peer_id, to_session_tx);
531
532 let timeout = Arc::new(AtomicU64::new(
533 self.initial_internal_request_timeout.as_millis() as u64,
534 ));
535
536 let version = conn.version();
538
539 let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
544 let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
545 let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
546 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
547 interval
548 });
549
550 let broadcast_items = BroadcastItemCounter::new();
556
557 let session = ActiveSession {
558 next_id: 0,
559 remote_peer_id: peer_id,
560 remote_addr,
561 remote_capabilities: Arc::clone(&capabilities),
562 session_id,
563 commands_rx: ReceiverStream::new(commands_rx),
564 unbounded_rx,
565 unbounded_broadcast_msgs: self.metrics.total_unbounded_broadcast_msgs.clone(),
566 to_session_manager: self.active_session_tx.clone(),
567 pending_message_to_session: None,
568 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
569 inflight_requests: Default::default(),
570 conn,
571 queued_outgoing: QueuedOutgoingMessages::new(
572 self.metrics.queued_outgoing_messages.clone(),
573 broadcast_items.clone(),
574 ),
575 received_requests_from_remote: Default::default(),
576 internal_request_timeout_interval: tokio::time::interval(
577 self.initial_internal_request_timeout,
578 ),
579 internal_request_timeout: Arc::clone(&timeout),
580 protocol_breach_request_timeout: self.protocol_breach_request_timeout,
581 terminate_message: None,
582 range_info: None,
583 local_range_info: self.local_range_info.clone(),
584 range_update_interval,
585 last_sent_latest_block: None,
586 };
587
588 self.spawn(session);
589
590 let client_version = client_id.into();
591 let handle = ActiveSessionHandle {
592 status: status.clone(),
593 direction,
594 session_id,
595 remote_id: peer_id,
596 version,
597 established: Instant::now(),
598 capabilities: Arc::clone(&capabilities),
599 commands: SessionCommandSender::new(commands_tx, unbounded_tx, broadcast_items),
600 client_version: Arc::clone(&client_version),
601 remote_addr,
602 local_addr,
603 };
604
605 self.active_sessions.insert(peer_id, handle);
606 self.counter.inc_active(&direction);
607
608 if direction.is_outgoing() {
609 self.metrics.total_dial_successes.increment(1);
610 }
611
612 Poll::Ready(SessionEvent::SessionEstablished {
613 peer_id,
614 remote_addr,
615 client_version,
616 version,
617 capabilities,
618 status,
619 messages,
620 direction,
621 timeout,
622 range_info: None,
623 })
624 }
625 PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
626 trace!(
627 target: "net::session",
628 ?session_id,
629 ?remote_addr,
630 ?error,
631 "disconnected pending session"
632 );
633 self.remove_pending_session(&session_id);
634 match direction {
635 Direction::Incoming => {
636 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
637 remote_addr,
638 error,
639 })
640 }
641 Direction::Outgoing(peer_id) => {
642 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
643 remote_addr,
644 peer_id,
645 error,
646 })
647 }
648 }
649 }
650 PendingSessionEvent::OutgoingConnectionError {
651 remote_addr,
652 session_id,
653 peer_id,
654 error,
655 } => {
656 trace!(
657 target: "net::session",
658 %error,
659 ?session_id,
660 ?remote_addr,
661 ?peer_id,
662 "connection refused"
663 );
664 self.remove_pending_session(&session_id);
665 Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
666 }
667 PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
668 trace!(
669 target: "net::session",
670 %error,
671 ?session_id,
672 ?remote_addr,
673 "ecies auth failed"
674 );
675 self.remove_pending_session(&session_id);
676 match direction {
677 Direction::Incoming => {
678 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
679 remote_addr,
680 error: Some(PendingSessionHandshakeError::Ecies(error)),
681 })
682 }
683 Direction::Outgoing(peer_id) => {
684 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
685 remote_addr,
686 peer_id,
687 error: Some(PendingSessionHandshakeError::Ecies(error)),
688 })
689 }
690 }
691 }
692 }
693 }
694
695 pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
703 self.status.earliest_block = Some(block_range_update.earliest);
704 self.status.latest_block = Some(block_range_update.latest);
705 self.status.blockhash = block_range_update.latest_hash;
706
707 self.local_range_info.update(
709 block_range_update.earliest,
710 block_range_update.latest,
711 block_range_update.latest_hash,
712 );
713 }
714}
715
716#[derive(Default, Debug, Clone)]
718struct DisconnectionsCounter(Arc<()>);
719
720impl DisconnectionsCounter {
721 const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
722
723 fn has_capacity(&self) -> bool {
726 Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
727 }
728}
729
730#[derive(Debug)]
732pub enum SessionEvent<N: NetworkPrimitives> {
733 SessionEstablished {
737 peer_id: PeerId,
739 remote_addr: SocketAddr,
741 client_version: Arc<str>,
743 capabilities: Arc<Capabilities>,
745 version: EthVersion,
747 status: Arc<UnifiedStatus>,
749 messages: PeerRequestSender<PeerRequest<N>>,
751 direction: Direction,
753 timeout: Arc<AtomicU64>,
756 range_info: Option<BlockRangeInfo>,
758 },
759 AlreadyConnected {
761 peer_id: PeerId,
763 remote_addr: SocketAddr,
765 direction: Direction,
767 },
768 ValidMessage {
770 peer_id: PeerId,
772 message: PeerMessage<N>,
774 },
775 BadMessage {
777 peer_id: PeerId,
779 },
780 ProtocolBreach {
782 peer_id: PeerId,
784 },
785 IncomingPendingSessionClosed {
787 remote_addr: SocketAddr,
789 error: Option<PendingSessionHandshakeError>,
791 },
792 OutgoingPendingSessionClosed {
794 remote_addr: SocketAddr,
796 peer_id: PeerId,
798 error: Option<PendingSessionHandshakeError>,
800 },
801 OutgoingConnectionError {
803 remote_addr: SocketAddr,
805 peer_id: PeerId,
807 error: io::Error,
809 },
810 SessionClosedOnConnectionError {
812 peer_id: PeerId,
814 remote_addr: SocketAddr,
816 error: EthStreamError,
818 },
819 Disconnected {
821 peer_id: PeerId,
823 remote_addr: SocketAddr,
825 },
826}
827
828#[derive(Debug, thiserror::Error)]
830pub enum PendingSessionHandshakeError {
831 #[error(transparent)]
833 Eth(EthStreamError),
834 #[error(transparent)]
836 Ecies(ECIESError),
837 #[error("authentication timed out")]
839 Timeout,
840 #[error("Mandatory extra capability unsupported")]
842 UnsupportedExtraCapability,
843}
844
845impl PendingSessionHandshakeError {
846 pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
848 match self {
849 Self::Eth(eth_err) => eth_err.as_disconnected(),
850 _ => None,
851 }
852 }
853}
854
855#[derive(Debug, Clone, thiserror::Error)]
858#[error("session limit reached {0}")]
859pub struct ExceedsSessionLimit(pub(crate) u32);
860
861pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
863 timeout: Duration,
864 session_id: SessionId,
865 remote_addr: SocketAddr,
866 direction: Direction,
867 events: mpsc::Sender<PendingSessionEvent<N>>,
868 f: F,
869) where
870 F: Future<Output = ()>,
871{
872 if tokio::time::timeout(timeout, f).await.is_err() {
873 trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
874 let event = PendingSessionEvent::Disconnected {
875 remote_addr,
876 session_id,
877 direction,
878 error: Some(PendingSessionHandshakeError::Timeout),
879 };
880 let _ = events.send(event).await;
881 }
882}
883
884#[expect(clippy::too_many_arguments)]
888pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
889 handshake: Arc<dyn EthRlpxHandshake>,
890 disconnect_rx: oneshot::Receiver<()>,
891 session_id: SessionId,
892 stream: TcpStream,
893 events: mpsc::Sender<PendingSessionEvent<N>>,
894 remote_addr: SocketAddr,
895 secret_key: SecretKey,
896 hello: HelloMessageWithProtocols,
897 status: UnifiedStatus,
898 fork_filter: ForkFilter,
899 extra_handlers: RlpxSubProtocolHandlers,
900) {
901 authenticate(
902 handshake,
903 disconnect_rx,
904 events,
905 stream,
906 session_id,
907 remote_addr,
908 secret_key,
909 Direction::Incoming,
910 hello,
911 status,
912 fork_filter,
913 extra_handlers,
914 )
915 .await
916}
917
918#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
920#[expect(clippy::too_many_arguments)]
921async fn start_pending_outbound_session<N: NetworkPrimitives>(
922 handshake: Arc<dyn EthRlpxHandshake>,
923 disconnect_rx: oneshot::Receiver<()>,
924 events: mpsc::Sender<PendingSessionEvent<N>>,
925 session_id: SessionId,
926 remote_addr: SocketAddr,
927 remote_peer_id: PeerId,
928 secret_key: SecretKey,
929 hello: HelloMessageWithProtocols,
930 status: UnifiedStatus,
931 fork_filter: ForkFilter,
932 extra_handlers: RlpxSubProtocolHandlers,
933) {
934 let stream = match TcpStream::connect(remote_addr).await {
935 Ok(stream) => {
936 if let Err(err) = stream.set_nodelay(true) {
937 tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
938 }
939 stream
940 }
941 Err(error) => {
942 let _ = events
943 .send(PendingSessionEvent::OutgoingConnectionError {
944 remote_addr,
945 session_id,
946 peer_id: remote_peer_id,
947 error,
948 })
949 .await;
950 return
951 }
952 };
953 authenticate(
954 handshake,
955 disconnect_rx,
956 events,
957 stream,
958 session_id,
959 remote_addr,
960 secret_key,
961 Direction::Outgoing(remote_peer_id),
962 hello,
963 status,
964 fork_filter,
965 extra_handlers,
966 )
967 .await
968}
969
970#[expect(clippy::too_many_arguments)]
972async fn authenticate<N: NetworkPrimitives>(
973 handshake: Arc<dyn EthRlpxHandshake>,
974 disconnect_rx: oneshot::Receiver<()>,
975 events: mpsc::Sender<PendingSessionEvent<N>>,
976 stream: TcpStream,
977 session_id: SessionId,
978 remote_addr: SocketAddr,
979 secret_key: SecretKey,
980 direction: Direction,
981 hello: HelloMessageWithProtocols,
982 status: UnifiedStatus,
983 fork_filter: ForkFilter,
984 extra_handlers: RlpxSubProtocolHandlers,
985) {
986 let local_addr = stream.local_addr().ok();
987 let stream = match get_ecies_stream(stream, secret_key, direction).await {
988 Ok(stream) => stream,
989 Err(error) => {
990 let _ = events
991 .send(PendingSessionEvent::EciesAuthError {
992 remote_addr,
993 session_id,
994 error,
995 direction,
996 })
997 .await;
998 return
999 }
1000 };
1001
1002 let unauthed = UnauthedP2PStream::new(stream);
1003
1004 let auth = authenticate_stream(
1005 handshake,
1006 unauthed,
1007 session_id,
1008 remote_addr,
1009 local_addr,
1010 direction,
1011 hello,
1012 status,
1013 fork_filter,
1014 extra_handlers,
1015 )
1016 .boxed();
1017
1018 match futures::future::select(disconnect_rx, auth).await {
1019 Either::Left((_, _)) => {
1020 let _ = events
1021 .send(PendingSessionEvent::Disconnected {
1022 remote_addr,
1023 session_id,
1024 direction,
1025 error: None,
1026 })
1027 .await;
1028 }
1029 Either::Right((res, _)) => {
1030 let _ = events.send(res).await;
1031 }
1032 }
1033}
1034
1035async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1038 stream: Io,
1039 secret_key: SecretKey,
1040 direction: Direction,
1041) -> Result<ECIESStream<Io>, ECIESError> {
1042 match direction {
1043 Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1044 Direction::Outgoing(remote_peer_id) => {
1045 ECIESStream::connect(stream, secret_key, remote_peer_id).await
1046 }
1047 }
1048}
1049
1050#[expect(clippy::too_many_arguments)]
1057async fn authenticate_stream<N: NetworkPrimitives>(
1058 handshake: Arc<dyn EthRlpxHandshake>,
1059 stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1060 session_id: SessionId,
1061 remote_addr: SocketAddr,
1062 local_addr: Option<SocketAddr>,
1063 direction: Direction,
1064 mut hello: HelloMessageWithProtocols,
1065 mut status: UnifiedStatus,
1066 fork_filter: ForkFilter,
1067 mut extra_handlers: RlpxSubProtocolHandlers,
1068) -> PendingSessionEvent<N> {
1069 extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1071
1072 let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1074 Ok(stream_res) => stream_res,
1075 Err(err) => {
1076 return PendingSessionEvent::Disconnected {
1077 remote_addr,
1078 session_id,
1079 direction,
1080 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1081 }
1082 }
1083 };
1084
1085 if !extra_handlers.is_empty() {
1087 while let Some(pos) = extra_handlers.iter().position(|handler| {
1089 p2p_stream
1090 .shared_capabilities()
1091 .ensure_matching_capability(&handler.protocol().cap)
1092 .is_err()
1093 }) {
1094 let handler = extra_handlers.remove(pos);
1095 if handler.on_unsupported_by_peer(
1096 p2p_stream.shared_capabilities(),
1097 direction,
1098 their_hello.id,
1099 ) == OnNotSupported::Disconnect
1100 {
1101 return PendingSessionEvent::Disconnected {
1102 remote_addr,
1103 session_id,
1104 direction,
1105 error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1106 };
1107 }
1108 }
1109 }
1110
1111 let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1113 Ok(version) => version,
1114 Err(err) => {
1115 return PendingSessionEvent::Disconnected {
1116 remote_addr,
1117 session_id,
1118 direction,
1119 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1120 }
1121 }
1122 };
1123
1124 status.set_eth_version(eth_version);
1126
1127 let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1128 match handshake
1133 .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1134 .await
1135 {
1136 Ok(their_status) => {
1137 let eth_stream = EthStream::new(eth_version, p2p_stream);
1138 (eth_stream.into(), their_status)
1139 }
1140 Err(err) => {
1141 return PendingSessionEvent::Disconnected {
1142 remote_addr,
1143 session_id,
1144 direction,
1145 error: Some(PendingSessionHandshakeError::Eth(err)),
1146 }
1147 }
1148 }
1149 } else {
1150 let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1152
1153 for handler in extra_handlers.into_iter() {
1155 let cap = handler.protocol().cap;
1156 let remote_peer_id = their_hello.id;
1157
1158 multiplex_stream
1159 .install_protocol(&cap, move |conn| {
1160 handler.into_connection(direction, remote_peer_id, conn)
1161 })
1162 .ok();
1163 }
1164
1165 let (multiplex_stream, their_status) = match multiplex_stream
1166 .into_eth_satellite_stream(status, fork_filter, handshake)
1167 .await
1168 {
1169 Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1170 Err(err) => {
1171 return PendingSessionEvent::Disconnected {
1172 remote_addr,
1173 session_id,
1174 direction,
1175 error: Some(PendingSessionHandshakeError::Eth(err)),
1176 }
1177 }
1178 };
1179
1180 (multiplex_stream.into(), their_status)
1181 };
1182
1183 PendingSessionEvent::Established {
1184 session_id,
1185 remote_addr,
1186 local_addr,
1187 peer_id: their_hello.id,
1188 capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1189 status: Arc::new(their_status),
1190 conn,
1191 direction,
1192 client_id: their_hello.client_version,
1193 }
1194}