1use core::sync::atomic::Ordering;
4use std::{
5 collections::VecDeque,
6 future::Future,
7 net::SocketAddr,
8 pin::Pin,
9 sync::{
10 atomic::{AtomicU64, AtomicUsize},
11 Arc,
12 },
13 task::{ready, Context, Poll},
14 time::{Duration, Instant},
15};
16
17use crate::{
18 message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult},
19 session::{
20 conn::EthRlpxConnection,
21 handle::{ActiveSessionMessage, SessionCommand},
22 BlockRangeInfo, EthVersion, SessionId,
23 },
24};
25use alloy_eips::merge::EPOCH_SLOTS;
26use alloy_primitives::Sealable;
27use futures::{stream::Fuse, SinkExt, StreamExt};
28use metrics::{Counter, Gauge};
29use reth_eth_wire::{
30 errors::{EthHandshakeError, EthStreamError},
31 message::{EthBroadcastMessage, MessageError},
32 Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload,
33};
34use reth_eth_wire_types::{message::RequestPair, NewPooledTransactionHashes, RawCapabilityMessage};
35use reth_metrics::common::mpsc::MeteredPollSender;
36use reth_network_api::PeerRequest;
37use reth_network_p2p::error::RequestError;
38use reth_network_peers::PeerId;
39use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT;
40use reth_primitives_traits::Block;
41use rustc_hash::FxHashMap;
42use tokio::{
43 sync::{mpsc, mpsc::error::TrySendError, oneshot},
44 time::Interval,
45};
46use tokio_stream::wrappers::ReceiverStream;
47use tokio_util::sync::PollSender;
48use tracing::{debug, trace};
49
50pub(super) const RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(EPOCH_SLOTS * 12);
56
57const MINIMUM_TIMEOUT: Duration = Duration::from_secs(2);
61
62const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT;
64const SAMPLE_IMPACT: f64 = 0.1;
66const TIMEOUT_SCALING: u32 = 3;
68
69const MAX_QUEUED_OUTGOING_RESPONSES: usize = 4;
81
82const MIN_RECEIVED_REQUESTS_CAPACITY: usize = 1;
84
85const MAX_QUEUED_BROADCAST_ITEMS: usize = 4096;
91
92#[derive(Debug, Clone)]
98pub(crate) struct BroadcastItemCounter(Arc<AtomicUsize>);
99
100impl BroadcastItemCounter {
101 pub(crate) fn new() -> Self {
103 Self(Arc::new(AtomicUsize::new(0)))
104 }
105
106 pub(crate) fn get(&self) -> usize {
108 self.0.load(Ordering::Relaxed)
109 }
110
111 pub(crate) fn try_add(&self, n: usize) -> bool {
113 let prev = self.0.fetch_add(n, Ordering::Relaxed);
114 if prev >= MAX_QUEUED_BROADCAST_ITEMS {
115 self.0.fetch_sub(n, Ordering::Relaxed);
116 false
117 } else {
118 true
119 }
120 }
121
122 pub(crate) fn sub(&self, n: usize) {
124 self.0.fetch_sub(n, Ordering::Relaxed);
125 }
126}
127
128#[expect(dead_code)]
138pub(crate) struct ActiveSession<N: NetworkPrimitives> {
139 pub(crate) next_id: u64,
141 pub(crate) conn: EthRlpxConnection<N>,
143 pub(crate) remote_peer_id: PeerId,
145 pub(crate) remote_addr: SocketAddr,
147 pub(crate) remote_capabilities: Arc<Capabilities>,
149 pub(crate) session_id: SessionId,
151 pub(crate) commands_rx: ReceiverStream<SessionCommand<N>>,
153 pub(crate) unbounded_rx: mpsc::UnboundedReceiver<SessionCommand<N>>,
156 pub(crate) unbounded_broadcast_msgs: Counter,
158 pub(crate) to_session_manager: MeteredPollSender<ActiveSessionMessage<N>>,
160 pub(crate) pending_message_to_session: Option<ActiveSessionMessage<N>>,
162 pub(crate) internal_request_rx: Fuse<ReceiverStream<PeerRequest<N>>>,
164 pub(crate) inflight_requests: FxHashMap<u64, InflightRequest<PeerRequest<N>>>,
166 pub(crate) received_requests_from_remote: Vec<ReceivedRequest<N>>,
168 pub(crate) queued_outgoing: QueuedOutgoingMessages<N>,
170 pub(crate) internal_request_timeout: Arc<AtomicU64>,
172 pub(crate) internal_request_timeout_interval: Interval,
174 pub(crate) protocol_breach_request_timeout: Duration,
177 pub(crate) terminate_message:
179 Option<(PollSender<ActiveSessionMessage<N>>, ActiveSessionMessage<N>)>,
180 pub(crate) range_info: Option<BlockRangeInfo>,
183 pub(crate) local_range_info: BlockRangeInfo,
186 pub(crate) range_update_interval: Option<Interval>,
190 pub(crate) last_sent_latest_block: Option<u64>,
193}
194
195impl<N: NetworkPrimitives> ActiveSession<N> {
196 fn is_disconnecting(&self) -> bool {
198 self.conn.inner().is_disconnecting()
199 }
200
201 const fn next_id(&mut self) -> u64 {
203 let id = self.next_id;
204 self.next_id += 1;
205 id
206 }
207
208 pub fn shrink_to_fit(&mut self) {
210 self.received_requests_from_remote.shrink_to(MIN_RECEIVED_REQUESTS_CAPACITY);
211 self.queued_outgoing.shrink_to(MAX_QUEUED_OUTGOING_RESPONSES);
212 }
213
214 fn queued_response_count(&self) -> usize {
216 self.queued_outgoing.messages.iter().filter(|m| m.is_response()).count()
217 }
218
219 fn on_incoming_message(&mut self, msg: EthMessage<N>) -> OnIncomingMessageOutcome<N> {
223 macro_rules! on_request {
227 ($req:ident, $resp_item:ident, $req_item:ident) => {{
228 let RequestPair { request_id, message: request } = $req;
229 let (tx, response) = oneshot::channel();
230 let received = ReceivedRequest {
231 request_id,
232 rx: PeerResponse::$resp_item { response },
233 received: Instant::now(),
234 };
235 self.received_requests_from_remote.push(received);
236 self.try_emit_request(PeerMessage::EthRequest(PeerRequest::$req_item {
237 request,
238 response: tx,
239 }))
240 .into()
241 }};
242 }
243
244 macro_rules! on_response {
246 ($resp:ident, $item:ident) => {{
247 let RequestPair { request_id, message } = $resp;
248 if let Some(req) = self.inflight_requests.remove(&request_id) {
249 match req.request {
250 RequestState::Waiting(PeerRequest::$item { response, .. }) => {
251 trace!(peer_id=?self.remote_peer_id, ?request_id, "received response from peer");
252 let _ = response.send(Ok(message));
253 self.update_request_timeout(req.timestamp, Instant::now());
254 }
255 RequestState::Waiting(request) => {
256 request.send_bad_response();
257 }
258 RequestState::TimedOut => {
259 self.update_request_timeout(req.timestamp, Instant::now());
261 }
262 }
263 } else {
264 trace!(peer_id=?self.remote_peer_id, ?request_id, "received response to unknown request");
265 self.on_bad_message();
267 }
268
269 OnIncomingMessageOutcome::Ok
270 }};
271 }
272
273 match msg {
274 message @ EthMessage::Status(_) => OnIncomingMessageOutcome::BadMessage {
275 error: EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake),
276 message,
277 },
278 EthMessage::NewBlockHashes(msg) => {
279 self.try_emit_broadcast(PeerMessage::NewBlockHashes(msg)).into()
280 }
281 EthMessage::NewBlock(msg) => {
282 let block = NewBlockMessage {
283 hash: msg.block().header().hash_slow(),
284 block: Arc::new(*msg),
285 };
286 self.try_emit_broadcast(PeerMessage::NewBlock(block)).into()
287 }
288 EthMessage::Transactions(msg) => {
289 self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into()
290 }
291 EthMessage::NewPooledTransactionHashes66(msg) => {
292 self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
293 }
294 EthMessage::NewPooledTransactionHashes68(msg) => {
295 self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
296 }
297 EthMessage::NewPooledTransactionHashes72(msg) => {
298 self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
299 }
300 EthMessage::GetBlockHeaders(req) => {
301 on_request!(req, BlockHeaders, GetBlockHeaders)
302 }
303 EthMessage::BlockHeaders(resp) => {
304 on_response!(resp, GetBlockHeaders)
305 }
306 EthMessage::GetBlockBodies(req) => {
307 on_request!(req, BlockBodies, GetBlockBodies)
308 }
309 EthMessage::BlockBodies(resp) => {
310 on_response!(resp, GetBlockBodies)
311 }
312 EthMessage::GetPooledTransactions(req) => {
313 on_request!(req, PooledTransactions, GetPooledTransactions)
314 }
315 EthMessage::PooledTransactions(resp) => {
316 on_response!(resp, GetPooledTransactions)
317 }
318 EthMessage::GetNodeData(req) => {
319 on_request!(req, NodeData, GetNodeData)
320 }
321 EthMessage::NodeData(resp) => {
322 on_response!(resp, GetNodeData)
323 }
324 EthMessage::GetReceipts(req) => {
325 if self.conn.version() >= EthVersion::Eth69 {
326 on_request!(req, Receipts69, GetReceipts69)
327 } else {
328 on_request!(req, Receipts, GetReceipts)
329 }
330 }
331 EthMessage::GetReceipts70(req) => {
332 on_request!(req, Receipts70, GetReceipts70)
333 }
334 EthMessage::Receipts(resp) => {
335 on_response!(resp, GetReceipts)
336 }
337 EthMessage::Receipts69(resp) => {
338 on_response!(resp, GetReceipts69)
339 }
340 EthMessage::Receipts70(resp) => {
341 on_response!(resp, GetReceipts70)
342 }
343 EthMessage::GetBlockAccessLists(req) => {
344 on_request!(req, BlockAccessLists, GetBlockAccessLists)
345 }
346 EthMessage::BlockAccessLists(resp) => {
347 on_response!(resp, GetBlockAccessLists)
348 }
349 EthMessage::Cells(resp) => {
350 on_response!(resp, GetCells)
351 }
352 EthMessage::BlockRangeUpdate(msg) => {
353 if msg.earliest > msg.latest {
355 return OnIncomingMessageOutcome::BadMessage {
356 error: EthStreamError::InvalidMessage(MessageError::Other(format!(
357 "invalid block range: earliest ({}) > latest ({})",
358 msg.earliest, msg.latest
359 ))),
360 message: EthMessage::BlockRangeUpdate(msg),
361 };
362 }
363
364 if msg.latest_hash.is_zero() {
366 return OnIncomingMessageOutcome::BadMessage {
367 error: EthStreamError::InvalidMessage(MessageError::Other(
368 "invalid block range: latest_hash cannot be zero".to_string(),
369 )),
370 message: EthMessage::BlockRangeUpdate(msg),
371 };
372 }
373
374 if let Some(range_info) = self.range_info.as_ref() {
375 range_info.update(msg.earliest, msg.latest, msg.latest_hash);
376 }
377
378 OnIncomingMessageOutcome::Ok
379 }
380 EthMessage::GetCells(resp) => {
381 on_request!(resp, Cells, GetCells)
382 }
383 EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(),
384 }
385 }
386
387 fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
389 let version = self.conn.version();
390 if !Self::is_request_supported_for_version(&request, version) {
391 debug!(
392 target: "net",
393 ?request,
394 peer_id=?self.remote_peer_id,
395 ?version,
396 "Request not supported for negotiated eth version",
397 );
398 request.send_err_response(RequestError::UnsupportedCapability);
399 return;
400 }
401
402 let request_id = self.next_id();
403 trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
404 let msg = request.create_request_message(request_id).map_versioned(version);
405
406 self.queued_outgoing.push_back(msg.into());
407 let req = InflightRequest {
408 request: RequestState::Waiting(request),
409 timestamp: Instant::now(),
410 deadline,
411 };
412 self.inflight_requests.insert(request_id, req);
413 }
414
415 #[inline]
416 fn is_request_supported_for_version(request: &PeerRequest<N>, version: EthVersion) -> bool {
417 request.is_supported_by_eth_version(version)
418 }
419
420 fn on_internal_peer_message(&mut self, msg: PeerMessage<N>) {
422 match msg {
423 PeerMessage::NewBlockHashes(msg) => {
424 self.queued_outgoing.push_back(EthMessage::NewBlockHashes(msg).into());
425 }
426 PeerMessage::NewBlock(msg) => {
427 self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into());
428 }
429 PeerMessage::PooledTransactions(msg) => {
430 if msg.is_valid_for_version(self.conn.version()) {
431 self.queued_outgoing.push_pooled_hashes(msg);
432 } else {
433 self.queued_outgoing.broadcast_items.sub(msg.len());
434 debug!(target: "net", ?msg, version=?self.conn.version(), "Message is invalid for connection version, skipping");
435 }
436 }
437 PeerMessage::EthRequest(req) => {
438 let deadline = self.request_deadline();
439 self.on_internal_peer_request(req, deadline);
440 }
441 PeerMessage::SendTransactions(msg) => {
442 self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into());
443 }
444 PeerMessage::BlockRangeUpdated(_) => {}
445 PeerMessage::ReceivedTransaction(_) => {
446 unreachable!("Not emitted by network")
447 }
448 PeerMessage::Other(other) => {
449 self.queued_outgoing.push_back(OutgoingMessage::Raw(other));
450 }
451 }
452 }
453
454 fn request_deadline(&self) -> Instant {
456 Instant::now() +
457 Duration::from_millis(self.internal_request_timeout.load(Ordering::Relaxed))
458 }
459
460 fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult<N>) {
464 match resp.try_into_message(id) {
465 Ok(msg) => {
466 self.queued_outgoing.push_back(msg.into());
467 }
468 Err(err) => {
469 debug!(target: "net", %err, "Failed to respond to received request");
470 }
471 }
472 }
473
474 #[expect(clippy::result_large_err)]
478 fn try_emit_broadcast(&self, message: PeerMessage<N>) -> Result<(), ActiveSessionMessage<N>> {
479 let Some(sender) = self.to_session_manager.inner().get_ref() else { return Ok(()) };
480
481 match sender
482 .try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
483 {
484 Ok(_) => Ok(()),
485 Err(err) => {
486 trace!(
487 target: "net",
488 %err,
489 "no capacity for incoming broadcast",
490 );
491 match err {
492 TrySendError::Full(msg) => Err(msg),
493 TrySendError::Closed(_) => Ok(()),
494 }
495 }
496 }
497 }
498
499 #[expect(clippy::result_large_err)]
504 fn try_emit_request(&self, message: PeerMessage<N>) -> Result<(), ActiveSessionMessage<N>> {
505 let Some(sender) = self.to_session_manager.inner().get_ref() else { return Ok(()) };
506
507 match sender
508 .try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
509 {
510 Ok(_) => Ok(()),
511 Err(err) => {
512 trace!(
513 target: "net",
514 %err,
515 "no capacity for incoming request",
516 );
517 match err {
518 TrySendError::Full(msg) => Err(msg),
519 TrySendError::Closed(_) => {
520 Ok(())
523 }
524 }
525 }
526 }
527 }
528
529 fn on_bad_message(&self) {
531 let Some(sender) = self.to_session_manager.inner().get_ref() else { return };
532 let _ = sender.try_send(ActiveSessionMessage::BadMessage { peer_id: self.remote_peer_id });
533 }
534
535 fn emit_disconnect(&mut self, cx: &mut Context<'_>) -> Poll<()> {
537 trace!(target: "net::session", remote_peer_id=?self.remote_peer_id, "emitting disconnect");
538 let msg = ActiveSessionMessage::Disconnected {
539 peer_id: self.remote_peer_id,
540 remote_addr: self.remote_addr,
541 };
542
543 self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
544 self.poll_terminate_message(cx).expect("message is set")
545 }
546
547 fn close_on_error(&mut self, error: EthStreamError, cx: &mut Context<'_>) -> Poll<()> {
549 let msg = ActiveSessionMessage::ClosedOnConnectionError {
550 peer_id: self.remote_peer_id,
551 remote_addr: self.remote_addr,
552 error,
553 };
554 self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
555 self.poll_terminate_message(cx).expect("message is set")
556 }
557
558 fn start_disconnect(&mut self, reason: DisconnectReason) -> Result<(), EthStreamError> {
560 Ok(self.conn.inner_mut().start_disconnect(reason)?)
561 }
562
563 fn poll_disconnect(&mut self, cx: &mut Context<'_>) -> Poll<()> {
565 debug_assert!(self.is_disconnecting(), "not disconnecting");
566
567 let _ = ready!(self.conn.poll_close_unpin(cx));
569 self.emit_disconnect(cx)
570 }
571
572 fn try_disconnect(&mut self, reason: DisconnectReason, cx: &mut Context<'_>) -> Poll<()> {
574 match self.start_disconnect(reason) {
575 Ok(()) => {
576 self.poll_disconnect(cx)
578 }
579 Err(err) => {
580 debug!(target: "net::session", %err, remote_peer_id=?self.remote_peer_id, "could not send disconnect");
581 self.close_on_error(err, cx)
582 }
583 }
584 }
585
586 #[must_use]
595 fn check_timed_out_requests(&mut self, now: Instant) -> bool {
596 for (id, req) in &mut self.inflight_requests {
597 if req.is_timed_out(now) {
598 if req.is_waiting() {
599 debug!(target: "net::session", ?id, remote_peer_id=?self.remote_peer_id, "timed out outgoing request");
600 req.timeout();
601 } else if now - req.timestamp > self.protocol_breach_request_timeout {
602 return true
603 }
604 }
605 }
606
607 false
608 }
609
610 fn update_request_timeout(&mut self, sent: Instant, received: Instant) {
612 let elapsed = received.saturating_duration_since(sent);
613
614 let current = Duration::from_millis(self.internal_request_timeout.load(Ordering::Relaxed));
615 let request_timeout = calculate_new_timeout(current, elapsed);
616 self.internal_request_timeout.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
617 self.internal_request_timeout_interval = tokio::time::interval(request_timeout);
618 }
619
620 fn poll_terminate_message(&mut self, cx: &mut Context<'_>) -> Option<Poll<()>> {
622 let (mut tx, msg) = self.terminate_message.take()?;
623 match tx.poll_reserve(cx) {
624 Poll::Pending => {
625 self.terminate_message = Some((tx, msg));
626 return Some(Poll::Pending)
627 }
628 Poll::Ready(Ok(())) => {
629 let _ = tx.send_item(msg);
630 }
631 Poll::Ready(Err(_)) => {
632 }
634 }
635 Some(Poll::Ready(()))
637 }
638}
639
640impl<N: NetworkPrimitives> Future for ActiveSession<N> {
641 type Output = ();
642
643 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
644 let this = self.get_mut();
645
646 if let Some(terminate) = this.poll_terminate_message(cx) {
648 return terminate
649 }
650
651 if this.is_disconnecting() {
652 return this.poll_disconnect(cx)
653 }
654
655 let mut budget = 4;
661
662 'main: loop {
664 let mut progress = false;
665
666 loop {
668 match this.commands_rx.poll_next_unpin(cx) {
669 Poll::Pending => break,
670 Poll::Ready(None) => {
671 return Poll::Ready(())
674 }
675 Poll::Ready(Some(cmd)) => {
676 progress = true;
677 match cmd {
678 SessionCommand::Disconnect { reason } => {
679 debug!(
680 target: "net::session",
681 ?reason,
682 remote_peer_id=?this.remote_peer_id,
683 "Received disconnect command for session"
684 );
685 let reason =
686 reason.unwrap_or(DisconnectReason::DisconnectRequested);
687
688 return this.try_disconnect(reason, cx)
689 }
690 SessionCommand::Message(msg) => {
691 this.on_internal_peer_message(msg);
692 }
693 }
694 }
695 }
696 }
697
698 while let Poll::Ready(Some(cmd)) = this.unbounded_rx.poll_recv(cx) {
700 progress = true;
701 match cmd {
702 SessionCommand::Message(msg) => {
703 this.unbounded_broadcast_msgs.increment(1);
704 this.on_internal_peer_message(msg);
705 }
706 SessionCommand::Disconnect { reason } => {
707 let reason = reason.unwrap_or(DisconnectReason::DisconnectRequested);
708 return this.try_disconnect(reason, cx)
709 }
710 }
711 }
712
713 let deadline = this.request_deadline();
714
715 while let Poll::Ready(Some(req)) = this.internal_request_rx.poll_next_unpin(cx) {
716 progress = true;
717 this.on_internal_peer_request(req, deadline);
718 }
719
720 for idx in (0..this.received_requests_from_remote.len()).rev() {
723 let mut req = this.received_requests_from_remote.swap_remove(idx);
724 match req.rx.poll(cx) {
725 Poll::Pending => {
726 this.received_requests_from_remote.push(req);
728 }
729 Poll::Ready(resp) => {
730 this.handle_outgoing_response(req.request_id, resp);
731 }
732 }
733 }
734
735 while this.conn.poll_ready_unpin(cx).is_ready() {
737 if let Some(msg) = this.queued_outgoing.pop_front() {
738 progress = true;
739 let res = match msg {
740 OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg),
741 OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg),
742 OutgoingMessage::Raw(msg) => this.conn.start_send_raw(msg),
743 };
744 if let Err(err) = res {
745 debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to send message");
746 return this.close_on_error(err, cx)
748 }
749 } else {
750 break
752 }
753 }
754
755 'receive: loop {
757 budget -= 1;
759 if budget == 0 {
760 cx.waker().wake_by_ref();
762 break 'main
763 }
764
765 if let Some(msg) = this.pending_message_to_session.take() {
769 match this.to_session_manager.poll_reserve(cx) {
770 Poll::Ready(Ok(_)) => {
771 let _ = this.to_session_manager.send_item(msg);
772 }
773 Poll::Ready(Err(_)) => return Poll::Ready(()),
774 Poll::Pending => {
775 this.pending_message_to_session = Some(msg);
776 break 'receive
777 }
778 };
779 }
780
781 if this.received_requests_from_remote.len() > MAX_QUEUED_OUTGOING_RESPONSES {
783 break 'receive
789 }
790
791 if this.queued_outgoing.messages.len() > MAX_QUEUED_OUTGOING_RESPONSES &&
793 this.queued_response_count() > MAX_QUEUED_OUTGOING_RESPONSES
794 {
795 break 'receive
802 }
803
804 match this.conn.poll_next_unpin(cx) {
805 Poll::Pending => break,
806 Poll::Ready(None) => {
807 if this.is_disconnecting() {
808 break
809 }
810 debug!(target: "net::session", remote_peer_id=?this.remote_peer_id, "eth stream completed");
811 return this.emit_disconnect(cx)
812 }
813 Poll::Ready(Some(res)) => {
814 match res {
815 Ok(msg) => {
816 trace!(target: "net::session", msg_id=?msg.message_id(), remote_peer_id=?this.remote_peer_id, "received eth message");
817 match this.on_incoming_message(msg) {
819 OnIncomingMessageOutcome::Ok => {
820 progress = true;
822 }
823 OnIncomingMessageOutcome::BadMessage { error, message } => {
824 debug!(target: "net::session", %error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
825 this.on_bad_message();
826 return this
827 .try_disconnect(DisconnectReason::ProtocolBreach, cx)
828 }
829 OnIncomingMessageOutcome::NoCapacity(msg) => {
830 this.pending_message_to_session = Some(msg);
832 }
833 }
834 }
835 Err(err) => {
836 debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
837 if err.is_protocol_breach() {
838 this.on_bad_message();
839 return this.try_disconnect(DisconnectReason::ProtocolBreach, cx)
840 }
841 return this.close_on_error(err, cx)
842 }
843 }
844 }
845 }
846 }
847
848 if !progress {
849 break 'main
850 }
851 }
852
853 if let Some(interval) = &mut this.range_update_interval {
854 while interval.poll_tick(cx).is_ready() {
856 let current_latest = this.local_range_info.latest();
857 let should_send = if let Some(last_sent) = this.last_sent_latest_block {
858 current_latest.saturating_sub(last_sent) >= EPOCH_SLOTS
860 } else {
861 true };
863
864 if should_send {
865 this.queued_outgoing.push_back(
866 EthMessage::BlockRangeUpdate(this.local_range_info.to_message()).into(),
867 );
868 this.last_sent_latest_block = Some(current_latest);
869 }
870 }
871 }
872
873 while this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
874 if this.check_timed_out_requests(Instant::now()) &&
876 let Poll::Ready(Ok(_)) = this.to_session_manager.poll_reserve(cx)
877 {
878 let msg = ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id };
879 this.pending_message_to_session = Some(msg);
880 }
881 }
882
883 this.shrink_to_fit();
884
885 Poll::Pending
886 }
887}
888
889pub(crate) struct ReceivedRequest<N: NetworkPrimitives> {
891 request_id: u64,
893 rx: PeerResponse<N>,
895 #[expect(dead_code)]
897 received: Instant,
898}
899
900pub(crate) struct InflightRequest<R> {
902 request: RequestState<R>,
904 timestamp: Instant,
906 deadline: Instant,
908}
909
910impl<N: NetworkPrimitives> InflightRequest<PeerRequest<N>> {
913 #[inline]
915 fn is_timed_out(&self, now: Instant) -> bool {
916 now > self.deadline
917 }
918
919 #[inline]
921 const fn is_waiting(&self) -> bool {
922 matches!(self.request, RequestState::Waiting(_))
923 }
924
925 fn timeout(&mut self) {
927 let mut req = RequestState::TimedOut;
928 std::mem::swap(&mut self.request, &mut req);
929
930 if let RequestState::Waiting(req) = req {
931 req.send_err_response(RequestError::Timeout);
932 }
933 }
934}
935
936enum OnIncomingMessageOutcome<N: NetworkPrimitives> {
938 Ok,
940 BadMessage { error: EthStreamError, message: EthMessage<N> },
942 NoCapacity(ActiveSessionMessage<N>),
944}
945
946impl<N: NetworkPrimitives> From<Result<(), ActiveSessionMessage<N>>>
947 for OnIncomingMessageOutcome<N>
948{
949 fn from(res: Result<(), ActiveSessionMessage<N>>) -> Self {
950 match res {
951 Ok(_) => Self::Ok,
952 Err(msg) => Self::NoCapacity(msg),
953 }
954 }
955}
956
957enum RequestState<R> {
958 Waiting(R),
960 TimedOut,
962}
963
964#[derive(Debug)]
966pub(crate) enum OutgoingMessage<N: NetworkPrimitives> {
967 Eth(EthMessage<N>),
969 Broadcast(EthBroadcastMessage<N>),
971 Raw(RawCapabilityMessage),
973}
974
975impl<N: NetworkPrimitives> OutgoingMessage<N> {
976 const fn is_response(&self) -> bool {
978 match self {
979 Self::Eth(msg) => msg.is_response(),
980 _ => false,
981 }
982 }
983
984 fn broadcast_item_count(&self) -> usize {
990 match self {
991 Self::Eth(msg) => match msg {
992 EthMessage::NewBlockHashes(h) => h.len(),
993 EthMessage::NewPooledTransactionHashes66(h) => h.len(),
994 EthMessage::NewPooledTransactionHashes68(h) => h.hashes.len(),
995 EthMessage::NewPooledTransactionHashes72(h) => h.hashes.len(),
996 _ => 0,
997 },
998 Self::Broadcast(msg) => match msg {
999 EthBroadcastMessage::NewBlock(_) => 1,
1000 EthBroadcastMessage::Transactions(txs) => txs.len(),
1001 },
1002 Self::Raw(_) => 0,
1003 }
1004 }
1005
1006 fn try_merge_hashes(
1009 &mut self,
1010 incoming: NewPooledTransactionHashes,
1011 ) -> Option<NewPooledTransactionHashes> {
1012 let Self::Eth(eth) = self else { return Some(incoming) };
1013 match (eth, incoming) {
1014 (
1015 EthMessage::NewPooledTransactionHashes66(existing),
1016 NewPooledTransactionHashes::Eth66(inc),
1017 ) => {
1018 existing.extend(inc);
1019 None
1020 }
1021 (
1022 EthMessage::NewPooledTransactionHashes68(existing),
1023 NewPooledTransactionHashes::Eth68(inc),
1024 ) => {
1025 existing.hashes.extend(inc.hashes);
1026 existing.sizes.extend(inc.sizes);
1027 existing.types.extend(inc.types);
1028 None
1029 }
1030 (
1031 EthMessage::NewPooledTransactionHashes72(existing),
1032 NewPooledTransactionHashes::Eth72(inc),
1033 ) => {
1034 existing.hashes.extend(inc.hashes);
1035 existing.sizes.extend(inc.sizes);
1036 existing.types.extend(inc.types);
1037 None
1038 }
1039 (_, incoming) => Some(incoming),
1040 }
1041 }
1042}
1043
1044impl<N: NetworkPrimitives> From<EthMessage<N>> for OutgoingMessage<N> {
1045 fn from(value: EthMessage<N>) -> Self {
1046 Self::Eth(value)
1047 }
1048}
1049
1050impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for OutgoingMessage<N> {
1051 fn from(value: EthBroadcastMessage<N>) -> Self {
1052 Self::Broadcast(value)
1053 }
1054}
1055
1056#[inline]
1058fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) -> Duration {
1059 let new_timeout = estimated_rtt.mul_f64(SAMPLE_IMPACT) * TIMEOUT_SCALING;
1060
1061 let smoothened_timeout = current_timeout.mul_f64(1.0 - SAMPLE_IMPACT) + new_timeout;
1063
1064 smoothened_timeout.clamp(MINIMUM_TIMEOUT, MAXIMUM_TIMEOUT)
1065}
1066
1067pub(crate) struct QueuedOutgoingMessages<N: NetworkPrimitives> {
1074 messages: VecDeque<OutgoingMessage<N>>,
1075 count: Gauge,
1076 broadcast_items: BroadcastItemCounter,
1078}
1079
1080impl<N: NetworkPrimitives> QueuedOutgoingMessages<N> {
1081 pub(crate) const fn new(metric: Gauge, broadcast_items: BroadcastItemCounter) -> Self {
1082 Self { messages: VecDeque::new(), count: metric, broadcast_items }
1083 }
1084
1085 pub(crate) fn push_back(&mut self, message: OutgoingMessage<N>) {
1086 self.messages.push_back(message);
1087 self.count.increment(1);
1088 }
1089
1090 pub(crate) fn pop_front(&mut self) -> Option<OutgoingMessage<N>> {
1091 self.messages.pop_front().inspect(|msg| {
1092 self.count.decrement(1);
1093 let items = msg.broadcast_item_count();
1094 if items > 0 {
1095 self.broadcast_items.sub(items);
1096 }
1097 })
1098 }
1099
1100 pub(crate) fn push_pooled_hashes(&mut self, msg: NewPooledTransactionHashes) {
1103 let msg = if let Some(last) = self.messages.back_mut() {
1104 match last.try_merge_hashes(msg) {
1105 None => return,
1106 Some(msg) => msg,
1107 }
1108 } else {
1109 msg
1110 };
1111 self.messages.push_back(EthMessage::from(msg).into());
1112 self.count.increment(1);
1113 }
1114
1115 pub(crate) fn shrink_to(&mut self, min_capacity: usize) {
1116 self.messages.shrink_to(min_capacity);
1117 }
1118}
1119
1120impl<N: NetworkPrimitives> Drop for QueuedOutgoingMessages<N> {
1121 fn drop(&mut self) {
1122 let remaining = self.messages.len();
1124 if remaining > 0 {
1125 self.count.decrement(remaining as f64);
1126 }
1127 }
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use super::*;
1133 use crate::session::{handle::PendingSessionEvent, start_pending_incoming_session};
1134 use alloy_eips::eip2124::ForkFilter;
1135 use reth_chainspec::MAINNET;
1136 use reth_ecies::stream::ECIESStream;
1137 use reth_eth_wire::{
1138 handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockAccessLists,
1139 GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
1140 UnauthedP2PStream, UnifiedStatus,
1141 };
1142 use reth_eth_wire_types::{
1143 message::MAX_MESSAGE_SIZE, EthMessageID, NewPooledTransactionHashes72, RawCapabilityMessage,
1144 };
1145 use reth_ethereum_forks::EthereumHardfork;
1146 use reth_network_peers::pk2id;
1147 use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
1148 use secp256k1::{SecretKey, SECP256K1};
1149 use tokio::{
1150 net::{TcpListener, TcpStream},
1151 sync::mpsc,
1152 };
1153
1154 fn eth_hello(server_key: &SecretKey) -> HelloMessageWithProtocols {
1156 HelloMessageWithProtocols::builder(pk2id(&server_key.public_key(SECP256K1))).build()
1157 }
1158
1159 struct SessionBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
1160 _remote_capabilities: Arc<Capabilities>,
1161 active_session_tx: mpsc::Sender<ActiveSessionMessage<N>>,
1162 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
1163 to_sessions: Vec<mpsc::Sender<SessionCommand<N>>>,
1164 secret_key: SecretKey,
1165 local_peer_id: PeerId,
1166 hello: HelloMessageWithProtocols,
1167 status: UnifiedStatus,
1168 fork_filter: ForkFilter,
1169 next_id: usize,
1170 }
1171
1172 impl<N: NetworkPrimitives> SessionBuilder<N> {
1173 fn next_id(&mut self) -> SessionId {
1174 let id = self.next_id;
1175 self.next_id += 1;
1176 SessionId(id)
1177 }
1178
1179 fn with_client_stream<F, O>(
1181 &self,
1182 local_addr: SocketAddr,
1183 f: F,
1184 ) -> Pin<Box<dyn Future<Output = ()> + Send>>
1185 where
1186 F: FnOnce(EthStream<P2PStream<ECIESStream<TcpStream>>, N>) -> O + Send + 'static,
1187 O: Future<Output = ()> + Send + Sync,
1188 {
1189 let mut status = self.status;
1190 let fork_filter = self.fork_filter.clone();
1191 let local_peer_id = self.local_peer_id;
1192 let mut hello = self.hello.clone();
1193 let key = SecretKey::new(&mut rand_08::thread_rng());
1194 hello.id = pk2id(&key.public_key(SECP256K1));
1195 Box::pin(async move {
1196 let outgoing = TcpStream::connect(local_addr).await.unwrap();
1197 let sink = ECIESStream::connect(outgoing, key, local_peer_id).await.unwrap();
1198
1199 let (p2p_stream, _) = UnauthedP2PStream::new(sink).handshake(hello).await.unwrap();
1200
1201 let eth_version = p2p_stream.shared_capabilities().eth_version().unwrap();
1202 status.set_eth_version(eth_version);
1203
1204 let (client_stream, _) = UnauthedEthStream::new(p2p_stream)
1205 .handshake(status, fork_filter)
1206 .await
1207 .unwrap();
1208 f(client_stream).await
1209 })
1210 }
1211
1212 async fn connect_incoming(&mut self, stream: TcpStream) -> ActiveSession<N> {
1213 let remote_addr = stream.local_addr().unwrap();
1214 let session_id = self.next_id();
1215 let (_disconnect_tx, disconnect_rx) = oneshot::channel();
1216 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(1);
1217
1218 tokio::task::spawn(start_pending_incoming_session(
1219 Arc::new(EthHandshake::default()),
1220 MAX_MESSAGE_SIZE,
1221 disconnect_rx,
1222 session_id,
1223 stream,
1224 pending_sessions_tx,
1225 remote_addr,
1226 self.secret_key,
1227 self.hello.clone(),
1228 self.status,
1229 self.fork_filter.clone(),
1230 Default::default(),
1231 ));
1232
1233 let mut stream = ReceiverStream::new(pending_sessions_rx);
1234
1235 match stream.next().await.unwrap() {
1236 PendingSessionEvent::Established {
1237 session_id,
1238 remote_addr,
1239 peer_id,
1240 capabilities,
1241 conn,
1242 ..
1243 } => {
1244 let (_to_session_tx, messages_rx) = mpsc::channel(10);
1245 let (commands_to_session, commands_rx) = mpsc::channel(10);
1246 let (_unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
1247 let poll_sender = PollSender::new(self.active_session_tx.clone());
1248
1249 self.to_sessions.push(commands_to_session);
1250
1251 ActiveSession {
1252 next_id: 0,
1253 remote_peer_id: peer_id,
1254 remote_addr,
1255 remote_capabilities: Arc::clone(&capabilities),
1256 session_id,
1257 commands_rx: ReceiverStream::new(commands_rx),
1258 unbounded_rx,
1259 unbounded_broadcast_msgs: Counter::noop(),
1260 to_session_manager: MeteredPollSender::new(
1261 poll_sender,
1262 "network_active_session",
1263 ),
1264 pending_message_to_session: None,
1265 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
1266 inflight_requests: Default::default(),
1267 conn,
1268 queued_outgoing: QueuedOutgoingMessages::new(
1269 Gauge::noop(),
1270 BroadcastItemCounter::new(),
1271 ),
1272 received_requests_from_remote: Default::default(),
1273 internal_request_timeout_interval: tokio::time::interval(
1274 INITIAL_REQUEST_TIMEOUT,
1275 ),
1276 internal_request_timeout: Arc::new(AtomicU64::new(
1277 INITIAL_REQUEST_TIMEOUT.as_millis() as u64,
1278 )),
1279 protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
1280 terminate_message: None,
1281 range_info: None,
1282 local_range_info: BlockRangeInfo::new(
1283 0,
1284 1000,
1285 alloy_primitives::B256::ZERO,
1286 ),
1287 range_update_interval: None,
1288 last_sent_latest_block: None,
1289 }
1290 }
1291 ev => {
1292 panic!("unexpected message {ev:?}")
1293 }
1294 }
1295 }
1296 }
1297
1298 impl Default for SessionBuilder {
1299 fn default() -> Self {
1300 let (active_session_tx, active_session_rx) = mpsc::channel(100);
1301
1302 let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
1303 let local_peer_id = pk2id(&pk);
1304
1305 Self {
1306 next_id: 0,
1307 _remote_capabilities: Arc::new(Capabilities::from(vec![])),
1308 active_session_tx,
1309 active_session_rx: ReceiverStream::new(active_session_rx),
1310 to_sessions: vec![],
1311 hello: eth_hello(&secret_key),
1312 secret_key,
1313 local_peer_id,
1314 status: StatusBuilder::default().build(),
1315 fork_filter: MAINNET
1316 .hardfork_fork_filter(EthereumHardfork::Frontier)
1317 .expect("The Frontier fork filter should exist on mainnet"),
1318 }
1319 }
1320 }
1321
1322 #[tokio::test(flavor = "multi_thread")]
1323 async fn test_disconnect() {
1324 let mut builder = SessionBuilder::default();
1325
1326 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1327 let local_addr = listener.local_addr().unwrap();
1328
1329 let expected_disconnect = DisconnectReason::UselessPeer;
1330
1331 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1332 let msg = client_stream.next().await.unwrap().unwrap_err();
1333 assert_eq!(msg.as_disconnected().unwrap(), expected_disconnect);
1334 });
1335
1336 tokio::task::spawn(async move {
1337 let (incoming, _) = listener.accept().await.unwrap();
1338 let mut session = builder.connect_incoming(incoming).await;
1339
1340 session.start_disconnect(expected_disconnect).unwrap();
1341 session.await
1342 });
1343
1344 fut.await;
1345 }
1346
1347 #[tokio::test(flavor = "multi_thread")]
1348 async fn test_invalid_message_disconnects_with_protocol_breach() {
1349 let mut builder = SessionBuilder::default();
1350
1351 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1352 let local_addr = listener.local_addr().unwrap();
1353
1354 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1355 client_stream
1356 .start_send_raw(RawCapabilityMessage::eth(
1357 EthMessageID::PooledTransactions,
1358 vec![0xc0].into(),
1359 ))
1360 .unwrap();
1361 client_stream.flush().await.unwrap();
1362
1363 let msg = client_stream.next().await.unwrap().unwrap_err();
1364 assert_eq!(msg.as_disconnected(), Some(DisconnectReason::ProtocolBreach));
1365 });
1366
1367 let (tx, rx) = oneshot::channel();
1368
1369 tokio::task::spawn(async move {
1370 let (incoming, _) = listener.accept().await.unwrap();
1371 let session = builder.connect_incoming(incoming).await;
1372 session.await;
1373
1374 tx.send(()).unwrap();
1375 });
1376
1377 fut.await;
1378 rx.await.unwrap();
1379 }
1380
1381 #[tokio::test(flavor = "multi_thread")]
1382 async fn handle_dropped_stream() {
1383 let mut builder = SessionBuilder::default();
1384
1385 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1386 let local_addr = listener.local_addr().unwrap();
1387
1388 let fut = builder.with_client_stream(local_addr, async move |client_stream| {
1389 drop(client_stream);
1390 tokio::time::sleep(Duration::from_secs(1)).await
1391 });
1392
1393 let (tx, rx) = oneshot::channel();
1394
1395 tokio::task::spawn(async move {
1396 let (incoming, _) = listener.accept().await.unwrap();
1397 let session = builder.connect_incoming(incoming).await;
1398 session.await;
1399
1400 tx.send(()).unwrap();
1401 });
1402
1403 tokio::task::spawn(fut);
1404
1405 rx.await.unwrap();
1406 }
1407
1408 #[tokio::test(flavor = "multi_thread")]
1409 async fn test_send_many_messages() {
1410 reth_tracing::init_test_tracing();
1411 let mut builder = SessionBuilder::default();
1412
1413 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1414 let local_addr = listener.local_addr().unwrap();
1415
1416 let num_messages = 100;
1417
1418 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1419 for _ in 0..num_messages {
1420 client_stream
1421 .send(EthMessage::NewPooledTransactionHashes66(Vec::new().into()))
1422 .await
1423 .unwrap();
1424 }
1425 });
1426
1427 let (tx, rx) = oneshot::channel();
1428
1429 tokio::task::spawn(async move {
1430 let (incoming, _) = listener.accept().await.unwrap();
1431 let session = builder.connect_incoming(incoming).await;
1432 session.await;
1433
1434 tx.send(()).unwrap();
1435 });
1436
1437 tokio::task::spawn(fut);
1438
1439 rx.await.unwrap();
1440 }
1441
1442 #[tokio::test(flavor = "multi_thread")]
1443 async fn test_request_timeout() {
1444 reth_tracing::init_test_tracing();
1445
1446 let mut builder = SessionBuilder::default();
1447
1448 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1449 let local_addr = listener.local_addr().unwrap();
1450
1451 let request_timeout = Duration::from_millis(100);
1452 let drop_timeout = Duration::from_millis(1500);
1453
1454 let fut = builder.with_client_stream(local_addr, async move |client_stream| {
1455 let _client_stream = client_stream;
1456 tokio::time::sleep(drop_timeout * 60).await;
1457 });
1458 tokio::task::spawn(fut);
1459
1460 let (incoming, _) = listener.accept().await.unwrap();
1461 let mut session = builder.connect_incoming(incoming).await;
1462 session
1463 .internal_request_timeout
1464 .store(request_timeout.as_millis() as u64, Ordering::Relaxed);
1465 session.protocol_breach_request_timeout = drop_timeout;
1466 session.internal_request_timeout_interval =
1467 tokio::time::interval_at(tokio::time::Instant::now(), request_timeout);
1468 let (tx, rx) = oneshot::channel();
1469 let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx };
1470 session.on_internal_peer_request(req, Instant::now());
1471 tokio::spawn(session);
1472
1473 let err = rx.await.unwrap().unwrap_err();
1474 assert_eq!(err, RequestError::Timeout);
1475
1476 let msg = builder.active_session_rx.next().await.unwrap();
1478 match msg {
1479 ActiveSessionMessage::ProtocolBreach { .. } => {}
1480 ev => unreachable!("{ev:?}"),
1481 }
1482 }
1483
1484 #[test]
1485 fn eth72_pooled_hashes_count_broadcast_items() {
1486 let hashes =
1487 vec![alloy_primitives::B256::repeat_byte(1), alloy_primitives::B256::repeat_byte(2)];
1488 let msg: OutgoingMessage<EthNetworkPrimitives> =
1489 EthMessage::NewPooledTransactionHashes72(NewPooledTransactionHashes72 {
1490 types: vec![0; hashes.len()],
1491 sizes: vec![1; hashes.len()],
1492 hashes,
1493 cell_mask: None,
1494 })
1495 .into();
1496
1497 assert_eq!(2, msg.broadcast_item_count());
1498 }
1499
1500 #[test]
1501 fn test_reject_bal_request_for_eth70() {
1502 let (tx, _rx) = oneshot::channel();
1503 let request: PeerRequest<EthNetworkPrimitives> =
1504 PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
1505
1506 assert!(!ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
1507 &request,
1508 EthVersion::Eth70
1509 ));
1510 assert!(ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
1511 &request,
1512 EthVersion::Eth71
1513 ));
1514 }
1515
1516 #[tokio::test(flavor = "multi_thread")]
1517 async fn test_keep_alive() {
1518 let mut builder = SessionBuilder::default();
1519
1520 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1521 let local_addr = listener.local_addr().unwrap();
1522
1523 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1524 let _ = tokio::time::timeout(Duration::from_secs(5), client_stream.next()).await;
1525 client_stream.into_inner().disconnect(DisconnectReason::UselessPeer).await.unwrap();
1526 });
1527
1528 let (tx, rx) = oneshot::channel();
1529
1530 tokio::task::spawn(async move {
1531 let (incoming, _) = listener.accept().await.unwrap();
1532 let session = builder.connect_incoming(incoming).await;
1533 session.await;
1534
1535 tx.send(()).unwrap();
1536 });
1537
1538 tokio::task::spawn(fut);
1539
1540 rx.await.unwrap();
1541 }
1542
1543 #[tokio::test(flavor = "multi_thread")]
1545 async fn test_send_at_capacity() {
1546 let mut builder = SessionBuilder::default();
1547
1548 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1549 let local_addr = listener.local_addr().unwrap();
1550
1551 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1552 client_stream
1553 .send(EthMessage::NewPooledTransactionHashes68(Default::default()))
1554 .await
1555 .unwrap();
1556 let _ = tokio::time::timeout(Duration::from_secs(100), client_stream.next()).await;
1557 });
1558 tokio::task::spawn(fut);
1559
1560 let (incoming, _) = listener.accept().await.unwrap();
1561 let session = builder.connect_incoming(incoming).await;
1562
1563 let mut num_fill_messages = 0;
1565 loop {
1566 if builder
1567 .active_session_tx
1568 .try_send(ActiveSessionMessage::ProtocolBreach { peer_id: PeerId::random() })
1569 .is_err()
1570 {
1571 break
1572 }
1573 num_fill_messages += 1;
1574 }
1575
1576 tokio::task::spawn(async move {
1577 session.await;
1578 });
1579
1580 tokio::time::sleep(Duration::from_millis(100)).await;
1581
1582 for _ in 0..num_fill_messages {
1583 let message = builder.active_session_rx.next().await.unwrap();
1584 match message {
1585 ActiveSessionMessage::ProtocolBreach { .. } => {}
1586 ev => unreachable!("{ev:?}"),
1587 }
1588 }
1589
1590 let message = builder.active_session_rx.next().await.unwrap();
1591 match message {
1592 ActiveSessionMessage::ValidMessage {
1593 message: PeerMessage::PooledTransactions(_),
1594 ..
1595 } => {}
1596 _ => unreachable!(),
1597 }
1598 }
1599
1600 #[test]
1601 fn timeout_calculation_sanity_tests() {
1602 let rtt = Duration::from_secs(5);
1603 let timeout = rtt * TIMEOUT_SCALING;
1605
1606 assert_eq!(calculate_new_timeout(timeout, rtt), timeout);
1608
1609 assert!(calculate_new_timeout(timeout, rtt / 2) < timeout);
1611 assert!(calculate_new_timeout(timeout, rtt / 2) > timeout / 2);
1612 assert!(calculate_new_timeout(timeout, rtt * 2) > timeout);
1613 assert!(calculate_new_timeout(timeout, rtt * 2) < timeout * 2);
1614 }
1615}