1use alloy_consensus::transaction::TxHashRef;
4use itertools::Itertools;
5use rayon::iter::{IntoParallelIterator, ParallelIterator};
6
7pub mod config;
9pub mod constants;
11pub mod fetcher;
13pub mod policy;
15
16pub use self::constants::{
17 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
18 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
19};
20use config::AnnouncementAcceptance;
21pub use config::{
22 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
23 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
24};
25use policy::NetworkPolicies;
26
27pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
28
29use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
30use crate::{
31 budget::{
32 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
33 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
34 },
35 cache::LruCache,
36 duration_metered_exec, metered_poll_nested_stream_with_budget,
37 metrics::{
38 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
39 },
40 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
41 NetworkHandle, TxTypesCounter,
42};
43use alloy_primitives::{TxHash, B256};
44use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
45use futures::{stream::FuturesUnordered, Future, StreamExt};
46use reth_eth_wire::{
47 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
48 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
49 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
50 RequestTxHashes, Transactions, ValidAnnouncementData,
51};
52use reth_ethereum_primitives::{TransactionSigned, TxType};
53use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
54use reth_network_api::{
55 events::{PeerEvent, SessionInfo},
56 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
57};
58use reth_network_p2p::{
59 error::{RequestError, RequestResult},
60 sync::SyncStateProvider,
61};
62use reth_network_peers::PeerId;
63use reth_network_types::ReputationChangeKind;
64use reth_primitives_traits::SignedTransaction;
65use reth_tokio_util::EventStream;
66use reth_transaction_pool::{
67 error::{PoolError, PoolResult},
68 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
69 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
70};
71use std::{
72 collections::{hash_map::Entry, HashMap, HashSet},
73 pin::Pin,
74 sync::{
75 atomic::{AtomicUsize, Ordering},
76 Arc,
77 },
78 task::{Context, Poll},
79 time::{Duration, Instant},
80};
81use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
82use tokio_stream::wrappers::UnboundedReceiverStream;
83use tracing::{debug, trace};
84
85pub type PoolImportFuture =
89 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
90
91#[derive(Debug, Clone)]
99pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
100 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
102}
103
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105 fn send(&self, cmd: TransactionsCommand<N>) {
106 let _ = self.manager_tx.send(cmd);
107 }
108
109 async fn peer_handle(
111 &self,
112 peer_id: PeerId,
113 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114 let (tx, rx) = oneshot::channel();
115 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116 rx.await
117 }
118
119 pub fn propagate(&self, hash: TxHash) {
121 self.send(TransactionsCommand::PropagateHash(hash))
122 }
123
124 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128 self.propagate_hashes_to(Some(hash), peer)
129 }
130
131 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135 let hashes = hash.into_iter().collect::<Vec<_>>();
136 if hashes.is_empty() {
137 return
138 }
139 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140 }
141
142 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144 let (tx, rx) = oneshot::channel();
145 self.send(TransactionsCommand::GetActivePeers(tx));
146 rx.await
147 }
148
149 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153 if transactions.is_empty() {
154 return
155 }
156 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157 }
158
159 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164 if transactions.is_empty() {
165 return
166 }
167 self.send(TransactionsCommand::PropagateTransactions(transactions))
168 }
169
170 pub fn broadcast_transactions(
175 &self,
176 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177 ) {
178 let transactions =
179 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180 if transactions.is_empty() {
181 return
182 }
183 self.send(TransactionsCommand::BroadcastTransactions(transactions))
184 }
185
186 pub async fn get_transaction_hashes(
188 &self,
189 peers: Vec<PeerId>,
190 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
191 if peers.is_empty() {
192 return Ok(Default::default())
193 }
194 let (tx, rx) = oneshot::channel();
195 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196 rx.await
197 }
198
199 pub async fn get_peer_transaction_hashes(
201 &self,
202 peer: PeerId,
203 ) -> Result<HashSet<TxHash>, RecvError> {
204 let res = self.get_transaction_hashes(vec![peer]).await?;
205 Ok(res.into_values().next().unwrap_or_default())
206 }
207
208 pub async fn get_pooled_transactions_from(
214 &self,
215 peer_id: PeerId,
216 hashes: Vec<B256>,
217 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
218 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
219
220 let (tx, rx) = oneshot::channel();
221 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
222 peer.try_send(request).ok();
223
224 rx.await?.map(|res| Some(res.0))
225 }
226}
227
228#[derive(Debug)]
283#[must_use = "Manager does nothing unless polled."]
284pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
285 pool: Pool,
287 network: NetworkHandle<N>,
289 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
293 transaction_fetcher: TransactionFetcher<N>,
295 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
300 pool_imports: FuturesUnordered<PoolImportFuture>,
312 pending_pool_imports_info: PendingPoolImportsInfo,
314 bad_imports: LruCache<TxHash>,
316 peers: HashMap<PeerId, PeerMetadata<N>>,
318 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
322 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
327 pending_transactions: mpsc::Receiver<TxHash>,
336 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
338 config: TransactionsManagerConfig,
340 policies: NetworkPolicies<N>,
342 metrics: TransactionsManagerMetrics,
344 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
346}
347
348impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
349 pub fn new(
353 network: NetworkHandle<N>,
354 pool: Pool,
355 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
356 transactions_manager_config: TransactionsManagerConfig,
357 ) -> Self {
358 Self::with_policy(
359 network,
360 pool,
361 from_network,
362 transactions_manager_config,
363 NetworkPolicies::new(
364 TransactionPropagationKind::default(),
365 StrictEthAnnouncementFilter::default(),
366 ),
367 )
368 }
369}
370
371impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
372 pub fn with_policy(
376 network: NetworkHandle<N>,
377 pool: Pool,
378 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
379 transactions_manager_config: TransactionsManagerConfig,
380 policies: NetworkPolicies<N>,
381 ) -> Self {
382 let network_events = network.event_listener();
383
384 let (command_tx, command_rx) = mpsc::unbounded_channel();
385
386 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
387 &transactions_manager_config.transaction_fetcher_config,
388 );
389
390 let pending = pool.pending_transactions_listener();
393 let pending_pool_imports_info =
394 PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
395 let metrics = TransactionsManagerMetrics::default();
396 metrics
397 .capacity_pending_pool_imports
398 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
399
400 Self {
401 pool,
402 network,
403 network_events,
404 transaction_fetcher,
405 transactions_by_peers: Default::default(),
406 pool_imports: Default::default(),
407 pending_pool_imports_info,
408 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409 peers: Default::default(),
410 command_tx,
411 command_rx: UnboundedReceiverStream::new(command_rx),
412 pending_transactions: pending,
413 transaction_events: UnboundedMeteredReceiver::new(
414 from_network,
415 NETWORK_POOL_TRANSACTIONS_SCOPE,
416 ),
417 config: transactions_manager_config,
418 policies,
419 metrics,
420 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421 }
422 }
423
424 pub fn handle(&self) -> TransactionsHandle<N> {
426 TransactionsHandle { manager_tx: self.command_tx.clone() }
427 }
428
429 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432 self.has_capacity_for_pending_pool_imports() &&
433 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434 }
435
436 fn has_capacity_for_pending_pool_imports(&self) -> bool {
438 self.remaining_pool_import_capacity() > 0
439 }
440
441 fn remaining_pool_import_capacity(&self) -> usize {
443 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445 )
446 }
447
448 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450 self.metrics.reported_bad_transactions.increment(1);
451 }
452
453 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455 self.network.reputation_change(peer_id, kind);
456 }
457
458 fn report_already_seen(&self, peer_id: PeerId) {
459 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461 }
462
463 fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
465 if let Some(mut peer) = self.peers.remove(peer_id) {
466 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
467 }
468 self.transaction_fetcher.remove_peer(peer_id);
469 }
470
471 fn on_good_import(&mut self, hash: TxHash) {
473 self.transactions_by_peers.remove(&hash);
474 }
475
476 fn on_bad_import(&mut self, err: PoolError) {
500 let peers = self.transactions_by_peers.remove(&err.hash);
501
502 if !err.is_bad_transaction() || self.network.is_syncing() {
504 return
505 }
506 if let Some(peers) = peers {
509 for peer_id in peers {
510 self.report_peer_bad_transactions(peer_id);
511 }
512 }
513 self.metrics.bad_imports.increment(1);
514 self.bad_imports.insert(err.hash);
515 }
516
517 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
521 let info = &self.pending_pool_imports_info;
523 let max_pending_pool_imports = info.max_pending_pool_imports;
524 let has_capacity_wrt_pending_pool_imports =
525 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
526
527 self.transaction_fetcher
528 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
529 }
530
531 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
532 let kind = match req_err {
533 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
534 RequestError::Timeout => ReputationChangeKind::Timeout,
535 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
536 return
538 }
539 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
540 };
541 self.report_peer(peer_id, kind);
542 }
543
544 #[inline]
545 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
546 let metrics = &self.metrics;
547
548 let TxManagerPollDurations {
549 acc_network_events,
550 acc_pending_imports,
551 acc_tx_events,
552 acc_imported_txns,
553 acc_fetch_events,
554 acc_pending_fetch,
555 acc_cmds,
556 } = poll_durations;
557
558 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
560 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
562 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
563 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
564 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
565 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
566 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
567 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
568 }
569}
570
571impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
572 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
574 for res in batch_results {
575 match res {
576 Ok(AddedTransactionOutcome { hash, .. }) => {
577 self.on_good_import(hash);
578 }
579 Err(err) => {
580 self.on_bad_import(err);
581 }
582 }
583 }
584 }
585
586 fn on_new_pooled_transaction_hashes(
588 &mut self,
589 peer_id: PeerId,
590 msg: NewPooledTransactionHashes,
591 ) {
592 if self.network.is_initially_syncing() {
594 return
595 }
596 if self.network.tx_gossip_disabled() {
597 return
598 }
599
600 let Some(peer) = self.peers.get_mut(&peer_id) else {
602 trace!(
603 peer_id = format!("{peer_id:#}"),
604 ?msg,
605 "discarding announcement from inactive peer"
606 );
607
608 return
609 };
610 let client = peer.client_version.clone();
611
612 let mut count_txns_already_seen_by_peer = 0;
614 for tx in msg.iter_hashes().copied() {
615 if !peer.seen_transactions.insert(tx) {
616 count_txns_already_seen_by_peer += 1;
617 }
618 }
619 if count_txns_already_seen_by_peer > 0 {
620 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
625 self.metrics
626 .occurrences_hash_already_seen_by_peer
627 .increment(count_txns_already_seen_by_peer);
628
629 trace!(target: "net::tx",
630 %count_txns_already_seen_by_peer,
631 peer_id=format!("{peer_id:#}"),
632 ?client,
633 "Peer sent hashes that have already been marked as seen by peer"
634 );
635
636 self.report_already_seen(peer_id);
637 }
638
639 if msg.is_empty() {
641 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
642 return;
643 }
644
645 let original_len = msg.len();
646 let mut partially_valid_msg = msg.dedup();
647
648 if partially_valid_msg.len() != original_len {
649 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
650 }
651
652 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
654
655 let mut should_report_peer = false;
662 let mut tx_types_counter = TxTypesCounter::default();
663
664 let is_eth68_message = partially_valid_msg
665 .msg_version()
666 .expect("partially valid announcement should have a version")
667 .is_eth68();
668
669 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
670 let (ty_byte, size_val) = match *metadata_ref_mut {
671 Some((ty, size)) => {
672 if !is_eth68_message {
673 should_report_peer = true;
674 }
675 (ty, size)
676 }
677 None => {
678 if is_eth68_message {
679 should_report_peer = true;
680 return false;
681 }
682 (0u8, 0)
683 }
684 };
685
686 if is_eth68_message &&
687 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
688 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
689 {
690 tx_types_counter.increase_by_tx_type(parsed_tx_type);
691 }
692
693 let decision = self
694 .policies
695 .announcement_filter()
696 .decide_on_announcement(ty_byte, tx_hash, size_val);
697
698 match decision {
699 AnnouncementAcceptance::Accept => true,
700 AnnouncementAcceptance::Ignore => false,
701 AnnouncementAcceptance::Reject { penalize_peer } => {
702 if penalize_peer {
703 should_report_peer = true;
704 }
705 false
706 }
707 }
708 });
709
710 if is_eth68_message {
711 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
712 }
713
714 if should_report_peer {
715 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
716 }
717
718 let hashes_count_pre_pool_filter = partially_valid_msg.len();
726 self.pool.retain_unknown(&mut partially_valid_msg);
727 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
728 let already_known_hashes_count =
729 hashes_count_pre_pool_filter - partially_valid_msg.len();
730 self.metrics
731 .occurrences_hashes_already_in_pool
732 .increment(already_known_hashes_count as u64);
733 }
734
735 if partially_valid_msg.is_empty() {
736 return
738 }
739
740 let mut valid_announcement_data =
741 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
742
743 if valid_announcement_data.is_empty() {
744 return
746 }
747
748 let bad_imports = &self.bad_imports;
755 self.transaction_fetcher.filter_unseen_and_pending_hashes(
756 &mut valid_announcement_data,
757 |hash| bad_imports.contains(hash),
758 &peer_id,
759 &client,
760 );
761
762 if valid_announcement_data.is_empty() {
763 return
765 }
766
767 trace!(target: "net::tx::propagation",
768 peer_id=format!("{peer_id:#}"),
769 hashes_len=valid_announcement_data.len(),
770 hashes=%valid_announcement_data.keys().format(", "),
771 msg_version=%valid_announcement_data.msg_version(),
772 client_version=%client,
773 "received previously unseen and pending hashes in announcement from peer"
774 );
775
776 if !self.transaction_fetcher.is_idle(&peer_id) {
779 let msg_version = valid_announcement_data.msg_version();
781 let (hashes, _version) = valid_announcement_data.into_request_hashes();
782
783 trace!(target: "net::tx",
784 peer_id=format!("{peer_id:#}"),
785 hashes=?*hashes,
786 %msg_version,
787 %client,
788 "buffering hashes announced by busy peer"
789 );
790
791 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
792
793 return
794 }
795
796 let mut hashes_to_request =
797 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
798 let surplus_hashes =
799 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
800
801 if !surplus_hashes.is_empty() {
802 trace!(target: "net::tx",
803 peer_id=format!("{peer_id:#}"),
804 surplus_hashes=?*surplus_hashes,
805 %client,
806 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
807 );
808
809 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
810 }
811
812 trace!(target: "net::tx",
813 peer_id=format!("{peer_id:#}"),
814 hashes=?*hashes_to_request,
815 %client,
816 "sending hashes in `GetPooledTransactions` request to peer's session"
817 );
818
819 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
823 if let Some(failed_to_request_hashes) =
824 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
825 {
826 let conn_eth_version = peer.version;
827
828 trace!(target: "net::tx",
829 peer_id=format!("{peer_id:#}"),
830 failed_to_request_hashes=?*failed_to_request_hashes,
831 %conn_eth_version,
832 %client,
833 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
834 );
835 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
836 }
837 }
838}
839
840impl<Pool, N> TransactionsManager<Pool, N>
841where
842 Pool: TransactionPool + Unpin + 'static,
843 N: NetworkPrimitives<
844 BroadcastedTransaction: SignedTransaction,
845 PooledTransaction: SignedTransaction,
846 > + Unpin,
847 Pool::Transaction:
848 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
849{
850 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
862 if self.network.tx_gossip_disabled() {
866 return
867 }
868
869 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
870
871 self.propagate_all(hashes);
872 }
873
874 fn propagate_full_transactions_to_peer(
878 &mut self,
879 txs: Vec<TxHash>,
880 peer_id: PeerId,
881 propagation_mode: PropagationMode,
882 ) -> Option<PropagatedTransactions> {
883 let peer = self.peers.get_mut(&peer_id)?;
884 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
885 let mut propagated = PropagatedTransactions::default();
886
887 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
889
890 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
891
892 if propagation_mode.is_forced() {
893 full_transactions.extend(to_propagate);
895 } else {
896 for tx in to_propagate {
899 if !peer.seen_transactions.contains(tx.tx_hash()) {
900 full_transactions.push(&tx);
902 }
903 }
904 }
905
906 if full_transactions.is_empty() {
907 return None
909 }
910
911 let PropagateTransactions { pooled, full } = full_transactions.build();
912
913 if let Some(new_pooled_hashes) = pooled {
915 for hash in new_pooled_hashes.iter_hashes().copied() {
916 propagated.record(hash, PropagateKind::Hash(peer_id));
917 peer.seen_transactions.insert(hash);
919 }
920
921 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
923 }
924
925 if let Some(new_full_transactions) = full {
927 for tx in &new_full_transactions {
928 propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
929 peer.seen_transactions.insert(*tx.tx_hash());
931 }
932
933 self.network.send_transactions(peer_id, new_full_transactions);
935 }
936
937 self.metrics.propagated_transactions.increment(propagated.len() as u64);
939
940 Some(propagated)
941 }
942
943 fn propagate_hashes_to(
947 &mut self,
948 hashes: Vec<TxHash>,
949 peer_id: PeerId,
950 propagation_mode: PropagationMode,
951 ) {
952 trace!(target: "net::tx", "Start propagating transactions as hashes");
953
954 let propagated = {
957 let Some(peer) = self.peers.get_mut(&peer_id) else {
958 return
960 };
961
962 let to_propagate =
963 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
964
965 let mut propagated = PropagatedTransactions::default();
966
967 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
969
970 if propagation_mode.is_forced() {
971 hashes.extend(to_propagate)
972 } else {
973 for tx in to_propagate {
974 if !peer.seen_transactions.contains(tx.tx_hash()) {
975 hashes.push(&tx);
977 }
978 }
979 }
980
981 let new_pooled_hashes = hashes.build();
982
983 if new_pooled_hashes.is_empty() {
984 return
986 }
987
988 if let Some(peer) = self.peers.get_mut(&peer_id) {
989 for hash in new_pooled_hashes.iter_hashes().copied() {
990 propagated.record(hash, PropagateKind::Hash(peer_id));
991 peer.seen_transactions.insert(hash);
992 }
993 }
994
995 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
996
997 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
999
1000 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1002
1003 propagated
1004 };
1005
1006 self.pool.on_propagated(propagated);
1008 }
1009
1010 fn propagate_transactions(
1017 &mut self,
1018 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1019 propagation_mode: PropagationMode,
1020 ) -> PropagatedTransactions {
1021 let mut propagated = PropagatedTransactions::default();
1022 if self.network.tx_gossip_disabled() {
1023 return propagated
1024 }
1025
1026 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1028
1029 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1031 if !self.policies.propagation_policy().can_propagate(peer) {
1032 continue
1034 }
1035 let mut builder = if peer_idx > max_num_full {
1037 PropagateTransactionsBuilder::pooled(peer.version)
1038 } else {
1039 PropagateTransactionsBuilder::full(peer.version)
1040 };
1041
1042 if propagation_mode.is_forced() {
1043 builder.extend(to_propagate.iter());
1044 } else {
1045 for tx in &to_propagate {
1049 if !peer.seen_transactions.contains(tx.tx_hash()) {
1052 builder.push(tx);
1053 }
1054 }
1055 }
1056
1057 if builder.is_empty() {
1058 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1059 continue
1060 }
1061
1062 let PropagateTransactions { pooled, full } = builder.build();
1063
1064 if let Some(mut new_pooled_hashes) = pooled {
1066 new_pooled_hashes
1069 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1070
1071 for hash in new_pooled_hashes.iter_hashes().copied() {
1072 propagated.record(hash, PropagateKind::Hash(*peer_id));
1073 peer.seen_transactions.insert(hash);
1075 }
1076
1077 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1078
1079 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1081 }
1082
1083 if let Some(new_full_transactions) = full {
1085 for tx in &new_full_transactions {
1086 propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1087 peer.seen_transactions.insert(*tx.tx_hash());
1089 }
1090
1091 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1092
1093 self.network.send_transactions(*peer_id, new_full_transactions);
1095 }
1096 }
1097
1098 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1100
1101 propagated
1102 }
1103
1104 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1109 if self.peers.is_empty() {
1110 return
1112 }
1113 let propagated = self.propagate_transactions(
1114 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1115 PropagationMode::Basic,
1116 );
1117
1118 self.pool.on_propagated(propagated);
1120 }
1121
1122 fn on_get_pooled_transactions(
1124 &mut self,
1125 peer_id: PeerId,
1126 request: GetPooledTransactions,
1127 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1128 ) {
1129 if self.network.tx_gossip_disabled() {
1131 let _ = response.send(Ok(PooledTransactions::default()));
1132 return
1133 }
1134 if let Some(peer) = self.peers.get_mut(&peer_id) {
1135 let transactions = self.pool.get_pooled_transaction_elements(
1136 request.0,
1137 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1138 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1139 ),
1140 );
1141 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1142
1143 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1146
1147 let resp = PooledTransactions(transactions);
1148 let _ = response.send(Ok(resp));
1149 }
1150 }
1151
1152 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1154 match cmd {
1155 TransactionsCommand::PropagateHash(hash) => {
1156 self.on_new_pending_transactions(vec![hash])
1157 }
1158 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1159 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1160 }
1161 TransactionsCommand::GetActivePeers(tx) => {
1162 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1163 tx.send(peers).ok();
1164 }
1165 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1166 if let Some(propagated) =
1167 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1168 {
1169 self.pool.on_propagated(propagated);
1170 }
1171 }
1172 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1173 TransactionsCommand::BroadcastTransactions(txs) => {
1174 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1175 self.pool.on_propagated(propagated);
1176 }
1177 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1178 let mut res = HashMap::with_capacity(peers.len());
1179 for peer_id in peers {
1180 let hashes = self
1181 .peers
1182 .get(&peer_id)
1183 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1184 .unwrap_or_default();
1185 res.insert(peer_id, hashes);
1186 }
1187 tx.send(res).ok();
1188 }
1189 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1190 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1191 peer_request_sender.send(sender).ok();
1192 }
1193 }
1194 }
1195
1196 fn handle_peer_session(
1200 &mut self,
1201 info: SessionInfo,
1202 messages: PeerRequestSender<PeerRequest<N>>,
1203 ) {
1204 let SessionInfo { peer_id, client_version, version, .. } = info;
1205
1206 let peer = PeerMetadata::<N>::new(
1208 messages,
1209 version,
1210 client_version,
1211 self.config.max_transactions_seen_by_peer_history,
1212 info.peer_kind,
1213 );
1214 let peer = match self.peers.entry(peer_id) {
1215 Entry::Occupied(mut entry) => {
1216 entry.insert(peer);
1217 entry.into_mut()
1218 }
1219 Entry::Vacant(entry) => entry.insert(peer),
1220 };
1221
1222 self.policies.propagation_policy_mut().on_session_established(peer);
1223
1224 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1228 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1229 return
1230 }
1231
1232 let pooled_txs = self.pool.pooled_transactions_max(
1234 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1235 );
1236 if pooled_txs.is_empty() {
1237 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1238 return;
1239 }
1240
1241 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1243 for pooled_tx in pooled_txs {
1244 peer.seen_transactions.insert(*pooled_tx.hash());
1245 msg_builder.push_pooled(pooled_tx);
1246 }
1247
1248 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1249 let msg = msg_builder.build();
1250 self.network.send_transactions_hashes(peer_id, msg);
1251 }
1252
1253 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1255 match event_result {
1256 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1257 self.on_peer_session_closed(&peer_id);
1258 }
1259 NetworkEvent::ActivePeerSession { info, messages } => {
1260 self.handle_peer_session(info, messages);
1262 }
1263 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1264 let peer_id = info.peer_id;
1265 let messages = match self.peers.get(&peer_id) {
1267 Some(p) => p.request_tx.clone(),
1268 None => {
1269 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1270 return;
1271 }
1272 };
1273 self.handle_peer_session(info, messages);
1274 }
1275 _ => {}
1276 }
1277 }
1278
1279 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1281 if self.config.ingress_policy.allows_all() {
1282 return true;
1283 }
1284 let Some(peer) = self.peers.get(peer_id) else {
1285 return false;
1286 };
1287 self.config.ingress_policy.allows(peer.peer_kind())
1288 }
1289
1290 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1292 match event {
1293 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1294 if !self.accepts_incoming_from(&peer_id) {
1295 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1296 return;
1297 }
1298
1299 let has_blob_txs = msg.has_eip4844();
1303
1304 let non_blob_txs = msg
1305 .into_iter()
1306 .map(N::PooledTransaction::try_from)
1307 .filter_map(Result::ok)
1308 .collect();
1309
1310 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1311
1312 if has_blob_txs {
1313 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1314 self.report_peer_bad_transactions(peer_id);
1315 }
1316 }
1317 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1318 if !self.accepts_incoming_from(&peer_id) {
1319 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1320 return;
1321 }
1322 self.on_new_pooled_transaction_hashes(peer_id, msg)
1323 }
1324 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1325 self.on_get_pooled_transactions(peer_id, request, response)
1326 }
1327 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1328 let _ = response.send(Some(self.handle()));
1329 }
1330 }
1331 }
1332
1333 fn import_transactions(
1335 &mut self,
1336 peer_id: PeerId,
1337 transactions: PooledTransactions<N::PooledTransaction>,
1338 source: TransactionSource,
1339 ) {
1340 if self.network.is_initially_syncing() {
1342 return
1343 }
1344 if self.network.tx_gossip_disabled() {
1345 return
1346 }
1347
1348 if !self.has_capacity_for_pending_pool_imports() {
1350 return
1351 }
1352
1353 let mut transactions = transactions.0;
1354
1355 let capacity = self.remaining_pool_import_capacity();
1359 if transactions.len() > capacity {
1360 let skipped = transactions.len() - capacity;
1361 transactions.truncate(capacity);
1362 self.metrics
1363 .skipped_transactions_pending_pool_imports_at_capacity
1364 .increment(skipped as u64);
1365 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1366 }
1367
1368 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1369 let client_version = peer.client_version.clone();
1370
1371 let start = Instant::now();
1372
1373 self.transaction_fetcher
1375 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1376
1377 let mut num_already_seen_by_peer = 0;
1382 for tx in &transactions {
1383 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1384 num_already_seen_by_peer += 1;
1385 }
1386 }
1387
1388 let mut has_bad_transactions = false;
1390
1391 transactions.retain(|tx| {
1394 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1395 entry.get_mut().insert(peer_id);
1396 return false
1397 }
1398 if self.bad_imports.contains(tx.tx_hash()) {
1399 trace!(target: "net::tx",
1400 peer_id=format!("{peer_id:#}"),
1401 hash=%tx.tx_hash(),
1402 %client_version,
1403 "received a known bad transaction from peer"
1404 );
1405 has_bad_transactions = true;
1406 return false;
1407 }
1408 true
1409 });
1410
1411 let txns_count_pre_pool_filter = transactions.len();
1413 self.pool.retain_unknown(&mut transactions);
1414 if txns_count_pre_pool_filter > transactions.len() {
1415 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1416 self.metrics
1417 .occurrences_transactions_already_in_pool
1418 .increment(already_known_txns_count as u64);
1419 }
1420
1421 let txs_len = transactions.len();
1422
1423 let new_txs = transactions
1424 .into_par_iter()
1425 .filter_map(|tx| match tx.try_into_recovered() {
1426 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1427 Err(badtx) => {
1428 trace!(target: "net::tx",
1429 peer_id=format!("{peer_id:#}"),
1430 hash=%badtx.tx_hash(),
1431 client_version=%client_version,
1432 "failed ecrecovery for transaction"
1433 );
1434 None
1435 }
1436 })
1437 .collect::<Vec<_>>();
1438
1439 has_bad_transactions |= new_txs.len() != txs_len;
1440
1441 for tx in &new_txs {
1443 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1444 }
1445
1446 if !new_txs.is_empty() {
1449 let pool = self.pool.clone();
1450 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1452 metric_pending_pool_imports.increment(new_txs.len() as f64);
1453
1454 self.pending_pool_imports_info
1456 .pending_pool_imports
1457 .fetch_add(new_txs.len(), Ordering::Relaxed);
1458 let tx_manager_info_pending_pool_imports =
1459 self.pending_pool_imports_info.pending_pool_imports.clone();
1460
1461 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1462 let import = Box::pin(async move {
1463 let added = new_txs.len();
1464 let res = pool.add_external_transactions(new_txs).await;
1465
1466 metric_pending_pool_imports.decrement(added as f64);
1468 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1470
1471 res
1472 });
1473
1474 self.pool_imports.push(import);
1475 }
1476
1477 if num_already_seen_by_peer > 0 {
1478 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1479 self.metrics
1480 .occurrences_of_transaction_already_seen_by_peer
1481 .increment(num_already_seen_by_peer);
1482 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1483 }
1484
1485 if has_bad_transactions {
1486 self.report_peer_bad_transactions(peer_id)
1488 }
1489
1490 if num_already_seen_by_peer > 0 {
1491 self.report_already_seen(peer_id);
1492 }
1493
1494 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1495 }
1496
1497 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1499 match fetch_event {
1500 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1501 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1502 if report_peer {
1503 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1504 }
1505 }
1506 FetchEvent::FetchError { peer_id, error } => {
1507 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1508 self.on_request_error(peer_id, error);
1509 }
1510 FetchEvent::EmptyResponse { peer_id } => {
1511 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1512 }
1513 }
1514 }
1515}
1516
1517impl<
1525 Pool: TransactionPool + Unpin + 'static,
1526 N: NetworkPrimitives<
1527 BroadcastedTransaction: SignedTransaction,
1528 PooledTransaction: SignedTransaction,
1529 > + Unpin,
1530 > Future for TransactionsManager<Pool, N>
1531where
1532 Pool::Transaction:
1533 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1534{
1535 type Output = ();
1536
1537 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1538 let start = Instant::now();
1539 let mut poll_durations = TxManagerPollDurations::default();
1540
1541 let this = self.get_mut();
1542
1543 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1549 poll_durations.acc_network_events,
1550 "net::tx",
1551 "Network events stream",
1552 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1553 this.network_events.poll_next_unpin(cx),
1554 |event| this.on_network_event(event)
1555 );
1556
1557 let mut new_txs = Vec::new();
1566 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1567 cx,
1568 &mut new_txs,
1569 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1570 ) {
1571 Poll::Ready(count) => {
1572 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1573 true
1576 } else {
1577 let limit =
1581 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1582 new_txs.len();
1583 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1584 }
1585 }
1586 Poll::Pending => false,
1587 };
1588 if !new_txs.is_empty() {
1589 this.on_new_pending_transactions(new_txs);
1590 }
1591
1592 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1607 poll_durations.acc_tx_events,
1608 "net::tx",
1609 "Network transaction events stream",
1610 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1611 this.transaction_events.poll_next_unpin(cx),
1612 |event| this.on_network_tx_event(event),
1613 );
1614
1615 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1626 poll_durations.acc_fetch_events,
1627 "net::tx",
1628 "Transaction fetch events stream",
1629 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1630 this.transaction_fetcher.poll_next_unpin(cx),
1631 |event| this.on_fetch_event(event),
1632 );
1633
1634 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1649 poll_durations.acc_pending_imports,
1650 "net::tx",
1651 "Batched pool imports stream",
1652 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1653 this.pool_imports.poll_next_unpin(cx),
1654 |batch_results| this.on_batch_import_result(batch_results)
1655 );
1656
1657 duration_metered_exec!(
1662 {
1663 if this.has_capacity_for_fetching_pending_hashes() &&
1664 this.on_fetch_hashes_pending_fetch()
1665 {
1666 maybe_more_tx_fetch_events = true;
1667 }
1668 },
1669 poll_durations.acc_pending_fetch
1670 );
1671
1672 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1674 poll_durations.acc_cmds,
1675 "net::tx",
1676 "Commands channel",
1677 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1678 this.command_rx.poll_next_unpin(cx),
1679 |cmd| this.on_command(cmd)
1680 );
1681
1682 this.transaction_fetcher.update_metrics();
1683
1684 if maybe_more_network_events ||
1686 maybe_more_commands ||
1687 maybe_more_tx_events ||
1688 maybe_more_tx_fetch_events ||
1689 maybe_more_pool_imports ||
1690 maybe_more_pending_txns
1691 {
1692 cx.waker().wake_by_ref();
1694 return Poll::Pending
1695 }
1696
1697 this.update_poll_metrics(start, poll_durations);
1698
1699 Poll::Pending
1700 }
1701}
1702
1703#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1707enum PropagationMode {
1708 Basic,
1712 Forced,
1717}
1718
1719impl PropagationMode {
1720 const fn is_forced(self) -> bool {
1722 matches!(self, Self::Forced)
1723 }
1724}
1725
1726#[derive(Debug, Clone)]
1728struct PropagateTransaction<T = TransactionSigned> {
1729 size: usize,
1730 transaction: Arc<T>,
1731}
1732
1733impl<T: SignedTransaction> PropagateTransaction<T> {
1734 pub fn new(transaction: T) -> Self {
1736 let size = transaction.length();
1737 Self { size, transaction: Arc::new(transaction) }
1738 }
1739
1740 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1742 where
1743 P: PoolTransaction<Consensus = T>,
1744 {
1745 let size = tx.encoded_length();
1746 let transaction = tx.transaction.clone_into_consensus();
1747 let transaction = Arc::new(transaction.into_inner());
1748 Self { size, transaction }
1749 }
1750
1751 fn tx_hash(&self) -> &TxHash {
1752 self.transaction.tx_hash()
1753 }
1754}
1755
1756#[derive(Debug, Clone)]
1759enum PropagateTransactionsBuilder<T> {
1760 Pooled(PooledTransactionsHashesBuilder),
1761 Full(FullTransactionsBuilder<T>),
1762}
1763
1764impl<T> PropagateTransactionsBuilder<T> {
1765 fn pooled(version: EthVersion) -> Self {
1767 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1768 }
1769
1770 fn full(version: EthVersion) -> Self {
1772 Self::Full(FullTransactionsBuilder::new(version))
1773 }
1774
1775 fn is_empty(&self) -> bool {
1777 match self {
1778 Self::Pooled(builder) => builder.is_empty(),
1779 Self::Full(builder) => builder.is_empty(),
1780 }
1781 }
1782
1783 fn build(self) -> PropagateTransactions<T> {
1785 match self {
1786 Self::Pooled(pooled) => {
1787 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1788 }
1789 Self::Full(full) => full.build(),
1790 }
1791 }
1792}
1793
1794impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1795 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1797 for tx in txs {
1798 self.push(tx);
1799 }
1800 }
1801
1802 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1804 match self {
1805 Self::Pooled(builder) => builder.push(transaction),
1806 Self::Full(builder) => builder.push(transaction),
1807 }
1808 }
1809}
1810
1811struct PropagateTransactions<T> {
1813 pooled: Option<NewPooledTransactionHashes>,
1815 full: Option<Vec<Arc<T>>>,
1817}
1818
1819#[derive(Debug, Clone)]
1824struct FullTransactionsBuilder<T> {
1825 total_size: usize,
1827 transactions: Vec<Arc<T>>,
1829 pooled: PooledTransactionsHashesBuilder,
1831}
1832
1833impl<T> FullTransactionsBuilder<T> {
1834 fn new(version: EthVersion) -> Self {
1836 Self {
1837 total_size: 0,
1838 pooled: PooledTransactionsHashesBuilder::new(version),
1839 transactions: vec![],
1840 }
1841 }
1842
1843 fn is_empty(&self) -> bool {
1845 self.transactions.is_empty() && self.pooled.is_empty()
1846 }
1847
1848 fn build(self) -> PropagateTransactions<T> {
1850 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1851 let full = Some(self.transactions).filter(|full| !full.is_empty());
1852 PropagateTransactions { pooled, full }
1853 }
1854}
1855
1856impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1857 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1859 for tx in txs {
1860 self.push(&tx)
1861 }
1862 }
1863
1864 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1874 if !transaction.transaction.is_broadcastable_in_full() {
1883 self.pooled.push(transaction);
1884 return
1885 }
1886
1887 let new_size = self.total_size + transaction.size;
1888 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1889 self.total_size > 0
1890 {
1891 self.pooled.push(transaction);
1893 return
1894 }
1895
1896 self.total_size = new_size;
1897 self.transactions.push(Arc::clone(&transaction.transaction));
1898 }
1899}
1900
1901#[derive(Debug, Clone)]
1904enum PooledTransactionsHashesBuilder {
1905 Eth66(NewPooledTransactionHashes66),
1906 Eth68(NewPooledTransactionHashes68),
1907}
1908
1909impl PooledTransactionsHashesBuilder {
1912 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1914 match self {
1915 Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1916 Self::Eth68(msg) => {
1917 msg.hashes.push(*pooled_tx.hash());
1918 msg.sizes.push(pooled_tx.encoded_length());
1919 msg.types.push(pooled_tx.transaction.ty());
1920 }
1921 }
1922 }
1923
1924 fn is_empty(&self) -> bool {
1926 match self {
1927 Self::Eth66(hashes) => hashes.is_empty(),
1928 Self::Eth68(hashes) => hashes.is_empty(),
1929 }
1930 }
1931
1932 fn len(&self) -> usize {
1934 match self {
1935 Self::Eth66(hashes) => hashes.len(),
1936 Self::Eth68(hashes) => hashes.len(),
1937 }
1938 }
1939
1940 fn extend<T: SignedTransaction>(
1942 &mut self,
1943 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1944 ) {
1945 for tx in txs {
1946 self.push(&tx);
1947 }
1948 }
1949
1950 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1951 match self {
1952 Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1953 Self::Eth68(msg) => {
1954 msg.hashes.push(*tx.tx_hash());
1955 msg.sizes.push(tx.size);
1956 msg.types.push(tx.transaction.ty());
1957 }
1958 }
1959 }
1960
1961 fn new(version: EthVersion) -> Self {
1963 match version {
1964 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1965 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1966 Self::Eth68(Default::default())
1967 }
1968 }
1969 }
1970
1971 fn build(self) -> NewPooledTransactionHashes {
1972 match self {
1973 Self::Eth66(mut msg) => {
1974 msg.shrink_to_fit();
1975 msg.into()
1976 }
1977 Self::Eth68(mut msg) => {
1978 msg.shrink_to_fit();
1979 msg.into()
1980 }
1981 }
1982 }
1983}
1984
1985enum TransactionSource {
1987 Broadcast,
1989 Response,
1991}
1992
1993impl TransactionSource {
1996 const fn is_broadcast(&self) -> bool {
1998 matches!(self, Self::Broadcast)
1999 }
2000}
2001
2002#[derive(Debug)]
2004pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2005 seen_transactions: LruCache<TxHash>,
2009 request_tx: PeerRequestSender<PeerRequest<N>>,
2011 version: EthVersion,
2013 client_version: Arc<str>,
2015 peer_kind: PeerKind,
2017}
2018
2019impl<N: NetworkPrimitives> PeerMetadata<N> {
2020 pub fn new(
2022 request_tx: PeerRequestSender<PeerRequest<N>>,
2023 version: EthVersion,
2024 client_version: Arc<str>,
2025 max_transactions_seen_by_peer: u32,
2026 peer_kind: PeerKind,
2027 ) -> Self {
2028 Self {
2029 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2030 request_tx,
2031 version,
2032 client_version,
2033 peer_kind,
2034 }
2035 }
2036
2037 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2039 &self.request_tx
2040 }
2041
2042 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2044 &mut self.seen_transactions
2045 }
2046
2047 pub const fn version(&self) -> EthVersion {
2049 self.version
2050 }
2051
2052 pub fn client_version(&self) -> &str {
2054 &self.client_version
2055 }
2056
2057 pub const fn peer_kind(&self) -> PeerKind {
2059 self.peer_kind
2060 }
2061}
2062
2063#[derive(Debug)]
2065enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2066 PropagateHash(B256),
2068 PropagateHashesTo(Vec<B256>, PeerId),
2070 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2072 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2074 PropagateTransactions(Vec<TxHash>),
2076 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2078 GetTransactionHashes {
2080 peers: Vec<PeerId>,
2081 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2082 },
2083 GetPeerSender {
2085 peer_id: PeerId,
2086 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2087 },
2088}
2089
2090#[derive(Debug)]
2092pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2093 IncomingTransactions {
2097 peer_id: PeerId,
2099 msg: Transactions<N::BroadcastedTransaction>,
2101 },
2102 IncomingPooledTransactionHashes {
2104 peer_id: PeerId,
2106 msg: NewPooledTransactionHashes,
2108 },
2109 GetPooledTransactions {
2111 peer_id: PeerId,
2113 request: GetPooledTransactions,
2115 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2117 },
2118 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2120}
2121
2122#[derive(Debug)]
2124pub struct PendingPoolImportsInfo {
2125 pending_pool_imports: Arc<AtomicUsize>,
2127 max_pending_pool_imports: usize,
2129}
2130
2131impl PendingPoolImportsInfo {
2132 pub fn new(max_pending_pool_imports: usize) -> Self {
2134 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2135 }
2136
2137 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2139 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2140 }
2141}
2142
2143impl Default for PendingPoolImportsInfo {
2144 fn default() -> Self {
2145 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2146 }
2147}
2148
2149#[derive(Debug, Default)]
2150struct TxManagerPollDurations {
2151 acc_network_events: Duration,
2152 acc_pending_imports: Duration,
2153 acc_tx_events: Duration,
2154 acc_imported_txns: Duration,
2155 acc_fetch_events: Duration,
2156 acc_pending_fetch: Duration,
2157 acc_cmds: Duration,
2158}
2159
2160#[cfg(test)]
2161mod tests {
2162 use super::*;
2163 use crate::{
2164 test_utils::{
2165 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2166 Testnet,
2167 },
2168 transactions::config::RelaxedEthAnnouncementFilter,
2169 NetworkConfigBuilder, NetworkManager,
2170 };
2171 use alloy_consensus::{TxEip1559, TxLegacy};
2172 use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2173 use alloy_rlp::Decodable;
2174 use futures::FutureExt;
2175 use reth_chainspec::MIN_TRANSACTION_GAS;
2176 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2177 use reth_network_api::{NetworkInfo, PeerKind};
2178 use reth_network_p2p::{
2179 error::{RequestError, RequestResult},
2180 sync::{NetworkSyncUpdater, SyncState},
2181 };
2182 use reth_storage_api::noop::NoopProvider;
2183 use reth_tasks::Runtime;
2184 use reth_transaction_pool::test_utils::{
2185 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2186 };
2187 use secp256k1::SecretKey;
2188 use std::{
2189 future::poll_fn,
2190 net::{IpAddr, Ipv4Addr, SocketAddr},
2191 str::FromStr,
2192 };
2193 use tracing::error;
2194
2195 #[tokio::test(flavor = "multi_thread")]
2196 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2197 reth_tracing::init_test_tracing();
2198 let net = Testnet::create(3).await;
2199
2200 let mut handles = net.handles();
2201 let handle0 = handles.next().unwrap();
2202 let handle1 = handles.next().unwrap();
2203
2204 drop(handles);
2205 let handle = net.spawn();
2206
2207 let listener0 = handle0.event_listener();
2208 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2209 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2210
2211 let client = NoopProvider::default();
2212 let pool = testing_pool();
2213 let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2214 .disable_discovery()
2215 .listener_port(0)
2216 .build(client);
2217 let transactions_manager_config = config.transactions_manager_config.clone();
2218 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2219 .await
2220 .unwrap()
2221 .into_builder()
2222 .transactions(pool.clone(), transactions_manager_config)
2223 .split_with_handle();
2224
2225 tokio::task::spawn(network);
2226
2227 network_handle.update_sync_state(SyncState::Syncing);
2229 assert!(NetworkInfo::is_syncing(&network_handle));
2230 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2231
2232 let mut established = listener0.take(2);
2234 while let Some(ev) = established.next().await {
2235 match ev {
2236 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2237 transactions
2239 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2240 }
2241 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2242 ev => {
2243 error!("unexpected event {ev:?}")
2244 }
2245 }
2246 }
2247 let input = hex!(
2249 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2250 );
2251 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2252 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2253 peer_id: *handle1.peer_id(),
2254 msg: Transactions(vec![signed_tx.clone()]),
2255 });
2256 poll_fn(|cx| {
2257 let _ = transactions.poll_unpin(cx);
2258 Poll::Ready(())
2259 })
2260 .await;
2261 assert!(pool.is_empty());
2262 handle.terminate().await;
2263 }
2264
2265 #[tokio::test(flavor = "multi_thread")]
2266 async fn test_tx_broadcasts_through_two_syncs() {
2267 reth_tracing::init_test_tracing();
2268 let net = Testnet::create(3).await;
2269
2270 let mut handles = net.handles();
2271 let handle0 = handles.next().unwrap();
2272 let handle1 = handles.next().unwrap();
2273
2274 drop(handles);
2275 let handle = net.spawn();
2276
2277 let listener0 = handle0.event_listener();
2278 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2279 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2280
2281 let client = NoopProvider::default();
2282 let pool = testing_pool();
2283 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2284 .disable_discovery()
2285 .listener_port(0)
2286 .build(client);
2287 let transactions_manager_config = config.transactions_manager_config.clone();
2288 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2289 .await
2290 .unwrap()
2291 .into_builder()
2292 .transactions(pool.clone(), transactions_manager_config)
2293 .split_with_handle();
2294
2295 tokio::task::spawn(network);
2296
2297 network_handle.update_sync_state(SyncState::Syncing);
2299 assert!(NetworkInfo::is_syncing(&network_handle));
2300 network_handle.update_sync_state(SyncState::Idle);
2301 assert!(!NetworkInfo::is_syncing(&network_handle));
2302 network_handle.update_sync_state(SyncState::Syncing);
2303 assert!(NetworkInfo::is_syncing(&network_handle));
2304
2305 let mut established = listener0.take(2);
2307 while let Some(ev) = established.next().await {
2308 match ev {
2309 NetworkEvent::ActivePeerSession { .. } |
2310 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2311 transactions.on_network_event(ev);
2313 }
2314 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2315 _ => {
2316 error!("unexpected event {ev:?}")
2317 }
2318 }
2319 }
2320 let input = hex!(
2322 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2323 );
2324 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2325 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2326 peer_id: *handle1.peer_id(),
2327 msg: Transactions(vec![signed_tx.clone()]),
2328 });
2329 poll_fn(|cx| {
2330 let _ = transactions.poll_unpin(cx);
2331 Poll::Ready(())
2332 })
2333 .await;
2334 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2335 assert!(NetworkInfo::is_syncing(&network_handle));
2336 assert!(!pool.is_empty());
2337 handle.terminate().await;
2338 }
2339
2340 #[tokio::test(flavor = "multi_thread")]
2343 async fn test_handle_incoming_transactions_hashes() {
2344 reth_tracing::init_test_tracing();
2345
2346 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2347 let client = NoopProvider::default();
2348
2349 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2350 .listener_port(0)
2352 .disable_discovery()
2353 .build(client);
2354
2355 let pool = testing_pool();
2356
2357 let transactions_manager_config = config.transactions_manager_config.clone();
2358 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2359 .await
2360 .unwrap()
2361 .into_builder()
2362 .transactions(pool.clone(), transactions_manager_config)
2363 .split_with_handle();
2364
2365 let peer_id_1 = PeerId::new([1; 64]);
2366 let eth_version = EthVersion::Eth66;
2367
2368 let txs = vec![TransactionSigned::new_unhashed(
2369 Transaction::Legacy(TxLegacy {
2370 chain_id: Some(4),
2371 nonce: 15u64,
2372 gas_price: 2200000000,
2373 gas_limit: 34811,
2374 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2375 value: U256::from(1234u64),
2376 input: Default::default(),
2377 }),
2378 Signature::new(
2379 U256::from_str(
2380 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2381 )
2382 .unwrap(),
2383 U256::from_str(
2384 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2385 )
2386 .unwrap(),
2387 true,
2388 ),
2389 )];
2390
2391 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2392
2393 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2394 tx_manager.peers.insert(peer_id_1, peer_1);
2395
2396 assert!(pool.is_empty());
2397
2398 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2399 peer_id: peer_id_1,
2400 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2401 txs_hashes.clone(),
2402 )),
2403 });
2404
2405 let req = to_mock_session_rx
2407 .recv()
2408 .await
2409 .expect("peer_1 session should receive request with buffered hashes");
2410 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2411 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2412
2413 let message: Vec<PooledTransactionVariant> = txs
2414 .into_iter()
2415 .map(|tx| {
2416 PooledTransactionVariant::try_from(tx)
2417 .expect("Failed to convert MockTransaction to PooledTransaction")
2418 })
2419 .collect();
2420
2421 response
2423 .send(Ok(PooledTransactions(message)))
2424 .expect("should send peer_1 response to tx manager");
2425
2426 poll_fn(|cx| {
2428 let _ = tx_manager.poll_unpin(cx);
2429 Poll::Ready(())
2430 })
2431 .await;
2432
2433 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2436 }
2437
2438 #[tokio::test(flavor = "multi_thread")]
2439 async fn test_handle_incoming_transactions() {
2440 reth_tracing::init_test_tracing();
2441 let net = Testnet::create(3).await;
2442
2443 let mut handles = net.handles();
2444 let handle0 = handles.next().unwrap();
2445 let handle1 = handles.next().unwrap();
2446
2447 drop(handles);
2448 let handle = net.spawn();
2449
2450 let listener0 = handle0.event_listener();
2451
2452 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2453 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2454
2455 let client = NoopProvider::default();
2456 let pool = testing_pool();
2457 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2458 .disable_discovery()
2459 .listener_port(0)
2460 .build(client);
2461 let transactions_manager_config = config.transactions_manager_config.clone();
2462 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2463 .await
2464 .unwrap()
2465 .into_builder()
2466 .transactions(pool.clone(), transactions_manager_config)
2467 .split_with_handle();
2468 tokio::task::spawn(network);
2469
2470 network_handle.update_sync_state(SyncState::Idle);
2471
2472 assert!(!NetworkInfo::is_syncing(&network_handle));
2473
2474 let mut established = listener0.take(2);
2476 while let Some(ev) = established.next().await {
2477 match ev {
2478 NetworkEvent::ActivePeerSession { .. } |
2479 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2480 transactions.on_network_event(ev);
2482 }
2483 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2484 ev => {
2485 error!("unexpected event {ev:?}")
2486 }
2487 }
2488 }
2489 let input = hex!(
2491 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2492 );
2493 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2494 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2495 peer_id: *handle1.peer_id(),
2496 msg: Transactions(vec![signed_tx.clone()]),
2497 });
2498 assert!(transactions
2499 .transactions_by_peers
2500 .get(signed_tx.tx_hash())
2501 .unwrap()
2502 .contains(handle1.peer_id()));
2503
2504 poll_fn(|cx| {
2506 let _ = transactions.poll_unpin(cx);
2507 Poll::Ready(())
2508 })
2509 .await;
2510
2511 assert!(!pool.is_empty());
2512 assert!(pool.get(signed_tx.tx_hash()).is_some());
2513 handle.terminate().await;
2514 }
2515
2516 #[tokio::test(flavor = "multi_thread")]
2517 async fn test_session_closed_cleans_transaction_peer_state() {
2518 let (mut tx_manager, _network) = new_tx_manager().await;
2519 let peer_id = PeerId::new([1; 64]);
2520 let fallback_peer = PeerId::new([2; 64]);
2521 let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2522 let hash_shared = B256::from_slice(&[1; 32]);
2523
2524 tx_manager.peers.insert(peer_id, peer);
2525 buffer_hash_to_tx_fetcher(
2526 &mut tx_manager.transaction_fetcher,
2527 hash_shared,
2528 peer_id,
2529 0,
2530 None,
2531 );
2532 buffer_hash_to_tx_fetcher(
2533 &mut tx_manager.transaction_fetcher,
2534 hash_shared,
2535 fallback_peer,
2536 0,
2537 None,
2538 );
2539 tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2540
2541 tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2542 peer_id,
2543 reason: None,
2544 }));
2545
2546 assert!(!tx_manager.peers.contains_key(&peer_id));
2548 assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2549 assert_eq!(
2551 tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2552 Some(&fallback_peer)
2553 );
2554 }
2555
2556 #[tokio::test(flavor = "multi_thread")]
2557 async fn test_on_get_pooled_transactions_network() {
2558 reth_tracing::init_test_tracing();
2559 let net = Testnet::create(2).await;
2560
2561 let mut handles = net.handles();
2562 let handle0 = handles.next().unwrap();
2563 let handle1 = handles.next().unwrap();
2564
2565 drop(handles);
2566 let handle = net.spawn();
2567
2568 let listener0 = handle0.event_listener();
2569
2570 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2571 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2572
2573 let client = NoopProvider::default();
2574 let pool = testing_pool();
2575 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2576 .disable_discovery()
2577 .listener_port(0)
2578 .build(client);
2579 let transactions_manager_config = config.transactions_manager_config.clone();
2580 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2581 .await
2582 .unwrap()
2583 .into_builder()
2584 .transactions(pool.clone(), transactions_manager_config)
2585 .split_with_handle();
2586 tokio::task::spawn(network);
2587
2588 network_handle.update_sync_state(SyncState::Idle);
2589
2590 assert!(!NetworkInfo::is_syncing(&network_handle));
2591
2592 let mut established = listener0.take(2);
2594 while let Some(ev) = established.next().await {
2595 match ev {
2596 NetworkEvent::ActivePeerSession { .. } |
2597 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2598 transactions.on_network_event(ev);
2599 }
2600 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2601 ev => {
2602 error!("unexpected event {ev:?}")
2603 }
2604 }
2605 }
2606 handle.terminate().await;
2607
2608 let tx = MockTransaction::eip1559();
2609 let _ = transactions
2610 .pool
2611 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2612 .await;
2613
2614 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2615
2616 let (send, receive) =
2617 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2618
2619 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2620 peer_id: *handle1.peer_id(),
2621 request,
2622 response: send,
2623 });
2624
2625 match receive.await.unwrap() {
2626 Ok(PooledTransactions(transactions)) => {
2627 assert_eq!(transactions.len(), 1);
2628 }
2629 Err(e) => {
2630 panic!("error: {e:?}");
2631 }
2632 }
2633 }
2634
2635 #[tokio::test]
2639 async fn test_partially_tx_response() {
2640 reth_tracing::init_test_tracing();
2641
2642 let mut tx_manager = new_tx_manager().await.0;
2643 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2644
2645 let peer_id_1 = PeerId::new([1; 64]);
2646 let eth_version = EthVersion::Eth66;
2647
2648 let txs = vec![
2649 TransactionSigned::new_unhashed(
2650 Transaction::Legacy(TxLegacy {
2651 chain_id: Some(4),
2652 nonce: 15u64,
2653 gas_price: 2200000000,
2654 gas_limit: 34811,
2655 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2656 value: U256::from(1234u64),
2657 input: Default::default(),
2658 }),
2659 Signature::new(
2660 U256::from_str(
2661 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2662 )
2663 .unwrap(),
2664 U256::from_str(
2665 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2666 )
2667 .unwrap(),
2668 true,
2669 ),
2670 ),
2671 TransactionSigned::new_unhashed(
2672 Transaction::Eip1559(TxEip1559 {
2673 chain_id: 4,
2674 nonce: 26u64,
2675 max_priority_fee_per_gas: 1500000000,
2676 max_fee_per_gas: 1500000013,
2677 gas_limit: MIN_TRANSACTION_GAS,
2678 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2679 value: U256::from(3000000000000000000u64),
2680 input: Default::default(),
2681 access_list: Default::default(),
2682 }),
2683 Signature::new(
2684 U256::from_str(
2685 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2686 )
2687 .unwrap(),
2688 U256::from_str(
2689 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2690 )
2691 .unwrap(),
2692 true,
2693 ),
2694 ),
2695 ];
2696
2697 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2698
2699 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2700 peer_1.seen_transactions.insert(txs_hashes[0]);
2703 peer_1.seen_transactions.insert(txs_hashes[1]);
2704 tx_manager.peers.insert(peer_id_1, peer_1);
2705
2706 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2707 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2708
2709 assert!(tx_fetcher.is_idle(&peer_id_1));
2711 assert_eq!(tx_fetcher.active_peers.len(), 0);
2712
2713 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2715
2716 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2717 assert!(!tx_fetcher.is_idle(&peer_id_1));
2719 assert_eq!(tx_fetcher.active_peers.len(), 1);
2720
2721 let req = to_mock_session_rx
2723 .recv()
2724 .await
2725 .expect("peer_1 session should receive request with buffered hashes");
2726 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2727
2728 let message: Vec<PooledTransactionVariant> = txs
2729 .into_iter()
2730 .take(1)
2731 .map(|tx| {
2732 PooledTransactionVariant::try_from(tx)
2733 .expect("Failed to convert MockTransaction to PooledTransaction")
2734 })
2735 .collect();
2736 response
2738 .send(Ok(PooledTransactions(message)))
2739 .expect("should send peer_1 response to tx manager");
2740 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2741 unreachable!()
2742 };
2743
2744 assert!(tx_fetcher.is_idle(&peer_id));
2746 assert_eq!(tx_fetcher.active_peers.len(), 0);
2747 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2749 }
2750
2751 #[tokio::test]
2752 async fn test_max_retries_tx_request() {
2753 reth_tracing::init_test_tracing();
2754
2755 let mut tx_manager = new_tx_manager().await.0;
2756 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2757
2758 let peer_id_1 = PeerId::new([1; 64]);
2759 let peer_id_2 = PeerId::new([2; 64]);
2760 let eth_version = EthVersion::Eth66;
2761 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2762
2763 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2764 peer_1.seen_transactions.insert(seen_hashes[0]);
2767 peer_1.seen_transactions.insert(seen_hashes[1]);
2768 tx_manager.peers.insert(peer_id_1, peer_1);
2769
2770 let retries = 1;
2773 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2774 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2775
2776 assert!(tx_fetcher.is_idle(&peer_id_1));
2778 assert_eq!(tx_fetcher.active_peers.len(), 0);
2779
2780 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2782
2783 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2784
2785 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2786 assert!(!tx_fetcher.is_idle(&peer_id_1));
2788 assert_eq!(tx_fetcher.active_peers.len(), 1);
2789
2790 let req = to_mock_session_rx
2792 .recv()
2793 .await
2794 .expect("peer_1 session should receive request with buffered hashes");
2795 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2796 let GetPooledTransactions(hashes) = request;
2797
2798 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2799
2800 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2801
2802 response
2804 .send(Err(RequestError::BadResponse))
2805 .expect("should send peer_1 response to tx manager");
2806 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2807 unreachable!()
2808 };
2809
2810 assert!(tx_fetcher.is_idle(&peer_id));
2812 assert_eq!(tx_fetcher.active_peers.len(), 0);
2813 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2815
2816 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2817 tx_manager.peers.insert(peer_id_2, peer_2);
2818
2819 let msg =
2821 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2822 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2823
2824 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2825
2826 assert_eq!(tx_fetcher.active_peers.len(), 1);
2828
2829 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2831 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2833
2834 let req = to_mock_session_rx
2836 .recv()
2837 .await
2838 .expect("peer_2 session should receive request with buffered hashes");
2839 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2840
2841 response
2843 .send(Err(RequestError::BadResponse))
2844 .expect("should send peer_2 response to tx manager");
2845 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2846
2847 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2850 assert_eq!(tx_fetcher.active_peers.len(), 0);
2851 }
2852
2853 #[test]
2854 fn test_transaction_builder_empty() {
2855 let mut builder =
2856 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2857 assert!(builder.is_empty());
2858
2859 let mut factory = MockTransactionFactory::default();
2860 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2861 builder.push(&tx);
2862 assert!(!builder.is_empty());
2863
2864 let txs = builder.build();
2865 assert!(txs.full.is_none());
2866 let txs = txs.pooled.unwrap();
2867 assert_eq!(txs.len(), 1);
2868 }
2869
2870 #[test]
2871 fn test_transaction_builder_large() {
2872 let mut builder =
2873 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2874 assert!(builder.is_empty());
2875
2876 let mut factory = MockTransactionFactory::default();
2877 let mut tx = factory.create_eip1559();
2878 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2880 let tx = Arc::new(tx);
2881 let tx = PropagateTransaction::pool_tx(tx);
2882 builder.push(&tx);
2883 assert!(!builder.is_empty());
2884
2885 let txs = builder.clone().build();
2886 assert!(txs.pooled.is_none());
2887 let txs = txs.full.unwrap();
2888 assert_eq!(txs.len(), 1);
2889
2890 builder.push(&tx);
2891
2892 let txs = builder.clone().build();
2893 let pooled = txs.pooled.unwrap();
2894 assert_eq!(pooled.len(), 1);
2895 let txs = txs.full.unwrap();
2896 assert_eq!(txs.len(), 1);
2897 }
2898
2899 #[test]
2900 fn test_transaction_builder_eip4844() {
2901 let mut builder =
2902 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2903 assert!(builder.is_empty());
2904
2905 let mut factory = MockTransactionFactory::default();
2906 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2907 builder.push(&tx);
2908 assert!(!builder.is_empty());
2909
2910 let txs = builder.clone().build();
2911 assert!(txs.full.is_none());
2912 let txs = txs.pooled.unwrap();
2913 assert_eq!(txs.len(), 1);
2914
2915 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2916 builder.push(&tx);
2917
2918 let txs = builder.clone().build();
2919 let pooled = txs.pooled.unwrap();
2920 assert_eq!(pooled.len(), 1);
2921 let txs = txs.full.unwrap();
2922 assert_eq!(txs.len(), 1);
2923 }
2924
2925 #[tokio::test]
2926 async fn test_propagate_full() {
2927 reth_tracing::init_test_tracing();
2928
2929 let (mut tx_manager, network) = new_tx_manager().await;
2930 let peer_id = PeerId::random();
2931
2932 network.handle().update_sync_state(SyncState::Idle);
2934
2935 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2937
2938 let session_info = SessionInfo {
2939 peer_id,
2940 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2941 client_version: Arc::from(""),
2942 capabilities: Arc::new(vec![].into()),
2943 status: Arc::new(Default::default()),
2944 version: EthVersion::Eth68,
2945 peer_kind: PeerKind::Basic,
2946 };
2947 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2948 tx_manager
2949 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2950 let mut propagate = vec![];
2951 let mut factory = MockTransactionFactory::default();
2952 let eip1559_tx = Arc::new(factory.create_eip1559());
2953 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2954 let eip4844_tx = Arc::new(factory.create_eip4844());
2955 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2956
2957 let propagated =
2958 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2959 assert_eq!(propagated.len(), 2);
2960 let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
2961 assert_eq!(prop_txs.len(), 1);
2962 assert!(prop_txs[0].is_full());
2963
2964 let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
2965 assert_eq!(prop_txs.len(), 1);
2966 assert!(prop_txs[0].is_hash());
2967
2968 let peer = tx_manager.peers.get(&peer_id).unwrap();
2969 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2970 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2971 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2972
2973 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2975 assert!(propagated.is_empty());
2976 }
2977
2978 #[tokio::test]
2979 async fn test_propagate_pending_txs_while_initially_syncing() {
2980 reth_tracing::init_test_tracing();
2981
2982 let (mut tx_manager, network) = new_tx_manager().await;
2983 let peer_id = PeerId::random();
2984
2985 network.handle().update_sync_state(SyncState::Syncing);
2987 assert!(NetworkInfo::is_initially_syncing(&network.handle()));
2988
2989 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
2991 tx_manager.peers.insert(peer_id, peer);
2992
2993 let tx = MockTransaction::eip1559();
2994 tx_manager
2995 .pool
2996 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2997 .await
2998 .expect("transaction should be accepted into the pool");
2999
3000 tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3001
3002 let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3003 assert!(peer.seen_transactions.contains(tx.get_hash()));
3004 }
3005
3006 #[tokio::test]
3007 async fn test_relaxed_filter_ignores_unknown_tx_types() {
3008 reth_tracing::init_test_tracing();
3009
3010 let transactions_manager_config = TransactionsManagerConfig::default();
3011
3012 let propagation_policy = TransactionPropagationKind::default();
3013 let announcement_policy = RelaxedEthAnnouncementFilter::default();
3014
3015 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3016
3017 let pool = testing_pool();
3018 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3019 let client = NoopProvider::default();
3020
3021 let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3022 .listener_port(0)
3023 .disable_discovery()
3024 .build(client.clone());
3025
3026 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3027 let (to_tx_manager_tx, from_network_rx) =
3028 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
3029 network_manager.set_transactions(to_tx_manager_tx);
3030 let network_handle = network_manager.handle().clone();
3031 let network_service_handle = tokio::spawn(network_manager);
3032
3033 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3034 network_handle.clone(),
3035 pool.clone(),
3036 from_network_rx,
3037 transactions_manager_config,
3038 policy_bundle,
3039 );
3040
3041 let peer_id = PeerId::random();
3042 let eth_version = EthVersion::Eth68;
3043 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3044 tx_manager.peers.insert(peer_id, mock_peer_metadata);
3045
3046 let mut tx_factory = MockTransactionFactory::default();
3047
3048 let valid_known_tx = tx_factory.create_eip1559();
3049 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3050
3051 let known_tx_hash = *known_tx_signed.hash();
3052 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3053 let known_tx_size = known_tx_signed.encoded_length();
3054
3055 let unknown_tx_hash = B256::random();
3056 let unknown_tx_type_byte = 0xff_u8;
3057 let unknown_tx_size = 150;
3058
3059 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3060 types: vec![known_tx_type_byte, unknown_tx_type_byte],
3061 sizes: vec![known_tx_size, unknown_tx_size],
3062 hashes: vec![known_tx_hash, unknown_tx_hash],
3063 });
3064
3065 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3066
3067 poll_fn(|cx| {
3068 let _ = tx_manager.poll_unpin(cx);
3069 Poll::Ready(())
3070 })
3071 .await;
3072
3073 let mut requested_hashes_in_getpooled = HashSet::new();
3074 let mut unexpected_request_received = false;
3075
3076 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3077 .await
3078 {
3079 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3080 let GetPooledTransactions(hashes) = request;
3081 for hash in hashes {
3082 requested_hashes_in_getpooled.insert(hash);
3083 }
3084 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3085 }
3086 Ok(Some(other_request)) => {
3087 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3088 unexpected_request_received = true;
3089 }
3090 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3091 Err(_timeout_err) => {
3092 tracing::info!("Timeout: No GetPooledTransactions request received.")
3093 }
3094 }
3095
3096 assert!(
3097 requested_hashes_in_getpooled.contains(&known_tx_hash),
3098 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3099 );
3100 assert!(
3101 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3102 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3103 );
3104 assert!(
3105 !unexpected_request_received,
3106 "An unexpected P2P request was received by the mock peer."
3107 );
3108
3109 network_service_handle.abort();
3110 }
3111}