1use core::sync::atomic::Ordering;
4use std::{
5 collections::VecDeque,
6 future::Future,
7 net::SocketAddr,
8 pin::Pin,
9 sync::{
10 atomic::{AtomicU64, AtomicUsize},
11 Arc,
12 },
13 task::{ready, Context, Poll},
14 time::{Duration, Instant},
15};
16
17use crate::{
18 message::{NewBlockMessage, PeerMessage, PeerResponse, PeerResponseResult},
19 session::{
20 conn::EthRlpxConnection,
21 handle::{ActiveSessionMessage, SessionCommand},
22 BlockRangeInfo, EthVersion, SessionId,
23 },
24};
25use alloy_eips::merge::EPOCH_SLOTS;
26use alloy_primitives::Sealable;
27use futures::{stream::Fuse, SinkExt, StreamExt};
28use metrics::{Counter, Gauge};
29use reth_eth_wire::{
30 errors::{EthHandshakeError, EthStreamError},
31 message::{EthBroadcastMessage, MessageError},
32 Capabilities, DisconnectP2P, DisconnectReason, EthMessage, NetworkPrimitives, NewBlockPayload,
33};
34use reth_eth_wire_types::{message::RequestPair, NewPooledTransactionHashes, RawCapabilityMessage};
35use reth_metrics::common::mpsc::MeteredPollSender;
36use reth_network_api::PeerRequest;
37use reth_network_p2p::error::RequestError;
38use reth_network_peers::PeerId;
39use reth_network_types::session::config::INITIAL_REQUEST_TIMEOUT;
40use reth_primitives_traits::Block;
41use rustc_hash::FxHashMap;
42use tokio::{
43 sync::{mpsc, mpsc::error::TrySendError, oneshot},
44 time::Interval,
45};
46use tokio_stream::wrappers::ReceiverStream;
47use tokio_util::sync::PollSender;
48use tracing::{debug, trace};
49
50pub(super) const RANGE_UPDATE_INTERVAL: Duration = Duration::from_secs(EPOCH_SLOTS * 12);
56
57const MINIMUM_TIMEOUT: Duration = Duration::from_secs(2);
61
62const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT;
64const SAMPLE_IMPACT: f64 = 0.1;
66const TIMEOUT_SCALING: u32 = 3;
68
69const MAX_QUEUED_OUTGOING_RESPONSES: usize = 4;
81
82const MAX_QUEUED_BROADCAST_ITEMS: usize = 4096;
88
89#[derive(Debug, Clone)]
95pub(crate) struct BroadcastItemCounter(Arc<AtomicUsize>);
96
97impl BroadcastItemCounter {
98 pub(crate) fn new() -> Self {
100 Self(Arc::new(AtomicUsize::new(0)))
101 }
102
103 pub(crate) fn get(&self) -> usize {
105 self.0.load(Ordering::Relaxed)
106 }
107
108 pub(crate) fn try_add(&self, n: usize) -> bool {
110 let prev = self.0.fetch_add(n, Ordering::Relaxed);
111 if prev >= MAX_QUEUED_BROADCAST_ITEMS {
112 self.0.fetch_sub(n, Ordering::Relaxed);
113 false
114 } else {
115 true
116 }
117 }
118
119 pub(crate) fn sub(&self, n: usize) {
121 self.0.fetch_sub(n, Ordering::Relaxed);
122 }
123}
124
125#[expect(dead_code)]
135pub(crate) struct ActiveSession<N: NetworkPrimitives> {
136 pub(crate) next_id: u64,
138 pub(crate) conn: EthRlpxConnection<N>,
140 pub(crate) remote_peer_id: PeerId,
142 pub(crate) remote_addr: SocketAddr,
144 pub(crate) remote_capabilities: Arc<Capabilities>,
146 pub(crate) session_id: SessionId,
148 pub(crate) commands_rx: ReceiverStream<SessionCommand<N>>,
150 pub(crate) unbounded_rx: mpsc::UnboundedReceiver<SessionCommand<N>>,
153 pub(crate) unbounded_broadcast_msgs: Counter,
155 pub(crate) to_session_manager: MeteredPollSender<ActiveSessionMessage<N>>,
157 pub(crate) pending_message_to_session: Option<ActiveSessionMessage<N>>,
159 pub(crate) internal_request_rx: Fuse<ReceiverStream<PeerRequest<N>>>,
161 pub(crate) inflight_requests: FxHashMap<u64, InflightRequest<PeerRequest<N>>>,
163 pub(crate) received_requests_from_remote: Vec<ReceivedRequest<N>>,
165 pub(crate) queued_outgoing: QueuedOutgoingMessages<N>,
167 pub(crate) internal_request_timeout: Arc<AtomicU64>,
169 pub(crate) internal_request_timeout_interval: Interval,
171 pub(crate) protocol_breach_request_timeout: Duration,
174 pub(crate) terminate_message:
176 Option<(PollSender<ActiveSessionMessage<N>>, ActiveSessionMessage<N>)>,
177 pub(crate) range_info: Option<BlockRangeInfo>,
179 pub(crate) local_range_info: BlockRangeInfo,
182 pub(crate) range_update_interval: Option<Interval>,
186 pub(crate) last_sent_latest_block: Option<u64>,
189}
190
191impl<N: NetworkPrimitives> ActiveSession<N> {
192 fn is_disconnecting(&self) -> bool {
194 self.conn.inner().is_disconnecting()
195 }
196
197 const fn next_id(&mut self) -> u64 {
199 let id = self.next_id;
200 self.next_id += 1;
201 id
202 }
203
204 pub fn shrink_to_fit(&mut self) {
206 self.received_requests_from_remote.shrink_to_fit();
207 self.queued_outgoing.shrink_to_fit();
208 }
209
210 fn queued_response_count(&self) -> usize {
212 self.queued_outgoing.messages.iter().filter(|m| m.is_response()).count()
213 }
214
215 fn on_incoming_message(&mut self, msg: EthMessage<N>) -> OnIncomingMessageOutcome<N> {
219 macro_rules! on_request {
223 ($req:ident, $resp_item:ident, $req_item:ident) => {{
224 let RequestPair { request_id, message: request } = $req;
225 let (tx, response) = oneshot::channel();
226 let received = ReceivedRequest {
227 request_id,
228 rx: PeerResponse::$resp_item { response },
229 received: Instant::now(),
230 };
231 self.received_requests_from_remote.push(received);
232 self.try_emit_request(PeerMessage::EthRequest(PeerRequest::$req_item {
233 request,
234 response: tx,
235 }))
236 .into()
237 }};
238 }
239
240 macro_rules! on_response {
242 ($resp:ident, $item:ident) => {{
243 let RequestPair { request_id, message } = $resp;
244 if let Some(req) = self.inflight_requests.remove(&request_id) {
245 match req.request {
246 RequestState::Waiting(PeerRequest::$item { response, .. }) => {
247 trace!(peer_id=?self.remote_peer_id, ?request_id, "received response from peer");
248 let _ = response.send(Ok(message));
249 self.update_request_timeout(req.timestamp, Instant::now());
250 }
251 RequestState::Waiting(request) => {
252 request.send_bad_response();
253 }
254 RequestState::TimedOut => {
255 self.update_request_timeout(req.timestamp, Instant::now());
257 }
258 }
259 } else {
260 trace!(peer_id=?self.remote_peer_id, ?request_id, "received response to unknown request");
261 self.on_bad_message();
263 }
264
265 OnIncomingMessageOutcome::Ok
266 }};
267 }
268
269 match msg {
270 message @ EthMessage::Status(_) => OnIncomingMessageOutcome::BadMessage {
271 error: EthStreamError::EthHandshakeError(EthHandshakeError::StatusNotInHandshake),
272 message,
273 },
274 EthMessage::NewBlockHashes(msg) => {
275 self.try_emit_broadcast(PeerMessage::NewBlockHashes(msg)).into()
276 }
277 EthMessage::NewBlock(msg) => {
278 let block = NewBlockMessage {
279 hash: msg.block().header().hash_slow(),
280 block: Arc::new(*msg),
281 };
282 self.try_emit_broadcast(PeerMessage::NewBlock(block)).into()
283 }
284 EthMessage::Transactions(msg) => {
285 self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into()
286 }
287 EthMessage::NewPooledTransactionHashes66(msg) => {
288 self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
289 }
290 EthMessage::NewPooledTransactionHashes68(msg) => {
291 self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
292 }
293 EthMessage::GetBlockHeaders(req) => {
294 on_request!(req, BlockHeaders, GetBlockHeaders)
295 }
296 EthMessage::BlockHeaders(resp) => {
297 on_response!(resp, GetBlockHeaders)
298 }
299 EthMessage::GetBlockBodies(req) => {
300 on_request!(req, BlockBodies, GetBlockBodies)
301 }
302 EthMessage::BlockBodies(resp) => {
303 on_response!(resp, GetBlockBodies)
304 }
305 EthMessage::GetPooledTransactions(req) => {
306 on_request!(req, PooledTransactions, GetPooledTransactions)
307 }
308 EthMessage::PooledTransactions(resp) => {
309 on_response!(resp, GetPooledTransactions)
310 }
311 EthMessage::GetNodeData(req) => {
312 on_request!(req, NodeData, GetNodeData)
313 }
314 EthMessage::NodeData(resp) => {
315 on_response!(resp, GetNodeData)
316 }
317 EthMessage::GetReceipts(req) => {
318 if self.conn.version() >= EthVersion::Eth69 {
319 on_request!(req, Receipts69, GetReceipts69)
320 } else {
321 on_request!(req, Receipts, GetReceipts)
322 }
323 }
324 EthMessage::GetReceipts70(req) => {
325 on_request!(req, Receipts70, GetReceipts70)
326 }
327 EthMessage::Receipts(resp) => {
328 on_response!(resp, GetReceipts)
329 }
330 EthMessage::Receipts69(resp) => {
331 on_response!(resp, GetReceipts69)
332 }
333 EthMessage::Receipts70(resp) => {
334 on_response!(resp, GetReceipts70)
335 }
336 EthMessage::GetBlockAccessLists(req) => {
337 on_request!(req, BlockAccessLists, GetBlockAccessLists)
338 }
339 EthMessage::BlockAccessLists(resp) => {
340 on_response!(resp, GetBlockAccessLists)
341 }
342 EthMessage::BlockRangeUpdate(msg) => {
343 if msg.earliest > msg.latest {
345 return OnIncomingMessageOutcome::BadMessage {
346 error: EthStreamError::InvalidMessage(MessageError::Other(format!(
347 "invalid block range: earliest ({}) > latest ({})",
348 msg.earliest, msg.latest
349 ))),
350 message: EthMessage::BlockRangeUpdate(msg),
351 };
352 }
353
354 if msg.latest_hash.is_zero() {
356 return OnIncomingMessageOutcome::BadMessage {
357 error: EthStreamError::InvalidMessage(MessageError::Other(
358 "invalid block range: latest_hash cannot be zero".to_string(),
359 )),
360 message: EthMessage::BlockRangeUpdate(msg),
361 };
362 }
363
364 if let Some(range_info) = self.range_info.as_ref() {
365 range_info.update(msg.earliest, msg.latest, msg.latest_hash);
366 }
367
368 OnIncomingMessageOutcome::Ok
369 }
370 EthMessage::Other(bytes) => self.try_emit_broadcast(PeerMessage::Other(bytes)).into(),
371 }
372 }
373
374 fn on_internal_peer_request(&mut self, request: PeerRequest<N>, deadline: Instant) {
376 let version = self.conn.version();
377 if !Self::is_request_supported_for_version(&request, version) {
378 debug!(
379 target: "net",
380 ?request,
381 peer_id=?self.remote_peer_id,
382 ?version,
383 "Request not supported for negotiated eth version",
384 );
385 request.send_err_response(RequestError::UnsupportedCapability);
386 return;
387 }
388
389 let request_id = self.next_id();
390 trace!(?request, peer_id=?self.remote_peer_id, ?request_id, "sending request to peer");
391 let msg = request.create_request_message(request_id).map_versioned(version);
392
393 self.queued_outgoing.push_back(msg.into());
394 let req = InflightRequest {
395 request: RequestState::Waiting(request),
396 timestamp: Instant::now(),
397 deadline,
398 };
399 self.inflight_requests.insert(request_id, req);
400 }
401
402 #[inline]
403 fn is_request_supported_for_version(request: &PeerRequest<N>, version: EthVersion) -> bool {
404 request.is_supported_by_eth_version(version)
405 }
406
407 fn on_internal_peer_message(&mut self, msg: PeerMessage<N>) {
409 match msg {
410 PeerMessage::NewBlockHashes(msg) => {
411 self.queued_outgoing.push_back(EthMessage::NewBlockHashes(msg).into());
412 }
413 PeerMessage::NewBlock(msg) => {
414 self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into());
415 }
416 PeerMessage::PooledTransactions(msg) => {
417 if msg.is_valid_for_version(self.conn.version()) {
418 self.queued_outgoing.push_pooled_hashes(msg);
419 } else {
420 self.queued_outgoing.broadcast_items.sub(msg.len());
421 debug!(target: "net", ?msg, version=?self.conn.version(), "Message is invalid for connection version, skipping");
422 }
423 }
424 PeerMessage::EthRequest(req) => {
425 let deadline = self.request_deadline();
426 self.on_internal_peer_request(req, deadline);
427 }
428 PeerMessage::SendTransactions(msg) => {
429 self.queued_outgoing.push_back(EthBroadcastMessage::Transactions(msg).into());
430 }
431 PeerMessage::BlockRangeUpdated(_) => {}
432 PeerMessage::ReceivedTransaction(_) => {
433 unreachable!("Not emitted by network")
434 }
435 PeerMessage::Other(other) => {
436 self.queued_outgoing.push_back(OutgoingMessage::Raw(other));
437 }
438 }
439 }
440
441 fn request_deadline(&self) -> Instant {
443 Instant::now() +
444 Duration::from_millis(self.internal_request_timeout.load(Ordering::Relaxed))
445 }
446
447 fn handle_outgoing_response(&mut self, id: u64, resp: PeerResponseResult<N>) {
451 match resp.try_into_message(id) {
452 Ok(msg) => {
453 self.queued_outgoing.push_back(msg.into());
454 }
455 Err(err) => {
456 debug!(target: "net", %err, "Failed to respond to received request");
457 }
458 }
459 }
460
461 #[expect(clippy::result_large_err)]
465 fn try_emit_broadcast(&self, message: PeerMessage<N>) -> Result<(), ActiveSessionMessage<N>> {
466 let Some(sender) = self.to_session_manager.inner().get_ref() else { return Ok(()) };
467
468 match sender
469 .try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
470 {
471 Ok(_) => Ok(()),
472 Err(err) => {
473 trace!(
474 target: "net",
475 %err,
476 "no capacity for incoming broadcast",
477 );
478 match err {
479 TrySendError::Full(msg) => Err(msg),
480 TrySendError::Closed(_) => Ok(()),
481 }
482 }
483 }
484 }
485
486 #[expect(clippy::result_large_err)]
491 fn try_emit_request(&self, message: PeerMessage<N>) -> Result<(), ActiveSessionMessage<N>> {
492 let Some(sender) = self.to_session_manager.inner().get_ref() else { return Ok(()) };
493
494 match sender
495 .try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
496 {
497 Ok(_) => Ok(()),
498 Err(err) => {
499 trace!(
500 target: "net",
501 %err,
502 "no capacity for incoming request",
503 );
504 match err {
505 TrySendError::Full(msg) => Err(msg),
506 TrySendError::Closed(_) => {
507 Ok(())
510 }
511 }
512 }
513 }
514 }
515
516 fn on_bad_message(&self) {
518 let Some(sender) = self.to_session_manager.inner().get_ref() else { return };
519 let _ = sender.try_send(ActiveSessionMessage::BadMessage { peer_id: self.remote_peer_id });
520 }
521
522 fn emit_disconnect(&mut self, cx: &mut Context<'_>) -> Poll<()> {
524 trace!(target: "net::session", remote_peer_id=?self.remote_peer_id, "emitting disconnect");
525 let msg = ActiveSessionMessage::Disconnected {
526 peer_id: self.remote_peer_id,
527 remote_addr: self.remote_addr,
528 };
529
530 self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
531 self.poll_terminate_message(cx).expect("message is set")
532 }
533
534 fn close_on_error(&mut self, error: EthStreamError, cx: &mut Context<'_>) -> Poll<()> {
536 let msg = ActiveSessionMessage::ClosedOnConnectionError {
537 peer_id: self.remote_peer_id,
538 remote_addr: self.remote_addr,
539 error,
540 };
541 self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
542 self.poll_terminate_message(cx).expect("message is set")
543 }
544
545 fn start_disconnect(&mut self, reason: DisconnectReason) -> Result<(), EthStreamError> {
547 Ok(self.conn.inner_mut().start_disconnect(reason)?)
548 }
549
550 fn poll_disconnect(&mut self, cx: &mut Context<'_>) -> Poll<()> {
552 debug_assert!(self.is_disconnecting(), "not disconnecting");
553
554 let _ = ready!(self.conn.poll_close_unpin(cx));
556 self.emit_disconnect(cx)
557 }
558
559 fn try_disconnect(&mut self, reason: DisconnectReason, cx: &mut Context<'_>) -> Poll<()> {
561 match self.start_disconnect(reason) {
562 Ok(()) => {
563 self.poll_disconnect(cx)
565 }
566 Err(err) => {
567 debug!(target: "net::session", %err, remote_peer_id=?self.remote_peer_id, "could not send disconnect");
568 self.close_on_error(err, cx)
569 }
570 }
571 }
572
573 #[must_use]
582 fn check_timed_out_requests(&mut self, now: Instant) -> bool {
583 for (id, req) in &mut self.inflight_requests {
584 if req.is_timed_out(now) {
585 if req.is_waiting() {
586 debug!(target: "net::session", ?id, remote_peer_id=?self.remote_peer_id, "timed out outgoing request");
587 req.timeout();
588 } else if now - req.timestamp > self.protocol_breach_request_timeout {
589 return true
590 }
591 }
592 }
593
594 false
595 }
596
597 fn update_request_timeout(&mut self, sent: Instant, received: Instant) {
599 let elapsed = received.saturating_duration_since(sent);
600
601 let current = Duration::from_millis(self.internal_request_timeout.load(Ordering::Relaxed));
602 let request_timeout = calculate_new_timeout(current, elapsed);
603 self.internal_request_timeout.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
604 self.internal_request_timeout_interval = tokio::time::interval(request_timeout);
605 }
606
607 fn poll_terminate_message(&mut self, cx: &mut Context<'_>) -> Option<Poll<()>> {
609 let (mut tx, msg) = self.terminate_message.take()?;
610 match tx.poll_reserve(cx) {
611 Poll::Pending => {
612 self.terminate_message = Some((tx, msg));
613 return Some(Poll::Pending)
614 }
615 Poll::Ready(Ok(())) => {
616 let _ = tx.send_item(msg);
617 }
618 Poll::Ready(Err(_)) => {
619 }
621 }
622 Some(Poll::Ready(()))
624 }
625}
626
627impl<N: NetworkPrimitives> Future for ActiveSession<N> {
628 type Output = ();
629
630 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
631 let this = self.get_mut();
632
633 if let Some(terminate) = this.poll_terminate_message(cx) {
635 return terminate
636 }
637
638 if this.is_disconnecting() {
639 return this.poll_disconnect(cx)
640 }
641
642 let mut budget = 4;
648
649 'main: loop {
651 let mut progress = false;
652
653 loop {
655 match this.commands_rx.poll_next_unpin(cx) {
656 Poll::Pending => break,
657 Poll::Ready(None) => {
658 return Poll::Ready(())
661 }
662 Poll::Ready(Some(cmd)) => {
663 progress = true;
664 match cmd {
665 SessionCommand::Disconnect { reason } => {
666 debug!(
667 target: "net::session",
668 ?reason,
669 remote_peer_id=?this.remote_peer_id,
670 "Received disconnect command for session"
671 );
672 let reason =
673 reason.unwrap_or(DisconnectReason::DisconnectRequested);
674
675 return this.try_disconnect(reason, cx)
676 }
677 SessionCommand::Message(msg) => {
678 this.on_internal_peer_message(msg);
679 }
680 }
681 }
682 }
683 }
684
685 while let Poll::Ready(Some(cmd)) = this.unbounded_rx.poll_recv(cx) {
687 progress = true;
688 match cmd {
689 SessionCommand::Message(msg) => {
690 this.unbounded_broadcast_msgs.increment(1);
691 this.on_internal_peer_message(msg);
692 }
693 SessionCommand::Disconnect { reason } => {
694 let reason = reason.unwrap_or(DisconnectReason::DisconnectRequested);
695 return this.try_disconnect(reason, cx)
696 }
697 }
698 }
699
700 let deadline = this.request_deadline();
701
702 while let Poll::Ready(Some(req)) = this.internal_request_rx.poll_next_unpin(cx) {
703 progress = true;
704 this.on_internal_peer_request(req, deadline);
705 }
706
707 for idx in (0..this.received_requests_from_remote.len()).rev() {
710 let mut req = this.received_requests_from_remote.swap_remove(idx);
711 match req.rx.poll(cx) {
712 Poll::Pending => {
713 this.received_requests_from_remote.push(req);
715 }
716 Poll::Ready(resp) => {
717 this.handle_outgoing_response(req.request_id, resp);
718 }
719 }
720 }
721
722 while this.conn.poll_ready_unpin(cx).is_ready() {
724 if let Some(msg) = this.queued_outgoing.pop_front() {
725 progress = true;
726 let res = match msg {
727 OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg),
728 OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg),
729 OutgoingMessage::Raw(msg) => this.conn.start_send_raw(msg),
730 };
731 if let Err(err) = res {
732 debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to send message");
733 return this.close_on_error(err, cx)
735 }
736 } else {
737 break
739 }
740 }
741
742 'receive: loop {
744 budget -= 1;
746 if budget == 0 {
747 cx.waker().wake_by_ref();
749 break 'main
750 }
751
752 if let Some(msg) = this.pending_message_to_session.take() {
756 match this.to_session_manager.poll_reserve(cx) {
757 Poll::Ready(Ok(_)) => {
758 let _ = this.to_session_manager.send_item(msg);
759 }
760 Poll::Ready(Err(_)) => return Poll::Ready(()),
761 Poll::Pending => {
762 this.pending_message_to_session = Some(msg);
763 break 'receive
764 }
765 };
766 }
767
768 if this.received_requests_from_remote.len() > MAX_QUEUED_OUTGOING_RESPONSES {
770 break 'receive
776 }
777
778 if this.queued_outgoing.messages.len() > MAX_QUEUED_OUTGOING_RESPONSES &&
780 this.queued_response_count() > MAX_QUEUED_OUTGOING_RESPONSES
781 {
782 break 'receive
789 }
790
791 match this.conn.poll_next_unpin(cx) {
792 Poll::Pending => break,
793 Poll::Ready(None) => {
794 if this.is_disconnecting() {
795 break
796 }
797 debug!(target: "net::session", remote_peer_id=?this.remote_peer_id, "eth stream completed");
798 return this.emit_disconnect(cx)
799 }
800 Poll::Ready(Some(res)) => {
801 match res {
802 Ok(msg) => {
803 trace!(target: "net::session", msg_id=?msg.message_id(), remote_peer_id=?this.remote_peer_id, "received eth message");
804 match this.on_incoming_message(msg) {
806 OnIncomingMessageOutcome::Ok => {
807 progress = true;
809 }
810 OnIncomingMessageOutcome::BadMessage { error, message } => {
811 debug!(target: "net::session", %error, msg=?message, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
812 this.on_bad_message();
813 return this
814 .try_disconnect(DisconnectReason::ProtocolBreach, cx)
815 }
816 OnIncomingMessageOutcome::NoCapacity(msg) => {
817 this.pending_message_to_session = Some(msg);
819 }
820 }
821 }
822 Err(err) => {
823 debug!(target: "net::session", %err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
824 if err.is_protocol_breach() {
825 this.on_bad_message();
826 return this.try_disconnect(DisconnectReason::ProtocolBreach, cx)
827 }
828 return this.close_on_error(err, cx)
829 }
830 }
831 }
832 }
833 }
834
835 if !progress {
836 break 'main
837 }
838 }
839
840 if let Some(interval) = &mut this.range_update_interval {
841 while interval.poll_tick(cx).is_ready() {
843 let current_latest = this.local_range_info.latest();
844 let should_send = if let Some(last_sent) = this.last_sent_latest_block {
845 current_latest.saturating_sub(last_sent) >= EPOCH_SLOTS
847 } else {
848 true };
850
851 if should_send {
852 this.queued_outgoing.push_back(
853 EthMessage::BlockRangeUpdate(this.local_range_info.to_message()).into(),
854 );
855 this.last_sent_latest_block = Some(current_latest);
856 }
857 }
858 }
859
860 while this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
861 if this.check_timed_out_requests(Instant::now()) &&
863 let Poll::Ready(Ok(_)) = this.to_session_manager.poll_reserve(cx)
864 {
865 let msg = ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id };
866 this.pending_message_to_session = Some(msg);
867 }
868 }
869
870 this.shrink_to_fit();
871
872 Poll::Pending
873 }
874}
875
876pub(crate) struct ReceivedRequest<N: NetworkPrimitives> {
878 request_id: u64,
880 rx: PeerResponse<N>,
882 #[expect(dead_code)]
884 received: Instant,
885}
886
887pub(crate) struct InflightRequest<R> {
889 request: RequestState<R>,
891 timestamp: Instant,
893 deadline: Instant,
895}
896
897impl<N: NetworkPrimitives> InflightRequest<PeerRequest<N>> {
900 #[inline]
902 fn is_timed_out(&self, now: Instant) -> bool {
903 now > self.deadline
904 }
905
906 #[inline]
908 const fn is_waiting(&self) -> bool {
909 matches!(self.request, RequestState::Waiting(_))
910 }
911
912 fn timeout(&mut self) {
914 let mut req = RequestState::TimedOut;
915 std::mem::swap(&mut self.request, &mut req);
916
917 if let RequestState::Waiting(req) = req {
918 req.send_err_response(RequestError::Timeout);
919 }
920 }
921}
922
923enum OnIncomingMessageOutcome<N: NetworkPrimitives> {
925 Ok,
927 BadMessage { error: EthStreamError, message: EthMessage<N> },
929 NoCapacity(ActiveSessionMessage<N>),
931}
932
933impl<N: NetworkPrimitives> From<Result<(), ActiveSessionMessage<N>>>
934 for OnIncomingMessageOutcome<N>
935{
936 fn from(res: Result<(), ActiveSessionMessage<N>>) -> Self {
937 match res {
938 Ok(_) => Self::Ok,
939 Err(msg) => Self::NoCapacity(msg),
940 }
941 }
942}
943
944enum RequestState<R> {
945 Waiting(R),
947 TimedOut,
949}
950
951#[derive(Debug)]
953pub(crate) enum OutgoingMessage<N: NetworkPrimitives> {
954 Eth(EthMessage<N>),
956 Broadcast(EthBroadcastMessage<N>),
958 Raw(RawCapabilityMessage),
960}
961
962impl<N: NetworkPrimitives> OutgoingMessage<N> {
963 const fn is_response(&self) -> bool {
965 match self {
966 Self::Eth(msg) => msg.is_response(),
967 _ => false,
968 }
969 }
970
971 fn broadcast_item_count(&self) -> usize {
977 match self {
978 Self::Eth(msg) => match msg {
979 EthMessage::NewBlockHashes(h) => h.len(),
980 EthMessage::NewPooledTransactionHashes66(h) => h.len(),
981 EthMessage::NewPooledTransactionHashes68(h) => h.hashes.len(),
982 _ => 0,
983 },
984 Self::Broadcast(msg) => match msg {
985 EthBroadcastMessage::NewBlock(_) => 1,
986 EthBroadcastMessage::Transactions(txs) => txs.len(),
987 },
988 Self::Raw(_) => 0,
989 }
990 }
991
992 fn try_merge_hashes(
995 &mut self,
996 incoming: NewPooledTransactionHashes,
997 ) -> Option<NewPooledTransactionHashes> {
998 let Self::Eth(eth) = self else { return Some(incoming) };
999 match (eth, incoming) {
1000 (
1001 EthMessage::NewPooledTransactionHashes66(existing),
1002 NewPooledTransactionHashes::Eth66(inc),
1003 ) => {
1004 existing.extend(inc);
1005 None
1006 }
1007 (
1008 EthMessage::NewPooledTransactionHashes68(existing),
1009 NewPooledTransactionHashes::Eth68(inc),
1010 ) => {
1011 existing.hashes.extend(inc.hashes);
1012 existing.sizes.extend(inc.sizes);
1013 existing.types.extend(inc.types);
1014 None
1015 }
1016 (_, incoming) => Some(incoming),
1017 }
1018 }
1019}
1020
1021impl<N: NetworkPrimitives> From<EthMessage<N>> for OutgoingMessage<N> {
1022 fn from(value: EthMessage<N>) -> Self {
1023 Self::Eth(value)
1024 }
1025}
1026
1027impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for OutgoingMessage<N> {
1028 fn from(value: EthBroadcastMessage<N>) -> Self {
1029 Self::Broadcast(value)
1030 }
1031}
1032
1033#[inline]
1035fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) -> Duration {
1036 let new_timeout = estimated_rtt.mul_f64(SAMPLE_IMPACT) * TIMEOUT_SCALING;
1037
1038 let smoothened_timeout = current_timeout.mul_f64(1.0 - SAMPLE_IMPACT) + new_timeout;
1040
1041 smoothened_timeout.clamp(MINIMUM_TIMEOUT, MAXIMUM_TIMEOUT)
1042}
1043
1044pub(crate) struct QueuedOutgoingMessages<N: NetworkPrimitives> {
1051 messages: VecDeque<OutgoingMessage<N>>,
1052 count: Gauge,
1053 broadcast_items: BroadcastItemCounter,
1055}
1056
1057impl<N: NetworkPrimitives> QueuedOutgoingMessages<N> {
1058 pub(crate) const fn new(metric: Gauge, broadcast_items: BroadcastItemCounter) -> Self {
1059 Self { messages: VecDeque::new(), count: metric, broadcast_items }
1060 }
1061
1062 pub(crate) fn push_back(&mut self, message: OutgoingMessage<N>) {
1063 self.messages.push_back(message);
1064 self.count.increment(1);
1065 }
1066
1067 pub(crate) fn pop_front(&mut self) -> Option<OutgoingMessage<N>> {
1068 self.messages.pop_front().inspect(|msg| {
1069 self.count.decrement(1);
1070 let items = msg.broadcast_item_count();
1071 if items > 0 {
1072 self.broadcast_items.sub(items);
1073 }
1074 })
1075 }
1076
1077 pub(crate) fn push_pooled_hashes(&mut self, msg: NewPooledTransactionHashes) {
1080 let msg = if let Some(last) = self.messages.back_mut() {
1081 match last.try_merge_hashes(msg) {
1082 None => return,
1083 Some(msg) => msg,
1084 }
1085 } else {
1086 msg
1087 };
1088 self.messages.push_back(EthMessage::from(msg).into());
1089 self.count.increment(1);
1090 }
1091
1092 pub(crate) fn shrink_to_fit(&mut self) {
1093 self.messages.shrink_to_fit();
1094 }
1095}
1096
1097impl<N: NetworkPrimitives> Drop for QueuedOutgoingMessages<N> {
1098 fn drop(&mut self) {
1099 let remaining = self.messages.len();
1101 if remaining > 0 {
1102 self.count.decrement(remaining as f64);
1103 }
1104 }
1105}
1106
1107#[cfg(test)]
1108mod tests {
1109 use super::*;
1110 use crate::session::{handle::PendingSessionEvent, start_pending_incoming_session};
1111 use alloy_eips::eip2124::ForkFilter;
1112 use reth_chainspec::MAINNET;
1113 use reth_ecies::stream::ECIESStream;
1114 use reth_eth_wire::{
1115 handshake::EthHandshake, EthNetworkPrimitives, EthStream, GetBlockAccessLists,
1116 GetBlockBodies, HelloMessageWithProtocols, P2PStream, StatusBuilder, UnauthedEthStream,
1117 UnauthedP2PStream, UnifiedStatus,
1118 };
1119 use reth_eth_wire_types::{EthMessageID, RawCapabilityMessage};
1120 use reth_ethereum_forks::EthereumHardfork;
1121 use reth_network_peers::pk2id;
1122 use reth_network_types::session::config::PROTOCOL_BREACH_REQUEST_TIMEOUT;
1123 use secp256k1::{SecretKey, SECP256K1};
1124 use tokio::{
1125 net::{TcpListener, TcpStream},
1126 sync::mpsc,
1127 };
1128
1129 fn eth_hello(server_key: &SecretKey) -> HelloMessageWithProtocols {
1131 HelloMessageWithProtocols::builder(pk2id(&server_key.public_key(SECP256K1))).build()
1132 }
1133
1134 struct SessionBuilder<N: NetworkPrimitives = EthNetworkPrimitives> {
1135 _remote_capabilities: Arc<Capabilities>,
1136 active_session_tx: mpsc::Sender<ActiveSessionMessage<N>>,
1137 active_session_rx: ReceiverStream<ActiveSessionMessage<N>>,
1138 to_sessions: Vec<mpsc::Sender<SessionCommand<N>>>,
1139 secret_key: SecretKey,
1140 local_peer_id: PeerId,
1141 hello: HelloMessageWithProtocols,
1142 status: UnifiedStatus,
1143 fork_filter: ForkFilter,
1144 next_id: usize,
1145 }
1146
1147 impl<N: NetworkPrimitives> SessionBuilder<N> {
1148 fn next_id(&mut self) -> SessionId {
1149 let id = self.next_id;
1150 self.next_id += 1;
1151 SessionId(id)
1152 }
1153
1154 fn with_client_stream<F, O>(
1156 &self,
1157 local_addr: SocketAddr,
1158 f: F,
1159 ) -> Pin<Box<dyn Future<Output = ()> + Send>>
1160 where
1161 F: FnOnce(EthStream<P2PStream<ECIESStream<TcpStream>>, N>) -> O + Send + 'static,
1162 O: Future<Output = ()> + Send + Sync,
1163 {
1164 let mut status = self.status;
1165 let fork_filter = self.fork_filter.clone();
1166 let local_peer_id = self.local_peer_id;
1167 let mut hello = self.hello.clone();
1168 let key = SecretKey::new(&mut rand_08::thread_rng());
1169 hello.id = pk2id(&key.public_key(SECP256K1));
1170 Box::pin(async move {
1171 let outgoing = TcpStream::connect(local_addr).await.unwrap();
1172 let sink = ECIESStream::connect(outgoing, key, local_peer_id).await.unwrap();
1173
1174 let (p2p_stream, _) = UnauthedP2PStream::new(sink).handshake(hello).await.unwrap();
1175
1176 let eth_version = p2p_stream.shared_capabilities().eth_version().unwrap();
1177 status.set_eth_version(eth_version);
1178
1179 let (client_stream, _) = UnauthedEthStream::new(p2p_stream)
1180 .handshake(status, fork_filter)
1181 .await
1182 .unwrap();
1183 f(client_stream).await
1184 })
1185 }
1186
1187 async fn connect_incoming(&mut self, stream: TcpStream) -> ActiveSession<N> {
1188 let remote_addr = stream.local_addr().unwrap();
1189 let session_id = self.next_id();
1190 let (_disconnect_tx, disconnect_rx) = oneshot::channel();
1191 let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(1);
1192
1193 tokio::task::spawn(start_pending_incoming_session(
1194 Arc::new(EthHandshake::default()),
1195 disconnect_rx,
1196 session_id,
1197 stream,
1198 pending_sessions_tx,
1199 remote_addr,
1200 self.secret_key,
1201 self.hello.clone(),
1202 self.status,
1203 self.fork_filter.clone(),
1204 Default::default(),
1205 ));
1206
1207 let mut stream = ReceiverStream::new(pending_sessions_rx);
1208
1209 match stream.next().await.unwrap() {
1210 PendingSessionEvent::Established {
1211 session_id,
1212 remote_addr,
1213 peer_id,
1214 capabilities,
1215 conn,
1216 ..
1217 } => {
1218 let (_to_session_tx, messages_rx) = mpsc::channel(10);
1219 let (commands_to_session, commands_rx) = mpsc::channel(10);
1220 let (_unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
1221 let poll_sender = PollSender::new(self.active_session_tx.clone());
1222
1223 self.to_sessions.push(commands_to_session);
1224
1225 ActiveSession {
1226 next_id: 0,
1227 remote_peer_id: peer_id,
1228 remote_addr,
1229 remote_capabilities: Arc::clone(&capabilities),
1230 session_id,
1231 commands_rx: ReceiverStream::new(commands_rx),
1232 unbounded_rx,
1233 unbounded_broadcast_msgs: Counter::noop(),
1234 to_session_manager: MeteredPollSender::new(
1235 poll_sender,
1236 "network_active_session",
1237 ),
1238 pending_message_to_session: None,
1239 internal_request_rx: ReceiverStream::new(messages_rx).fuse(),
1240 inflight_requests: Default::default(),
1241 conn,
1242 queued_outgoing: QueuedOutgoingMessages::new(
1243 Gauge::noop(),
1244 BroadcastItemCounter::new(),
1245 ),
1246 received_requests_from_remote: Default::default(),
1247 internal_request_timeout_interval: tokio::time::interval(
1248 INITIAL_REQUEST_TIMEOUT,
1249 ),
1250 internal_request_timeout: Arc::new(AtomicU64::new(
1251 INITIAL_REQUEST_TIMEOUT.as_millis() as u64,
1252 )),
1253 protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
1254 terminate_message: None,
1255 range_info: None,
1256 local_range_info: BlockRangeInfo::new(
1257 0,
1258 1000,
1259 alloy_primitives::B256::ZERO,
1260 ),
1261 range_update_interval: None,
1262 last_sent_latest_block: None,
1263 }
1264 }
1265 ev => {
1266 panic!("unexpected message {ev:?}")
1267 }
1268 }
1269 }
1270 }
1271
1272 impl Default for SessionBuilder {
1273 fn default() -> Self {
1274 let (active_session_tx, active_session_rx) = mpsc::channel(100);
1275
1276 let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
1277 let local_peer_id = pk2id(&pk);
1278
1279 Self {
1280 next_id: 0,
1281 _remote_capabilities: Arc::new(Capabilities::from(vec![])),
1282 active_session_tx,
1283 active_session_rx: ReceiverStream::new(active_session_rx),
1284 to_sessions: vec![],
1285 hello: eth_hello(&secret_key),
1286 secret_key,
1287 local_peer_id,
1288 status: StatusBuilder::default().build(),
1289 fork_filter: MAINNET
1290 .hardfork_fork_filter(EthereumHardfork::Frontier)
1291 .expect("The Frontier fork filter should exist on mainnet"),
1292 }
1293 }
1294 }
1295
1296 #[tokio::test(flavor = "multi_thread")]
1297 async fn test_disconnect() {
1298 let mut builder = SessionBuilder::default();
1299
1300 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1301 let local_addr = listener.local_addr().unwrap();
1302
1303 let expected_disconnect = DisconnectReason::UselessPeer;
1304
1305 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1306 let msg = client_stream.next().await.unwrap().unwrap_err();
1307 assert_eq!(msg.as_disconnected().unwrap(), expected_disconnect);
1308 });
1309
1310 tokio::task::spawn(async move {
1311 let (incoming, _) = listener.accept().await.unwrap();
1312 let mut session = builder.connect_incoming(incoming).await;
1313
1314 session.start_disconnect(expected_disconnect).unwrap();
1315 session.await
1316 });
1317
1318 fut.await;
1319 }
1320
1321 #[tokio::test(flavor = "multi_thread")]
1322 async fn test_invalid_message_disconnects_with_protocol_breach() {
1323 let mut builder = SessionBuilder::default();
1324
1325 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1326 let local_addr = listener.local_addr().unwrap();
1327
1328 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1329 client_stream
1330 .start_send_raw(RawCapabilityMessage::eth(
1331 EthMessageID::PooledTransactions,
1332 vec![0xc0].into(),
1333 ))
1334 .unwrap();
1335 client_stream.flush().await.unwrap();
1336
1337 let msg = client_stream.next().await.unwrap().unwrap_err();
1338 assert_eq!(msg.as_disconnected(), Some(DisconnectReason::ProtocolBreach));
1339 });
1340
1341 let (tx, rx) = oneshot::channel();
1342
1343 tokio::task::spawn(async move {
1344 let (incoming, _) = listener.accept().await.unwrap();
1345 let session = builder.connect_incoming(incoming).await;
1346 session.await;
1347
1348 tx.send(()).unwrap();
1349 });
1350
1351 fut.await;
1352 rx.await.unwrap();
1353 }
1354
1355 #[tokio::test(flavor = "multi_thread")]
1356 async fn handle_dropped_stream() {
1357 let mut builder = SessionBuilder::default();
1358
1359 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1360 let local_addr = listener.local_addr().unwrap();
1361
1362 let fut = builder.with_client_stream(local_addr, async move |client_stream| {
1363 drop(client_stream);
1364 tokio::time::sleep(Duration::from_secs(1)).await
1365 });
1366
1367 let (tx, rx) = oneshot::channel();
1368
1369 tokio::task::spawn(async move {
1370 let (incoming, _) = listener.accept().await.unwrap();
1371 let session = builder.connect_incoming(incoming).await;
1372 session.await;
1373
1374 tx.send(()).unwrap();
1375 });
1376
1377 tokio::task::spawn(fut);
1378
1379 rx.await.unwrap();
1380 }
1381
1382 #[tokio::test(flavor = "multi_thread")]
1383 async fn test_send_many_messages() {
1384 reth_tracing::init_test_tracing();
1385 let mut builder = SessionBuilder::default();
1386
1387 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1388 let local_addr = listener.local_addr().unwrap();
1389
1390 let num_messages = 100;
1391
1392 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1393 for _ in 0..num_messages {
1394 client_stream
1395 .send(EthMessage::NewPooledTransactionHashes66(Vec::new().into()))
1396 .await
1397 .unwrap();
1398 }
1399 });
1400
1401 let (tx, rx) = oneshot::channel();
1402
1403 tokio::task::spawn(async move {
1404 let (incoming, _) = listener.accept().await.unwrap();
1405 let session = builder.connect_incoming(incoming).await;
1406 session.await;
1407
1408 tx.send(()).unwrap();
1409 });
1410
1411 tokio::task::spawn(fut);
1412
1413 rx.await.unwrap();
1414 }
1415
1416 #[tokio::test(flavor = "multi_thread")]
1417 async fn test_request_timeout() {
1418 reth_tracing::init_test_tracing();
1419
1420 let mut builder = SessionBuilder::default();
1421
1422 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1423 let local_addr = listener.local_addr().unwrap();
1424
1425 let request_timeout = Duration::from_millis(100);
1426 let drop_timeout = Duration::from_millis(1500);
1427
1428 let fut = builder.with_client_stream(local_addr, async move |client_stream| {
1429 let _client_stream = client_stream;
1430 tokio::time::sleep(drop_timeout * 60).await;
1431 });
1432 tokio::task::spawn(fut);
1433
1434 let (incoming, _) = listener.accept().await.unwrap();
1435 let mut session = builder.connect_incoming(incoming).await;
1436 session
1437 .internal_request_timeout
1438 .store(request_timeout.as_millis() as u64, Ordering::Relaxed);
1439 session.protocol_breach_request_timeout = drop_timeout;
1440 session.internal_request_timeout_interval =
1441 tokio::time::interval_at(tokio::time::Instant::now(), request_timeout);
1442 let (tx, rx) = oneshot::channel();
1443 let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx };
1444 session.on_internal_peer_request(req, Instant::now());
1445 tokio::spawn(session);
1446
1447 let err = rx.await.unwrap().unwrap_err();
1448 assert_eq!(err, RequestError::Timeout);
1449
1450 let msg = builder.active_session_rx.next().await.unwrap();
1452 match msg {
1453 ActiveSessionMessage::ProtocolBreach { .. } => {}
1454 ev => unreachable!("{ev:?}"),
1455 }
1456 }
1457
1458 #[test]
1459 fn test_reject_bal_request_for_eth70() {
1460 let (tx, _rx) = oneshot::channel();
1461 let request: PeerRequest<EthNetworkPrimitives> =
1462 PeerRequest::GetBlockAccessLists { request: GetBlockAccessLists(vec![]), response: tx };
1463
1464 assert!(!ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
1465 &request,
1466 EthVersion::Eth70
1467 ));
1468 assert!(ActiveSession::<EthNetworkPrimitives>::is_request_supported_for_version(
1469 &request,
1470 EthVersion::Eth71
1471 ));
1472 }
1473
1474 #[tokio::test(flavor = "multi_thread")]
1475 async fn test_keep_alive() {
1476 let mut builder = SessionBuilder::default();
1477
1478 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1479 let local_addr = listener.local_addr().unwrap();
1480
1481 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1482 let _ = tokio::time::timeout(Duration::from_secs(5), client_stream.next()).await;
1483 client_stream.into_inner().disconnect(DisconnectReason::UselessPeer).await.unwrap();
1484 });
1485
1486 let (tx, rx) = oneshot::channel();
1487
1488 tokio::task::spawn(async move {
1489 let (incoming, _) = listener.accept().await.unwrap();
1490 let session = builder.connect_incoming(incoming).await;
1491 session.await;
1492
1493 tx.send(()).unwrap();
1494 });
1495
1496 tokio::task::spawn(fut);
1497
1498 rx.await.unwrap();
1499 }
1500
1501 #[tokio::test(flavor = "multi_thread")]
1503 async fn test_send_at_capacity() {
1504 let mut builder = SessionBuilder::default();
1505
1506 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1507 let local_addr = listener.local_addr().unwrap();
1508
1509 let fut = builder.with_client_stream(local_addr, async move |mut client_stream| {
1510 client_stream
1511 .send(EthMessage::NewPooledTransactionHashes68(Default::default()))
1512 .await
1513 .unwrap();
1514 let _ = tokio::time::timeout(Duration::from_secs(100), client_stream.next()).await;
1515 });
1516 tokio::task::spawn(fut);
1517
1518 let (incoming, _) = listener.accept().await.unwrap();
1519 let session = builder.connect_incoming(incoming).await;
1520
1521 let mut num_fill_messages = 0;
1523 loop {
1524 if builder
1525 .active_session_tx
1526 .try_send(ActiveSessionMessage::ProtocolBreach { peer_id: PeerId::random() })
1527 .is_err()
1528 {
1529 break
1530 }
1531 num_fill_messages += 1;
1532 }
1533
1534 tokio::task::spawn(async move {
1535 session.await;
1536 });
1537
1538 tokio::time::sleep(Duration::from_millis(100)).await;
1539
1540 for _ in 0..num_fill_messages {
1541 let message = builder.active_session_rx.next().await.unwrap();
1542 match message {
1543 ActiveSessionMessage::ProtocolBreach { .. } => {}
1544 ev => unreachable!("{ev:?}"),
1545 }
1546 }
1547
1548 let message = builder.active_session_rx.next().await.unwrap();
1549 match message {
1550 ActiveSessionMessage::ValidMessage {
1551 message: PeerMessage::PooledTransactions(_),
1552 ..
1553 } => {}
1554 _ => unreachable!(),
1555 }
1556 }
1557
1558 #[test]
1559 fn timeout_calculation_sanity_tests() {
1560 let rtt = Duration::from_secs(5);
1561 let timeout = rtt * TIMEOUT_SCALING;
1563
1564 assert_eq!(calculate_new_timeout(timeout, rtt), timeout);
1566
1567 assert!(calculate_new_timeout(timeout, rtt / 2) < timeout);
1569 assert!(calculate_new_timeout(timeout, rtt / 2) > timeout / 2);
1570 assert!(calculate_new_timeout(timeout, rtt * 2) > timeout);
1571 assert!(calculate_new_timeout(timeout, rtt * 2) < timeout * 2);
1572 }
1573}