1use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6pub mod config;
8pub mod constants;
10pub mod fetcher;
12pub mod policy;
14
15pub use self::constants::{
16 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30 budget::{
31 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33 },
34 cache::LruCache,
35 duration_metered_exec, metered_poll_nested_stream_with_budget,
36 metrics::{
37 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38 },
39 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40 NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49 RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54 events::{PeerEvent, SessionInfo},
55 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58 error::{RequestError, RequestResult},
59 sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66 error::{PoolError, PoolResult},
67 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71 collections::{hash_map::Entry, HashMap, HashSet},
72 pin::Pin,
73 sync::{
74 atomic::{AtomicUsize, Ordering},
75 Arc,
76 },
77 task::{Context, Poll},
78 time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84pub type PoolImportFuture =
88 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104 fn send(&self, cmd: TransactionsCommand<N>) {
105 let _ = self.manager_tx.send(cmd);
106 }
107
108 async fn peer_handle(
110 &self,
111 peer_id: PeerId,
112 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113 let (tx, rx) = oneshot::channel();
114 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115 rx.await
116 }
117
118 pub fn propagate(&self, hash: TxHash) {
120 self.send(TransactionsCommand::PropagateHash(hash))
121 }
122
123 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127 self.propagate_hashes_to(Some(hash), peer)
128 }
129
130 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134 let hashes = hash.into_iter().collect::<Vec<_>>();
135 if hashes.is_empty() {
136 return
137 }
138 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139 }
140
141 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143 let (tx, rx) = oneshot::channel();
144 self.send(TransactionsCommand::GetActivePeers(tx));
145 rx.await
146 }
147
148 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152 if transactions.is_empty() {
153 return
154 }
155 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156 }
157
158 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163 if transactions.is_empty() {
164 return
165 }
166 self.send(TransactionsCommand::PropagateTransactions(transactions))
167 }
168
169 pub fn broadcast_transactions(
174 &self,
175 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176 ) {
177 let transactions =
178 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179 if transactions.is_empty() {
180 return
181 }
182 self.send(TransactionsCommand::BroadcastTransactions(transactions))
183 }
184
185 pub async fn get_transaction_hashes(
187 &self,
188 peers: Vec<PeerId>,
189 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190 if peers.is_empty() {
191 return Ok(Default::default())
192 }
193 let (tx, rx) = oneshot::channel();
194 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195 rx.await
196 }
197
198 pub async fn get_peer_transaction_hashes(
200 &self,
201 peer: PeerId,
202 ) -> Result<HashSet<TxHash>, RecvError> {
203 let res = self.get_transaction_hashes(vec![peer]).await?;
204 Ok(res.into_values().next().unwrap_or_default())
205 }
206
207 pub async fn get_pooled_transactions_from(
213 &self,
214 peer_id: PeerId,
215 hashes: Vec<B256>,
216 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219 let (tx, rx) = oneshot::channel();
220 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221 peer.try_send(request).ok();
222
223 rx.await?.map(|res| Some(res.0))
224 }
225}
226
227#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284 pool: Pool,
286 network: NetworkHandle<N>,
288 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292 transaction_fetcher: TransactionFetcher<N>,
294 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299 pool_imports: FuturesUnordered<PoolImportFuture>,
311 pending_pool_imports_info: PendingPoolImportsInfo,
313 bad_imports: LruCache<TxHash>,
315 peers: HashMap<PeerId, PeerMetadata<N>>,
317 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326 pending_transactions: mpsc::Receiver<TxHash>,
335 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337 config: TransactionsManagerConfig,
339 policies: NetworkPolicies<N>,
341 metrics: TransactionsManagerMetrics,
343 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348 pub fn new(
352 network: NetworkHandle<N>,
353 pool: Pool,
354 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355 transactions_manager_config: TransactionsManagerConfig,
356 ) -> Self {
357 Self::with_policy(
358 network,
359 pool,
360 from_network,
361 transactions_manager_config,
362 NetworkPolicies::new(
363 TransactionPropagationKind::default(),
364 StrictEthAnnouncementFilter::default(),
365 ),
366 )
367 }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371 pub fn with_policy(
375 network: NetworkHandle<N>,
376 pool: Pool,
377 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378 transactions_manager_config: TransactionsManagerConfig,
379 policies: NetworkPolicies<N>,
380 ) -> Self {
381 let network_events = network.event_listener();
382
383 let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386 &transactions_manager_config.transaction_fetcher_config,
387 );
388
389 let pending = pool.pending_transactions_listener();
392 let pending_pool_imports_info =
393 PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
394 let metrics = TransactionsManagerMetrics::default();
395 metrics
396 .capacity_pending_pool_imports
397 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
398
399 Self {
400 pool,
401 network,
402 network_events,
403 transaction_fetcher,
404 transactions_by_peers: Default::default(),
405 pool_imports: Default::default(),
406 pending_pool_imports_info,
407 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
408 peers: Default::default(),
409 command_tx,
410 command_rx: UnboundedReceiverStream::new(command_rx),
411 pending_transactions: pending,
412 transaction_events: UnboundedMeteredReceiver::new(
413 from_network,
414 NETWORK_POOL_TRANSACTIONS_SCOPE,
415 ),
416 config: transactions_manager_config,
417 policies,
418 metrics,
419 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
420 }
421 }
422
423 pub fn handle(&self) -> TransactionsHandle<N> {
425 TransactionsHandle { manager_tx: self.command_tx.clone() }
426 }
427
428 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
431 self.has_capacity_for_pending_pool_imports() &&
432 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
433 }
434
435 fn has_capacity_for_pending_pool_imports(&self) -> bool {
437 self.remaining_pool_import_capacity() > 0
438 }
439
440 fn remaining_pool_import_capacity(&self) -> usize {
442 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
443 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
444 )
445 }
446
447 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
448 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
449 self.metrics.reported_bad_transactions.increment(1);
450 }
451
452 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
453 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
454 self.network.reputation_change(peer_id, kind);
455 }
456
457 fn report_already_seen(&self, peer_id: PeerId) {
458 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
459 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
460 }
461
462 fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
464 if let Some(mut peer) = self.peers.remove(peer_id) {
465 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
466 }
467 self.transaction_fetcher.remove_peer(peer_id);
468 }
469
470 fn on_good_import(&mut self, hash: TxHash) {
472 self.transactions_by_peers.remove(&hash);
473 }
474
475 fn on_bad_import(&mut self, err: PoolError) {
505 let peers = self.transactions_by_peers.remove(&err.hash);
506
507 if err.is_bad_blob_sidecar() {
508 if let Some(peers) = peers {
512 for peer_id in peers {
513 self.report_peer_bad_transactions(peer_id);
514 }
515 }
516 return
517 }
518
519 if !err.is_bad_transaction() || self.network.is_syncing() {
521 return
522 }
523 if let Some(peers) = peers {
526 for peer_id in peers {
527 self.report_peer_bad_transactions(peer_id);
528 }
529 }
530 self.metrics.bad_imports.increment(1);
531 self.bad_imports.insert(err.hash);
532 }
533
534 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
538 let info = &self.pending_pool_imports_info;
540 let max_pending_pool_imports = info.max_pending_pool_imports;
541 let has_capacity_wrt_pending_pool_imports =
542 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
543
544 self.transaction_fetcher
545 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
546 }
547
548 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
549 let kind = match req_err {
550 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
551 RequestError::Timeout => ReputationChangeKind::Timeout,
552 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
553 return
555 }
556 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
557 };
558 self.report_peer(peer_id, kind);
559 }
560
561 #[inline]
562 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
563 let metrics = &self.metrics;
564
565 let TxManagerPollDurations {
566 acc_network_events,
567 acc_pending_imports,
568 acc_tx_events,
569 acc_imported_txns,
570 acc_fetch_events,
571 acc_pending_fetch,
572 acc_cmds,
573 } = poll_durations;
574
575 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
577 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
579 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
580 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
581 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
582 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
583 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
584 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
585 }
586}
587
588impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
589 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
591 for res in batch_results {
592 match res {
593 Ok(AddedTransactionOutcome { hash, .. }) => {
594 self.on_good_import(hash);
595 }
596 Err(err) => {
597 self.on_bad_import(err);
598 }
599 }
600 }
601 }
602
603 fn on_new_pooled_transaction_hashes(
605 &mut self,
606 peer_id: PeerId,
607 msg: NewPooledTransactionHashes,
608 ) {
609 if self.network.is_initially_syncing() {
611 return
612 }
613 if self.network.tx_gossip_disabled() {
614 return
615 }
616
617 let Some(peer) = self.peers.get_mut(&peer_id) else {
619 trace!(
620 peer_id = format!("{peer_id:#}"),
621 ?msg,
622 "discarding announcement from inactive peer"
623 );
624
625 return
626 };
627 let client = peer.client_version.clone();
628
629 let mut count_txns_already_seen_by_peer = 0;
631 for tx in msg.iter_hashes().copied() {
632 if !peer.seen_transactions.insert(tx) {
633 count_txns_already_seen_by_peer += 1;
634 }
635 }
636 if count_txns_already_seen_by_peer > 0 {
637 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
642 self.metrics
643 .occurrences_hash_already_seen_by_peer
644 .increment(count_txns_already_seen_by_peer);
645
646 trace!(target: "net::tx",
647 %count_txns_already_seen_by_peer,
648 peer_id=format!("{peer_id:#}"),
649 ?client,
650 "Peer sent hashes that have already been marked as seen by peer"
651 );
652
653 self.report_already_seen(peer_id);
654 }
655
656 if msg.is_empty() {
658 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
659 return;
660 }
661
662 let original_len = msg.len();
663 let mut partially_valid_msg = msg.dedup();
664
665 if partially_valid_msg.len() != original_len {
666 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
667 }
668
669 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
671
672 let mut should_report_peer = false;
679 let mut tx_types_counter = TxTypesCounter::default();
680
681 let is_eth68_message = partially_valid_msg
682 .msg_version()
683 .expect("partially valid announcement should have a version")
684 .is_eth68();
685
686 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
687 let (ty_byte, size_val) = match *metadata_ref_mut {
688 Some((ty, size)) => {
689 if !is_eth68_message {
690 should_report_peer = true;
691 }
692 (ty, size)
693 }
694 None => {
695 if is_eth68_message {
696 should_report_peer = true;
697 return false;
698 }
699 (0u8, 0)
700 }
701 };
702
703 if is_eth68_message &&
704 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
705 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
706 {
707 tx_types_counter.increase_by_tx_type(parsed_tx_type);
708 }
709
710 let decision = self
711 .policies
712 .announcement_filter()
713 .decide_on_announcement(ty_byte, tx_hash, size_val);
714
715 match decision {
716 AnnouncementAcceptance::Accept => true,
717 AnnouncementAcceptance::Ignore => false,
718 AnnouncementAcceptance::Reject { penalize_peer } => {
719 if penalize_peer {
720 should_report_peer = true;
721 }
722 false
723 }
724 }
725 });
726
727 if is_eth68_message {
728 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
729 }
730
731 if should_report_peer {
732 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
733 }
734
735 let hashes_count_pre_pool_filter = partially_valid_msg.len();
743 self.pool.retain_unknown(&mut partially_valid_msg);
744 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
745 let already_known_hashes_count =
746 hashes_count_pre_pool_filter - partially_valid_msg.len();
747 self.metrics
748 .occurrences_hashes_already_in_pool
749 .increment(already_known_hashes_count as u64);
750 }
751
752 if partially_valid_msg.is_empty() {
753 return
755 }
756
757 let mut valid_announcement_data =
758 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
759
760 if valid_announcement_data.is_empty() {
761 return
763 }
764
765 let bad_imports = &self.bad_imports;
772 self.transaction_fetcher.filter_unseen_and_pending_hashes(
773 &mut valid_announcement_data,
774 |hash| bad_imports.contains(hash),
775 &peer_id,
776 &client,
777 );
778
779 if valid_announcement_data.is_empty() {
780 return
782 }
783
784 trace!(target: "net::tx::propagation",
785 peer_id=format!("{peer_id:#}"),
786 hashes_len=valid_announcement_data.len(),
787 hashes=?valid_announcement_data.keys(),
788 msg_version=%valid_announcement_data.msg_version(),
789 client_version=%client,
790 "received previously unseen and pending hashes in announcement from peer"
791 );
792
793 if !self.transaction_fetcher.is_idle(&peer_id) {
796 let msg_version = valid_announcement_data.msg_version();
798 let (hashes, _version) = valid_announcement_data.into_request_hashes();
799
800 trace!(target: "net::tx",
801 peer_id=format!("{peer_id:#}"),
802 hashes=?*hashes,
803 %msg_version,
804 %client,
805 "buffering hashes announced by busy peer"
806 );
807
808 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
809
810 return
811 }
812
813 let mut hashes_to_request =
814 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
815 let surplus_hashes =
816 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
817
818 if !surplus_hashes.is_empty() {
819 trace!(target: "net::tx",
820 peer_id=format!("{peer_id:#}"),
821 surplus_hashes=?*surplus_hashes,
822 %client,
823 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
824 );
825
826 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
827 }
828
829 trace!(target: "net::tx",
830 peer_id=format!("{peer_id:#}"),
831 hashes=?*hashes_to_request,
832 %client,
833 "sending hashes in `GetPooledTransactions` request to peer's session"
834 );
835
836 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
840 if let Some(failed_to_request_hashes) =
841 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
842 {
843 let conn_eth_version = peer.version;
844
845 trace!(target: "net::tx",
846 peer_id=format!("{peer_id:#}"),
847 failed_to_request_hashes=?*failed_to_request_hashes,
848 %conn_eth_version,
849 %client,
850 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
851 );
852 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
853 }
854 }
855}
856
857impl<Pool, N> TransactionsManager<Pool, N>
858where
859 Pool: TransactionPool + Unpin + 'static,
860 N: NetworkPrimitives<
861 BroadcastedTransaction: SignedTransaction,
862 PooledTransaction: SignedTransaction,
863 > + Unpin,
864 Pool::Transaction:
865 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
866{
867 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
879 if self.network.tx_gossip_disabled() {
883 return
884 }
885
886 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
887
888 self.propagate_all(hashes);
889 }
890
891 fn propagate_full_transactions_to_peer(
895 &mut self,
896 txs: Vec<TxHash>,
897 peer_id: PeerId,
898 propagation_mode: PropagationMode,
899 ) -> Option<PropagatedTransactions> {
900 let peer = self.peers.get_mut(&peer_id)?;
901 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
902 let mut propagated = PropagatedTransactions::default();
903
904 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
906
907 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
908
909 if propagation_mode.is_forced() {
910 full_transactions.extend(to_propagate);
912 } else {
913 for tx in to_propagate {
916 if !peer.seen_transactions.contains(tx.tx_hash()) {
917 full_transactions.push(&tx);
919 }
920 }
921 }
922
923 if full_transactions.is_empty() {
924 return None
926 }
927
928 let PropagateTransactions { pooled, full } = full_transactions.build();
929
930 if let Some(new_pooled_hashes) = pooled {
932 for hash in new_pooled_hashes.iter_hashes().copied() {
933 propagated.record(hash, PropagateKind::Hash(peer_id));
934 peer.seen_transactions.insert(hash);
936 }
937
938 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
940 }
941
942 if let Some(new_full_transactions) = full {
944 for tx in &new_full_transactions {
945 propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
946 peer.seen_transactions.insert(*tx.tx_hash());
948 }
949
950 self.network.send_transactions(peer_id, new_full_transactions);
952 }
953
954 self.metrics.propagated_transactions.increment(propagated.len() as u64);
956
957 Some(propagated)
958 }
959
960 fn propagate_hashes_to(
964 &mut self,
965 hashes: Vec<TxHash>,
966 peer_id: PeerId,
967 propagation_mode: PropagationMode,
968 ) {
969 trace!(target: "net::tx", "Start propagating transactions as hashes");
970
971 let propagated = {
974 let Some(peer) = self.peers.get_mut(&peer_id) else {
975 return
977 };
978
979 let to_propagate =
980 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
981
982 let mut propagated = PropagatedTransactions::default();
983
984 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
986
987 if propagation_mode.is_forced() {
988 hashes.extend(to_propagate)
989 } else {
990 for tx in to_propagate {
991 if !peer.seen_transactions.contains(tx.tx_hash()) {
992 hashes.push(&tx);
994 }
995 }
996 }
997
998 let new_pooled_hashes = hashes.build();
999
1000 if new_pooled_hashes.is_empty() {
1001 return
1003 }
1004
1005 if let Some(peer) = self.peers.get_mut(&peer_id) {
1006 for hash in new_pooled_hashes.iter_hashes().copied() {
1007 propagated.record(hash, PropagateKind::Hash(peer_id));
1008 peer.seen_transactions.insert(hash);
1009 }
1010 }
1011
1012 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1013
1014 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1016
1017 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1019
1020 propagated
1021 };
1022
1023 self.pool.on_propagated(propagated);
1025 }
1026
1027 fn propagate_transactions(
1034 &mut self,
1035 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1036 propagation_mode: PropagationMode,
1037 ) -> PropagatedTransactions {
1038 let mut propagated = PropagatedTransactions::default();
1039 if self.network.tx_gossip_disabled() {
1040 return propagated
1041 }
1042
1043 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1045
1046 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1048 if !self.policies.propagation_policy().can_propagate(peer) {
1049 continue
1051 }
1052 let mut builder = if peer_idx > max_num_full {
1054 PropagateTransactionsBuilder::pooled(peer.version)
1055 } else {
1056 PropagateTransactionsBuilder::full(peer.version)
1057 };
1058
1059 if propagation_mode.is_forced() {
1060 builder.extend(to_propagate.iter());
1061 } else {
1062 for tx in &to_propagate {
1066 if !peer.seen_transactions.contains(tx.tx_hash()) {
1069 builder.push(tx);
1070 }
1071 }
1072 }
1073
1074 if builder.is_empty() {
1075 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1076 continue
1077 }
1078
1079 let PropagateTransactions { pooled, full } = builder.build();
1080
1081 if let Some(mut new_pooled_hashes) = pooled {
1083 new_pooled_hashes
1086 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1087
1088 for hash in new_pooled_hashes.iter_hashes().copied() {
1089 propagated.record(hash, PropagateKind::Hash(*peer_id));
1090 peer.seen_transactions.insert(hash);
1092 }
1093
1094 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1095
1096 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1098 }
1099
1100 if let Some(new_full_transactions) = full {
1102 for tx in &new_full_transactions {
1103 propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1104 peer.seen_transactions.insert(*tx.tx_hash());
1106 }
1107
1108 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1109
1110 self.network.send_transactions(*peer_id, new_full_transactions);
1112 }
1113 }
1114
1115 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1117
1118 propagated
1119 }
1120
1121 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1126 if self.peers.is_empty() {
1127 return
1129 }
1130 let propagated = self.propagate_transactions(
1131 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1132 PropagationMode::Basic,
1133 );
1134
1135 self.pool.on_propagated(propagated);
1137 }
1138
1139 fn on_get_pooled_transactions(
1141 &mut self,
1142 peer_id: PeerId,
1143 request: GetPooledTransactions,
1144 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1145 ) {
1146 if self.network.tx_gossip_disabled() {
1148 let _ = response.send(Ok(PooledTransactions::default()));
1149 return
1150 }
1151 if let Some(peer) = self.peers.get_mut(&peer_id) {
1152 let transactions = self.pool.get_pooled_transaction_elements(
1153 request.0,
1154 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1155 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1156 ),
1157 );
1158 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1159
1160 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1163
1164 let resp = PooledTransactions(transactions);
1165 let _ = response.send(Ok(resp));
1166 }
1167 }
1168
1169 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1171 match cmd {
1172 TransactionsCommand::PropagateHash(hash) => {
1173 self.on_new_pending_transactions(vec![hash])
1174 }
1175 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1176 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1177 }
1178 TransactionsCommand::GetActivePeers(tx) => {
1179 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1180 tx.send(peers).ok();
1181 }
1182 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1183 if let Some(propagated) =
1184 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1185 {
1186 self.pool.on_propagated(propagated);
1187 }
1188 }
1189 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1190 TransactionsCommand::BroadcastTransactions(txs) => {
1191 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1192 self.pool.on_propagated(propagated);
1193 }
1194 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1195 let mut res = HashMap::with_capacity(peers.len());
1196 for peer_id in peers {
1197 let hashes = self
1198 .peers
1199 .get(&peer_id)
1200 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1201 .unwrap_or_default();
1202 res.insert(peer_id, hashes);
1203 }
1204 tx.send(res).ok();
1205 }
1206 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1207 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1208 peer_request_sender.send(sender).ok();
1209 }
1210 }
1211 }
1212
1213 fn handle_peer_session(
1217 &mut self,
1218 info: SessionInfo,
1219 messages: PeerRequestSender<PeerRequest<N>>,
1220 ) {
1221 let SessionInfo { peer_id, client_version, version, .. } = info;
1222
1223 let peer = PeerMetadata::<N>::new(
1225 messages,
1226 version,
1227 client_version,
1228 self.config.max_transactions_seen_by_peer_history,
1229 info.peer_kind,
1230 );
1231 let peer = match self.peers.entry(peer_id) {
1232 Entry::Occupied(mut entry) => {
1233 entry.insert(peer);
1234 entry.into_mut()
1235 }
1236 Entry::Vacant(entry) => entry.insert(peer),
1237 };
1238
1239 self.policies.propagation_policy_mut().on_session_established(peer);
1240
1241 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1245 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1246 return
1247 }
1248
1249 let pooled_txs = self.pool.pooled_transactions_max(
1251 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1252 );
1253 if pooled_txs.is_empty() {
1254 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1255 return;
1256 }
1257
1258 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1260 for pooled_tx in pooled_txs {
1261 peer.seen_transactions.insert(*pooled_tx.hash());
1262 msg_builder.push_pooled(pooled_tx);
1263 }
1264
1265 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1266 let msg = msg_builder.build();
1267 self.network.send_transactions_hashes(peer_id, msg);
1268 }
1269
1270 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1272 match event_result {
1273 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1274 self.on_peer_session_closed(&peer_id);
1275 }
1276 NetworkEvent::ActivePeerSession { info, messages } => {
1277 self.handle_peer_session(info, messages);
1279 }
1280 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1281 let peer_id = info.peer_id;
1282 let messages = match self.peers.get(&peer_id) {
1284 Some(p) => p.request_tx.clone(),
1285 None => {
1286 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1287 return;
1288 }
1289 };
1290 self.handle_peer_session(info, messages);
1291 }
1292 _ => {}
1293 }
1294 }
1295
1296 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1298 if self.config.ingress_policy.allows_all() {
1299 return true;
1300 }
1301 let Some(peer) = self.peers.get(peer_id) else {
1302 return false;
1303 };
1304 self.config.ingress_policy.allows(peer.peer_kind())
1305 }
1306
1307 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1309 match event {
1310 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1311 if !self.accepts_incoming_from(&peer_id) {
1312 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1313 return;
1314 }
1315
1316 let has_blob_txs = msg.has_eip4844();
1320
1321 let non_blob_txs = msg
1322 .into_iter()
1323 .map(N::PooledTransaction::try_from)
1324 .filter_map(Result::ok)
1325 .collect();
1326
1327 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1328
1329 if has_blob_txs {
1330 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1331 self.report_peer_bad_transactions(peer_id);
1332 }
1333 }
1334 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1335 if !self.accepts_incoming_from(&peer_id) {
1336 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1337 return;
1338 }
1339 self.on_new_pooled_transaction_hashes(peer_id, msg)
1340 }
1341 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1342 self.on_get_pooled_transactions(peer_id, request, response)
1343 }
1344 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1345 let _ = response.send(Some(self.handle()));
1346 }
1347 }
1348 }
1349
1350 fn import_transactions(
1352 &mut self,
1353 peer_id: PeerId,
1354 transactions: PooledTransactions<N::PooledTransaction>,
1355 source: TransactionSource,
1356 ) {
1357 if self.network.is_initially_syncing() {
1359 return
1360 }
1361 if self.network.tx_gossip_disabled() {
1362 return
1363 }
1364
1365 if !self.has_capacity_for_pending_pool_imports() {
1367 return
1368 }
1369
1370 let mut transactions = transactions.0;
1371
1372 let capacity = self.remaining_pool_import_capacity();
1376 if transactions.len() > capacity {
1377 let skipped = transactions.len() - capacity;
1378 transactions.truncate(capacity);
1379 self.metrics
1380 .skipped_transactions_pending_pool_imports_at_capacity
1381 .increment(skipped as u64);
1382 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1383 }
1384
1385 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1386 let client_version = peer.client_version.clone();
1387
1388 let start = Instant::now();
1389
1390 self.transaction_fetcher
1392 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1393
1394 let mut num_already_seen_by_peer = 0;
1399 for tx in &transactions {
1400 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1401 num_already_seen_by_peer += 1;
1402 }
1403 }
1404
1405 let mut has_bad_transactions = false;
1407
1408 transactions.retain(|tx| {
1411 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1412 entry.get_mut().insert(peer_id);
1413 return false
1414 }
1415 if self.bad_imports.contains(tx.tx_hash()) {
1416 trace!(target: "net::tx",
1417 peer_id=format!("{peer_id:#}"),
1418 hash=%tx.tx_hash(),
1419 %client_version,
1420 "received a known bad transaction from peer"
1421 );
1422 has_bad_transactions = true;
1423 return false;
1424 }
1425 true
1426 });
1427
1428 let txns_count_pre_pool_filter = transactions.len();
1430 self.pool.retain_unknown(&mut transactions);
1431 if txns_count_pre_pool_filter > transactions.len() {
1432 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1433 self.metrics
1434 .occurrences_transactions_already_in_pool
1435 .increment(already_known_txns_count as u64);
1436 }
1437
1438 let txs_len = transactions.len();
1439
1440 let new_txs = transactions
1441 .into_par_iter()
1442 .filter_map(|tx| match tx.try_into_recovered() {
1443 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1444 Err(badtx) => {
1445 trace!(target: "net::tx",
1446 peer_id=format!("{peer_id:#}"),
1447 hash=%badtx.tx_hash(),
1448 client_version=%client_version,
1449 "failed ecrecovery for transaction"
1450 );
1451 None
1452 }
1453 })
1454 .collect::<Vec<_>>();
1455
1456 has_bad_transactions |= new_txs.len() != txs_len;
1457
1458 for tx in &new_txs {
1460 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1461 }
1462
1463 if !new_txs.is_empty() {
1466 let pool = self.pool.clone();
1467 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1469 metric_pending_pool_imports.increment(new_txs.len() as f64);
1470
1471 self.pending_pool_imports_info
1473 .pending_pool_imports
1474 .fetch_add(new_txs.len(), Ordering::Relaxed);
1475 let tx_manager_info_pending_pool_imports =
1476 self.pending_pool_imports_info.pending_pool_imports.clone();
1477
1478 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1479 let import = Box::pin(async move {
1480 let added = new_txs.len();
1481 let res = pool.add_external_transactions(new_txs).await;
1482
1483 metric_pending_pool_imports.decrement(added as f64);
1485 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1487
1488 res
1489 });
1490
1491 self.pool_imports.push(import);
1492 }
1493
1494 if num_already_seen_by_peer > 0 {
1495 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1496 self.metrics
1497 .occurrences_of_transaction_already_seen_by_peer
1498 .increment(num_already_seen_by_peer);
1499 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1500 }
1501
1502 if has_bad_transactions {
1503 self.report_peer_bad_transactions(peer_id)
1505 }
1506
1507 if num_already_seen_by_peer > 0 {
1508 self.report_already_seen(peer_id);
1509 }
1510
1511 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1512 }
1513
1514 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1516 match fetch_event {
1517 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1518 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1519 if report_peer {
1520 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1521 }
1522 }
1523 FetchEvent::FetchError { peer_id, error } => {
1524 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1525 self.on_request_error(peer_id, error);
1526 }
1527 FetchEvent::EmptyResponse { peer_id } => {
1528 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1529 }
1530 }
1531 }
1532}
1533
1534impl<
1542 Pool: TransactionPool + Unpin + 'static,
1543 N: NetworkPrimitives<
1544 BroadcastedTransaction: SignedTransaction,
1545 PooledTransaction: SignedTransaction,
1546 > + Unpin,
1547 > Future for TransactionsManager<Pool, N>
1548where
1549 Pool::Transaction:
1550 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1551{
1552 type Output = ();
1553
1554 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1555 let start = Instant::now();
1556 let mut poll_durations = TxManagerPollDurations::default();
1557
1558 let this = self.get_mut();
1559
1560 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1566 poll_durations.acc_network_events,
1567 "net::tx",
1568 "Network events stream",
1569 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1570 this.network_events.poll_next_unpin(cx),
1571 |event| this.on_network_event(event)
1572 );
1573
1574 let mut new_txs = Vec::new();
1583 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1584 cx,
1585 &mut new_txs,
1586 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1587 ) {
1588 Poll::Ready(count) => {
1589 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1590 true
1593 } else {
1594 let limit =
1598 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1599 new_txs.len();
1600 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1601 }
1602 }
1603 Poll::Pending => false,
1604 };
1605 if !new_txs.is_empty() {
1606 this.on_new_pending_transactions(new_txs);
1607 }
1608
1609 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1624 poll_durations.acc_tx_events,
1625 "net::tx",
1626 "Network transaction events stream",
1627 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1628 this.transaction_events.poll_next_unpin(cx),
1629 |event| this.on_network_tx_event(event),
1630 );
1631
1632 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1643 poll_durations.acc_fetch_events,
1644 "net::tx",
1645 "Transaction fetch events stream",
1646 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1647 this.transaction_fetcher.poll_next_unpin(cx),
1648 |event| this.on_fetch_event(event),
1649 );
1650
1651 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1666 poll_durations.acc_pending_imports,
1667 "net::tx",
1668 "Batched pool imports stream",
1669 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1670 this.pool_imports.poll_next_unpin(cx),
1671 |batch_results| this.on_batch_import_result(batch_results)
1672 );
1673
1674 duration_metered_exec!(
1679 {
1680 if this.has_capacity_for_fetching_pending_hashes() &&
1681 this.on_fetch_hashes_pending_fetch()
1682 {
1683 maybe_more_tx_fetch_events = true;
1684 }
1685 },
1686 poll_durations.acc_pending_fetch
1687 );
1688
1689 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1691 poll_durations.acc_cmds,
1692 "net::tx",
1693 "Commands channel",
1694 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1695 this.command_rx.poll_next_unpin(cx),
1696 |cmd| this.on_command(cmd)
1697 );
1698
1699 this.transaction_fetcher.update_metrics();
1700
1701 if maybe_more_network_events ||
1703 maybe_more_commands ||
1704 maybe_more_tx_events ||
1705 maybe_more_tx_fetch_events ||
1706 maybe_more_pool_imports ||
1707 maybe_more_pending_txns
1708 {
1709 cx.waker().wake_by_ref();
1711 return Poll::Pending
1712 }
1713
1714 this.update_poll_metrics(start, poll_durations);
1715
1716 Poll::Pending
1717 }
1718}
1719
1720#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1724enum PropagationMode {
1725 Basic,
1729 Forced,
1734}
1735
1736impl PropagationMode {
1737 const fn is_forced(self) -> bool {
1739 matches!(self, Self::Forced)
1740 }
1741}
1742
1743#[derive(Debug, Clone)]
1745struct PropagateTransaction<T = TransactionSigned> {
1746 size: usize,
1747 transaction: Arc<T>,
1748}
1749
1750impl<T: SignedTransaction> PropagateTransaction<T> {
1751 pub fn new(transaction: T) -> Self {
1753 let size = transaction.length();
1754 Self { size, transaction: Arc::new(transaction) }
1755 }
1756
1757 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1759 where
1760 P: PoolTransaction<Consensus = T>,
1761 {
1762 let size = tx.encoded_length();
1763 let transaction = tx.transaction.clone_into_consensus();
1764 let transaction = Arc::new(transaction.into_inner());
1765 Self { size, transaction }
1766 }
1767
1768 fn tx_hash(&self) -> &TxHash {
1769 self.transaction.tx_hash()
1770 }
1771}
1772
1773#[derive(Debug, Clone)]
1776enum PropagateTransactionsBuilder<T> {
1777 Pooled(PooledTransactionsHashesBuilder),
1778 Full(FullTransactionsBuilder<T>),
1779}
1780
1781impl<T> PropagateTransactionsBuilder<T> {
1782 fn pooled(version: EthVersion) -> Self {
1784 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1785 }
1786
1787 fn full(version: EthVersion) -> Self {
1789 Self::Full(FullTransactionsBuilder::new(version))
1790 }
1791
1792 fn is_empty(&self) -> bool {
1794 match self {
1795 Self::Pooled(builder) => builder.is_empty(),
1796 Self::Full(builder) => builder.is_empty(),
1797 }
1798 }
1799
1800 fn build(self) -> PropagateTransactions<T> {
1802 match self {
1803 Self::Pooled(pooled) => {
1804 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1805 }
1806 Self::Full(full) => full.build(),
1807 }
1808 }
1809}
1810
1811impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1812 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1814 for tx in txs {
1815 self.push(tx);
1816 }
1817 }
1818
1819 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1821 match self {
1822 Self::Pooled(builder) => builder.push(transaction),
1823 Self::Full(builder) => builder.push(transaction),
1824 }
1825 }
1826}
1827
1828struct PropagateTransactions<T> {
1830 pooled: Option<NewPooledTransactionHashes>,
1832 full: Option<Vec<Arc<T>>>,
1834}
1835
1836#[derive(Debug, Clone)]
1841struct FullTransactionsBuilder<T> {
1842 total_size: usize,
1844 transactions: Vec<Arc<T>>,
1846 pooled: PooledTransactionsHashesBuilder,
1848}
1849
1850impl<T> FullTransactionsBuilder<T> {
1851 fn new(version: EthVersion) -> Self {
1853 Self {
1854 total_size: 0,
1855 pooled: PooledTransactionsHashesBuilder::new(version),
1856 transactions: vec![],
1857 }
1858 }
1859
1860 fn is_empty(&self) -> bool {
1862 self.transactions.is_empty() && self.pooled.is_empty()
1863 }
1864
1865 fn build(self) -> PropagateTransactions<T> {
1867 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1868 let full = Some(self.transactions).filter(|full| !full.is_empty());
1869 PropagateTransactions { pooled, full }
1870 }
1871}
1872
1873impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1874 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1876 for tx in txs {
1877 self.push(&tx)
1878 }
1879 }
1880
1881 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1891 if !transaction.transaction.is_broadcastable_in_full() {
1900 self.pooled.push(transaction);
1901 return
1902 }
1903
1904 let new_size = self.total_size + transaction.size;
1905 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1906 self.total_size > 0
1907 {
1908 self.pooled.push(transaction);
1910 return
1911 }
1912
1913 self.total_size = new_size;
1914 self.transactions.push(Arc::clone(&transaction.transaction));
1915 }
1916}
1917
1918#[derive(Debug, Clone)]
1921enum PooledTransactionsHashesBuilder {
1922 Eth66(NewPooledTransactionHashes66),
1923 Eth68(NewPooledTransactionHashes68),
1924}
1925
1926impl PooledTransactionsHashesBuilder {
1929 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1931 match self {
1932 Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1933 Self::Eth68(msg) => {
1934 msg.hashes.push(*pooled_tx.hash());
1935 msg.sizes.push(pooled_tx.encoded_length());
1936 msg.types.push(pooled_tx.transaction.ty());
1937 }
1938 }
1939 }
1940
1941 fn is_empty(&self) -> bool {
1943 match self {
1944 Self::Eth66(hashes) => hashes.is_empty(),
1945 Self::Eth68(hashes) => hashes.is_empty(),
1946 }
1947 }
1948
1949 fn len(&self) -> usize {
1951 match self {
1952 Self::Eth66(hashes) => hashes.len(),
1953 Self::Eth68(hashes) => hashes.len(),
1954 }
1955 }
1956
1957 fn extend<T: SignedTransaction>(
1959 &mut self,
1960 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1961 ) {
1962 for tx in txs {
1963 self.push(&tx);
1964 }
1965 }
1966
1967 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1968 match self {
1969 Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1970 Self::Eth68(msg) => {
1971 msg.hashes.push(*tx.tx_hash());
1972 msg.sizes.push(tx.size);
1973 msg.types.push(tx.transaction.ty());
1974 }
1975 }
1976 }
1977
1978 fn new(version: EthVersion) -> Self {
1980 match version {
1981 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1982 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1983 Self::Eth68(Default::default())
1984 }
1985 }
1986 }
1987
1988 fn build(self) -> NewPooledTransactionHashes {
1989 match self {
1990 Self::Eth66(mut msg) => {
1991 msg.shrink_to_fit();
1992 msg.into()
1993 }
1994 Self::Eth68(mut msg) => {
1995 msg.shrink_to_fit();
1996 msg.into()
1997 }
1998 }
1999 }
2000}
2001
2002enum TransactionSource {
2004 Broadcast,
2006 Response,
2008}
2009
2010impl TransactionSource {
2013 const fn is_broadcast(&self) -> bool {
2015 matches!(self, Self::Broadcast)
2016 }
2017}
2018
2019#[derive(Debug)]
2021pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2022 seen_transactions: LruCache<TxHash>,
2026 request_tx: PeerRequestSender<PeerRequest<N>>,
2028 version: EthVersion,
2030 client_version: Arc<str>,
2032 peer_kind: PeerKind,
2034}
2035
2036impl<N: NetworkPrimitives> PeerMetadata<N> {
2037 pub fn new(
2039 request_tx: PeerRequestSender<PeerRequest<N>>,
2040 version: EthVersion,
2041 client_version: Arc<str>,
2042 max_transactions_seen_by_peer: u32,
2043 peer_kind: PeerKind,
2044 ) -> Self {
2045 Self {
2046 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2047 request_tx,
2048 version,
2049 client_version,
2050 peer_kind,
2051 }
2052 }
2053
2054 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2056 &self.request_tx
2057 }
2058
2059 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2061 &mut self.seen_transactions
2062 }
2063
2064 pub const fn version(&self) -> EthVersion {
2066 self.version
2067 }
2068
2069 pub fn client_version(&self) -> &str {
2071 &self.client_version
2072 }
2073
2074 pub const fn peer_kind(&self) -> PeerKind {
2076 self.peer_kind
2077 }
2078}
2079
2080#[derive(Debug)]
2082enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2083 PropagateHash(B256),
2085 PropagateHashesTo(Vec<B256>, PeerId),
2087 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2089 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2091 PropagateTransactions(Vec<TxHash>),
2093 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2095 GetTransactionHashes {
2097 peers: Vec<PeerId>,
2098 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2099 },
2100 GetPeerSender {
2102 peer_id: PeerId,
2103 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2104 },
2105}
2106
2107#[derive(Debug)]
2109pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2110 IncomingTransactions {
2114 peer_id: PeerId,
2116 msg: Transactions<N::BroadcastedTransaction>,
2118 },
2119 IncomingPooledTransactionHashes {
2121 peer_id: PeerId,
2123 msg: NewPooledTransactionHashes,
2125 },
2126 GetPooledTransactions {
2128 peer_id: PeerId,
2130 request: GetPooledTransactions,
2132 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2134 },
2135 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2137}
2138
2139#[derive(Debug)]
2141pub struct PendingPoolImportsInfo {
2142 pending_pool_imports: Arc<AtomicUsize>,
2144 max_pending_pool_imports: usize,
2146}
2147
2148impl PendingPoolImportsInfo {
2149 pub fn new(max_pending_pool_imports: usize) -> Self {
2151 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2152 }
2153
2154 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2156 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2157 }
2158}
2159
2160impl Default for PendingPoolImportsInfo {
2161 fn default() -> Self {
2162 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2163 }
2164}
2165
2166#[derive(Debug, Default)]
2167struct TxManagerPollDurations {
2168 acc_network_events: Duration,
2169 acc_pending_imports: Duration,
2170 acc_tx_events: Duration,
2171 acc_imported_txns: Duration,
2172 acc_fetch_events: Duration,
2173 acc_pending_fetch: Duration,
2174 acc_cmds: Duration,
2175}
2176
2177#[cfg(test)]
2178mod tests {
2179 use super::*;
2180 use crate::{
2181 test_utils::{
2182 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2183 Testnet,
2184 },
2185 transactions::config::RelaxedEthAnnouncementFilter,
2186 NetworkConfigBuilder, NetworkManager,
2187 };
2188 use alloy_consensus::{TxEip1559, TxLegacy};
2189 use alloy_eips::eip4844::BlobTransactionValidationError;
2190 use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2191 use alloy_rlp::Decodable;
2192 use futures::FutureExt;
2193 use reth_chainspec::MIN_TRANSACTION_GAS;
2194 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2195 use reth_network_api::{NetworkInfo, PeerKind};
2196 use reth_network_p2p::{
2197 error::{RequestError, RequestResult},
2198 sync::{NetworkSyncUpdater, SyncState},
2199 };
2200 use reth_storage_api::noop::NoopProvider;
2201 use reth_tasks::Runtime;
2202 use reth_transaction_pool::{
2203 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2204 test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2205 };
2206 use secp256k1::SecretKey;
2207 use std::{
2208 collections::HashSet,
2209 future::poll_fn,
2210 net::{IpAddr, Ipv4Addr, SocketAddr},
2211 str::FromStr,
2212 };
2213 use tracing::error;
2214
2215 #[tokio::test(flavor = "multi_thread")]
2216 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2217 reth_tracing::init_test_tracing();
2218 let net = Testnet::create(3).await;
2219
2220 let mut handles = net.handles();
2221 let handle0 = handles.next().unwrap();
2222 let handle1 = handles.next().unwrap();
2223
2224 drop(handles);
2225 let handle = net.spawn();
2226
2227 let listener0 = handle0.event_listener();
2228 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2229 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2230
2231 let client = NoopProvider::default();
2232 let pool = testing_pool();
2233 let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2234 .disable_discovery()
2235 .listener_port(0)
2236 .build(client);
2237 let transactions_manager_config = config.transactions_manager_config.clone();
2238 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2239 .await
2240 .unwrap()
2241 .into_builder()
2242 .transactions(pool.clone(), transactions_manager_config)
2243 .split_with_handle();
2244
2245 tokio::task::spawn(network);
2246
2247 network_handle.update_sync_state(SyncState::Syncing);
2249 assert!(NetworkInfo::is_syncing(&network_handle));
2250 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2251
2252 let mut established = listener0.take(2);
2254 while let Some(ev) = established.next().await {
2255 match ev {
2256 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2257 transactions
2259 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2260 }
2261 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2262 ev => {
2263 error!("unexpected event {ev:?}")
2264 }
2265 }
2266 }
2267 let input = hex!(
2269 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2270 );
2271 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2272 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2273 peer_id: *handle1.peer_id(),
2274 msg: Transactions(vec![signed_tx.clone()]),
2275 });
2276 poll_fn(|cx| {
2277 let _ = transactions.poll_unpin(cx);
2278 Poll::Ready(())
2279 })
2280 .await;
2281 assert!(pool.is_empty());
2282 handle.terminate().await;
2283 }
2284
2285 #[tokio::test(flavor = "multi_thread")]
2286 async fn test_tx_broadcasts_through_two_syncs() {
2287 reth_tracing::init_test_tracing();
2288 let net = Testnet::create(3).await;
2289
2290 let mut handles = net.handles();
2291 let handle0 = handles.next().unwrap();
2292 let handle1 = handles.next().unwrap();
2293
2294 drop(handles);
2295 let handle = net.spawn();
2296
2297 let listener0 = handle0.event_listener();
2298 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2299 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2300
2301 let client = NoopProvider::default();
2302 let pool = testing_pool();
2303 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2304 .disable_discovery()
2305 .listener_port(0)
2306 .build(client);
2307 let transactions_manager_config = config.transactions_manager_config.clone();
2308 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2309 .await
2310 .unwrap()
2311 .into_builder()
2312 .transactions(pool.clone(), transactions_manager_config)
2313 .split_with_handle();
2314
2315 tokio::task::spawn(network);
2316
2317 network_handle.update_sync_state(SyncState::Syncing);
2319 assert!(NetworkInfo::is_syncing(&network_handle));
2320 network_handle.update_sync_state(SyncState::Idle);
2321 assert!(!NetworkInfo::is_syncing(&network_handle));
2322 network_handle.update_sync_state(SyncState::Syncing);
2323 assert!(NetworkInfo::is_syncing(&network_handle));
2324
2325 let mut established = listener0.take(2);
2327 while let Some(ev) = established.next().await {
2328 match ev {
2329 NetworkEvent::ActivePeerSession { .. } |
2330 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2331 transactions.on_network_event(ev);
2333 }
2334 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2335 _ => {
2336 error!("unexpected event {ev:?}")
2337 }
2338 }
2339 }
2340 let input = hex!(
2342 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2343 );
2344 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2345 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2346 peer_id: *handle1.peer_id(),
2347 msg: Transactions(vec![signed_tx.clone()]),
2348 });
2349 poll_fn(|cx| {
2350 let _ = transactions.poll_unpin(cx);
2351 Poll::Ready(())
2352 })
2353 .await;
2354 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2355 assert!(NetworkInfo::is_syncing(&network_handle));
2356 assert!(!pool.is_empty());
2357 handle.terminate().await;
2358 }
2359
2360 #[tokio::test(flavor = "multi_thread")]
2363 async fn test_handle_incoming_transactions_hashes() {
2364 reth_tracing::init_test_tracing();
2365
2366 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2367 let client = NoopProvider::default();
2368
2369 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2370 .listener_port(0)
2372 .disable_discovery()
2373 .build(client);
2374
2375 let pool = testing_pool();
2376
2377 let transactions_manager_config = config.transactions_manager_config.clone();
2378 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2379 .await
2380 .unwrap()
2381 .into_builder()
2382 .transactions(pool.clone(), transactions_manager_config)
2383 .split_with_handle();
2384
2385 let peer_id_1 = PeerId::new([1; 64]);
2386 let eth_version = EthVersion::Eth66;
2387
2388 let txs = vec![TransactionSigned::new_unhashed(
2389 Transaction::Legacy(TxLegacy {
2390 chain_id: Some(4),
2391 nonce: 15u64,
2392 gas_price: 2200000000,
2393 gas_limit: 34811,
2394 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2395 value: U256::from(1234u64),
2396 input: Default::default(),
2397 }),
2398 Signature::new(
2399 U256::from_str(
2400 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2401 )
2402 .unwrap(),
2403 U256::from_str(
2404 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2405 )
2406 .unwrap(),
2407 true,
2408 ),
2409 )];
2410
2411 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2412
2413 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2414 tx_manager.peers.insert(peer_id_1, peer_1);
2415
2416 assert!(pool.is_empty());
2417
2418 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2419 peer_id: peer_id_1,
2420 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2421 txs_hashes.clone(),
2422 )),
2423 });
2424
2425 let req = to_mock_session_rx
2427 .recv()
2428 .await
2429 .expect("peer_1 session should receive request with buffered hashes");
2430 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2431 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2432
2433 let message: Vec<PooledTransactionVariant> = txs
2434 .into_iter()
2435 .map(|tx| {
2436 PooledTransactionVariant::try_from(tx)
2437 .expect("Failed to convert MockTransaction to PooledTransaction")
2438 })
2439 .collect();
2440
2441 response
2443 .send(Ok(PooledTransactions(message)))
2444 .expect("should send peer_1 response to tx manager");
2445
2446 poll_fn(|cx| {
2448 let _ = tx_manager.poll_unpin(cx);
2449 Poll::Ready(())
2450 })
2451 .await;
2452
2453 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2456 }
2457
2458 #[tokio::test(flavor = "multi_thread")]
2459 async fn test_handle_incoming_transactions() {
2460 reth_tracing::init_test_tracing();
2461 let net = Testnet::create(3).await;
2462
2463 let mut handles = net.handles();
2464 let handle0 = handles.next().unwrap();
2465 let handle1 = handles.next().unwrap();
2466
2467 drop(handles);
2468 let handle = net.spawn();
2469
2470 let listener0 = handle0.event_listener();
2471
2472 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2473 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2474
2475 let client = NoopProvider::default();
2476 let pool = testing_pool();
2477 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2478 .disable_discovery()
2479 .listener_port(0)
2480 .build(client);
2481 let transactions_manager_config = config.transactions_manager_config.clone();
2482 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2483 .await
2484 .unwrap()
2485 .into_builder()
2486 .transactions(pool.clone(), transactions_manager_config)
2487 .split_with_handle();
2488 tokio::task::spawn(network);
2489
2490 network_handle.update_sync_state(SyncState::Idle);
2491
2492 assert!(!NetworkInfo::is_syncing(&network_handle));
2493
2494 let mut established = listener0.take(2);
2496 while let Some(ev) = established.next().await {
2497 match ev {
2498 NetworkEvent::ActivePeerSession { .. } |
2499 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2500 transactions.on_network_event(ev);
2502 }
2503 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2504 ev => {
2505 error!("unexpected event {ev:?}")
2506 }
2507 }
2508 }
2509 let input = hex!(
2511 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2512 );
2513 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2514 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2515 peer_id: *handle1.peer_id(),
2516 msg: Transactions(vec![signed_tx.clone()]),
2517 });
2518 assert!(transactions
2519 .transactions_by_peers
2520 .get(signed_tx.tx_hash())
2521 .unwrap()
2522 .contains(handle1.peer_id()));
2523
2524 poll_fn(|cx| {
2526 let _ = transactions.poll_unpin(cx);
2527 Poll::Ready(())
2528 })
2529 .await;
2530
2531 assert!(!pool.is_empty());
2532 assert!(pool.get(signed_tx.tx_hash()).is_some());
2533 handle.terminate().await;
2534 }
2535
2536 #[tokio::test(flavor = "multi_thread")]
2537 async fn test_session_closed_cleans_transaction_peer_state() {
2538 let (mut tx_manager, _network) = new_tx_manager().await;
2539 let peer_id = PeerId::new([1; 64]);
2540 let fallback_peer = PeerId::new([2; 64]);
2541 let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2542 let hash_shared = B256::from_slice(&[1; 32]);
2543
2544 tx_manager.peers.insert(peer_id, peer);
2545 buffer_hash_to_tx_fetcher(
2546 &mut tx_manager.transaction_fetcher,
2547 hash_shared,
2548 peer_id,
2549 0,
2550 None,
2551 );
2552 buffer_hash_to_tx_fetcher(
2553 &mut tx_manager.transaction_fetcher,
2554 hash_shared,
2555 fallback_peer,
2556 0,
2557 None,
2558 );
2559 tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2560
2561 tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2562 peer_id,
2563 reason: None,
2564 }));
2565
2566 assert!(!tx_manager.peers.contains_key(&peer_id));
2568 assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2569 assert_eq!(
2571 tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2572 Some(&fallback_peer)
2573 );
2574 }
2575
2576 #[tokio::test(flavor = "multi_thread")]
2577 async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2578 let (mut tx_manager, _network) = new_tx_manager().await;
2579 let peer_id = PeerId::new([1; 64]);
2580 let hash = B256::from_slice(&[1; 32]);
2581
2582 tx_manager.network.update_sync_state(SyncState::Idle);
2583 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2584
2585 let err = PoolError::new(
2586 hash,
2587 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2588 BlobTransactionValidationError::InvalidProof,
2589 )),
2590 );
2591
2592 tx_manager.on_bad_import(err);
2593
2594 assert!(!tx_manager.bad_imports.contains(&hash));
2595 }
2596
2597 #[tokio::test(flavor = "multi_thread")]
2598 async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2599 let (mut tx_manager, _network) = new_tx_manager().await;
2600 let peer_id = PeerId::new([1; 64]);
2601 let hash = B256::from_slice(&[3; 32]);
2602
2603 tx_manager.network.update_sync_state(SyncState::Idle);
2604 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2605
2606 let err = PoolError::new(
2607 hash,
2608 InvalidPoolTransactionError::Eip4844(
2609 Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2610 ),
2611 );
2612
2613 tx_manager.on_bad_import(err);
2614
2615 assert!(!tx_manager.bad_imports.contains(&hash));
2616 }
2617
2618 #[tokio::test(flavor = "multi_thread")]
2619 async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2620 let (mut tx_manager, _network) = new_tx_manager().await;
2621 let peer_id = PeerId::new([1; 64]);
2622 let hash = B256::from_slice(&[2; 32]);
2623
2624 tx_manager.network.update_sync_state(SyncState::Idle);
2625 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2626
2627 let err = PoolError::new(
2628 hash,
2629 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2630 );
2631
2632 tx_manager.on_bad_import(err);
2633
2634 assert!(tx_manager.bad_imports.contains(&hash));
2635 }
2636
2637 #[tokio::test(flavor = "multi_thread")]
2638 async fn test_on_get_pooled_transactions_network() {
2639 reth_tracing::init_test_tracing();
2640 let net = Testnet::create(2).await;
2641
2642 let mut handles = net.handles();
2643 let handle0 = handles.next().unwrap();
2644 let handle1 = handles.next().unwrap();
2645
2646 drop(handles);
2647 let handle = net.spawn();
2648
2649 let listener0 = handle0.event_listener();
2650
2651 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2652 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2653
2654 let client = NoopProvider::default();
2655 let pool = testing_pool();
2656 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2657 .disable_discovery()
2658 .listener_port(0)
2659 .build(client);
2660 let transactions_manager_config = config.transactions_manager_config.clone();
2661 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2662 .await
2663 .unwrap()
2664 .into_builder()
2665 .transactions(pool.clone(), transactions_manager_config)
2666 .split_with_handle();
2667 tokio::task::spawn(network);
2668
2669 network_handle.update_sync_state(SyncState::Idle);
2670
2671 assert!(!NetworkInfo::is_syncing(&network_handle));
2672
2673 let mut established = listener0.take(2);
2675 while let Some(ev) = established.next().await {
2676 match ev {
2677 NetworkEvent::ActivePeerSession { .. } |
2678 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2679 transactions.on_network_event(ev);
2680 }
2681 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2682 ev => {
2683 error!("unexpected event {ev:?}")
2684 }
2685 }
2686 }
2687 handle.terminate().await;
2688
2689 let tx = MockTransaction::eip1559();
2690 let _ = transactions
2691 .pool
2692 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2693 .await;
2694
2695 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2696
2697 let (send, receive) =
2698 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2699
2700 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2701 peer_id: *handle1.peer_id(),
2702 request,
2703 response: send,
2704 });
2705
2706 match receive.await.unwrap() {
2707 Ok(PooledTransactions(transactions)) => {
2708 assert_eq!(transactions.len(), 1);
2709 }
2710 Err(e) => {
2711 panic!("error: {e:?}");
2712 }
2713 }
2714 }
2715
2716 #[tokio::test]
2720 async fn test_partially_tx_response() {
2721 reth_tracing::init_test_tracing();
2722
2723 let mut tx_manager = new_tx_manager().await.0;
2724 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2725
2726 let peer_id_1 = PeerId::new([1; 64]);
2727 let eth_version = EthVersion::Eth66;
2728
2729 let txs = vec![
2730 TransactionSigned::new_unhashed(
2731 Transaction::Legacy(TxLegacy {
2732 chain_id: Some(4),
2733 nonce: 15u64,
2734 gas_price: 2200000000,
2735 gas_limit: 34811,
2736 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2737 value: U256::from(1234u64),
2738 input: Default::default(),
2739 }),
2740 Signature::new(
2741 U256::from_str(
2742 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2743 )
2744 .unwrap(),
2745 U256::from_str(
2746 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2747 )
2748 .unwrap(),
2749 true,
2750 ),
2751 ),
2752 TransactionSigned::new_unhashed(
2753 Transaction::Eip1559(TxEip1559 {
2754 chain_id: 4,
2755 nonce: 26u64,
2756 max_priority_fee_per_gas: 1500000000,
2757 max_fee_per_gas: 1500000013,
2758 gas_limit: MIN_TRANSACTION_GAS,
2759 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2760 value: U256::from(3000000000000000000u64),
2761 input: Default::default(),
2762 access_list: Default::default(),
2763 }),
2764 Signature::new(
2765 U256::from_str(
2766 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2767 )
2768 .unwrap(),
2769 U256::from_str(
2770 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2771 )
2772 .unwrap(),
2773 true,
2774 ),
2775 ),
2776 ];
2777
2778 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2779
2780 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2781 peer_1.seen_transactions.insert(txs_hashes[0]);
2784 peer_1.seen_transactions.insert(txs_hashes[1]);
2785 tx_manager.peers.insert(peer_id_1, peer_1);
2786
2787 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2788 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2789
2790 assert!(tx_fetcher.is_idle(&peer_id_1));
2792 assert_eq!(tx_fetcher.active_peers.len(), 0);
2793
2794 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2796
2797 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2798 assert!(!tx_fetcher.is_idle(&peer_id_1));
2800 assert_eq!(tx_fetcher.active_peers.len(), 1);
2801
2802 let req = to_mock_session_rx
2804 .recv()
2805 .await
2806 .expect("peer_1 session should receive request with buffered hashes");
2807 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2808
2809 let message: Vec<PooledTransactionVariant> = txs
2810 .into_iter()
2811 .take(1)
2812 .map(|tx| {
2813 PooledTransactionVariant::try_from(tx)
2814 .expect("Failed to convert MockTransaction to PooledTransaction")
2815 })
2816 .collect();
2817 response
2819 .send(Ok(PooledTransactions(message)))
2820 .expect("should send peer_1 response to tx manager");
2821 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2822 unreachable!()
2823 };
2824
2825 assert!(tx_fetcher.is_idle(&peer_id));
2827 assert_eq!(tx_fetcher.active_peers.len(), 0);
2828 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2830 }
2831
2832 #[tokio::test]
2833 async fn test_max_retries_tx_request() {
2834 reth_tracing::init_test_tracing();
2835
2836 let mut tx_manager = new_tx_manager().await.0;
2837 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2838
2839 let peer_id_1 = PeerId::new([1; 64]);
2840 let peer_id_2 = PeerId::new([2; 64]);
2841 let eth_version = EthVersion::Eth66;
2842 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2843
2844 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2845 peer_1.seen_transactions.insert(seen_hashes[0]);
2848 peer_1.seen_transactions.insert(seen_hashes[1]);
2849 tx_manager.peers.insert(peer_id_1, peer_1);
2850
2851 let retries = 1;
2854 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2855 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2856
2857 assert!(tx_fetcher.is_idle(&peer_id_1));
2859 assert_eq!(tx_fetcher.active_peers.len(), 0);
2860
2861 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2863
2864 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2865
2866 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2867 assert!(!tx_fetcher.is_idle(&peer_id_1));
2869 assert_eq!(tx_fetcher.active_peers.len(), 1);
2870
2871 let req = to_mock_session_rx
2873 .recv()
2874 .await
2875 .expect("peer_1 session should receive request with buffered hashes");
2876 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2877 let GetPooledTransactions(hashes) = request;
2878
2879 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2880
2881 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2882
2883 response
2885 .send(Err(RequestError::BadResponse))
2886 .expect("should send peer_1 response to tx manager");
2887 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2888 unreachable!()
2889 };
2890
2891 assert!(tx_fetcher.is_idle(&peer_id));
2893 assert_eq!(tx_fetcher.active_peers.len(), 0);
2894 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2896
2897 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2898 tx_manager.peers.insert(peer_id_2, peer_2);
2899
2900 let msg =
2902 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2903 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2904
2905 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2906
2907 assert_eq!(tx_fetcher.active_peers.len(), 1);
2909
2910 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2912 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2914
2915 let req = to_mock_session_rx
2917 .recv()
2918 .await
2919 .expect("peer_2 session should receive request with buffered hashes");
2920 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2921
2922 response
2924 .send(Err(RequestError::BadResponse))
2925 .expect("should send peer_2 response to tx manager");
2926 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2927
2928 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2931 assert_eq!(tx_fetcher.active_peers.len(), 0);
2932 }
2933
2934 #[test]
2935 fn test_transaction_builder_empty() {
2936 let mut builder =
2937 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2938 assert!(builder.is_empty());
2939
2940 let mut factory = MockTransactionFactory::default();
2941 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2942 builder.push(&tx);
2943 assert!(!builder.is_empty());
2944
2945 let txs = builder.build();
2946 assert!(txs.full.is_none());
2947 let txs = txs.pooled.unwrap();
2948 assert_eq!(txs.len(), 1);
2949 }
2950
2951 #[test]
2952 fn test_transaction_builder_large() {
2953 let mut builder =
2954 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2955 assert!(builder.is_empty());
2956
2957 let mut factory = MockTransactionFactory::default();
2958 let mut tx = factory.create_eip1559();
2959 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2961 let tx = Arc::new(tx);
2962 let tx = PropagateTransaction::pool_tx(tx);
2963 builder.push(&tx);
2964 assert!(!builder.is_empty());
2965
2966 let txs = builder.clone().build();
2967 assert!(txs.pooled.is_none());
2968 let txs = txs.full.unwrap();
2969 assert_eq!(txs.len(), 1);
2970
2971 builder.push(&tx);
2972
2973 let txs = builder.clone().build();
2974 let pooled = txs.pooled.unwrap();
2975 assert_eq!(pooled.len(), 1);
2976 let txs = txs.full.unwrap();
2977 assert_eq!(txs.len(), 1);
2978 }
2979
2980 #[test]
2981 fn test_transaction_builder_eip4844() {
2982 let mut builder =
2983 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2984 assert!(builder.is_empty());
2985
2986 let mut factory = MockTransactionFactory::default();
2987 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2988 builder.push(&tx);
2989 assert!(!builder.is_empty());
2990
2991 let txs = builder.clone().build();
2992 assert!(txs.full.is_none());
2993 let txs = txs.pooled.unwrap();
2994 assert_eq!(txs.len(), 1);
2995
2996 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2997 builder.push(&tx);
2998
2999 let txs = builder.clone().build();
3000 let pooled = txs.pooled.unwrap();
3001 assert_eq!(pooled.len(), 1);
3002 let txs = txs.full.unwrap();
3003 assert_eq!(txs.len(), 1);
3004 }
3005
3006 #[tokio::test]
3007 async fn test_propagate_full() {
3008 reth_tracing::init_test_tracing();
3009
3010 let (mut tx_manager, network) = new_tx_manager().await;
3011 let peer_id = PeerId::random();
3012
3013 network.handle().update_sync_state(SyncState::Idle);
3015
3016 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3018
3019 let session_info = SessionInfo {
3020 peer_id,
3021 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3022 client_version: Arc::from(""),
3023 capabilities: Arc::new(vec![].into()),
3024 status: Arc::new(Default::default()),
3025 version: EthVersion::Eth68,
3026 peer_kind: PeerKind::Basic,
3027 };
3028 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3029 tx_manager
3030 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3031 let mut propagate = vec![];
3032 let mut factory = MockTransactionFactory::default();
3033 let eip1559_tx = Arc::new(factory.create_eip1559());
3034 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3035 let eip4844_tx = Arc::new(factory.create_eip4844());
3036 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3037
3038 let propagated =
3039 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3040 assert_eq!(propagated.len(), 2);
3041 let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3042 assert_eq!(prop_txs.len(), 1);
3043 assert!(prop_txs[0].is_full());
3044
3045 let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3046 assert_eq!(prop_txs.len(), 1);
3047 assert!(prop_txs[0].is_hash());
3048
3049 let peer = tx_manager.peers.get(&peer_id).unwrap();
3050 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3051 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3052 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3053
3054 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3056 assert!(propagated.is_empty());
3057 }
3058
3059 #[tokio::test]
3060 async fn test_propagate_pending_txs_while_initially_syncing() {
3061 reth_tracing::init_test_tracing();
3062
3063 let (mut tx_manager, network) = new_tx_manager().await;
3064 let peer_id = PeerId::random();
3065
3066 network.handle().update_sync_state(SyncState::Syncing);
3068 assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3069
3070 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3072 tx_manager.peers.insert(peer_id, peer);
3073
3074 let tx = MockTransaction::eip1559();
3075 tx_manager
3076 .pool
3077 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3078 .await
3079 .expect("transaction should be accepted into the pool");
3080
3081 tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3082
3083 let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3084 assert!(peer.seen_transactions.contains(tx.get_hash()));
3085 }
3086
3087 #[tokio::test]
3088 async fn test_relaxed_filter_ignores_unknown_tx_types() {
3089 reth_tracing::init_test_tracing();
3090
3091 let transactions_manager_config = TransactionsManagerConfig::default();
3092
3093 let propagation_policy = TransactionPropagationKind::default();
3094 let announcement_policy = RelaxedEthAnnouncementFilter::default();
3095
3096 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3097
3098 let pool = testing_pool();
3099 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3100 let client = NoopProvider::default();
3101
3102 let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3103 .listener_port(0)
3104 .disable_discovery()
3105 .build(client.clone());
3106
3107 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3108 let (to_tx_manager_tx, from_network_rx) =
3109 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
3110 network_manager.set_transactions(to_tx_manager_tx);
3111 let network_handle = network_manager.handle().clone();
3112 let network_service_handle = tokio::spawn(network_manager);
3113
3114 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3115 network_handle.clone(),
3116 pool.clone(),
3117 from_network_rx,
3118 transactions_manager_config,
3119 policy_bundle,
3120 );
3121
3122 let peer_id = PeerId::random();
3123 let eth_version = EthVersion::Eth68;
3124 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3125 tx_manager.peers.insert(peer_id, mock_peer_metadata);
3126
3127 let mut tx_factory = MockTransactionFactory::default();
3128
3129 let valid_known_tx = tx_factory.create_eip1559();
3130 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3131
3132 let known_tx_hash = *known_tx_signed.hash();
3133 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3134 let known_tx_size = known_tx_signed.encoded_length();
3135
3136 let unknown_tx_hash = B256::random();
3137 let unknown_tx_type_byte = 0xff_u8;
3138 let unknown_tx_size = 150;
3139
3140 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3141 types: vec![known_tx_type_byte, unknown_tx_type_byte],
3142 sizes: vec![known_tx_size, unknown_tx_size],
3143 hashes: vec![known_tx_hash, unknown_tx_hash],
3144 });
3145
3146 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3147
3148 poll_fn(|cx| {
3149 let _ = tx_manager.poll_unpin(cx);
3150 Poll::Ready(())
3151 })
3152 .await;
3153
3154 let mut requested_hashes_in_getpooled = HashSet::new();
3155 let mut unexpected_request_received = false;
3156
3157 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3158 .await
3159 {
3160 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3161 let GetPooledTransactions(hashes) = request;
3162 for hash in hashes {
3163 requested_hashes_in_getpooled.insert(hash);
3164 }
3165 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3166 }
3167 Ok(Some(other_request)) => {
3168 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3169 unexpected_request_received = true;
3170 }
3171 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3172 Err(_timeout_err) => {
3173 tracing::info!("Timeout: No GetPooledTransactions request received.")
3174 }
3175 }
3176
3177 assert!(
3178 requested_hashes_in_getpooled.contains(&known_tx_hash),
3179 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3180 );
3181 assert!(
3182 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3183 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3184 );
3185 assert!(
3186 !unexpected_request_received,
3187 "An unexpected P2P request was received by the mock peer."
3188 );
3189
3190 network_service_handle.abort();
3191 }
3192}