1use alloy_consensus::transaction::TxHashRef;
4
5pub mod config;
7pub mod constants;
9pub mod fetcher;
11pub mod policy;
13
14pub use self::constants::{
15 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
16 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
17};
18use config::AnnouncementAcceptance;
19pub use config::{
20 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
21 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
22};
23use policy::NetworkPolicies;
24
25pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
26
27use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28use crate::{
29 budget::{
30 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
31 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
32 },
33 cache::LruCache,
34 duration_metered_exec, metered_poll_nested_stream_with_budget,
35 metrics::{
36 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
37 },
38 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
39 NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{TxHash, B256};
42use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
43use futures::{stream::FuturesUnordered, Future, StreamExt};
44use reth_eth_wire::{
45 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
46 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
47 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
48 RequestTxHashes, Transactions, ValidAnnouncementData,
49};
50use reth_ethereum_primitives::{TransactionSigned, TxType};
51use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
52use reth_network_api::{
53 events::{PeerEvent, SessionInfo},
54 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
55};
56use reth_network_p2p::{
57 error::{RequestError, RequestResult},
58 sync::SyncStateProvider,
59};
60use reth_network_peers::PeerId;
61use reth_network_types::ReputationChangeKind;
62use reth_primitives_traits::SignedTransaction;
63use reth_tokio_util::EventStream;
64use reth_transaction_pool::{
65 error::{PoolError, PoolResult},
66 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
67 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
68};
69use std::{
70 collections::{hash_map::Entry, HashMap, HashSet},
71 pin::Pin,
72 sync::{
73 atomic::{AtomicUsize, Ordering},
74 Arc,
75 },
76 task::{Context, Poll},
77 time::{Duration, Instant},
78};
79use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
80use tokio_stream::wrappers::UnboundedReceiverStream;
81use tracing::{debug, trace};
82
83pub type PoolImportFuture =
87 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
88
89#[derive(Debug, Clone)]
97pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
98 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
100}
101
102impl<N: NetworkPrimitives> TransactionsHandle<N> {
103 fn send(&self, cmd: TransactionsCommand<N>) {
104 let _ = self.manager_tx.send(cmd);
105 }
106
107 async fn peer_handle(
109 &self,
110 peer_id: PeerId,
111 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
112 let (tx, rx) = oneshot::channel();
113 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
114 rx.await
115 }
116
117 pub fn propagate(&self, hash: TxHash) {
119 self.send(TransactionsCommand::PropagateHash(hash))
120 }
121
122 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
126 self.propagate_hashes_to(Some(hash), peer)
127 }
128
129 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
133 let hashes = hash.into_iter().collect::<Vec<_>>();
134 if hashes.is_empty() {
135 return
136 }
137 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
138 }
139
140 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
142 let (tx, rx) = oneshot::channel();
143 self.send(TransactionsCommand::GetActivePeers(tx));
144 rx.await
145 }
146
147 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
151 if transactions.is_empty() {
152 return
153 }
154 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
155 }
156
157 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
162 if transactions.is_empty() {
163 return
164 }
165 self.send(TransactionsCommand::PropagateTransactions(transactions))
166 }
167
168 pub fn broadcast_transactions(
173 &self,
174 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
175 ) {
176 let transactions =
177 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
178 if transactions.is_empty() {
179 return
180 }
181 self.send(TransactionsCommand::BroadcastTransactions(transactions))
182 }
183
184 pub async fn get_transaction_hashes(
186 &self,
187 peers: Vec<PeerId>,
188 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
189 if peers.is_empty() {
190 return Ok(Default::default())
191 }
192 let (tx, rx) = oneshot::channel();
193 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
194 rx.await
195 }
196
197 pub async fn get_peer_transaction_hashes(
199 &self,
200 peer: PeerId,
201 ) -> Result<HashSet<TxHash>, RecvError> {
202 let res = self.get_transaction_hashes(vec![peer]).await?;
203 Ok(res.into_values().next().unwrap_or_default())
204 }
205
206 pub async fn get_pooled_transactions_from(
212 &self,
213 peer_id: PeerId,
214 hashes: Vec<B256>,
215 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
216 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
217
218 let (tx, rx) = oneshot::channel();
219 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
220 peer.try_send(request).ok();
221
222 rx.await?.map(|res| Some(res.0))
223 }
224}
225
226#[derive(Debug)]
281#[must_use = "Manager does nothing unless polled."]
282pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
283 pool: Pool,
285 network: NetworkHandle<N>,
287 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
291 transaction_fetcher: TransactionFetcher<N>,
293 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
298 pool_imports: FuturesUnordered<PoolImportFuture>,
310 pending_pool_imports_info: PendingPoolImportsInfo,
312 bad_imports: LruCache<TxHash>,
314 peers: HashMap<PeerId, PeerMetadata<N>>,
316 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
320 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
325 pending_transactions: mpsc::Receiver<TxHash>,
334 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
336 config: TransactionsManagerConfig,
338 policies: NetworkPolicies<N>,
340 metrics: TransactionsManagerMetrics,
342 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
344}
345
346impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
347 pub fn new(
351 network: NetworkHandle<N>,
352 pool: Pool,
353 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
354 transactions_manager_config: TransactionsManagerConfig,
355 ) -> Self {
356 Self::with_policy(
357 network,
358 pool,
359 from_network,
360 transactions_manager_config,
361 NetworkPolicies::new(
362 TransactionPropagationKind::default(),
363 StrictEthAnnouncementFilter::default(),
364 ),
365 )
366 }
367}
368
369impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
370 pub fn with_policy(
374 network: NetworkHandle<N>,
375 pool: Pool,
376 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
377 transactions_manager_config: TransactionsManagerConfig,
378 policies: NetworkPolicies<N>,
379 ) -> Self {
380 let network_events = network.event_listener();
381
382 let (command_tx, command_rx) = mpsc::unbounded_channel();
383
384 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
385 &transactions_manager_config.transaction_fetcher_config,
386 );
387
388 let pending = pool.pending_transactions_listener();
391 let pending_pool_imports_info = PendingPoolImportsInfo::default();
392 let metrics = TransactionsManagerMetrics::default();
393 metrics
394 .capacity_pending_pool_imports
395 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
396
397 Self {
398 pool,
399 network,
400 network_events,
401 transaction_fetcher,
402 transactions_by_peers: Default::default(),
403 pool_imports: Default::default(),
404 pending_pool_imports_info: PendingPoolImportsInfo::new(
405 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
406 ),
407 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
408 peers: Default::default(),
409 command_tx,
410 command_rx: UnboundedReceiverStream::new(command_rx),
411 pending_transactions: pending,
412 transaction_events: UnboundedMeteredReceiver::new(
413 from_network,
414 NETWORK_POOL_TRANSACTIONS_SCOPE,
415 ),
416 config: transactions_manager_config,
417 policies,
418 metrics,
419 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
420 }
421 }
422
423 pub fn handle(&self) -> TransactionsHandle<N> {
425 TransactionsHandle { manager_tx: self.command_tx.clone() }
426 }
427
428 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
431 self.pending_pool_imports_info
432 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
433 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434 }
435
436 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
437 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
438 self.metrics.reported_bad_transactions.increment(1);
439 }
440
441 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
442 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
443 self.network.reputation_change(peer_id, kind);
444 }
445
446 fn report_already_seen(&self, peer_id: PeerId) {
447 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
448 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
449 }
450
451 fn on_good_import(&mut self, hash: TxHash) {
453 self.transactions_by_peers.remove(&hash);
454 }
455
456 fn on_bad_import(&mut self, err: PoolError) {
480 let peers = self.transactions_by_peers.remove(&err.hash);
481
482 if !err.is_bad_transaction() || self.network.is_syncing() {
484 return
485 }
486 if let Some(peers) = peers {
489 for peer_id in peers {
490 self.report_peer_bad_transactions(peer_id);
491 }
492 }
493 self.metrics.bad_imports.increment(1);
494 self.bad_imports.insert(err.hash);
495 }
496
497 fn on_fetch_hashes_pending_fetch(&mut self) {
499 let info = &self.pending_pool_imports_info;
501 let max_pending_pool_imports = info.max_pending_pool_imports;
502 let has_capacity_wrt_pending_pool_imports =
503 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
504
505 self.transaction_fetcher
506 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
507 }
508
509 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
510 let kind = match req_err {
511 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
512 RequestError::Timeout => ReputationChangeKind::Timeout,
513 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
514 return
516 }
517 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
518 };
519 self.report_peer(peer_id, kind);
520 }
521
522 #[inline]
523 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
524 let metrics = &self.metrics;
525
526 let TxManagerPollDurations {
527 acc_network_events,
528 acc_pending_imports,
529 acc_tx_events,
530 acc_imported_txns,
531 acc_fetch_events,
532 acc_pending_fetch,
533 acc_cmds,
534 } = poll_durations;
535
536 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
538 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
540 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
541 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
542 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
543 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
544 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
545 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
546 }
547}
548
549impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
550 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
552 for res in batch_results {
553 match res {
554 Ok(AddedTransactionOutcome { hash, .. }) => {
555 self.on_good_import(hash);
556 }
557 Err(err) => {
558 self.on_bad_import(err);
559 }
560 }
561 }
562 }
563
564 fn on_new_pooled_transaction_hashes(
566 &mut self,
567 peer_id: PeerId,
568 msg: NewPooledTransactionHashes,
569 ) {
570 if self.network.is_initially_syncing() {
572 return
573 }
574 if self.network.tx_gossip_disabled() {
575 return
576 }
577
578 let Some(peer) = self.peers.get_mut(&peer_id) else {
580 trace!(
581 peer_id = format!("{peer_id:#}"),
582 ?msg,
583 "discarding announcement from inactive peer"
584 );
585
586 return
587 };
588 let client = peer.client_version.clone();
589
590 let mut count_txns_already_seen_by_peer = 0;
592 for tx in msg.iter_hashes().copied() {
593 if !peer.seen_transactions.insert(tx) {
594 count_txns_already_seen_by_peer += 1;
595 }
596 }
597 if count_txns_already_seen_by_peer > 0 {
598 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
603 self.metrics
604 .occurrences_hash_already_seen_by_peer
605 .increment(count_txns_already_seen_by_peer);
606
607 trace!(target: "net::tx",
608 %count_txns_already_seen_by_peer,
609 peer_id=format!("{peer_id:#}"),
610 ?client,
611 "Peer sent hashes that have already been marked as seen by peer"
612 );
613
614 self.report_already_seen(peer_id);
615 }
616
617 if msg.is_empty() {
619 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
620 return;
621 }
622
623 let original_len = msg.len();
624 let mut partially_valid_msg = msg.dedup();
625
626 if partially_valid_msg.len() != original_len {
627 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
628 }
629
630 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
632
633 let hashes_count_pre_pool_filter = partially_valid_msg.len();
641 self.pool.retain_unknown(&mut partially_valid_msg);
642 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
643 let already_known_hashes_count =
644 hashes_count_pre_pool_filter - partially_valid_msg.len();
645 self.metrics
646 .occurrences_hashes_already_in_pool
647 .increment(already_known_hashes_count as u64);
648 }
649
650 if partially_valid_msg.is_empty() {
651 return
653 }
654
655 let mut should_report_peer = false;
660 let mut tx_types_counter = TxTypesCounter::default();
661
662 let is_eth68_message = partially_valid_msg
663 .msg_version()
664 .expect("partially valid announcement should have a version")
665 .is_eth68();
666
667 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
668 let (ty_byte, size_val) = match *metadata_ref_mut {
669 Some((ty, size)) => {
670 if !is_eth68_message {
671 should_report_peer = true;
672 }
673 (ty, size)
674 }
675 None => {
676 if is_eth68_message {
677 should_report_peer = true;
678 return false;
679 }
680 (0u8, 0)
681 }
682 };
683
684 if is_eth68_message &&
685 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
686 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
687 {
688 tx_types_counter.increase_by_tx_type(parsed_tx_type);
689 }
690
691 let decision = self
692 .policies
693 .announcement_filter()
694 .decide_on_announcement(ty_byte, tx_hash, size_val);
695
696 match decision {
697 AnnouncementAcceptance::Accept => true,
698 AnnouncementAcceptance::Ignore => false,
699 AnnouncementAcceptance::Reject { penalize_peer } => {
700 if penalize_peer {
701 should_report_peer = true;
702 }
703 false
704 }
705 }
706 });
707
708 if is_eth68_message {
709 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
710 }
711
712 if should_report_peer {
713 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
714 }
715
716 let mut valid_announcement_data =
717 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
718
719 if valid_announcement_data.is_empty() {
720 return
722 }
723
724 let bad_imports = &self.bad_imports;
731 self.transaction_fetcher.filter_unseen_and_pending_hashes(
732 &mut valid_announcement_data,
733 |hash| bad_imports.contains(hash),
734 &peer_id,
735 &client,
736 );
737
738 if valid_announcement_data.is_empty() {
739 return
741 }
742
743 trace!(target: "net::tx::propagation",
744 peer_id=format!("{peer_id:#}"),
745 hashes_len=valid_announcement_data.len(),
746 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
747 msg_version=%valid_announcement_data.msg_version(),
748 client_version=%client,
749 "received previously unseen and pending hashes in announcement from peer"
750 );
751
752 if !self.transaction_fetcher.is_idle(&peer_id) {
755 let msg_version = valid_announcement_data.msg_version();
757 let (hashes, _version) = valid_announcement_data.into_request_hashes();
758
759 trace!(target: "net::tx",
760 peer_id=format!("{peer_id:#}"),
761 hashes=?*hashes,
762 %msg_version,
763 %client,
764 "buffering hashes announced by busy peer"
765 );
766
767 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
768
769 return
770 }
771
772 let mut hashes_to_request =
773 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
774 let surplus_hashes =
775 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
776
777 if !surplus_hashes.is_empty() {
778 trace!(target: "net::tx",
779 peer_id=format!("{peer_id:#}"),
780 surplus_hashes=?*surplus_hashes,
781 %client,
782 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
783 );
784
785 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
786 }
787
788 trace!(target: "net::tx",
789 peer_id=format!("{peer_id:#}"),
790 hashes=?*hashes_to_request,
791 %client,
792 "sending hashes in `GetPooledTransactions` request to peer's session"
793 );
794
795 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
799 if let Some(failed_to_request_hashes) =
800 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
801 {
802 let conn_eth_version = peer.version;
803
804 trace!(target: "net::tx",
805 peer_id=format!("{peer_id:#}"),
806 failed_to_request_hashes=?*failed_to_request_hashes,
807 %conn_eth_version,
808 %client,
809 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
810 );
811 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
812 }
813 }
814}
815
816impl<Pool, N> TransactionsManager<Pool, N>
817where
818 Pool: TransactionPool + Unpin + 'static,
819 N: NetworkPrimitives<
820 BroadcastedTransaction: SignedTransaction,
821 PooledTransaction: SignedTransaction,
822 > + Unpin,
823 Pool::Transaction:
824 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
825{
826 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
838 if self.network.is_initially_syncing() {
840 return
841 }
842 if self.network.tx_gossip_disabled() {
843 return
844 }
845
846 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
847
848 self.propagate_all(hashes);
849 }
850
851 fn propagate_full_transactions_to_peer(
855 &mut self,
856 txs: Vec<TxHash>,
857 peer_id: PeerId,
858 propagation_mode: PropagationMode,
859 ) -> Option<PropagatedTransactions> {
860 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
861
862 let peer = self.peers.get_mut(&peer_id)?;
863 let mut propagated = PropagatedTransactions::default();
864
865 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
867
868 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
869
870 if propagation_mode.is_forced() {
871 full_transactions.extend(to_propagate);
873 } else {
874 for tx in to_propagate {
877 if !peer.seen_transactions.contains(tx.tx_hash()) {
878 full_transactions.push(&tx);
880 }
881 }
882 }
883
884 if full_transactions.is_empty() {
885 return None
887 }
888
889 let PropagateTransactions { pooled, full } = full_transactions.build();
890
891 if let Some(new_pooled_hashes) = pooled {
893 for hash in new_pooled_hashes.iter_hashes().copied() {
894 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
895 peer.seen_transactions.insert(hash);
897 }
898
899 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
901 }
902
903 if let Some(new_full_transactions) = full {
905 for tx in &new_full_transactions {
906 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
907 peer.seen_transactions.insert(*tx.tx_hash());
909 }
910
911 self.network.send_transactions(peer_id, new_full_transactions);
913 }
914
915 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
917
918 Some(propagated)
919 }
920
921 fn propagate_hashes_to(
925 &mut self,
926 hashes: Vec<TxHash>,
927 peer_id: PeerId,
928 propagation_mode: PropagationMode,
929 ) {
930 trace!(target: "net::tx", "Start propagating transactions as hashes");
931
932 let propagated = {
935 let Some(peer) = self.peers.get_mut(&peer_id) else {
936 return
938 };
939
940 let to_propagate = self
941 .pool
942 .get_all(hashes)
943 .into_iter()
944 .map(PropagateTransaction::pool_tx)
945 .collect::<Vec<_>>();
946
947 let mut propagated = PropagatedTransactions::default();
948
949 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
951
952 if propagation_mode.is_forced() {
953 hashes.extend(to_propagate)
954 } else {
955 for tx in to_propagate {
956 if !peer.seen_transactions.contains(tx.tx_hash()) {
957 hashes.push(&tx);
959 }
960 }
961 }
962
963 let new_pooled_hashes = hashes.build();
964
965 if new_pooled_hashes.is_empty() {
966 return
968 }
969
970 for hash in new_pooled_hashes.iter_hashes().copied() {
971 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
972 }
973
974 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
975
976 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
978
979 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
981
982 propagated
983 };
984
985 self.pool.on_propagated(propagated);
987 }
988
989 fn propagate_transactions(
996 &mut self,
997 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
998 propagation_mode: PropagationMode,
999 ) -> PropagatedTransactions {
1000 let mut propagated = PropagatedTransactions::default();
1001 if self.network.tx_gossip_disabled() {
1002 return propagated
1003 }
1004
1005 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1007
1008 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1010 if !self.policies.propagation_policy().can_propagate(peer) {
1011 continue
1013 }
1014 let mut builder = if peer_idx > max_num_full {
1016 PropagateTransactionsBuilder::pooled(peer.version)
1017 } else {
1018 PropagateTransactionsBuilder::full(peer.version)
1019 };
1020
1021 if propagation_mode.is_forced() {
1022 builder.extend(to_propagate.iter());
1023 } else {
1024 for tx in &to_propagate {
1028 if !peer.seen_transactions.contains(tx.tx_hash()) {
1031 builder.push(tx);
1032 }
1033 }
1034 }
1035
1036 if builder.is_empty() {
1037 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1038 continue
1039 }
1040
1041 let PropagateTransactions { pooled, full } = builder.build();
1042
1043 if let Some(mut new_pooled_hashes) = pooled {
1045 new_pooled_hashes
1048 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1049
1050 for hash in new_pooled_hashes.iter_hashes().copied() {
1051 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1052 peer.seen_transactions.insert(hash);
1054 }
1055
1056 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1057
1058 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1060 }
1061
1062 if let Some(new_full_transactions) = full {
1064 for tx in &new_full_transactions {
1065 propagated
1066 .0
1067 .entry(*tx.tx_hash())
1068 .or_default()
1069 .push(PropagateKind::Full(*peer_id));
1070 peer.seen_transactions.insert(*tx.tx_hash());
1072 }
1073
1074 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1075
1076 self.network.send_transactions(*peer_id, new_full_transactions);
1078 }
1079 }
1080
1081 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1083
1084 propagated
1085 }
1086
1087 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1092 if self.peers.is_empty() {
1093 return
1095 }
1096 let propagated = self.propagate_transactions(
1097 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1098 PropagationMode::Basic,
1099 );
1100
1101 self.pool.on_propagated(propagated);
1103 }
1104
1105 fn on_get_pooled_transactions(
1107 &mut self,
1108 peer_id: PeerId,
1109 request: GetPooledTransactions,
1110 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1111 ) {
1112 if let Some(peer) = self.peers.get_mut(&peer_id) {
1113 if self.network.tx_gossip_disabled() {
1114 let _ = response.send(Ok(PooledTransactions::default()));
1115 return
1116 }
1117 let transactions = self.pool.get_pooled_transaction_elements(
1118 request.0,
1119 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1120 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1121 ),
1122 );
1123 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1124
1125 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1128
1129 let resp = PooledTransactions(transactions);
1130 let _ = response.send(Ok(resp));
1131 }
1132 }
1133
1134 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1136 match cmd {
1137 TransactionsCommand::PropagateHash(hash) => {
1138 self.on_new_pending_transactions(vec![hash])
1139 }
1140 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1141 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1142 }
1143 TransactionsCommand::GetActivePeers(tx) => {
1144 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1145 tx.send(peers).ok();
1146 }
1147 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1148 if let Some(propagated) =
1149 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1150 {
1151 self.pool.on_propagated(propagated);
1152 }
1153 }
1154 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1155 TransactionsCommand::BroadcastTransactions(txs) => {
1156 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1157 self.pool.on_propagated(propagated);
1158 }
1159 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1160 let mut res = HashMap::with_capacity(peers.len());
1161 for peer_id in peers {
1162 let hashes = self
1163 .peers
1164 .get(&peer_id)
1165 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1166 .unwrap_or_default();
1167 res.insert(peer_id, hashes);
1168 }
1169 tx.send(res).ok();
1170 }
1171 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1172 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1173 peer_request_sender.send(sender).ok();
1174 }
1175 }
1176 }
1177
1178 fn handle_peer_session(
1182 &mut self,
1183 info: SessionInfo,
1184 messages: PeerRequestSender<PeerRequest<N>>,
1185 ) {
1186 let SessionInfo { peer_id, client_version, version, .. } = info;
1187
1188 let peer = PeerMetadata::<N>::new(
1190 messages,
1191 version,
1192 client_version,
1193 self.config.max_transactions_seen_by_peer_history,
1194 info.peer_kind,
1195 );
1196 let peer = match self.peers.entry(peer_id) {
1197 Entry::Occupied(mut entry) => {
1198 entry.insert(peer);
1199 entry.into_mut()
1200 }
1201 Entry::Vacant(entry) => entry.insert(peer),
1202 };
1203
1204 self.policies.propagation_policy_mut().on_session_established(peer);
1205
1206 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1210 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1211 return
1212 }
1213
1214 let pooled_txs = self.pool.pooled_transactions_max(
1216 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1217 );
1218 if pooled_txs.is_empty() {
1219 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1220 return;
1221 }
1222
1223 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1225 for pooled_tx in pooled_txs {
1226 peer.seen_transactions.insert(*pooled_tx.hash());
1227 msg_builder.push_pooled(pooled_tx);
1228 }
1229
1230 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1231 let msg = msg_builder.build();
1232 self.network.send_transactions_hashes(peer_id, msg);
1233 }
1234
1235 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1237 match event_result {
1238 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1239 let peer = self.peers.remove(&peer_id);
1242 if let Some(mut peer) = peer {
1243 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1244 }
1245 self.transaction_fetcher.remove_peer(&peer_id);
1246 }
1247 NetworkEvent::ActivePeerSession { info, messages } => {
1248 self.handle_peer_session(info, messages);
1250 }
1251 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1252 let peer_id = info.peer_id;
1253 let messages = match self.peers.get(&peer_id) {
1255 Some(p) => p.request_tx.clone(),
1256 None => {
1257 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1258 return;
1259 }
1260 };
1261 self.handle_peer_session(info, messages);
1262 }
1263 _ => {}
1264 }
1265 }
1266
1267 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1269 if self.config.ingress_policy.allows_all() {
1270 return true;
1271 }
1272 let Some(peer) = self.peers.get(peer_id) else {
1273 return false;
1274 };
1275 self.config.ingress_policy.allows(peer.peer_kind())
1276 }
1277
1278 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1280 match event {
1281 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1282 if !self.accepts_incoming_from(&peer_id) {
1283 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1284 return;
1285 }
1286 let has_blob_txs = msg.has_eip4844();
1290
1291 let non_blob_txs = msg
1292 .0
1293 .into_iter()
1294 .map(N::PooledTransaction::try_from)
1295 .filter_map(Result::ok)
1296 .collect();
1297
1298 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1299
1300 if has_blob_txs {
1301 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1302 self.report_peer_bad_transactions(peer_id);
1303 }
1304 }
1305 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1306 if !self.accepts_incoming_from(&peer_id) {
1307 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1308 return;
1309 }
1310 self.on_new_pooled_transaction_hashes(peer_id, msg)
1311 }
1312 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1313 self.on_get_pooled_transactions(peer_id, request, response)
1314 }
1315 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1316 let _ = response.send(Some(self.handle()));
1317 }
1318 }
1319 }
1320
1321 fn import_transactions(
1323 &mut self,
1324 peer_id: PeerId,
1325 transactions: PooledTransactions<N::PooledTransaction>,
1326 source: TransactionSource,
1327 ) {
1328 if self.network.is_initially_syncing() {
1330 return
1331 }
1332 if self.network.tx_gossip_disabled() {
1333 return
1334 }
1335
1336 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1337 let mut transactions = transactions.0;
1338
1339 self.transaction_fetcher
1341 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1342
1343 let mut num_already_seen_by_peer = 0;
1348 for tx in &transactions {
1349 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1350 num_already_seen_by_peer += 1;
1351 }
1352 }
1353
1354 let txns_count_pre_pool_filter = transactions.len();
1356 self.pool.retain_unknown(&mut transactions);
1357 if txns_count_pre_pool_filter > transactions.len() {
1358 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1359 self.metrics
1360 .occurrences_transactions_already_in_pool
1361 .increment(already_known_txns_count as u64);
1362 }
1363
1364 let mut has_bad_transactions = false;
1366
1367 let mut new_txs = Vec::with_capacity(transactions.len());
1370 for tx in transactions {
1371 let tx = match tx.try_into_recovered() {
1373 Ok(tx) => tx,
1374 Err(badtx) => {
1375 trace!(target: "net::tx",
1376 peer_id=format!("{peer_id:#}"),
1377 hash=%badtx.tx_hash(),
1378 client_version=%peer.client_version,
1379 "failed ecrecovery for transaction"
1380 );
1381 has_bad_transactions = true;
1382 continue
1383 }
1384 };
1385
1386 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1387 Entry::Occupied(mut entry) => {
1388 entry.get_mut().insert(peer_id);
1390 }
1391 Entry::Vacant(entry) => {
1392 if self.bad_imports.contains(tx.tx_hash()) {
1393 trace!(target: "net::tx",
1394 peer_id=format!("{peer_id:#}"),
1395 hash=%tx.tx_hash(),
1396 client_version=%peer.client_version,
1397 "received a known bad transaction from peer"
1398 );
1399 has_bad_transactions = true;
1400 } else {
1401 let pool_transaction = Pool::Transaction::from_pooled(tx);
1404 new_txs.push(pool_transaction);
1405
1406 entry.insert(HashSet::from([peer_id]));
1407 }
1408 }
1409 }
1410 }
1411 new_txs.shrink_to_fit();
1412
1413 if !new_txs.is_empty() {
1416 let pool = self.pool.clone();
1417 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1419 metric_pending_pool_imports.increment(new_txs.len() as f64);
1420
1421 self.pending_pool_imports_info
1423 .pending_pool_imports
1424 .fetch_add(new_txs.len(), Ordering::Relaxed);
1425 let tx_manager_info_pending_pool_imports =
1426 self.pending_pool_imports_info.pending_pool_imports.clone();
1427
1428 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1429 let import = Box::pin(async move {
1430 let added = new_txs.len();
1431 let res = pool.add_external_transactions(new_txs).await;
1432
1433 metric_pending_pool_imports.decrement(added as f64);
1435 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1437
1438 res
1439 });
1440
1441 self.pool_imports.push(import);
1442 }
1443
1444 if num_already_seen_by_peer > 0 {
1445 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1446 self.metrics
1447 .occurrences_of_transaction_already_seen_by_peer
1448 .increment(num_already_seen_by_peer);
1449 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1450 }
1451
1452 if has_bad_transactions {
1453 self.report_peer_bad_transactions(peer_id)
1455 }
1456
1457 if num_already_seen_by_peer > 0 {
1458 self.report_already_seen(peer_id);
1459 }
1460 }
1461
1462 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1464 match fetch_event {
1465 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1466 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1467 if report_peer {
1468 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1469 }
1470 }
1471 FetchEvent::FetchError { peer_id, error } => {
1472 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1473 self.on_request_error(peer_id, error);
1474 }
1475 FetchEvent::EmptyResponse { peer_id } => {
1476 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1477 }
1478 }
1479 }
1480}
1481
1482impl<
1490 Pool: TransactionPool + Unpin + 'static,
1491 N: NetworkPrimitives<
1492 BroadcastedTransaction: SignedTransaction,
1493 PooledTransaction: SignedTransaction,
1494 > + Unpin,
1495 > Future for TransactionsManager<Pool, N>
1496where
1497 Pool::Transaction:
1498 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1499{
1500 type Output = ();
1501
1502 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1503 let start = Instant::now();
1504 let mut poll_durations = TxManagerPollDurations::default();
1505
1506 let this = self.get_mut();
1507
1508 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1514 poll_durations.acc_network_events,
1515 "net::tx",
1516 "Network events stream",
1517 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1518 this.network_events.poll_next_unpin(cx),
1519 |event| this.on_network_event(event)
1520 );
1521
1522 let mut new_txs = Vec::new();
1531 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1532 cx,
1533 &mut new_txs,
1534 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1535 ) {
1536 Poll::Ready(count) => {
1537 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1538 true
1541 } else {
1542 let limit =
1546 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1547 new_txs.len();
1548 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1549 }
1550 }
1551 Poll::Pending => false,
1552 };
1553 if !new_txs.is_empty() {
1554 this.on_new_pending_transactions(new_txs);
1555 }
1556
1557 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1572 poll_durations.acc_tx_events,
1573 "net::tx",
1574 "Network transaction events stream",
1575 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1576 this.transaction_events.poll_next_unpin(cx),
1577 |event| this.on_network_tx_event(event),
1578 );
1579
1580 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1591 poll_durations.acc_fetch_events,
1592 "net::tx",
1593 "Transaction fetch events stream",
1594 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1595 this.transaction_fetcher.poll_next_unpin(cx),
1596 |event| this.on_fetch_event(event),
1597 );
1598
1599 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1614 poll_durations.acc_pending_imports,
1615 "net::tx",
1616 "Batched pool imports stream",
1617 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1618 this.pool_imports.poll_next_unpin(cx),
1619 |batch_results| this.on_batch_import_result(batch_results)
1620 );
1621
1622 duration_metered_exec!(
1627 {
1628 if this.has_capacity_for_fetching_pending_hashes() {
1629 this.on_fetch_hashes_pending_fetch();
1630 maybe_more_tx_fetch_events = true;
1631 }
1632 },
1633 poll_durations.acc_pending_fetch
1634 );
1635
1636 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1638 poll_durations.acc_cmds,
1639 "net::tx",
1640 "Commands channel",
1641 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1642 this.command_rx.poll_next_unpin(cx),
1643 |cmd| this.on_command(cmd)
1644 );
1645
1646 this.transaction_fetcher.update_metrics();
1647
1648 if maybe_more_network_events ||
1650 maybe_more_commands ||
1651 maybe_more_tx_events ||
1652 maybe_more_tx_fetch_events ||
1653 maybe_more_pool_imports ||
1654 maybe_more_pending_txns
1655 {
1656 cx.waker().wake_by_ref();
1658 return Poll::Pending
1659 }
1660
1661 this.update_poll_metrics(start, poll_durations);
1662
1663 Poll::Pending
1664 }
1665}
1666
1667#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1671enum PropagationMode {
1672 Basic,
1676 Forced,
1681}
1682
1683impl PropagationMode {
1684 const fn is_forced(self) -> bool {
1686 matches!(self, Self::Forced)
1687 }
1688}
1689
1690#[derive(Debug, Clone)]
1692struct PropagateTransaction<T = TransactionSigned> {
1693 size: usize,
1694 transaction: Arc<T>,
1695}
1696
1697impl<T: SignedTransaction> PropagateTransaction<T> {
1698 pub fn new(transaction: T) -> Self {
1700 let size = transaction.length();
1701 Self { size, transaction: Arc::new(transaction) }
1702 }
1703
1704 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1706 where
1707 P: PoolTransaction<Consensus = T>,
1708 {
1709 let size = tx.encoded_length();
1710 let transaction = tx.transaction.clone_into_consensus();
1711 let transaction = Arc::new(transaction.into_inner());
1712 Self { size, transaction }
1713 }
1714
1715 fn tx_hash(&self) -> &TxHash {
1716 self.transaction.tx_hash()
1717 }
1718}
1719
1720#[derive(Debug, Clone)]
1723enum PropagateTransactionsBuilder<T> {
1724 Pooled(PooledTransactionsHashesBuilder),
1725 Full(FullTransactionsBuilder<T>),
1726}
1727
1728impl<T> PropagateTransactionsBuilder<T> {
1729 fn pooled(version: EthVersion) -> Self {
1731 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1732 }
1733
1734 fn full(version: EthVersion) -> Self {
1736 Self::Full(FullTransactionsBuilder::new(version))
1737 }
1738
1739 fn is_empty(&self) -> bool {
1741 match self {
1742 Self::Pooled(builder) => builder.is_empty(),
1743 Self::Full(builder) => builder.is_empty(),
1744 }
1745 }
1746
1747 fn build(self) -> PropagateTransactions<T> {
1749 match self {
1750 Self::Pooled(pooled) => {
1751 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1752 }
1753 Self::Full(full) => full.build(),
1754 }
1755 }
1756}
1757
1758impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1759 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1761 for tx in txs {
1762 self.push(tx);
1763 }
1764 }
1765
1766 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1768 match self {
1769 Self::Pooled(builder) => builder.push(transaction),
1770 Self::Full(builder) => builder.push(transaction),
1771 }
1772 }
1773}
1774
1775struct PropagateTransactions<T> {
1777 pooled: Option<NewPooledTransactionHashes>,
1779 full: Option<Vec<Arc<T>>>,
1781}
1782
1783#[derive(Debug, Clone)]
1788struct FullTransactionsBuilder<T> {
1789 total_size: usize,
1791 transactions: Vec<Arc<T>>,
1793 pooled: PooledTransactionsHashesBuilder,
1795}
1796
1797impl<T> FullTransactionsBuilder<T> {
1798 fn new(version: EthVersion) -> Self {
1800 Self {
1801 total_size: 0,
1802 pooled: PooledTransactionsHashesBuilder::new(version),
1803 transactions: vec![],
1804 }
1805 }
1806
1807 fn is_empty(&self) -> bool {
1809 self.transactions.is_empty() && self.pooled.is_empty()
1810 }
1811
1812 fn build(self) -> PropagateTransactions<T> {
1814 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1815 let full = Some(self.transactions).filter(|full| !full.is_empty());
1816 PropagateTransactions { pooled, full }
1817 }
1818}
1819
1820impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1821 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1823 for tx in txs {
1824 self.push(&tx)
1825 }
1826 }
1827
1828 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1838 if !transaction.transaction.is_broadcastable_in_full() {
1847 self.pooled.push(transaction);
1848 return
1849 }
1850
1851 let new_size = self.total_size + transaction.size;
1852 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1853 self.total_size > 0
1854 {
1855 self.pooled.push(transaction);
1857 return
1858 }
1859
1860 self.total_size = new_size;
1861 self.transactions.push(Arc::clone(&transaction.transaction));
1862 }
1863}
1864
1865#[derive(Debug, Clone)]
1868enum PooledTransactionsHashesBuilder {
1869 Eth66(NewPooledTransactionHashes66),
1870 Eth68(NewPooledTransactionHashes68),
1871}
1872
1873impl PooledTransactionsHashesBuilder {
1876 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1878 match self {
1879 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1880 Self::Eth68(msg) => {
1881 msg.hashes.push(*pooled_tx.hash());
1882 msg.sizes.push(pooled_tx.encoded_length());
1883 msg.types.push(pooled_tx.transaction.ty());
1884 }
1885 }
1886 }
1887
1888 fn is_empty(&self) -> bool {
1890 match self {
1891 Self::Eth66(hashes) => hashes.is_empty(),
1892 Self::Eth68(hashes) => hashes.is_empty(),
1893 }
1894 }
1895
1896 fn extend<T: SignedTransaction>(
1898 &mut self,
1899 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1900 ) {
1901 for tx in txs {
1902 self.push(&tx);
1903 }
1904 }
1905
1906 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1907 match self {
1908 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1909 Self::Eth68(msg) => {
1910 msg.hashes.push(*tx.tx_hash());
1911 msg.sizes.push(tx.size);
1912 msg.types.push(tx.transaction.ty());
1913 }
1914 }
1915 }
1916
1917 fn new(version: EthVersion) -> Self {
1919 match version {
1920 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1921 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1922 }
1923 }
1924
1925 fn build(self) -> NewPooledTransactionHashes {
1926 match self {
1927 Self::Eth66(msg) => msg.into(),
1928 Self::Eth68(msg) => msg.into(),
1929 }
1930 }
1931}
1932
1933enum TransactionSource {
1935 Broadcast,
1937 Response,
1939}
1940
1941impl TransactionSource {
1944 const fn is_broadcast(&self) -> bool {
1946 matches!(self, Self::Broadcast)
1947 }
1948}
1949
1950#[derive(Debug)]
1952pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1953 seen_transactions: LruCache<TxHash>,
1957 request_tx: PeerRequestSender<PeerRequest<N>>,
1959 version: EthVersion,
1961 client_version: Arc<str>,
1963 peer_kind: PeerKind,
1965}
1966
1967impl<N: NetworkPrimitives> PeerMetadata<N> {
1968 pub fn new(
1970 request_tx: PeerRequestSender<PeerRequest<N>>,
1971 version: EthVersion,
1972 client_version: Arc<str>,
1973 max_transactions_seen_by_peer: u32,
1974 peer_kind: PeerKind,
1975 ) -> Self {
1976 Self {
1977 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1978 request_tx,
1979 version,
1980 client_version,
1981 peer_kind,
1982 }
1983 }
1984
1985 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1987 &self.request_tx
1988 }
1989
1990 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1992 &mut self.seen_transactions
1993 }
1994
1995 pub const fn version(&self) -> EthVersion {
1997 self.version
1998 }
1999
2000 pub fn client_version(&self) -> &str {
2002 &self.client_version
2003 }
2004
2005 pub const fn peer_kind(&self) -> PeerKind {
2007 self.peer_kind
2008 }
2009}
2010
2011#[derive(Debug)]
2013enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2014 PropagateHash(B256),
2016 PropagateHashesTo(Vec<B256>, PeerId),
2018 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2020 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2022 PropagateTransactions(Vec<TxHash>),
2024 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2026 GetTransactionHashes {
2028 peers: Vec<PeerId>,
2029 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2030 },
2031 GetPeerSender {
2033 peer_id: PeerId,
2034 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2035 },
2036}
2037
2038#[derive(Debug)]
2040pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2041 IncomingTransactions {
2045 peer_id: PeerId,
2047 msg: Transactions<N::BroadcastedTransaction>,
2049 },
2050 IncomingPooledTransactionHashes {
2052 peer_id: PeerId,
2054 msg: NewPooledTransactionHashes,
2056 },
2057 GetPooledTransactions {
2059 peer_id: PeerId,
2061 request: GetPooledTransactions,
2063 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2065 },
2066 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2068}
2069
2070#[derive(Debug)]
2072pub struct PendingPoolImportsInfo {
2073 pending_pool_imports: Arc<AtomicUsize>,
2075 max_pending_pool_imports: usize,
2077}
2078
2079impl PendingPoolImportsInfo {
2080 pub fn new(max_pending_pool_imports: usize) -> Self {
2082 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2083 }
2084
2085 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2087 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2088 }
2089}
2090
2091impl Default for PendingPoolImportsInfo {
2092 fn default() -> Self {
2093 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2094 }
2095}
2096
2097#[derive(Debug, Default)]
2098struct TxManagerPollDurations {
2099 acc_network_events: Duration,
2100 acc_pending_imports: Duration,
2101 acc_tx_events: Duration,
2102 acc_imported_txns: Duration,
2103 acc_fetch_events: Duration,
2104 acc_pending_fetch: Duration,
2105 acc_cmds: Duration,
2106}
2107
2108#[cfg(test)]
2109mod tests {
2110 use super::*;
2111 use crate::{
2112 test_utils::{
2113 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2114 Testnet,
2115 },
2116 transactions::config::RelaxedEthAnnouncementFilter,
2117 NetworkConfigBuilder, NetworkManager,
2118 };
2119 use alloy_consensus::{TxEip1559, TxLegacy};
2120 use alloy_primitives::{hex, Signature, TxKind, U256};
2121 use alloy_rlp::Decodable;
2122 use futures::FutureExt;
2123 use reth_chainspec::MIN_TRANSACTION_GAS;
2124 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2125 use reth_network_api::{NetworkInfo, PeerKind};
2126 use reth_network_p2p::{
2127 error::{RequestError, RequestResult},
2128 sync::{NetworkSyncUpdater, SyncState},
2129 };
2130 use reth_storage_api::noop::NoopProvider;
2131 use reth_transaction_pool::test_utils::{
2132 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2133 };
2134 use secp256k1::SecretKey;
2135 use std::{
2136 future::poll_fn,
2137 net::{IpAddr, Ipv4Addr, SocketAddr},
2138 str::FromStr,
2139 };
2140 use tracing::error;
2141
2142 #[tokio::test(flavor = "multi_thread")]
2143 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2144 reth_tracing::init_test_tracing();
2145 let net = Testnet::create(3).await;
2146
2147 let mut handles = net.handles();
2148 let handle0 = handles.next().unwrap();
2149 let handle1 = handles.next().unwrap();
2150
2151 drop(handles);
2152 let handle = net.spawn();
2153
2154 let listener0 = handle0.event_listener();
2155 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2156 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2157
2158 let client = NoopProvider::default();
2159 let pool = testing_pool();
2160 let config = NetworkConfigBuilder::eth(secret_key)
2161 .disable_discovery()
2162 .listener_port(0)
2163 .build(client);
2164 let transactions_manager_config = config.transactions_manager_config.clone();
2165 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2166 .await
2167 .unwrap()
2168 .into_builder()
2169 .transactions(pool.clone(), transactions_manager_config)
2170 .split_with_handle();
2171
2172 tokio::task::spawn(network);
2173
2174 network_handle.update_sync_state(SyncState::Syncing);
2176 assert!(NetworkInfo::is_syncing(&network_handle));
2177 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2178
2179 let mut established = listener0.take(2);
2181 while let Some(ev) = established.next().await {
2182 match ev {
2183 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2184 transactions
2186 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2187 }
2188 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2189 ev => {
2190 error!("unexpected event {ev:?}")
2191 }
2192 }
2193 }
2194 let input = hex!(
2196 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2197 );
2198 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2199 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2200 peer_id: *handle1.peer_id(),
2201 msg: Transactions(vec![signed_tx.clone()]),
2202 });
2203 poll_fn(|cx| {
2204 let _ = transactions.poll_unpin(cx);
2205 Poll::Ready(())
2206 })
2207 .await;
2208 assert!(pool.is_empty());
2209 handle.terminate().await;
2210 }
2211
2212 #[tokio::test(flavor = "multi_thread")]
2213 async fn test_tx_broadcasts_through_two_syncs() {
2214 reth_tracing::init_test_tracing();
2215 let net = Testnet::create(3).await;
2216
2217 let mut handles = net.handles();
2218 let handle0 = handles.next().unwrap();
2219 let handle1 = handles.next().unwrap();
2220
2221 drop(handles);
2222 let handle = net.spawn();
2223
2224 let listener0 = handle0.event_listener();
2225 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2226 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2227
2228 let client = NoopProvider::default();
2229 let pool = testing_pool();
2230 let config = NetworkConfigBuilder::new(secret_key)
2231 .disable_discovery()
2232 .listener_port(0)
2233 .build(client);
2234 let transactions_manager_config = config.transactions_manager_config.clone();
2235 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2236 .await
2237 .unwrap()
2238 .into_builder()
2239 .transactions(pool.clone(), transactions_manager_config)
2240 .split_with_handle();
2241
2242 tokio::task::spawn(network);
2243
2244 network_handle.update_sync_state(SyncState::Syncing);
2246 assert!(NetworkInfo::is_syncing(&network_handle));
2247 network_handle.update_sync_state(SyncState::Idle);
2248 assert!(!NetworkInfo::is_syncing(&network_handle));
2249 network_handle.update_sync_state(SyncState::Syncing);
2250 assert!(NetworkInfo::is_syncing(&network_handle));
2251
2252 let mut established = listener0.take(2);
2254 while let Some(ev) = established.next().await {
2255 match ev {
2256 NetworkEvent::ActivePeerSession { .. } |
2257 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2258 transactions.on_network_event(ev);
2260 }
2261 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2262 _ => {
2263 error!("unexpected event {ev:?}")
2264 }
2265 }
2266 }
2267 let input = hex!(
2269 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2270 );
2271 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2272 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2273 peer_id: *handle1.peer_id(),
2274 msg: Transactions(vec![signed_tx.clone()]),
2275 });
2276 poll_fn(|cx| {
2277 let _ = transactions.poll_unpin(cx);
2278 Poll::Ready(())
2279 })
2280 .await;
2281 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2282 assert!(NetworkInfo::is_syncing(&network_handle));
2283 assert!(!pool.is_empty());
2284 handle.terminate().await;
2285 }
2286
2287 #[tokio::test(flavor = "multi_thread")]
2290 async fn test_handle_incoming_transactions_hashes() {
2291 reth_tracing::init_test_tracing();
2292
2293 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2294 let client = NoopProvider::default();
2295
2296 let config = NetworkConfigBuilder::new(secret_key)
2297 .listener_port(0)
2299 .disable_discovery()
2300 .build(client);
2301
2302 let pool = testing_pool();
2303
2304 let transactions_manager_config = config.transactions_manager_config.clone();
2305 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2306 .await
2307 .unwrap()
2308 .into_builder()
2309 .transactions(pool.clone(), transactions_manager_config)
2310 .split_with_handle();
2311
2312 let peer_id_1 = PeerId::new([1; 64]);
2313 let eth_version = EthVersion::Eth66;
2314
2315 let txs = vec![TransactionSigned::new_unhashed(
2316 Transaction::Legacy(TxLegacy {
2317 chain_id: Some(4),
2318 nonce: 15u64,
2319 gas_price: 2200000000,
2320 gas_limit: 34811,
2321 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2322 value: U256::from(1234u64),
2323 input: Default::default(),
2324 }),
2325 Signature::new(
2326 U256::from_str(
2327 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2328 )
2329 .unwrap(),
2330 U256::from_str(
2331 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2332 )
2333 .unwrap(),
2334 true,
2335 ),
2336 )];
2337
2338 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2339
2340 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2341 tx_manager.peers.insert(peer_id_1, peer_1);
2342
2343 assert!(pool.is_empty());
2344
2345 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2346 peer_id: peer_id_1,
2347 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2348 txs_hashes.clone(),
2349 )),
2350 });
2351
2352 let req = to_mock_session_rx
2354 .recv()
2355 .await
2356 .expect("peer_1 session should receive request with buffered hashes");
2357 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2358 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2359
2360 let message: Vec<PooledTransactionVariant> = txs
2361 .into_iter()
2362 .map(|tx| {
2363 PooledTransactionVariant::try_from(tx)
2364 .expect("Failed to convert MockTransaction to PooledTransaction")
2365 })
2366 .collect();
2367
2368 response
2370 .send(Ok(PooledTransactions(message)))
2371 .expect("should send peer_1 response to tx manager");
2372
2373 poll_fn(|cx| {
2375 let _ = tx_manager.poll_unpin(cx);
2376 Poll::Ready(())
2377 })
2378 .await;
2379
2380 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2383 }
2384
2385 #[tokio::test(flavor = "multi_thread")]
2386 async fn test_handle_incoming_transactions() {
2387 reth_tracing::init_test_tracing();
2388 let net = Testnet::create(3).await;
2389
2390 let mut handles = net.handles();
2391 let handle0 = handles.next().unwrap();
2392 let handle1 = handles.next().unwrap();
2393
2394 drop(handles);
2395 let handle = net.spawn();
2396
2397 let listener0 = handle0.event_listener();
2398
2399 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2400 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2401
2402 let client = NoopProvider::default();
2403 let pool = testing_pool();
2404 let config = NetworkConfigBuilder::new(secret_key)
2405 .disable_discovery()
2406 .listener_port(0)
2407 .build(client);
2408 let transactions_manager_config = config.transactions_manager_config.clone();
2409 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2410 .await
2411 .unwrap()
2412 .into_builder()
2413 .transactions(pool.clone(), transactions_manager_config)
2414 .split_with_handle();
2415 tokio::task::spawn(network);
2416
2417 network_handle.update_sync_state(SyncState::Idle);
2418
2419 assert!(!NetworkInfo::is_syncing(&network_handle));
2420
2421 let mut established = listener0.take(2);
2423 while let Some(ev) = established.next().await {
2424 match ev {
2425 NetworkEvent::ActivePeerSession { .. } |
2426 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2427 transactions.on_network_event(ev);
2429 }
2430 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2431 ev => {
2432 error!("unexpected event {ev:?}")
2433 }
2434 }
2435 }
2436 let input = hex!(
2438 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2439 );
2440 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2441 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2442 peer_id: *handle1.peer_id(),
2443 msg: Transactions(vec![signed_tx.clone()]),
2444 });
2445 assert!(transactions
2446 .transactions_by_peers
2447 .get(signed_tx.tx_hash())
2448 .unwrap()
2449 .contains(handle1.peer_id()));
2450
2451 poll_fn(|cx| {
2453 let _ = transactions.poll_unpin(cx);
2454 Poll::Ready(())
2455 })
2456 .await;
2457
2458 assert!(!pool.is_empty());
2459 assert!(pool.get(signed_tx.tx_hash()).is_some());
2460 handle.terminate().await;
2461 }
2462
2463 #[tokio::test(flavor = "multi_thread")]
2464 async fn test_on_get_pooled_transactions_network() {
2465 reth_tracing::init_test_tracing();
2466 let net = Testnet::create(2).await;
2467
2468 let mut handles = net.handles();
2469 let handle0 = handles.next().unwrap();
2470 let handle1 = handles.next().unwrap();
2471
2472 drop(handles);
2473 let handle = net.spawn();
2474
2475 let listener0 = handle0.event_listener();
2476
2477 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2478 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2479
2480 let client = NoopProvider::default();
2481 let pool = testing_pool();
2482 let config = NetworkConfigBuilder::new(secret_key)
2483 .disable_discovery()
2484 .listener_port(0)
2485 .build(client);
2486 let transactions_manager_config = config.transactions_manager_config.clone();
2487 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2488 .await
2489 .unwrap()
2490 .into_builder()
2491 .transactions(pool.clone(), transactions_manager_config)
2492 .split_with_handle();
2493 tokio::task::spawn(network);
2494
2495 network_handle.update_sync_state(SyncState::Idle);
2496
2497 assert!(!NetworkInfo::is_syncing(&network_handle));
2498
2499 let mut established = listener0.take(2);
2501 while let Some(ev) = established.next().await {
2502 match ev {
2503 NetworkEvent::ActivePeerSession { .. } |
2504 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2505 transactions.on_network_event(ev);
2506 }
2507 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2508 ev => {
2509 error!("unexpected event {ev:?}")
2510 }
2511 }
2512 }
2513 handle.terminate().await;
2514
2515 let tx = MockTransaction::eip1559();
2516 let _ = transactions
2517 .pool
2518 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2519 .await;
2520
2521 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2522
2523 let (send, receive) =
2524 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2525
2526 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2527 peer_id: *handle1.peer_id(),
2528 request,
2529 response: send,
2530 });
2531
2532 match receive.await.unwrap() {
2533 Ok(PooledTransactions(transactions)) => {
2534 assert_eq!(transactions.len(), 1);
2535 }
2536 Err(e) => {
2537 panic!("error: {e:?}");
2538 }
2539 }
2540 }
2541
2542 #[tokio::test]
2546 async fn test_partially_tx_response() {
2547 reth_tracing::init_test_tracing();
2548
2549 let mut tx_manager = new_tx_manager().await.0;
2550 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2551
2552 let peer_id_1 = PeerId::new([1; 64]);
2553 let eth_version = EthVersion::Eth66;
2554
2555 let txs = vec![
2556 TransactionSigned::new_unhashed(
2557 Transaction::Legacy(TxLegacy {
2558 chain_id: Some(4),
2559 nonce: 15u64,
2560 gas_price: 2200000000,
2561 gas_limit: 34811,
2562 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2563 value: U256::from(1234u64),
2564 input: Default::default(),
2565 }),
2566 Signature::new(
2567 U256::from_str(
2568 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2569 )
2570 .unwrap(),
2571 U256::from_str(
2572 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2573 )
2574 .unwrap(),
2575 true,
2576 ),
2577 ),
2578 TransactionSigned::new_unhashed(
2579 Transaction::Eip1559(TxEip1559 {
2580 chain_id: 4,
2581 nonce: 26u64,
2582 max_priority_fee_per_gas: 1500000000,
2583 max_fee_per_gas: 1500000013,
2584 gas_limit: MIN_TRANSACTION_GAS,
2585 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2586 value: U256::from(3000000000000000000u64),
2587 input: Default::default(),
2588 access_list: Default::default(),
2589 }),
2590 Signature::new(
2591 U256::from_str(
2592 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2593 )
2594 .unwrap(),
2595 U256::from_str(
2596 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2597 )
2598 .unwrap(),
2599 true,
2600 ),
2601 ),
2602 ];
2603
2604 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2605
2606 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2607 peer_1.seen_transactions.insert(txs_hashes[0]);
2610 peer_1.seen_transactions.insert(txs_hashes[1]);
2611 tx_manager.peers.insert(peer_id_1, peer_1);
2612
2613 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2614 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2615
2616 assert!(tx_fetcher.is_idle(&peer_id_1));
2618 assert_eq!(tx_fetcher.active_peers.len(), 0);
2619
2620 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2622
2623 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2624 assert!(!tx_fetcher.is_idle(&peer_id_1));
2626 assert_eq!(tx_fetcher.active_peers.len(), 1);
2627
2628 let req = to_mock_session_rx
2630 .recv()
2631 .await
2632 .expect("peer_1 session should receive request with buffered hashes");
2633 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2634
2635 let message: Vec<PooledTransactionVariant> = txs
2636 .into_iter()
2637 .take(1)
2638 .map(|tx| {
2639 PooledTransactionVariant::try_from(tx)
2640 .expect("Failed to convert MockTransaction to PooledTransaction")
2641 })
2642 .collect();
2643 response
2645 .send(Ok(PooledTransactions(message)))
2646 .expect("should send peer_1 response to tx manager");
2647 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2648 unreachable!()
2649 };
2650
2651 assert!(tx_fetcher.is_idle(&peer_id));
2653 assert_eq!(tx_fetcher.active_peers.len(), 0);
2654 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2656 }
2657
2658 #[tokio::test]
2659 async fn test_max_retries_tx_request() {
2660 reth_tracing::init_test_tracing();
2661
2662 let mut tx_manager = new_tx_manager().await.0;
2663 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2664
2665 let peer_id_1 = PeerId::new([1; 64]);
2666 let peer_id_2 = PeerId::new([2; 64]);
2667 let eth_version = EthVersion::Eth66;
2668 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2669
2670 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2671 peer_1.seen_transactions.insert(seen_hashes[0]);
2674 peer_1.seen_transactions.insert(seen_hashes[1]);
2675 tx_manager.peers.insert(peer_id_1, peer_1);
2676
2677 let retries = 1;
2680 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2681 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2682
2683 assert!(tx_fetcher.is_idle(&peer_id_1));
2685 assert_eq!(tx_fetcher.active_peers.len(), 0);
2686
2687 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2689
2690 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2691
2692 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2693 assert!(!tx_fetcher.is_idle(&peer_id_1));
2695 assert_eq!(tx_fetcher.active_peers.len(), 1);
2696
2697 let req = to_mock_session_rx
2699 .recv()
2700 .await
2701 .expect("peer_1 session should receive request with buffered hashes");
2702 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2703 let GetPooledTransactions(hashes) = request;
2704
2705 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2706
2707 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2708
2709 response
2711 .send(Err(RequestError::BadResponse))
2712 .expect("should send peer_1 response to tx manager");
2713 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2714 unreachable!()
2715 };
2716
2717 assert!(tx_fetcher.is_idle(&peer_id));
2719 assert_eq!(tx_fetcher.active_peers.len(), 0);
2720 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2722
2723 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2724 tx_manager.peers.insert(peer_id_2, peer_2);
2725
2726 let msg =
2728 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2729 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2730
2731 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2732
2733 assert_eq!(tx_fetcher.active_peers.len(), 1);
2735
2736 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2738 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2740
2741 let req = to_mock_session_rx
2743 .recv()
2744 .await
2745 .expect("peer_2 session should receive request with buffered hashes");
2746 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2747
2748 response
2750 .send(Err(RequestError::BadResponse))
2751 .expect("should send peer_2 response to tx manager");
2752 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2753
2754 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2757 assert_eq!(tx_fetcher.active_peers.len(), 0);
2758 }
2759
2760 #[test]
2761 fn test_transaction_builder_empty() {
2762 let mut builder =
2763 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2764 assert!(builder.is_empty());
2765
2766 let mut factory = MockTransactionFactory::default();
2767 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2768 builder.push(&tx);
2769 assert!(!builder.is_empty());
2770
2771 let txs = builder.build();
2772 assert!(txs.full.is_none());
2773 let txs = txs.pooled.unwrap();
2774 assert_eq!(txs.len(), 1);
2775 }
2776
2777 #[test]
2778 fn test_transaction_builder_large() {
2779 let mut builder =
2780 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2781 assert!(builder.is_empty());
2782
2783 let mut factory = MockTransactionFactory::default();
2784 let mut tx = factory.create_eip1559();
2785 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2787 let tx = Arc::new(tx);
2788 let tx = PropagateTransaction::pool_tx(tx);
2789 builder.push(&tx);
2790 assert!(!builder.is_empty());
2791
2792 let txs = builder.clone().build();
2793 assert!(txs.pooled.is_none());
2794 let txs = txs.full.unwrap();
2795 assert_eq!(txs.len(), 1);
2796
2797 builder.push(&tx);
2798
2799 let txs = builder.clone().build();
2800 let pooled = txs.pooled.unwrap();
2801 assert_eq!(pooled.len(), 1);
2802 let txs = txs.full.unwrap();
2803 assert_eq!(txs.len(), 1);
2804 }
2805
2806 #[test]
2807 fn test_transaction_builder_eip4844() {
2808 let mut builder =
2809 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2810 assert!(builder.is_empty());
2811
2812 let mut factory = MockTransactionFactory::default();
2813 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2814 builder.push(&tx);
2815 assert!(!builder.is_empty());
2816
2817 let txs = builder.clone().build();
2818 assert!(txs.full.is_none());
2819 let txs = txs.pooled.unwrap();
2820 assert_eq!(txs.len(), 1);
2821
2822 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2823 builder.push(&tx);
2824
2825 let txs = builder.clone().build();
2826 let pooled = txs.pooled.unwrap();
2827 assert_eq!(pooled.len(), 1);
2828 let txs = txs.full.unwrap();
2829 assert_eq!(txs.len(), 1);
2830 }
2831
2832 #[tokio::test]
2833 async fn test_propagate_full() {
2834 reth_tracing::init_test_tracing();
2835
2836 let (mut tx_manager, network) = new_tx_manager().await;
2837 let peer_id = PeerId::random();
2838
2839 network.handle().update_sync_state(SyncState::Idle);
2841
2842 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2844
2845 let session_info = SessionInfo {
2846 peer_id,
2847 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2848 client_version: Arc::from(""),
2849 capabilities: Arc::new(vec![].into()),
2850 status: Arc::new(Default::default()),
2851 version: EthVersion::Eth68,
2852 peer_kind: PeerKind::Basic,
2853 };
2854 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2855 tx_manager
2856 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2857 let mut propagate = vec![];
2858 let mut factory = MockTransactionFactory::default();
2859 let eip1559_tx = Arc::new(factory.create_eip1559());
2860 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2861 let eip4844_tx = Arc::new(factory.create_eip4844());
2862 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2863
2864 let propagated =
2865 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2866 assert_eq!(propagated.0.len(), 2);
2867 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2868 assert_eq!(prop_txs.len(), 1);
2869 assert!(prop_txs[0].is_full());
2870
2871 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2872 assert_eq!(prop_txs.len(), 1);
2873 assert!(prop_txs[0].is_hash());
2874
2875 let peer = tx_manager.peers.get(&peer_id).unwrap();
2876 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2877 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2878 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2879
2880 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2882 assert!(propagated.0.is_empty());
2883 }
2884
2885 #[tokio::test]
2886 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2887 reth_tracing::init_test_tracing();
2888
2889 let transactions_manager_config = TransactionsManagerConfig::default();
2890
2891 let propagation_policy = TransactionPropagationKind::default();
2892 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2893
2894 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2895
2896 let pool = testing_pool();
2897 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2898 let client = NoopProvider::default();
2899
2900 let network_config = NetworkConfigBuilder::new(secret_key)
2901 .listener_port(0)
2902 .disable_discovery()
2903 .build(client.clone());
2904
2905 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2906 let (to_tx_manager_tx, from_network_rx) =
2907 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2908 network_manager.set_transactions(to_tx_manager_tx);
2909 let network_handle = network_manager.handle().clone();
2910 let network_service_handle = tokio::spawn(network_manager);
2911
2912 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2913 network_handle.clone(),
2914 pool.clone(),
2915 from_network_rx,
2916 transactions_manager_config,
2917 policy_bundle,
2918 );
2919
2920 let peer_id = PeerId::random();
2921 let eth_version = EthVersion::Eth68;
2922 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2923 tx_manager.peers.insert(peer_id, mock_peer_metadata);
2924
2925 let mut tx_factory = MockTransactionFactory::default();
2926
2927 let valid_known_tx = tx_factory.create_eip1559();
2928 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2929
2930 let known_tx_hash = *known_tx_signed.hash();
2931 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2932 let known_tx_size = known_tx_signed.encoded_length();
2933
2934 let unknown_tx_hash = B256::random();
2935 let unknown_tx_type_byte = 0xff_u8;
2936 let unknown_tx_size = 150;
2937
2938 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2939 types: vec![known_tx_type_byte, unknown_tx_type_byte],
2940 sizes: vec![known_tx_size, unknown_tx_size],
2941 hashes: vec![known_tx_hash, unknown_tx_hash],
2942 });
2943
2944 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2945
2946 poll_fn(|cx| {
2947 let _ = tx_manager.poll_unpin(cx);
2948 Poll::Ready(())
2949 })
2950 .await;
2951
2952 let mut requested_hashes_in_getpooled = HashSet::new();
2953 let mut unexpected_request_received = false;
2954
2955 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2956 .await
2957 {
2958 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2959 let GetPooledTransactions(hashes) = request;
2960 for hash in hashes {
2961 requested_hashes_in_getpooled.insert(hash);
2962 }
2963 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2964 }
2965 Ok(Some(other_request)) => {
2966 tracing::error!(?other_request, "Received unexpected PeerRequest type");
2967 unexpected_request_received = true;
2968 }
2969 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2970 Err(_timeout_err) => {
2971 tracing::info!("Timeout: No GetPooledTransactions request received.")
2972 }
2973 }
2974
2975 assert!(
2976 requested_hashes_in_getpooled.contains(&known_tx_hash),
2977 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2978 );
2979 assert!(
2980 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2981 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2982 );
2983 assert!(
2984 !unexpected_request_received,
2985 "An unexpected P2P request was received by the mock peer."
2986 );
2987
2988 network_service_handle.abort();
2989 }
2990}