1mod active;
4mod conn;
5mod counter;
6mod handle;
7mod types;
8pub use types::BlockRangeInfo;
9
10use crate::{
11 message::PeerMessage,
12 metrics::SessionManagerMetrics,
13 protocol::{IntoRlpxSubProtocol, OnNotSupported, RlpxSubProtocolHandlers, RlpxSubProtocols},
14 session::active::ActiveSession,
15};
16use active::QueuedOutgoingMessages;
17use counter::SessionCounter;
18use futures::{future::Either, io, FutureExt, StreamExt};
19use reth_ecies::{stream::ECIESStream, ECIESError};
20use reth_eth_wire::{
21 errors::EthStreamError, handshake::EthRlpxHandshake, multiplex::RlpxProtocolMultiplexer,
22 BlockRangeUpdate, Capabilities, DisconnectReason, EthStream, EthVersion,
23 HelloMessageWithProtocols, NetworkPrimitives, UnauthedP2PStream, UnifiedStatus,
24 HANDSHAKE_TIMEOUT,
25};
26use reth_ethereum_forks::{ForkFilter, ForkId, ForkTransition, Head};
27use reth_metrics::common::mpsc::MeteredPollSender;
28use reth_network_api::{PeerRequest, PeerRequestSender};
29use reth_network_peers::PeerId;
30use reth_network_types::SessionsConfig;
31use reth_tasks::Runtime;
32use rustc_hash::FxHashMap;
33use secp256k1::SecretKey;
34use std::{
35 collections::HashMap,
36 future::Future,
37 net::SocketAddr,
38 sync::{atomic::AtomicU64, Arc},
39 task::{Context, Poll},
40 time::{Duration, Instant},
41};
42use tokio::{
43 io::{AsyncRead, AsyncWrite},
44 net::TcpStream,
45 sync::{mpsc, mpsc::error::TrySendError, oneshot},
46};
47use tokio_stream::wrappers::ReceiverStream;
48use tokio_util::sync::PollSender;
49use tracing::{debug, instrument, trace};
50
51use crate::session::active::RANGE_UPDATE_INTERVAL;
52pub use conn::EthRlpxConnection;
53pub use handle::{
54 ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
55 SessionCommand,
56};
57pub use reth_network_api::{Direction, PeerInfo};
58
59#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
61pub struct SessionId(usize);
62
63#[must_use = "Session Manager must be polled to process session events."]
65#[derive(Debug)]
66pub struct SessionManager<N: NetworkPrimitives> {
67 next_id: usize,
69 counter: SessionCounter,
71 initial_internal_request_timeout: Duration,
74 protocol_breach_request_timeout: Duration,
77 pending_session_timeout: Duration,
79 secret_key: SecretKey,
81 status: UnifiedStatus,
83 hello_message: HelloMessageWithProtocols,
85 fork_filter: ForkFilter,
87 session_command_buffer: usize,
89 executor: Runtime,
91 pending_sessions: FxHashMap<SessionId, PendingSessionHandle>,
96 active_sessions: HashMap<PeerId, ActiveSessionHandle<N>>,
98 pending_sessions_tx: mpsc::Sender<PendingSessionEvent<N>>,
103 pending_session_rx: ReceiverStream<PendingSessionEvent<N>>,
105 active_session_tx: MeteredPollSender<ActiveSessionMessage<N>>,
110 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
112 extra_protocols: RlpxSubProtocols,
114 disconnections_counter: DisconnectionsCounter,
116 metrics: SessionManagerMetrics,
118 handshake: Arc<dyn EthRlpxHandshake>,
120 local_range_info: BlockRangeInfo,
123}
124
125impl<N: NetworkPrimitives> SessionManager<N> {
128 #[expect(clippy::too_many_arguments)]
130 pub fn new(
131 secret_key: SecretKey,
132 config: SessionsConfig,
133 executor: Runtime,
134 status: UnifiedStatus,
135 hello_message: HelloMessageWithProtocols,
136 fork_filter: ForkFilter,
137 extra_protocols: RlpxSubProtocols,
138 handshake: Arc<dyn EthRlpxHandshake>,
139 ) -> Self {
140 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
141 let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
142 let active_session_tx = PollSender::new(active_session_tx);
143
144 let local_range_info = BlockRangeInfo::new(
146 status.earliest_block.unwrap_or_default(),
147 status.latest_block.unwrap_or_default(),
148 status.blockhash,
149 );
150
151 Self {
152 next_id: 0,
153 counter: SessionCounter::new(config.limits),
154 initial_internal_request_timeout: config.initial_internal_request_timeout,
155 protocol_breach_request_timeout: config.protocol_breach_request_timeout,
156 pending_session_timeout: config.pending_session_timeout,
157 secret_key,
158 status,
159 hello_message,
160 fork_filter,
161 session_command_buffer: config.session_command_buffer,
162 executor,
163 pending_sessions: Default::default(),
164 active_sessions: Default::default(),
165 pending_sessions_tx,
166 pending_session_rx: ReceiverStream::new(pending_sessions_rx),
167 active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
168 active_session_rx: ReceiverStream::new(active_session_rx),
169 extra_protocols,
170 disconnections_counter: Default::default(),
171 metrics: Default::default(),
172 handshake,
173 local_range_info,
174 }
175 }
176
177 pub(crate) const fn fork_id(&self) -> ForkId {
179 self.fork_filter.current()
180 }
181
182 pub fn is_valid_fork_id(&self, fork_id: ForkId) -> bool {
185 self.fork_filter.validate(fork_id).is_ok()
186 }
187
188 const fn next_id(&mut self) -> SessionId {
190 let id = self.next_id;
191 self.next_id += 1;
192 SessionId(id)
193 }
194
195 pub const fn status(&self) -> UnifiedStatus {
197 self.status
198 }
199
200 pub const fn secret_key(&self) -> SecretKey {
202 self.secret_key
203 }
204
205 pub const fn active_sessions(&self) -> &HashMap<PeerId, ActiveSessionHandle<N>> {
207 &self.active_sessions
208 }
209
210 pub fn hello_message(&self) -> HelloMessageWithProtocols {
212 self.hello_message.clone()
213 }
214
215 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
217 self.extra_protocols.push(protocol)
218 }
219
220 #[inline]
222 pub(crate) fn num_pending_connections(&self) -> usize {
223 self.pending_sessions.len()
224 }
225
226 fn spawn<F>(&self, f: F)
229 where
230 F: Future<Output = ()> + Send + 'static,
231 {
232 self.executor.spawn_task(f);
233 }
234
235 pub(crate) fn on_status_update(&mut self, head: Head) -> Option<ForkTransition> {
240 self.status.blockhash = head.hash;
241 self.status.total_difficulty = Some(head.total_difficulty);
242 let transition = self.fork_filter.set_head(head);
243 self.status.forkid = self.fork_filter.current();
244 self.status.latest_block = Some(head.number);
245
246 transition
247 }
248
249 pub(crate) fn on_incoming(
254 &mut self,
255 stream: TcpStream,
256 remote_addr: SocketAddr,
257 ) -> Result<SessionId, ExceedsSessionLimit> {
258 self.counter.ensure_pending_inbound()?;
259
260 let session_id = self.next_id();
261
262 trace!(
263 target: "net::session",
264 ?remote_addr,
265 ?session_id,
266 "new pending incoming session"
267 );
268
269 let (disconnect_tx, disconnect_rx) = oneshot::channel();
270 let pending_events = self.pending_sessions_tx.clone();
271 let secret_key = self.secret_key;
272 let hello_message = self.hello_message.clone();
273 let status = self.status;
274 let fork_filter = self.fork_filter.clone();
275 let extra_handlers = self.extra_protocols.on_incoming(remote_addr);
276 self.spawn(pending_session_with_timeout(
277 self.pending_session_timeout,
278 session_id,
279 remote_addr,
280 Direction::Incoming,
281 pending_events.clone(),
282 start_pending_incoming_session(
283 self.handshake.clone(),
284 disconnect_rx,
285 session_id,
286 stream,
287 pending_events,
288 remote_addr,
289 secret_key,
290 hello_message,
291 status,
292 fork_filter,
293 extra_handlers,
294 ),
295 ));
296
297 let handle = PendingSessionHandle {
298 disconnect_tx: Some(disconnect_tx),
299 direction: Direction::Incoming,
300 };
301 self.pending_sessions.insert(session_id, handle);
302 self.counter.inc_pending_inbound();
303 Ok(session_id)
304 }
305
306 pub fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_peer_id: PeerId) {
308 if self.counter.ensure_pending_outbound().is_ok() {
310 let session_id = self.next_id();
311 let (disconnect_tx, disconnect_rx) = oneshot::channel();
312 let pending_events = self.pending_sessions_tx.clone();
313 let secret_key = self.secret_key;
314 let hello_message = self.hello_message.clone();
315 let fork_filter = self.fork_filter.clone();
316 let status = self.status;
317 let extra_handlers = self.extra_protocols.on_outgoing(remote_addr, remote_peer_id);
318 self.spawn(pending_session_with_timeout(
319 self.pending_session_timeout,
320 session_id,
321 remote_addr,
322 Direction::Outgoing(remote_peer_id),
323 pending_events.clone(),
324 start_pending_outbound_session(
325 self.handshake.clone(),
326 disconnect_rx,
327 pending_events,
328 session_id,
329 remote_addr,
330 remote_peer_id,
331 secret_key,
332 hello_message,
333 status,
334 fork_filter,
335 extra_handlers,
336 ),
337 ));
338
339 let handle = PendingSessionHandle {
340 disconnect_tx: Some(disconnect_tx),
341 direction: Direction::Outgoing(remote_peer_id),
342 };
343 self.pending_sessions.insert(session_id, handle);
344 self.counter.inc_pending_outbound();
345 }
346 }
347
348 pub fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
353 if let Some(session) = self.active_sessions.get(&node) {
354 session.disconnect(reason);
355 }
356 }
357
358 pub fn disconnect_all(&self, reason: Option<DisconnectReason>) {
363 for session in self.active_sessions.values() {
364 session.disconnect(reason);
365 }
366 }
367
368 pub fn disconnect_all_pending(&mut self) {
370 for session in self.pending_sessions.values_mut() {
371 session.disconnect();
372 }
373 }
374
375 pub fn send_message(&self, peer_id: &PeerId, msg: PeerMessage<N>) {
377 if let Some(session) = self.active_sessions.get(peer_id) {
378 let _ = session.commands_to_session.try_send(SessionCommand::Message(msg)).inspect_err(
379 |e| {
380 if let TrySendError::Full(SessionCommand::Message(msg)) = e {
381 debug!(
382 target: "net::session",
383 ?peer_id,
384 msg_kind = msg.message_kind(),
385 items = msg.message_item_count(),
386 "session command buffer full, dropping message"
387 );
388 self.metrics.total_outgoing_peer_messages_dropped.increment(1);
389 }
390 },
391 );
392 }
393 }
394
395 fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
397 let session = self.pending_sessions.remove(id)?;
398 self.counter.dec_pending(&session.direction);
399 Some(session)
400 }
401
402 fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle<N>> {
404 let session = self.active_sessions.remove(id)?;
405 self.counter.dec_active(&session.direction);
406 Some(session)
407 }
408
409 pub(crate) fn try_disconnect_incoming_connection(
413 &self,
414 stream: TcpStream,
415 reason: DisconnectReason,
416 ) {
417 if !self.disconnections_counter.has_capacity() {
418 return
420 }
421
422 let guard = self.disconnections_counter.clone();
423 let secret_key = self.secret_key;
424
425 self.spawn(async move {
426 trace!(
427 target: "net::session",
428 "gracefully disconnecting incoming connection"
429 );
430 if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
431 let mut unauth = UnauthedP2PStream::new(stream);
432 let _ = unauth.send_disconnect(reason).await;
433 drop(guard);
434 }
435 });
436 }
437
438 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SessionEvent<N>> {
442 match self.active_session_rx.poll_next_unpin(cx) {
444 Poll::Pending => {}
445 Poll::Ready(None) => {
446 unreachable!("Manager holds both channel halves.")
447 }
448 Poll::Ready(Some(event)) => {
449 return match event {
450 ActiveSessionMessage::Disconnected { peer_id, remote_addr } => {
451 trace!(
452 target: "net::session",
453 ?peer_id,
454 "gracefully disconnected active session."
455 );
456 self.remove_active_session(&peer_id);
457 Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
458 }
459 ActiveSessionMessage::ClosedOnConnectionError {
460 peer_id,
461 remote_addr,
462 error,
463 } => {
464 trace!(target: "net::session", ?peer_id, %error,"closed session.");
465 self.remove_active_session(&peer_id);
466 Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
467 remote_addr,
468 peer_id,
469 error,
470 })
471 }
472 ActiveSessionMessage::ValidMessage { peer_id, message } => {
473 Poll::Ready(SessionEvent::ValidMessage { peer_id, message })
474 }
475 ActiveSessionMessage::BadMessage { peer_id } => {
476 Poll::Ready(SessionEvent::BadMessage { peer_id })
477 }
478 ActiveSessionMessage::ProtocolBreach { peer_id } => {
479 Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
480 }
481 }
482 }
483 }
484
485 let event = match self.pending_session_rx.poll_next_unpin(cx) {
487 Poll::Pending => return Poll::Pending,
488 Poll::Ready(None) => unreachable!("Manager holds both channel halves."),
489 Poll::Ready(Some(event)) => event,
490 };
491 match event {
492 PendingSessionEvent::Established {
493 session_id,
494 remote_addr,
495 local_addr,
496 peer_id,
497 capabilities,
498 conn,
499 status,
500 direction,
501 client_id,
502 } => {
503 self.remove_pending_session(&session_id);
505
506 if self.active_sessions.contains_key(&peer_id) {
508 trace!(
509 target: "net::session",
510 ?session_id,
511 ?remote_addr,
512 ?peer_id,
513 ?direction,
514 "already connected"
515 );
516
517 self.spawn(async move {
518 let _ =
520 conn.into_inner().disconnect(DisconnectReason::AlreadyConnected).await;
521 });
522
523 return Poll::Ready(SessionEvent::AlreadyConnected {
524 peer_id,
525 remote_addr,
526 direction,
527 })
528 }
529
530 let (commands_to_session, commands_rx) = mpsc::channel(self.session_command_buffer);
531
532 let (to_session_tx, messages_rx) = mpsc::channel(self.session_command_buffer);
533
534 let messages = PeerRequestSender::new(peer_id, to_session_tx);
535
536 let timeout = Arc::new(AtomicU64::new(
537 self.initial_internal_request_timeout.as_millis() as u64,
538 ));
539
540 let version = conn.version();
542
543 let range_update_interval = (conn.version() >= EthVersion::Eth69).then(|| {
548 let start = tokio::time::Instant::now() + RANGE_UPDATE_INTERVAL;
549 let mut interval = tokio::time::interval_at(start, RANGE_UPDATE_INTERVAL);
550 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
551 interval
552 });
553
554 let session = ActiveSession {
555 next_id: 0,
556 remote_peer_id: peer_id,
557 remote_addr,
558 remote_capabilities: Arc::clone(&capabilities),
559 session_id,
560 commands_rx: ReceiverStream::new(commands_rx),
561 to_session_manager: self.active_session_tx.clone(),
562 pending_message_to_session: None,
563 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
564 inflight_requests: Default::default(),
565 conn,
566 queued_outgoing: QueuedOutgoingMessages::new(
567 self.metrics.queued_outgoing_messages.clone(),
568 ),
569 received_requests_from_remote: Default::default(),
570 internal_request_timeout_interval: tokio::time::interval(
571 self.initial_internal_request_timeout,
572 ),
573 internal_request_timeout: Arc::clone(&timeout),
574 protocol_breach_request_timeout: self.protocol_breach_request_timeout,
575 terminate_message: None,
576 range_info: None,
577 local_range_info: self.local_range_info.clone(),
578 range_update_interval,
579 last_sent_latest_block: None,
580 };
581
582 self.spawn(session);
583
584 let client_version = client_id.into();
585 let handle = ActiveSessionHandle {
586 status: status.clone(),
587 direction,
588 session_id,
589 remote_id: peer_id,
590 version,
591 established: Instant::now(),
592 capabilities: Arc::clone(&capabilities),
593 commands_to_session,
594 client_version: Arc::clone(&client_version),
595 remote_addr,
596 local_addr,
597 };
598
599 self.active_sessions.insert(peer_id, handle);
600 self.counter.inc_active(&direction);
601
602 if direction.is_outgoing() {
603 self.metrics.total_dial_successes.increment(1);
604 }
605
606 Poll::Ready(SessionEvent::SessionEstablished {
607 peer_id,
608 remote_addr,
609 client_version,
610 version,
611 capabilities,
612 status,
613 messages,
614 direction,
615 timeout,
616 range_info: None,
617 })
618 }
619 PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
620 trace!(
621 target: "net::session",
622 ?session_id,
623 ?remote_addr,
624 ?error,
625 "disconnected pending session"
626 );
627 self.remove_pending_session(&session_id);
628 match direction {
629 Direction::Incoming => {
630 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
631 remote_addr,
632 error,
633 })
634 }
635 Direction::Outgoing(peer_id) => {
636 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
637 remote_addr,
638 peer_id,
639 error,
640 })
641 }
642 }
643 }
644 PendingSessionEvent::OutgoingConnectionError {
645 remote_addr,
646 session_id,
647 peer_id,
648 error,
649 } => {
650 trace!(
651 target: "net::session",
652 %error,
653 ?session_id,
654 ?remote_addr,
655 ?peer_id,
656 "connection refused"
657 );
658 self.remove_pending_session(&session_id);
659 Poll::Ready(SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error })
660 }
661 PendingSessionEvent::EciesAuthError { remote_addr, session_id, error, direction } => {
662 trace!(
663 target: "net::session",
664 %error,
665 ?session_id,
666 ?remote_addr,
667 "ecies auth failed"
668 );
669 self.remove_pending_session(&session_id);
670 match direction {
671 Direction::Incoming => {
672 Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
673 remote_addr,
674 error: Some(PendingSessionHandshakeError::Ecies(error)),
675 })
676 }
677 Direction::Outgoing(peer_id) => {
678 Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
679 remote_addr,
680 peer_id,
681 error: Some(PendingSessionHandshakeError::Ecies(error)),
682 })
683 }
684 }
685 }
686 }
687 }
688
689 pub(crate) fn update_advertised_block_range(&mut self, block_range_update: BlockRangeUpdate) {
697 self.status.earliest_block = Some(block_range_update.earliest);
698 self.status.latest_block = Some(block_range_update.latest);
699 self.status.blockhash = block_range_update.latest_hash;
700
701 self.local_range_info.update(
703 block_range_update.earliest,
704 block_range_update.latest,
705 block_range_update.latest_hash,
706 );
707 }
708}
709
710#[derive(Default, Debug, Clone)]
712struct DisconnectionsCounter(Arc<()>);
713
714impl DisconnectionsCounter {
715 const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
716
717 fn has_capacity(&self) -> bool {
720 Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
721 }
722}
723
724#[derive(Debug)]
726pub enum SessionEvent<N: NetworkPrimitives> {
727 SessionEstablished {
731 peer_id: PeerId,
733 remote_addr: SocketAddr,
735 client_version: Arc<str>,
737 capabilities: Arc<Capabilities>,
739 version: EthVersion,
741 status: Arc<UnifiedStatus>,
743 messages: PeerRequestSender<PeerRequest<N>>,
745 direction: Direction,
747 timeout: Arc<AtomicU64>,
750 range_info: Option<BlockRangeInfo>,
752 },
753 AlreadyConnected {
755 peer_id: PeerId,
757 remote_addr: SocketAddr,
759 direction: Direction,
761 },
762 ValidMessage {
764 peer_id: PeerId,
766 message: PeerMessage<N>,
768 },
769 BadMessage {
771 peer_id: PeerId,
773 },
774 ProtocolBreach {
776 peer_id: PeerId,
778 },
779 IncomingPendingSessionClosed {
781 remote_addr: SocketAddr,
783 error: Option<PendingSessionHandshakeError>,
785 },
786 OutgoingPendingSessionClosed {
788 remote_addr: SocketAddr,
790 peer_id: PeerId,
792 error: Option<PendingSessionHandshakeError>,
794 },
795 OutgoingConnectionError {
797 remote_addr: SocketAddr,
799 peer_id: PeerId,
801 error: io::Error,
803 },
804 SessionClosedOnConnectionError {
806 peer_id: PeerId,
808 remote_addr: SocketAddr,
810 error: EthStreamError,
812 },
813 Disconnected {
815 peer_id: PeerId,
817 remote_addr: SocketAddr,
819 },
820}
821
822#[derive(Debug, thiserror::Error)]
824pub enum PendingSessionHandshakeError {
825 #[error(transparent)]
827 Eth(EthStreamError),
828 #[error(transparent)]
830 Ecies(ECIESError),
831 #[error("authentication timed out")]
833 Timeout,
834 #[error("Mandatory extra capability unsupported")]
836 UnsupportedExtraCapability,
837}
838
839impl PendingSessionHandshakeError {
840 pub const fn as_disconnected(&self) -> Option<DisconnectReason> {
842 match self {
843 Self::Eth(eth_err) => eth_err.as_disconnected(),
844 _ => None,
845 }
846 }
847}
848
849#[derive(Debug, Clone, thiserror::Error)]
852#[error("session limit reached {0}")]
853pub struct ExceedsSessionLimit(pub(crate) u32);
854
855pub(crate) async fn pending_session_with_timeout<F, N: NetworkPrimitives>(
857 timeout: Duration,
858 session_id: SessionId,
859 remote_addr: SocketAddr,
860 direction: Direction,
861 events: mpsc::Sender<PendingSessionEvent<N>>,
862 f: F,
863) where
864 F: Future<Output = ()>,
865{
866 if tokio::time::timeout(timeout, f).await.is_err() {
867 trace!(target: "net::session", ?remote_addr, ?direction, "pending session timed out");
868 let event = PendingSessionEvent::Disconnected {
869 remote_addr,
870 session_id,
871 direction,
872 error: Some(PendingSessionHandshakeError::Timeout),
873 };
874 let _ = events.send(event).await;
875 }
876}
877
878#[expect(clippy::too_many_arguments)]
882pub(crate) async fn start_pending_incoming_session<N: NetworkPrimitives>(
883 handshake: Arc<dyn EthRlpxHandshake>,
884 disconnect_rx: oneshot::Receiver<()>,
885 session_id: SessionId,
886 stream: TcpStream,
887 events: mpsc::Sender<PendingSessionEvent<N>>,
888 remote_addr: SocketAddr,
889 secret_key: SecretKey,
890 hello: HelloMessageWithProtocols,
891 status: UnifiedStatus,
892 fork_filter: ForkFilter,
893 extra_handlers: RlpxSubProtocolHandlers,
894) {
895 authenticate(
896 handshake,
897 disconnect_rx,
898 events,
899 stream,
900 session_id,
901 remote_addr,
902 secret_key,
903 Direction::Incoming,
904 hello,
905 status,
906 fork_filter,
907 extra_handlers,
908 )
909 .await
910}
911
912#[instrument(level = "trace", target = "net::network", skip_all, fields(%remote_addr, peer_id = ?remote_peer_id))]
914#[expect(clippy::too_many_arguments)]
915async fn start_pending_outbound_session<N: NetworkPrimitives>(
916 handshake: Arc<dyn EthRlpxHandshake>,
917 disconnect_rx: oneshot::Receiver<()>,
918 events: mpsc::Sender<PendingSessionEvent<N>>,
919 session_id: SessionId,
920 remote_addr: SocketAddr,
921 remote_peer_id: PeerId,
922 secret_key: SecretKey,
923 hello: HelloMessageWithProtocols,
924 status: UnifiedStatus,
925 fork_filter: ForkFilter,
926 extra_handlers: RlpxSubProtocolHandlers,
927) {
928 let stream = match TcpStream::connect(remote_addr).await {
929 Ok(stream) => {
930 if let Err(err) = stream.set_nodelay(true) {
931 tracing::warn!(target: "net::session", "set nodelay failed: {:?}", err);
932 }
933 stream
934 }
935 Err(error) => {
936 let _ = events
937 .send(PendingSessionEvent::OutgoingConnectionError {
938 remote_addr,
939 session_id,
940 peer_id: remote_peer_id,
941 error,
942 })
943 .await;
944 return
945 }
946 };
947 authenticate(
948 handshake,
949 disconnect_rx,
950 events,
951 stream,
952 session_id,
953 remote_addr,
954 secret_key,
955 Direction::Outgoing(remote_peer_id),
956 hello,
957 status,
958 fork_filter,
959 extra_handlers,
960 )
961 .await
962}
963
964#[expect(clippy::too_many_arguments)]
966async fn authenticate<N: NetworkPrimitives>(
967 handshake: Arc<dyn EthRlpxHandshake>,
968 disconnect_rx: oneshot::Receiver<()>,
969 events: mpsc::Sender<PendingSessionEvent<N>>,
970 stream: TcpStream,
971 session_id: SessionId,
972 remote_addr: SocketAddr,
973 secret_key: SecretKey,
974 direction: Direction,
975 hello: HelloMessageWithProtocols,
976 status: UnifiedStatus,
977 fork_filter: ForkFilter,
978 extra_handlers: RlpxSubProtocolHandlers,
979) {
980 let local_addr = stream.local_addr().ok();
981 let stream = match get_ecies_stream(stream, secret_key, direction).await {
982 Ok(stream) => stream,
983 Err(error) => {
984 let _ = events
985 .send(PendingSessionEvent::EciesAuthError {
986 remote_addr,
987 session_id,
988 error,
989 direction,
990 })
991 .await;
992 return
993 }
994 };
995
996 let unauthed = UnauthedP2PStream::new(stream);
997
998 let auth = authenticate_stream(
999 handshake,
1000 unauthed,
1001 session_id,
1002 remote_addr,
1003 local_addr,
1004 direction,
1005 hello,
1006 status,
1007 fork_filter,
1008 extra_handlers,
1009 )
1010 .boxed();
1011
1012 match futures::future::select(disconnect_rx, auth).await {
1013 Either::Left((_, _)) => {
1014 let _ = events
1015 .send(PendingSessionEvent::Disconnected {
1016 remote_addr,
1017 session_id,
1018 direction,
1019 error: None,
1020 })
1021 .await;
1022 }
1023 Either::Right((res, _)) => {
1024 let _ = events.send(res).await;
1025 }
1026 }
1027}
1028
1029async fn get_ecies_stream<Io: AsyncRead + AsyncWrite + Unpin>(
1032 stream: Io,
1033 secret_key: SecretKey,
1034 direction: Direction,
1035) -> Result<ECIESStream<Io>, ECIESError> {
1036 match direction {
1037 Direction::Incoming => ECIESStream::incoming(stream, secret_key).await,
1038 Direction::Outgoing(remote_peer_id) => {
1039 ECIESStream::connect(stream, secret_key, remote_peer_id).await
1040 }
1041 }
1042}
1043
1044#[expect(clippy::too_many_arguments)]
1051async fn authenticate_stream<N: NetworkPrimitives>(
1052 handshake: Arc<dyn EthRlpxHandshake>,
1053 stream: UnauthedP2PStream<ECIESStream<TcpStream>>,
1054 session_id: SessionId,
1055 remote_addr: SocketAddr,
1056 local_addr: Option<SocketAddr>,
1057 direction: Direction,
1058 mut hello: HelloMessageWithProtocols,
1059 mut status: UnifiedStatus,
1060 fork_filter: ForkFilter,
1061 mut extra_handlers: RlpxSubProtocolHandlers,
1062) -> PendingSessionEvent<N> {
1063 extra_handlers.retain(|handler| hello.try_add_protocol(handler.protocol()).is_ok());
1065
1066 let (mut p2p_stream, their_hello) = match stream.handshake(hello).await {
1068 Ok(stream_res) => stream_res,
1069 Err(err) => {
1070 return PendingSessionEvent::Disconnected {
1071 remote_addr,
1072 session_id,
1073 direction,
1074 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1075 }
1076 }
1077 };
1078
1079 if !extra_handlers.is_empty() {
1081 while let Some(pos) = extra_handlers.iter().position(|handler| {
1083 p2p_stream
1084 .shared_capabilities()
1085 .ensure_matching_capability(&handler.protocol().cap)
1086 .is_err()
1087 }) {
1088 let handler = extra_handlers.remove(pos);
1089 if handler.on_unsupported_by_peer(
1090 p2p_stream.shared_capabilities(),
1091 direction,
1092 their_hello.id,
1093 ) == OnNotSupported::Disconnect
1094 {
1095 return PendingSessionEvent::Disconnected {
1096 remote_addr,
1097 session_id,
1098 direction,
1099 error: Some(PendingSessionHandshakeError::UnsupportedExtraCapability),
1100 };
1101 }
1102 }
1103 }
1104
1105 let eth_version = match p2p_stream.shared_capabilities().eth_version() {
1107 Ok(version) => version,
1108 Err(err) => {
1109 return PendingSessionEvent::Disconnected {
1110 remote_addr,
1111 session_id,
1112 direction,
1113 error: Some(PendingSessionHandshakeError::Eth(err.into())),
1114 }
1115 }
1116 };
1117
1118 status.set_eth_version(eth_version);
1120
1121 let (conn, their_status) = if p2p_stream.shared_capabilities().len() == 1 {
1122 match handshake
1127 .handshake(&mut p2p_stream, status, fork_filter.clone(), HANDSHAKE_TIMEOUT)
1128 .await
1129 {
1130 Ok(their_status) => {
1131 let eth_stream = EthStream::new(eth_version, p2p_stream);
1132 (eth_stream.into(), their_status)
1133 }
1134 Err(err) => {
1135 return PendingSessionEvent::Disconnected {
1136 remote_addr,
1137 session_id,
1138 direction,
1139 error: Some(PendingSessionHandshakeError::Eth(err)),
1140 }
1141 }
1142 }
1143 } else {
1144 let mut multiplex_stream = RlpxProtocolMultiplexer::new(p2p_stream);
1146
1147 for handler in extra_handlers.into_iter() {
1149 let cap = handler.protocol().cap;
1150 let remote_peer_id = their_hello.id;
1151
1152 multiplex_stream
1153 .install_protocol(&cap, move |conn| {
1154 handler.into_connection(direction, remote_peer_id, conn)
1155 })
1156 .ok();
1157 }
1158
1159 let (multiplex_stream, their_status) = match multiplex_stream
1160 .into_eth_satellite_stream(status, fork_filter, handshake)
1161 .await
1162 {
1163 Ok((multiplex_stream, their_status)) => (multiplex_stream, their_status),
1164 Err(err) => {
1165 return PendingSessionEvent::Disconnected {
1166 remote_addr,
1167 session_id,
1168 direction,
1169 error: Some(PendingSessionHandshakeError::Eth(err)),
1170 }
1171 }
1172 };
1173
1174 (multiplex_stream.into(), their_status)
1175 };
1176
1177 PendingSessionEvent::Established {
1178 session_id,
1179 remote_addr,
1180 local_addr,
1181 peer_id: their_hello.id,
1182 capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
1183 status: Arc::new(their_status),
1184 conn,
1185 direction,
1186 client_id: their_hello.client_version,
1187 }
1188}