1use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6pub mod config;
8pub mod constants;
10pub mod fetcher;
12pub mod policy;
14
15pub use self::constants::{
16 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30 budget::{
31 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33 },
34 cache::LruCache,
35 duration_metered_exec, metered_poll_nested_stream_with_budget,
36 metrics::{
37 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38 },
39 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40 NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49 RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54 events::{PeerEvent, SessionInfo},
55 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58 error::{RequestError, RequestResult},
59 sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66 error::{PoolError, PoolResult},
67 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71 collections::{hash_map::Entry, HashMap, HashSet},
72 pin::Pin,
73 sync::{
74 atomic::{AtomicUsize, Ordering},
75 Arc,
76 },
77 task::{Context, Poll},
78 time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84pub type PoolImportFuture =
88 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104 fn send(&self, cmd: TransactionsCommand<N>) {
105 let _ = self.manager_tx.send(cmd);
106 }
107
108 async fn peer_handle(
110 &self,
111 peer_id: PeerId,
112 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113 let (tx, rx) = oneshot::channel();
114 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115 rx.await
116 }
117
118 pub fn propagate(&self, hash: TxHash) {
120 self.send(TransactionsCommand::PropagateHash(hash))
121 }
122
123 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127 self.propagate_hashes_to(Some(hash), peer)
128 }
129
130 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134 let hashes = hash.into_iter().collect::<Vec<_>>();
135 if hashes.is_empty() {
136 return
137 }
138 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139 }
140
141 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143 let (tx, rx) = oneshot::channel();
144 self.send(TransactionsCommand::GetActivePeers(tx));
145 rx.await
146 }
147
148 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152 if transactions.is_empty() {
153 return
154 }
155 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156 }
157
158 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163 if transactions.is_empty() {
164 return
165 }
166 self.send(TransactionsCommand::PropagateTransactions(transactions))
167 }
168
169 pub fn broadcast_transactions(
174 &self,
175 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176 ) {
177 let transactions =
178 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179 if transactions.is_empty() {
180 return
181 }
182 self.send(TransactionsCommand::BroadcastTransactions(transactions))
183 }
184
185 pub async fn get_transaction_hashes(
187 &self,
188 peers: Vec<PeerId>,
189 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190 if peers.is_empty() {
191 return Ok(Default::default())
192 }
193 let (tx, rx) = oneshot::channel();
194 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195 rx.await
196 }
197
198 pub async fn get_peer_transaction_hashes(
200 &self,
201 peer: PeerId,
202 ) -> Result<HashSet<TxHash>, RecvError> {
203 let res = self.get_transaction_hashes(vec![peer]).await?;
204 Ok(res.into_values().next().unwrap_or_default())
205 }
206
207 pub async fn get_pooled_transactions_from(
213 &self,
214 peer_id: PeerId,
215 hashes: Vec<B256>,
216 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219 let (tx, rx) = oneshot::channel();
220 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221 peer.try_send(request).ok();
222
223 rx.await?.map(|res| Some(res.0))
224 }
225}
226
227#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284 pool: Pool,
286 network: NetworkHandle<N>,
288 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292 transaction_fetcher: TransactionFetcher<N>,
294 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299 pool_imports: FuturesUnordered<PoolImportFuture>,
311 pending_pool_imports_info: PendingPoolImportsInfo,
313 bad_imports: LruCache<TxHash>,
315 peers: HashMap<PeerId, PeerMetadata<N>>,
317 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326 pending_transactions: mpsc::Receiver<TxHash>,
335 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337 config: TransactionsManagerConfig,
339 policies: NetworkPolicies<N>,
341 metrics: TransactionsManagerMetrics,
343 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348 pub fn new(
352 network: NetworkHandle<N>,
353 pool: Pool,
354 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355 transactions_manager_config: TransactionsManagerConfig,
356 ) -> Self {
357 Self::with_policy(
358 network,
359 pool,
360 from_network,
361 transactions_manager_config,
362 NetworkPolicies::new(
363 TransactionPropagationKind::default(),
364 StrictEthAnnouncementFilter::default(),
365 ),
366 )
367 }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371 pub fn with_policy(
375 network: NetworkHandle<N>,
376 pool: Pool,
377 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378 transactions_manager_config: TransactionsManagerConfig,
379 policies: NetworkPolicies<N>,
380 ) -> Self {
381 let network_events = network.event_listener();
382
383 let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386 &transactions_manager_config.transaction_fetcher_config,
387 );
388
389 let pending = pool.pending_transactions_listener();
392 let pending_pool_imports_info = PendingPoolImportsInfo::default();
393 let metrics = TransactionsManagerMetrics::default();
394 metrics
395 .capacity_pending_pool_imports
396 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398 Self {
399 pool,
400 network,
401 network_events,
402 transaction_fetcher,
403 transactions_by_peers: Default::default(),
404 pool_imports: Default::default(),
405 pending_pool_imports_info: PendingPoolImportsInfo::new(
406 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
407 ),
408 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409 peers: Default::default(),
410 command_tx,
411 command_rx: UnboundedReceiverStream::new(command_rx),
412 pending_transactions: pending,
413 transaction_events: UnboundedMeteredReceiver::new(
414 from_network,
415 NETWORK_POOL_TRANSACTIONS_SCOPE,
416 ),
417 config: transactions_manager_config,
418 policies,
419 metrics,
420 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421 }
422 }
423
424 pub fn handle(&self) -> TransactionsHandle<N> {
426 TransactionsHandle { manager_tx: self.command_tx.clone() }
427 }
428
429 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432 self.pending_pool_imports_info
433 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
434 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
435 }
436
437 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
438 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
439 self.metrics.reported_bad_transactions.increment(1);
440 }
441
442 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
443 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
444 self.network.reputation_change(peer_id, kind);
445 }
446
447 fn report_already_seen(&self, peer_id: PeerId) {
448 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
449 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
450 }
451
452 fn on_good_import(&mut self, hash: TxHash) {
454 self.transactions_by_peers.remove(&hash);
455 }
456
457 fn on_bad_import(&mut self, err: PoolError) {
481 let peers = self.transactions_by_peers.remove(&err.hash);
482
483 if !err.is_bad_transaction() || self.network.is_syncing() {
485 return
486 }
487 if let Some(peers) = peers {
490 for peer_id in peers {
491 self.report_peer_bad_transactions(peer_id);
492 }
493 }
494 self.metrics.bad_imports.increment(1);
495 self.bad_imports.insert(err.hash);
496 }
497
498 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
502 let info = &self.pending_pool_imports_info;
504 let max_pending_pool_imports = info.max_pending_pool_imports;
505 let has_capacity_wrt_pending_pool_imports =
506 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
507
508 self.transaction_fetcher
509 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
510 }
511
512 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
513 let kind = match req_err {
514 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
515 RequestError::Timeout => ReputationChangeKind::Timeout,
516 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
517 return
519 }
520 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
521 };
522 self.report_peer(peer_id, kind);
523 }
524
525 #[inline]
526 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
527 let metrics = &self.metrics;
528
529 let TxManagerPollDurations {
530 acc_network_events,
531 acc_pending_imports,
532 acc_tx_events,
533 acc_imported_txns,
534 acc_fetch_events,
535 acc_pending_fetch,
536 acc_cmds,
537 } = poll_durations;
538
539 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
541 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
543 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
544 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
545 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
546 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
547 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
548 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
549 }
550}
551
552impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
553 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
555 for res in batch_results {
556 match res {
557 Ok(AddedTransactionOutcome { hash, .. }) => {
558 self.on_good_import(hash);
559 }
560 Err(err) => {
561 self.on_bad_import(err);
562 }
563 }
564 }
565 }
566
567 fn on_new_pooled_transaction_hashes(
569 &mut self,
570 peer_id: PeerId,
571 msg: NewPooledTransactionHashes,
572 ) {
573 if self.network.is_initially_syncing() {
575 return
576 }
577 if self.network.tx_gossip_disabled() {
578 return
579 }
580
581 let Some(peer) = self.peers.get_mut(&peer_id) else {
583 trace!(
584 peer_id = format!("{peer_id:#}"),
585 ?msg,
586 "discarding announcement from inactive peer"
587 );
588
589 return
590 };
591 let client = peer.client_version.clone();
592
593 let mut count_txns_already_seen_by_peer = 0;
595 for tx in msg.iter_hashes().copied() {
596 if !peer.seen_transactions.insert(tx) {
597 count_txns_already_seen_by_peer += 1;
598 }
599 }
600 if count_txns_already_seen_by_peer > 0 {
601 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
606 self.metrics
607 .occurrences_hash_already_seen_by_peer
608 .increment(count_txns_already_seen_by_peer);
609
610 trace!(target: "net::tx",
611 %count_txns_already_seen_by_peer,
612 peer_id=format!("{peer_id:#}"),
613 ?client,
614 "Peer sent hashes that have already been marked as seen by peer"
615 );
616
617 self.report_already_seen(peer_id);
618 }
619
620 if msg.is_empty() {
622 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
623 return;
624 }
625
626 let original_len = msg.len();
627 let mut partially_valid_msg = msg.dedup();
628
629 if partially_valid_msg.len() != original_len {
630 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
631 }
632
633 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
635
636 let hashes_count_pre_pool_filter = partially_valid_msg.len();
644 self.pool.retain_unknown(&mut partially_valid_msg);
645 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
646 let already_known_hashes_count =
647 hashes_count_pre_pool_filter - partially_valid_msg.len();
648 self.metrics
649 .occurrences_hashes_already_in_pool
650 .increment(already_known_hashes_count as u64);
651 }
652
653 if partially_valid_msg.is_empty() {
654 return
656 }
657
658 let mut should_report_peer = false;
663 let mut tx_types_counter = TxTypesCounter::default();
664
665 let is_eth68_message = partially_valid_msg
666 .msg_version()
667 .expect("partially valid announcement should have a version")
668 .is_eth68();
669
670 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
671 let (ty_byte, size_val) = match *metadata_ref_mut {
672 Some((ty, size)) => {
673 if !is_eth68_message {
674 should_report_peer = true;
675 }
676 (ty, size)
677 }
678 None => {
679 if is_eth68_message {
680 should_report_peer = true;
681 return false;
682 }
683 (0u8, 0)
684 }
685 };
686
687 if is_eth68_message &&
688 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
689 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
690 {
691 tx_types_counter.increase_by_tx_type(parsed_tx_type);
692 }
693
694 let decision = self
695 .policies
696 .announcement_filter()
697 .decide_on_announcement(ty_byte, tx_hash, size_val);
698
699 match decision {
700 AnnouncementAcceptance::Accept => true,
701 AnnouncementAcceptance::Ignore => false,
702 AnnouncementAcceptance::Reject { penalize_peer } => {
703 if penalize_peer {
704 should_report_peer = true;
705 }
706 false
707 }
708 }
709 });
710
711 if is_eth68_message {
712 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
713 }
714
715 if should_report_peer {
716 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
717 }
718
719 let mut valid_announcement_data =
720 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
721
722 if valid_announcement_data.is_empty() {
723 return
725 }
726
727 let bad_imports = &self.bad_imports;
734 self.transaction_fetcher.filter_unseen_and_pending_hashes(
735 &mut valid_announcement_data,
736 |hash| bad_imports.contains(hash),
737 &peer_id,
738 &client,
739 );
740
741 if valid_announcement_data.is_empty() {
742 return
744 }
745
746 trace!(target: "net::tx::propagation",
747 peer_id=format!("{peer_id:#}"),
748 hashes_len=valid_announcement_data.len(),
749 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
750 msg_version=%valid_announcement_data.msg_version(),
751 client_version=%client,
752 "received previously unseen and pending hashes in announcement from peer"
753 );
754
755 if !self.transaction_fetcher.is_idle(&peer_id) {
758 let msg_version = valid_announcement_data.msg_version();
760 let (hashes, _version) = valid_announcement_data.into_request_hashes();
761
762 trace!(target: "net::tx",
763 peer_id=format!("{peer_id:#}"),
764 hashes=?*hashes,
765 %msg_version,
766 %client,
767 "buffering hashes announced by busy peer"
768 );
769
770 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
771
772 return
773 }
774
775 let mut hashes_to_request =
776 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
777 let surplus_hashes =
778 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
779
780 if !surplus_hashes.is_empty() {
781 trace!(target: "net::tx",
782 peer_id=format!("{peer_id:#}"),
783 surplus_hashes=?*surplus_hashes,
784 %client,
785 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
786 );
787
788 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
789 }
790
791 trace!(target: "net::tx",
792 peer_id=format!("{peer_id:#}"),
793 hashes=?*hashes_to_request,
794 %client,
795 "sending hashes in `GetPooledTransactions` request to peer's session"
796 );
797
798 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
802 if let Some(failed_to_request_hashes) =
803 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
804 {
805 let conn_eth_version = peer.version;
806
807 trace!(target: "net::tx",
808 peer_id=format!("{peer_id:#}"),
809 failed_to_request_hashes=?*failed_to_request_hashes,
810 %conn_eth_version,
811 %client,
812 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
813 );
814 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
815 }
816 }
817}
818
819impl<Pool, N> TransactionsManager<Pool, N>
820where
821 Pool: TransactionPool + Unpin + 'static,
822 N: NetworkPrimitives<
823 BroadcastedTransaction: SignedTransaction,
824 PooledTransaction: SignedTransaction,
825 > + Unpin,
826 Pool::Transaction:
827 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
828{
829 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
841 if self.network.is_initially_syncing() {
843 return
844 }
845 if self.network.tx_gossip_disabled() {
846 return
847 }
848
849 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
850
851 self.propagate_all(hashes);
852 }
853
854 fn propagate_full_transactions_to_peer(
858 &mut self,
859 txs: Vec<TxHash>,
860 peer_id: PeerId,
861 propagation_mode: PropagationMode,
862 ) -> Option<PropagatedTransactions> {
863 let peer = self.peers.get_mut(&peer_id)?;
864 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
865 let mut propagated = PropagatedTransactions::default();
866
867 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
869
870 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
871
872 if propagation_mode.is_forced() {
873 full_transactions.extend(to_propagate);
875 } else {
876 for tx in to_propagate {
879 if !peer.seen_transactions.contains(tx.tx_hash()) {
880 full_transactions.push(&tx);
882 }
883 }
884 }
885
886 if full_transactions.is_empty() {
887 return None
889 }
890
891 let PropagateTransactions { pooled, full } = full_transactions.build();
892
893 if let Some(new_pooled_hashes) = pooled {
895 for hash in new_pooled_hashes.iter_hashes().copied() {
896 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
897 peer.seen_transactions.insert(hash);
899 }
900
901 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
903 }
904
905 if let Some(new_full_transactions) = full {
907 for tx in &new_full_transactions {
908 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
909 peer.seen_transactions.insert(*tx.tx_hash());
911 }
912
913 self.network.send_transactions(peer_id, new_full_transactions);
915 }
916
917 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
919
920 Some(propagated)
921 }
922
923 fn propagate_hashes_to(
927 &mut self,
928 hashes: Vec<TxHash>,
929 peer_id: PeerId,
930 propagation_mode: PropagationMode,
931 ) {
932 trace!(target: "net::tx", "Start propagating transactions as hashes");
933
934 let propagated = {
937 let Some(peer) = self.peers.get_mut(&peer_id) else {
938 return
940 };
941
942 let to_propagate = self
943 .pool
944 .get_all(hashes)
945 .into_iter()
946 .map(PropagateTransaction::pool_tx)
947 .collect::<Vec<_>>();
948
949 let mut propagated = PropagatedTransactions::default();
950
951 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
953
954 if propagation_mode.is_forced() {
955 hashes.extend(to_propagate)
956 } else {
957 for tx in to_propagate {
958 if !peer.seen_transactions.contains(tx.tx_hash()) {
959 hashes.push(&tx);
961 }
962 }
963 }
964
965 let new_pooled_hashes = hashes.build();
966
967 if new_pooled_hashes.is_empty() {
968 return
970 }
971
972 for hash in new_pooled_hashes.iter_hashes().copied() {
973 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
974 }
975
976 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
977
978 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
980
981 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
983
984 propagated
985 };
986
987 self.pool.on_propagated(propagated);
989 }
990
991 fn propagate_transactions(
998 &mut self,
999 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1000 propagation_mode: PropagationMode,
1001 ) -> PropagatedTransactions {
1002 let mut propagated = PropagatedTransactions::default();
1003 if self.network.tx_gossip_disabled() {
1004 return propagated
1005 }
1006
1007 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1009
1010 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1012 if !self.policies.propagation_policy().can_propagate(peer) {
1013 continue
1015 }
1016 let mut builder = if peer_idx > max_num_full {
1018 PropagateTransactionsBuilder::pooled(peer.version)
1019 } else {
1020 PropagateTransactionsBuilder::full(peer.version)
1021 };
1022
1023 if propagation_mode.is_forced() {
1024 builder.extend(to_propagate.iter());
1025 } else {
1026 for tx in &to_propagate {
1030 if !peer.seen_transactions.contains(tx.tx_hash()) {
1033 builder.push(tx);
1034 }
1035 }
1036 }
1037
1038 if builder.is_empty() {
1039 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1040 continue
1041 }
1042
1043 let PropagateTransactions { pooled, full } = builder.build();
1044
1045 if let Some(mut new_pooled_hashes) = pooled {
1047 new_pooled_hashes
1050 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1051
1052 for hash in new_pooled_hashes.iter_hashes().copied() {
1053 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1054 peer.seen_transactions.insert(hash);
1056 }
1057
1058 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1059
1060 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1062 }
1063
1064 if let Some(new_full_transactions) = full {
1066 for tx in &new_full_transactions {
1067 propagated
1068 .0
1069 .entry(*tx.tx_hash())
1070 .or_default()
1071 .push(PropagateKind::Full(*peer_id));
1072 peer.seen_transactions.insert(*tx.tx_hash());
1074 }
1075
1076 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1077
1078 self.network.send_transactions(*peer_id, new_full_transactions);
1080 }
1081 }
1082
1083 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1085
1086 propagated
1087 }
1088
1089 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1094 if self.peers.is_empty() {
1095 return
1097 }
1098 let propagated = self.propagate_transactions(
1099 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1100 PropagationMode::Basic,
1101 );
1102
1103 self.pool.on_propagated(propagated);
1105 }
1106
1107 fn on_get_pooled_transactions(
1109 &mut self,
1110 peer_id: PeerId,
1111 request: GetPooledTransactions,
1112 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1113 ) {
1114 if let Some(peer) = self.peers.get_mut(&peer_id) {
1115 if self.network.tx_gossip_disabled() {
1116 let _ = response.send(Ok(PooledTransactions::default()));
1117 return
1118 }
1119 let transactions = self.pool.get_pooled_transaction_elements(
1120 request.0,
1121 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1122 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1123 ),
1124 );
1125 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1126
1127 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1130
1131 let resp = PooledTransactions(transactions);
1132 let _ = response.send(Ok(resp));
1133 }
1134 }
1135
1136 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1138 match cmd {
1139 TransactionsCommand::PropagateHash(hash) => {
1140 self.on_new_pending_transactions(vec![hash])
1141 }
1142 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1143 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1144 }
1145 TransactionsCommand::GetActivePeers(tx) => {
1146 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1147 tx.send(peers).ok();
1148 }
1149 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1150 if let Some(propagated) =
1151 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1152 {
1153 self.pool.on_propagated(propagated);
1154 }
1155 }
1156 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1157 TransactionsCommand::BroadcastTransactions(txs) => {
1158 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1159 self.pool.on_propagated(propagated);
1160 }
1161 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1162 let mut res = HashMap::with_capacity(peers.len());
1163 for peer_id in peers {
1164 let hashes = self
1165 .peers
1166 .get(&peer_id)
1167 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1168 .unwrap_or_default();
1169 res.insert(peer_id, hashes);
1170 }
1171 tx.send(res).ok();
1172 }
1173 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1174 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1175 peer_request_sender.send(sender).ok();
1176 }
1177 }
1178 }
1179
1180 fn handle_peer_session(
1184 &mut self,
1185 info: SessionInfo,
1186 messages: PeerRequestSender<PeerRequest<N>>,
1187 ) {
1188 let SessionInfo { peer_id, client_version, version, .. } = info;
1189
1190 let peer = PeerMetadata::<N>::new(
1192 messages,
1193 version,
1194 client_version,
1195 self.config.max_transactions_seen_by_peer_history,
1196 info.peer_kind,
1197 );
1198 let peer = match self.peers.entry(peer_id) {
1199 Entry::Occupied(mut entry) => {
1200 entry.insert(peer);
1201 entry.into_mut()
1202 }
1203 Entry::Vacant(entry) => entry.insert(peer),
1204 };
1205
1206 self.policies.propagation_policy_mut().on_session_established(peer);
1207
1208 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1212 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1213 return
1214 }
1215
1216 let pooled_txs = self.pool.pooled_transactions_max(
1218 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1219 );
1220 if pooled_txs.is_empty() {
1221 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1222 return;
1223 }
1224
1225 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1227 for pooled_tx in pooled_txs {
1228 peer.seen_transactions.insert(*pooled_tx.hash());
1229 msg_builder.push_pooled(pooled_tx);
1230 }
1231
1232 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1233 let msg = msg_builder.build();
1234 self.network.send_transactions_hashes(peer_id, msg);
1235 }
1236
1237 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1239 match event_result {
1240 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1241 let peer = self.peers.remove(&peer_id);
1244 if let Some(mut peer) = peer {
1245 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1246 }
1247 self.transaction_fetcher.remove_peer(&peer_id);
1248 }
1249 NetworkEvent::ActivePeerSession { info, messages } => {
1250 self.handle_peer_session(info, messages);
1252 }
1253 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1254 let peer_id = info.peer_id;
1255 let messages = match self.peers.get(&peer_id) {
1257 Some(p) => p.request_tx.clone(),
1258 None => {
1259 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1260 return;
1261 }
1262 };
1263 self.handle_peer_session(info, messages);
1264 }
1265 _ => {}
1266 }
1267 }
1268
1269 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1271 if self.config.ingress_policy.allows_all() {
1272 return true;
1273 }
1274 let Some(peer) = self.peers.get(peer_id) else {
1275 return false;
1276 };
1277 self.config.ingress_policy.allows(peer.peer_kind())
1278 }
1279
1280 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1282 match event {
1283 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1284 if !self.accepts_incoming_from(&peer_id) {
1285 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1286 return;
1287 }
1288 let has_blob_txs = msg.has_eip4844();
1292
1293 let non_blob_txs = msg
1294 .0
1295 .into_iter()
1296 .map(N::PooledTransaction::try_from)
1297 .filter_map(Result::ok)
1298 .collect();
1299
1300 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1301
1302 if has_blob_txs {
1303 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1304 self.report_peer_bad_transactions(peer_id);
1305 }
1306 }
1307 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1308 if !self.accepts_incoming_from(&peer_id) {
1309 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1310 return;
1311 }
1312 self.on_new_pooled_transaction_hashes(peer_id, msg)
1313 }
1314 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1315 self.on_get_pooled_transactions(peer_id, request, response)
1316 }
1317 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1318 let _ = response.send(Some(self.handle()));
1319 }
1320 }
1321 }
1322
1323 fn import_transactions(
1325 &mut self,
1326 peer_id: PeerId,
1327 transactions: PooledTransactions<N::PooledTransaction>,
1328 source: TransactionSource,
1329 ) {
1330 if self.network.is_initially_syncing() {
1332 return
1333 }
1334 if self.network.tx_gossip_disabled() {
1335 return
1336 }
1337
1338 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1339 let mut transactions = transactions.0;
1340
1341 let start = Instant::now();
1342
1343 self.transaction_fetcher
1345 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1346
1347 let mut num_already_seen_by_peer = 0;
1352 for tx in &transactions {
1353 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1354 num_already_seen_by_peer += 1;
1355 }
1356 }
1357
1358 let txns_count_pre_pool_filter = transactions.len();
1360 self.pool.retain_unknown(&mut transactions);
1361 if txns_count_pre_pool_filter > transactions.len() {
1362 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1363 self.metrics
1364 .occurrences_transactions_already_in_pool
1365 .increment(already_known_txns_count as u64);
1366 }
1367
1368 let mut has_bad_transactions = false;
1370
1371 transactions.retain(|tx| {
1373 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1374 entry.get_mut().insert(peer_id);
1375 return false
1376 }
1377 if self.bad_imports.contains(tx.tx_hash()) {
1378 trace!(target: "net::tx",
1379 peer_id=format!("{peer_id:#}"),
1380 hash=%tx.tx_hash(),
1381 client_version=%peer.client_version,
1382 "received a known bad transaction from peer"
1383 );
1384 has_bad_transactions = true;
1385 return false;
1386 }
1387 true
1388 });
1389
1390 let txs_len = transactions.len();
1391
1392 let new_txs = transactions
1393 .into_par_iter()
1394 .filter_map(|tx| match tx.try_into_recovered() {
1395 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1396 Err(badtx) => {
1397 trace!(target: "net::tx",
1398 peer_id=format!("{peer_id:#}"),
1399 hash=%badtx.tx_hash(),
1400 client_version=%peer.client_version,
1401 "failed ecrecovery for transaction"
1402 );
1403 None
1404 }
1405 })
1406 .collect::<Vec<_>>();
1407
1408 has_bad_transactions |= new_txs.len() != txs_len;
1409
1410 for tx in &new_txs {
1412 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1413 }
1414
1415 if !new_txs.is_empty() {
1418 let pool = self.pool.clone();
1419 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1421 metric_pending_pool_imports.increment(new_txs.len() as f64);
1422
1423 self.pending_pool_imports_info
1425 .pending_pool_imports
1426 .fetch_add(new_txs.len(), Ordering::Relaxed);
1427 let tx_manager_info_pending_pool_imports =
1428 self.pending_pool_imports_info.pending_pool_imports.clone();
1429
1430 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1431 let import = Box::pin(async move {
1432 let added = new_txs.len();
1433 let res = pool.add_external_transactions(new_txs).await;
1434
1435 metric_pending_pool_imports.decrement(added as f64);
1437 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1439
1440 res
1441 });
1442
1443 self.pool_imports.push(import);
1444 }
1445
1446 if num_already_seen_by_peer > 0 {
1447 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1448 self.metrics
1449 .occurrences_of_transaction_already_seen_by_peer
1450 .increment(num_already_seen_by_peer);
1451 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1452 }
1453
1454 if has_bad_transactions {
1455 self.report_peer_bad_transactions(peer_id)
1457 }
1458
1459 if num_already_seen_by_peer > 0 {
1460 self.report_already_seen(peer_id);
1461 }
1462
1463 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1464 }
1465
1466 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1468 match fetch_event {
1469 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1470 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1471 if report_peer {
1472 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1473 }
1474 }
1475 FetchEvent::FetchError { peer_id, error } => {
1476 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1477 self.on_request_error(peer_id, error);
1478 }
1479 FetchEvent::EmptyResponse { peer_id } => {
1480 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1481 }
1482 }
1483 }
1484}
1485
1486impl<
1494 Pool: TransactionPool + Unpin + 'static,
1495 N: NetworkPrimitives<
1496 BroadcastedTransaction: SignedTransaction,
1497 PooledTransaction: SignedTransaction,
1498 > + Unpin,
1499 > Future for TransactionsManager<Pool, N>
1500where
1501 Pool::Transaction:
1502 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1503{
1504 type Output = ();
1505
1506 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1507 let start = Instant::now();
1508 let mut poll_durations = TxManagerPollDurations::default();
1509
1510 let this = self.get_mut();
1511
1512 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1518 poll_durations.acc_network_events,
1519 "net::tx",
1520 "Network events stream",
1521 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1522 this.network_events.poll_next_unpin(cx),
1523 |event| this.on_network_event(event)
1524 );
1525
1526 let mut new_txs = Vec::new();
1535 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1536 cx,
1537 &mut new_txs,
1538 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1539 ) {
1540 Poll::Ready(count) => {
1541 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1542 true
1545 } else {
1546 let limit =
1550 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1551 new_txs.len();
1552 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1553 }
1554 }
1555 Poll::Pending => false,
1556 };
1557 if !new_txs.is_empty() {
1558 this.on_new_pending_transactions(new_txs);
1559 }
1560
1561 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1576 poll_durations.acc_tx_events,
1577 "net::tx",
1578 "Network transaction events stream",
1579 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1580 this.transaction_events.poll_next_unpin(cx),
1581 |event| this.on_network_tx_event(event),
1582 );
1583
1584 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1595 poll_durations.acc_fetch_events,
1596 "net::tx",
1597 "Transaction fetch events stream",
1598 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1599 this.transaction_fetcher.poll_next_unpin(cx),
1600 |event| this.on_fetch_event(event),
1601 );
1602
1603 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1618 poll_durations.acc_pending_imports,
1619 "net::tx",
1620 "Batched pool imports stream",
1621 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1622 this.pool_imports.poll_next_unpin(cx),
1623 |batch_results| this.on_batch_import_result(batch_results)
1624 );
1625
1626 duration_metered_exec!(
1631 {
1632 if this.has_capacity_for_fetching_pending_hashes() &&
1633 this.on_fetch_hashes_pending_fetch()
1634 {
1635 maybe_more_tx_fetch_events = true;
1636 }
1637 },
1638 poll_durations.acc_pending_fetch
1639 );
1640
1641 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1643 poll_durations.acc_cmds,
1644 "net::tx",
1645 "Commands channel",
1646 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1647 this.command_rx.poll_next_unpin(cx),
1648 |cmd| this.on_command(cmd)
1649 );
1650
1651 this.transaction_fetcher.update_metrics();
1652
1653 if maybe_more_network_events ||
1655 maybe_more_commands ||
1656 maybe_more_tx_events ||
1657 maybe_more_tx_fetch_events ||
1658 maybe_more_pool_imports ||
1659 maybe_more_pending_txns
1660 {
1661 cx.waker().wake_by_ref();
1663 return Poll::Pending
1664 }
1665
1666 this.update_poll_metrics(start, poll_durations);
1667
1668 Poll::Pending
1669 }
1670}
1671
1672#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1676enum PropagationMode {
1677 Basic,
1681 Forced,
1686}
1687
1688impl PropagationMode {
1689 const fn is_forced(self) -> bool {
1691 matches!(self, Self::Forced)
1692 }
1693}
1694
1695#[derive(Debug, Clone)]
1697struct PropagateTransaction<T = TransactionSigned> {
1698 size: usize,
1699 transaction: Arc<T>,
1700}
1701
1702impl<T: SignedTransaction> PropagateTransaction<T> {
1703 pub fn new(transaction: T) -> Self {
1705 let size = transaction.length();
1706 Self { size, transaction: Arc::new(transaction) }
1707 }
1708
1709 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1711 where
1712 P: PoolTransaction<Consensus = T>,
1713 {
1714 let size = tx.encoded_length();
1715 let transaction = tx.transaction.clone_into_consensus();
1716 let transaction = Arc::new(transaction.into_inner());
1717 Self { size, transaction }
1718 }
1719
1720 fn tx_hash(&self) -> &TxHash {
1721 self.transaction.tx_hash()
1722 }
1723}
1724
1725#[derive(Debug, Clone)]
1728enum PropagateTransactionsBuilder<T> {
1729 Pooled(PooledTransactionsHashesBuilder),
1730 Full(FullTransactionsBuilder<T>),
1731}
1732
1733impl<T> PropagateTransactionsBuilder<T> {
1734 fn pooled(version: EthVersion) -> Self {
1736 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1737 }
1738
1739 fn full(version: EthVersion) -> Self {
1741 Self::Full(FullTransactionsBuilder::new(version))
1742 }
1743
1744 fn is_empty(&self) -> bool {
1746 match self {
1747 Self::Pooled(builder) => builder.is_empty(),
1748 Self::Full(builder) => builder.is_empty(),
1749 }
1750 }
1751
1752 fn build(self) -> PropagateTransactions<T> {
1754 match self {
1755 Self::Pooled(pooled) => {
1756 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1757 }
1758 Self::Full(full) => full.build(),
1759 }
1760 }
1761}
1762
1763impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1764 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1766 for tx in txs {
1767 self.push(tx);
1768 }
1769 }
1770
1771 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1773 match self {
1774 Self::Pooled(builder) => builder.push(transaction),
1775 Self::Full(builder) => builder.push(transaction),
1776 }
1777 }
1778}
1779
1780struct PropagateTransactions<T> {
1782 pooled: Option<NewPooledTransactionHashes>,
1784 full: Option<Vec<Arc<T>>>,
1786}
1787
1788#[derive(Debug, Clone)]
1793struct FullTransactionsBuilder<T> {
1794 total_size: usize,
1796 transactions: Vec<Arc<T>>,
1798 pooled: PooledTransactionsHashesBuilder,
1800}
1801
1802impl<T> FullTransactionsBuilder<T> {
1803 fn new(version: EthVersion) -> Self {
1805 Self {
1806 total_size: 0,
1807 pooled: PooledTransactionsHashesBuilder::new(version),
1808 transactions: vec![],
1809 }
1810 }
1811
1812 fn is_empty(&self) -> bool {
1814 self.transactions.is_empty() && self.pooled.is_empty()
1815 }
1816
1817 fn build(self) -> PropagateTransactions<T> {
1819 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1820 let full = Some(self.transactions).filter(|full| !full.is_empty());
1821 PropagateTransactions { pooled, full }
1822 }
1823}
1824
1825impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1826 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1828 for tx in txs {
1829 self.push(&tx)
1830 }
1831 }
1832
1833 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1843 if !transaction.transaction.is_broadcastable_in_full() {
1852 self.pooled.push(transaction);
1853 return
1854 }
1855
1856 let new_size = self.total_size + transaction.size;
1857 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1858 self.total_size > 0
1859 {
1860 self.pooled.push(transaction);
1862 return
1863 }
1864
1865 self.total_size = new_size;
1866 self.transactions.push(Arc::clone(&transaction.transaction));
1867 }
1868}
1869
1870#[derive(Debug, Clone)]
1873enum PooledTransactionsHashesBuilder {
1874 Eth66(NewPooledTransactionHashes66),
1875 Eth68(NewPooledTransactionHashes68),
1876}
1877
1878impl PooledTransactionsHashesBuilder {
1881 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1883 match self {
1884 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1885 Self::Eth68(msg) => {
1886 msg.hashes.push(*pooled_tx.hash());
1887 msg.sizes.push(pooled_tx.encoded_length());
1888 msg.types.push(pooled_tx.transaction.ty());
1889 }
1890 }
1891 }
1892
1893 fn is_empty(&self) -> bool {
1895 match self {
1896 Self::Eth66(hashes) => hashes.is_empty(),
1897 Self::Eth68(hashes) => hashes.is_empty(),
1898 }
1899 }
1900
1901 fn extend<T: SignedTransaction>(
1903 &mut self,
1904 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1905 ) {
1906 for tx in txs {
1907 self.push(&tx);
1908 }
1909 }
1910
1911 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1912 match self {
1913 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1914 Self::Eth68(msg) => {
1915 msg.hashes.push(*tx.tx_hash());
1916 msg.sizes.push(tx.size);
1917 msg.types.push(tx.transaction.ty());
1918 }
1919 }
1920 }
1921
1922 fn new(version: EthVersion) -> Self {
1924 match version {
1925 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1926 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
1927 Self::Eth68(Default::default())
1928 }
1929 }
1930 }
1931
1932 fn build(self) -> NewPooledTransactionHashes {
1933 match self {
1934 Self::Eth66(mut msg) => {
1935 msg.0.shrink_to_fit();
1936 msg.into()
1937 }
1938 Self::Eth68(mut msg) => {
1939 msg.shrink_to_fit();
1940 msg.into()
1941 }
1942 }
1943 }
1944}
1945
1946enum TransactionSource {
1948 Broadcast,
1950 Response,
1952}
1953
1954impl TransactionSource {
1957 const fn is_broadcast(&self) -> bool {
1959 matches!(self, Self::Broadcast)
1960 }
1961}
1962
1963#[derive(Debug)]
1965pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1966 seen_transactions: LruCache<TxHash>,
1970 request_tx: PeerRequestSender<PeerRequest<N>>,
1972 version: EthVersion,
1974 client_version: Arc<str>,
1976 peer_kind: PeerKind,
1978}
1979
1980impl<N: NetworkPrimitives> PeerMetadata<N> {
1981 pub fn new(
1983 request_tx: PeerRequestSender<PeerRequest<N>>,
1984 version: EthVersion,
1985 client_version: Arc<str>,
1986 max_transactions_seen_by_peer: u32,
1987 peer_kind: PeerKind,
1988 ) -> Self {
1989 Self {
1990 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1991 request_tx,
1992 version,
1993 client_version,
1994 peer_kind,
1995 }
1996 }
1997
1998 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2000 &self.request_tx
2001 }
2002
2003 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2005 &mut self.seen_transactions
2006 }
2007
2008 pub const fn version(&self) -> EthVersion {
2010 self.version
2011 }
2012
2013 pub fn client_version(&self) -> &str {
2015 &self.client_version
2016 }
2017
2018 pub const fn peer_kind(&self) -> PeerKind {
2020 self.peer_kind
2021 }
2022}
2023
2024#[derive(Debug)]
2026enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2027 PropagateHash(B256),
2029 PropagateHashesTo(Vec<B256>, PeerId),
2031 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2033 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2035 PropagateTransactions(Vec<TxHash>),
2037 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2039 GetTransactionHashes {
2041 peers: Vec<PeerId>,
2042 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2043 },
2044 GetPeerSender {
2046 peer_id: PeerId,
2047 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2048 },
2049}
2050
2051#[derive(Debug)]
2053pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2054 IncomingTransactions {
2058 peer_id: PeerId,
2060 msg: Transactions<N::BroadcastedTransaction>,
2062 },
2063 IncomingPooledTransactionHashes {
2065 peer_id: PeerId,
2067 msg: NewPooledTransactionHashes,
2069 },
2070 GetPooledTransactions {
2072 peer_id: PeerId,
2074 request: GetPooledTransactions,
2076 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2078 },
2079 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2081}
2082
2083#[derive(Debug)]
2085pub struct PendingPoolImportsInfo {
2086 pending_pool_imports: Arc<AtomicUsize>,
2088 max_pending_pool_imports: usize,
2090}
2091
2092impl PendingPoolImportsInfo {
2093 pub fn new(max_pending_pool_imports: usize) -> Self {
2095 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2096 }
2097
2098 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2100 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2101 }
2102}
2103
2104impl Default for PendingPoolImportsInfo {
2105 fn default() -> Self {
2106 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2107 }
2108}
2109
2110#[derive(Debug, Default)]
2111struct TxManagerPollDurations {
2112 acc_network_events: Duration,
2113 acc_pending_imports: Duration,
2114 acc_tx_events: Duration,
2115 acc_imported_txns: Duration,
2116 acc_fetch_events: Duration,
2117 acc_pending_fetch: Duration,
2118 acc_cmds: Duration,
2119}
2120
2121#[cfg(test)]
2122mod tests {
2123 use super::*;
2124 use crate::{
2125 test_utils::{
2126 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2127 Testnet,
2128 },
2129 transactions::config::RelaxedEthAnnouncementFilter,
2130 NetworkConfigBuilder, NetworkManager,
2131 };
2132 use alloy_consensus::{TxEip1559, TxLegacy};
2133 use alloy_primitives::{hex, Signature, TxKind, U256};
2134 use alloy_rlp::Decodable;
2135 use futures::FutureExt;
2136 use reth_chainspec::MIN_TRANSACTION_GAS;
2137 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2138 use reth_network_api::{NetworkInfo, PeerKind};
2139 use reth_network_p2p::{
2140 error::{RequestError, RequestResult},
2141 sync::{NetworkSyncUpdater, SyncState},
2142 };
2143 use reth_storage_api::noop::NoopProvider;
2144 use reth_transaction_pool::test_utils::{
2145 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2146 };
2147 use secp256k1::SecretKey;
2148 use std::{
2149 future::poll_fn,
2150 net::{IpAddr, Ipv4Addr, SocketAddr},
2151 str::FromStr,
2152 };
2153 use tracing::error;
2154
2155 #[tokio::test(flavor = "multi_thread")]
2156 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2157 reth_tracing::init_test_tracing();
2158 let net = Testnet::create(3).await;
2159
2160 let mut handles = net.handles();
2161 let handle0 = handles.next().unwrap();
2162 let handle1 = handles.next().unwrap();
2163
2164 drop(handles);
2165 let handle = net.spawn();
2166
2167 let listener0 = handle0.event_listener();
2168 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2169 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2170
2171 let client = NoopProvider::default();
2172 let pool = testing_pool();
2173 let config = NetworkConfigBuilder::eth(secret_key)
2174 .disable_discovery()
2175 .listener_port(0)
2176 .build(client);
2177 let transactions_manager_config = config.transactions_manager_config.clone();
2178 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2179 .await
2180 .unwrap()
2181 .into_builder()
2182 .transactions(pool.clone(), transactions_manager_config)
2183 .split_with_handle();
2184
2185 tokio::task::spawn(network);
2186
2187 network_handle.update_sync_state(SyncState::Syncing);
2189 assert!(NetworkInfo::is_syncing(&network_handle));
2190 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2191
2192 let mut established = listener0.take(2);
2194 while let Some(ev) = established.next().await {
2195 match ev {
2196 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2197 transactions
2199 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2200 }
2201 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2202 ev => {
2203 error!("unexpected event {ev:?}")
2204 }
2205 }
2206 }
2207 let input = hex!(
2209 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2210 );
2211 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2212 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2213 peer_id: *handle1.peer_id(),
2214 msg: Transactions(vec![signed_tx.clone()]),
2215 });
2216 poll_fn(|cx| {
2217 let _ = transactions.poll_unpin(cx);
2218 Poll::Ready(())
2219 })
2220 .await;
2221 assert!(pool.is_empty());
2222 handle.terminate().await;
2223 }
2224
2225 #[tokio::test(flavor = "multi_thread")]
2226 async fn test_tx_broadcasts_through_two_syncs() {
2227 reth_tracing::init_test_tracing();
2228 let net = Testnet::create(3).await;
2229
2230 let mut handles = net.handles();
2231 let handle0 = handles.next().unwrap();
2232 let handle1 = handles.next().unwrap();
2233
2234 drop(handles);
2235 let handle = net.spawn();
2236
2237 let listener0 = handle0.event_listener();
2238 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2239 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2240
2241 let client = NoopProvider::default();
2242 let pool = testing_pool();
2243 let config = NetworkConfigBuilder::new(secret_key)
2244 .disable_discovery()
2245 .listener_port(0)
2246 .build(client);
2247 let transactions_manager_config = config.transactions_manager_config.clone();
2248 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2249 .await
2250 .unwrap()
2251 .into_builder()
2252 .transactions(pool.clone(), transactions_manager_config)
2253 .split_with_handle();
2254
2255 tokio::task::spawn(network);
2256
2257 network_handle.update_sync_state(SyncState::Syncing);
2259 assert!(NetworkInfo::is_syncing(&network_handle));
2260 network_handle.update_sync_state(SyncState::Idle);
2261 assert!(!NetworkInfo::is_syncing(&network_handle));
2262 network_handle.update_sync_state(SyncState::Syncing);
2263 assert!(NetworkInfo::is_syncing(&network_handle));
2264
2265 let mut established = listener0.take(2);
2267 while let Some(ev) = established.next().await {
2268 match ev {
2269 NetworkEvent::ActivePeerSession { .. } |
2270 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2271 transactions.on_network_event(ev);
2273 }
2274 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2275 _ => {
2276 error!("unexpected event {ev:?}")
2277 }
2278 }
2279 }
2280 let input = hex!(
2282 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2283 );
2284 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2285 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2286 peer_id: *handle1.peer_id(),
2287 msg: Transactions(vec![signed_tx.clone()]),
2288 });
2289 poll_fn(|cx| {
2290 let _ = transactions.poll_unpin(cx);
2291 Poll::Ready(())
2292 })
2293 .await;
2294 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2295 assert!(NetworkInfo::is_syncing(&network_handle));
2296 assert!(!pool.is_empty());
2297 handle.terminate().await;
2298 }
2299
2300 #[tokio::test(flavor = "multi_thread")]
2303 async fn test_handle_incoming_transactions_hashes() {
2304 reth_tracing::init_test_tracing();
2305
2306 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2307 let client = NoopProvider::default();
2308
2309 let config = NetworkConfigBuilder::new(secret_key)
2310 .listener_port(0)
2312 .disable_discovery()
2313 .build(client);
2314
2315 let pool = testing_pool();
2316
2317 let transactions_manager_config = config.transactions_manager_config.clone();
2318 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2319 .await
2320 .unwrap()
2321 .into_builder()
2322 .transactions(pool.clone(), transactions_manager_config)
2323 .split_with_handle();
2324
2325 let peer_id_1 = PeerId::new([1; 64]);
2326 let eth_version = EthVersion::Eth66;
2327
2328 let txs = vec![TransactionSigned::new_unhashed(
2329 Transaction::Legacy(TxLegacy {
2330 chain_id: Some(4),
2331 nonce: 15u64,
2332 gas_price: 2200000000,
2333 gas_limit: 34811,
2334 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2335 value: U256::from(1234u64),
2336 input: Default::default(),
2337 }),
2338 Signature::new(
2339 U256::from_str(
2340 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2341 )
2342 .unwrap(),
2343 U256::from_str(
2344 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2345 )
2346 .unwrap(),
2347 true,
2348 ),
2349 )];
2350
2351 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2352
2353 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2354 tx_manager.peers.insert(peer_id_1, peer_1);
2355
2356 assert!(pool.is_empty());
2357
2358 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2359 peer_id: peer_id_1,
2360 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2361 txs_hashes.clone(),
2362 )),
2363 });
2364
2365 let req = to_mock_session_rx
2367 .recv()
2368 .await
2369 .expect("peer_1 session should receive request with buffered hashes");
2370 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2371 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2372
2373 let message: Vec<PooledTransactionVariant> = txs
2374 .into_iter()
2375 .map(|tx| {
2376 PooledTransactionVariant::try_from(tx)
2377 .expect("Failed to convert MockTransaction to PooledTransaction")
2378 })
2379 .collect();
2380
2381 response
2383 .send(Ok(PooledTransactions(message)))
2384 .expect("should send peer_1 response to tx manager");
2385
2386 poll_fn(|cx| {
2388 let _ = tx_manager.poll_unpin(cx);
2389 Poll::Ready(())
2390 })
2391 .await;
2392
2393 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2396 }
2397
2398 #[tokio::test(flavor = "multi_thread")]
2399 async fn test_handle_incoming_transactions() {
2400 reth_tracing::init_test_tracing();
2401 let net = Testnet::create(3).await;
2402
2403 let mut handles = net.handles();
2404 let handle0 = handles.next().unwrap();
2405 let handle1 = handles.next().unwrap();
2406
2407 drop(handles);
2408 let handle = net.spawn();
2409
2410 let listener0 = handle0.event_listener();
2411
2412 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2413 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2414
2415 let client = NoopProvider::default();
2416 let pool = testing_pool();
2417 let config = NetworkConfigBuilder::new(secret_key)
2418 .disable_discovery()
2419 .listener_port(0)
2420 .build(client);
2421 let transactions_manager_config = config.transactions_manager_config.clone();
2422 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2423 .await
2424 .unwrap()
2425 .into_builder()
2426 .transactions(pool.clone(), transactions_manager_config)
2427 .split_with_handle();
2428 tokio::task::spawn(network);
2429
2430 network_handle.update_sync_state(SyncState::Idle);
2431
2432 assert!(!NetworkInfo::is_syncing(&network_handle));
2433
2434 let mut established = listener0.take(2);
2436 while let Some(ev) = established.next().await {
2437 match ev {
2438 NetworkEvent::ActivePeerSession { .. } |
2439 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2440 transactions.on_network_event(ev);
2442 }
2443 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2444 ev => {
2445 error!("unexpected event {ev:?}")
2446 }
2447 }
2448 }
2449 let input = hex!(
2451 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2452 );
2453 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2454 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2455 peer_id: *handle1.peer_id(),
2456 msg: Transactions(vec![signed_tx.clone()]),
2457 });
2458 assert!(transactions
2459 .transactions_by_peers
2460 .get(signed_tx.tx_hash())
2461 .unwrap()
2462 .contains(handle1.peer_id()));
2463
2464 poll_fn(|cx| {
2466 let _ = transactions.poll_unpin(cx);
2467 Poll::Ready(())
2468 })
2469 .await;
2470
2471 assert!(!pool.is_empty());
2472 assert!(pool.get(signed_tx.tx_hash()).is_some());
2473 handle.terminate().await;
2474 }
2475
2476 #[tokio::test(flavor = "multi_thread")]
2477 async fn test_on_get_pooled_transactions_network() {
2478 reth_tracing::init_test_tracing();
2479 let net = Testnet::create(2).await;
2480
2481 let mut handles = net.handles();
2482 let handle0 = handles.next().unwrap();
2483 let handle1 = handles.next().unwrap();
2484
2485 drop(handles);
2486 let handle = net.spawn();
2487
2488 let listener0 = handle0.event_listener();
2489
2490 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2491 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2492
2493 let client = NoopProvider::default();
2494 let pool = testing_pool();
2495 let config = NetworkConfigBuilder::new(secret_key)
2496 .disable_discovery()
2497 .listener_port(0)
2498 .build(client);
2499 let transactions_manager_config = config.transactions_manager_config.clone();
2500 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2501 .await
2502 .unwrap()
2503 .into_builder()
2504 .transactions(pool.clone(), transactions_manager_config)
2505 .split_with_handle();
2506 tokio::task::spawn(network);
2507
2508 network_handle.update_sync_state(SyncState::Idle);
2509
2510 assert!(!NetworkInfo::is_syncing(&network_handle));
2511
2512 let mut established = listener0.take(2);
2514 while let Some(ev) = established.next().await {
2515 match ev {
2516 NetworkEvent::ActivePeerSession { .. } |
2517 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2518 transactions.on_network_event(ev);
2519 }
2520 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2521 ev => {
2522 error!("unexpected event {ev:?}")
2523 }
2524 }
2525 }
2526 handle.terminate().await;
2527
2528 let tx = MockTransaction::eip1559();
2529 let _ = transactions
2530 .pool
2531 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2532 .await;
2533
2534 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2535
2536 let (send, receive) =
2537 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2538
2539 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2540 peer_id: *handle1.peer_id(),
2541 request,
2542 response: send,
2543 });
2544
2545 match receive.await.unwrap() {
2546 Ok(PooledTransactions(transactions)) => {
2547 assert_eq!(transactions.len(), 1);
2548 }
2549 Err(e) => {
2550 panic!("error: {e:?}");
2551 }
2552 }
2553 }
2554
2555 #[tokio::test]
2559 async fn test_partially_tx_response() {
2560 reth_tracing::init_test_tracing();
2561
2562 let mut tx_manager = new_tx_manager().await.0;
2563 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2564
2565 let peer_id_1 = PeerId::new([1; 64]);
2566 let eth_version = EthVersion::Eth66;
2567
2568 let txs = vec![
2569 TransactionSigned::new_unhashed(
2570 Transaction::Legacy(TxLegacy {
2571 chain_id: Some(4),
2572 nonce: 15u64,
2573 gas_price: 2200000000,
2574 gas_limit: 34811,
2575 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2576 value: U256::from(1234u64),
2577 input: Default::default(),
2578 }),
2579 Signature::new(
2580 U256::from_str(
2581 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2582 )
2583 .unwrap(),
2584 U256::from_str(
2585 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2586 )
2587 .unwrap(),
2588 true,
2589 ),
2590 ),
2591 TransactionSigned::new_unhashed(
2592 Transaction::Eip1559(TxEip1559 {
2593 chain_id: 4,
2594 nonce: 26u64,
2595 max_priority_fee_per_gas: 1500000000,
2596 max_fee_per_gas: 1500000013,
2597 gas_limit: MIN_TRANSACTION_GAS,
2598 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2599 value: U256::from(3000000000000000000u64),
2600 input: Default::default(),
2601 access_list: Default::default(),
2602 }),
2603 Signature::new(
2604 U256::from_str(
2605 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2606 )
2607 .unwrap(),
2608 U256::from_str(
2609 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2610 )
2611 .unwrap(),
2612 true,
2613 ),
2614 ),
2615 ];
2616
2617 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2618
2619 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2620 peer_1.seen_transactions.insert(txs_hashes[0]);
2623 peer_1.seen_transactions.insert(txs_hashes[1]);
2624 tx_manager.peers.insert(peer_id_1, peer_1);
2625
2626 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2627 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2628
2629 assert!(tx_fetcher.is_idle(&peer_id_1));
2631 assert_eq!(tx_fetcher.active_peers.len(), 0);
2632
2633 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2635
2636 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2637 assert!(!tx_fetcher.is_idle(&peer_id_1));
2639 assert_eq!(tx_fetcher.active_peers.len(), 1);
2640
2641 let req = to_mock_session_rx
2643 .recv()
2644 .await
2645 .expect("peer_1 session should receive request with buffered hashes");
2646 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2647
2648 let message: Vec<PooledTransactionVariant> = txs
2649 .into_iter()
2650 .take(1)
2651 .map(|tx| {
2652 PooledTransactionVariant::try_from(tx)
2653 .expect("Failed to convert MockTransaction to PooledTransaction")
2654 })
2655 .collect();
2656 response
2658 .send(Ok(PooledTransactions(message)))
2659 .expect("should send peer_1 response to tx manager");
2660 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2661 unreachable!()
2662 };
2663
2664 assert!(tx_fetcher.is_idle(&peer_id));
2666 assert_eq!(tx_fetcher.active_peers.len(), 0);
2667 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2669 }
2670
2671 #[tokio::test]
2672 async fn test_max_retries_tx_request() {
2673 reth_tracing::init_test_tracing();
2674
2675 let mut tx_manager = new_tx_manager().await.0;
2676 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2677
2678 let peer_id_1 = PeerId::new([1; 64]);
2679 let peer_id_2 = PeerId::new([2; 64]);
2680 let eth_version = EthVersion::Eth66;
2681 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2682
2683 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2684 peer_1.seen_transactions.insert(seen_hashes[0]);
2687 peer_1.seen_transactions.insert(seen_hashes[1]);
2688 tx_manager.peers.insert(peer_id_1, peer_1);
2689
2690 let retries = 1;
2693 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2694 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2695
2696 assert!(tx_fetcher.is_idle(&peer_id_1));
2698 assert_eq!(tx_fetcher.active_peers.len(), 0);
2699
2700 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2702
2703 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2704
2705 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2706 assert!(!tx_fetcher.is_idle(&peer_id_1));
2708 assert_eq!(tx_fetcher.active_peers.len(), 1);
2709
2710 let req = to_mock_session_rx
2712 .recv()
2713 .await
2714 .expect("peer_1 session should receive request with buffered hashes");
2715 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2716 let GetPooledTransactions(hashes) = request;
2717
2718 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2719
2720 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2721
2722 response
2724 .send(Err(RequestError::BadResponse))
2725 .expect("should send peer_1 response to tx manager");
2726 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2727 unreachable!()
2728 };
2729
2730 assert!(tx_fetcher.is_idle(&peer_id));
2732 assert_eq!(tx_fetcher.active_peers.len(), 0);
2733 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2735
2736 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2737 tx_manager.peers.insert(peer_id_2, peer_2);
2738
2739 let msg =
2741 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2742 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2743
2744 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2745
2746 assert_eq!(tx_fetcher.active_peers.len(), 1);
2748
2749 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2751 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2753
2754 let req = to_mock_session_rx
2756 .recv()
2757 .await
2758 .expect("peer_2 session should receive request with buffered hashes");
2759 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2760
2761 response
2763 .send(Err(RequestError::BadResponse))
2764 .expect("should send peer_2 response to tx manager");
2765 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2766
2767 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2770 assert_eq!(tx_fetcher.active_peers.len(), 0);
2771 }
2772
2773 #[test]
2774 fn test_transaction_builder_empty() {
2775 let mut builder =
2776 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2777 assert!(builder.is_empty());
2778
2779 let mut factory = MockTransactionFactory::default();
2780 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2781 builder.push(&tx);
2782 assert!(!builder.is_empty());
2783
2784 let txs = builder.build();
2785 assert!(txs.full.is_none());
2786 let txs = txs.pooled.unwrap();
2787 assert_eq!(txs.len(), 1);
2788 }
2789
2790 #[test]
2791 fn test_transaction_builder_large() {
2792 let mut builder =
2793 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2794 assert!(builder.is_empty());
2795
2796 let mut factory = MockTransactionFactory::default();
2797 let mut tx = factory.create_eip1559();
2798 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2800 let tx = Arc::new(tx);
2801 let tx = PropagateTransaction::pool_tx(tx);
2802 builder.push(&tx);
2803 assert!(!builder.is_empty());
2804
2805 let txs = builder.clone().build();
2806 assert!(txs.pooled.is_none());
2807 let txs = txs.full.unwrap();
2808 assert_eq!(txs.len(), 1);
2809
2810 builder.push(&tx);
2811
2812 let txs = builder.clone().build();
2813 let pooled = txs.pooled.unwrap();
2814 assert_eq!(pooled.len(), 1);
2815 let txs = txs.full.unwrap();
2816 assert_eq!(txs.len(), 1);
2817 }
2818
2819 #[test]
2820 fn test_transaction_builder_eip4844() {
2821 let mut builder =
2822 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2823 assert!(builder.is_empty());
2824
2825 let mut factory = MockTransactionFactory::default();
2826 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2827 builder.push(&tx);
2828 assert!(!builder.is_empty());
2829
2830 let txs = builder.clone().build();
2831 assert!(txs.full.is_none());
2832 let txs = txs.pooled.unwrap();
2833 assert_eq!(txs.len(), 1);
2834
2835 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2836 builder.push(&tx);
2837
2838 let txs = builder.clone().build();
2839 let pooled = txs.pooled.unwrap();
2840 assert_eq!(pooled.len(), 1);
2841 let txs = txs.full.unwrap();
2842 assert_eq!(txs.len(), 1);
2843 }
2844
2845 #[tokio::test]
2846 async fn test_propagate_full() {
2847 reth_tracing::init_test_tracing();
2848
2849 let (mut tx_manager, network) = new_tx_manager().await;
2850 let peer_id = PeerId::random();
2851
2852 network.handle().update_sync_state(SyncState::Idle);
2854
2855 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2857
2858 let session_info = SessionInfo {
2859 peer_id,
2860 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2861 client_version: Arc::from(""),
2862 capabilities: Arc::new(vec![].into()),
2863 status: Arc::new(Default::default()),
2864 version: EthVersion::Eth68,
2865 peer_kind: PeerKind::Basic,
2866 };
2867 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2868 tx_manager
2869 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2870 let mut propagate = vec![];
2871 let mut factory = MockTransactionFactory::default();
2872 let eip1559_tx = Arc::new(factory.create_eip1559());
2873 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2874 let eip4844_tx = Arc::new(factory.create_eip4844());
2875 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2876
2877 let propagated =
2878 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2879 assert_eq!(propagated.0.len(), 2);
2880 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2881 assert_eq!(prop_txs.len(), 1);
2882 assert!(prop_txs[0].is_full());
2883
2884 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2885 assert_eq!(prop_txs.len(), 1);
2886 assert!(prop_txs[0].is_hash());
2887
2888 let peer = tx_manager.peers.get(&peer_id).unwrap();
2889 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2890 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2891 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2892
2893 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2895 assert!(propagated.0.is_empty());
2896 }
2897
2898 #[tokio::test]
2899 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2900 reth_tracing::init_test_tracing();
2901
2902 let transactions_manager_config = TransactionsManagerConfig::default();
2903
2904 let propagation_policy = TransactionPropagationKind::default();
2905 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2906
2907 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2908
2909 let pool = testing_pool();
2910 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2911 let client = NoopProvider::default();
2912
2913 let network_config = NetworkConfigBuilder::new(secret_key)
2914 .listener_port(0)
2915 .disable_discovery()
2916 .build(client.clone());
2917
2918 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2919 let (to_tx_manager_tx, from_network_rx) =
2920 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2921 network_manager.set_transactions(to_tx_manager_tx);
2922 let network_handle = network_manager.handle().clone();
2923 let network_service_handle = tokio::spawn(network_manager);
2924
2925 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2926 network_handle.clone(),
2927 pool.clone(),
2928 from_network_rx,
2929 transactions_manager_config,
2930 policy_bundle,
2931 );
2932
2933 let peer_id = PeerId::random();
2934 let eth_version = EthVersion::Eth68;
2935 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2936 tx_manager.peers.insert(peer_id, mock_peer_metadata);
2937
2938 let mut tx_factory = MockTransactionFactory::default();
2939
2940 let valid_known_tx = tx_factory.create_eip1559();
2941 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2942
2943 let known_tx_hash = *known_tx_signed.hash();
2944 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2945 let known_tx_size = known_tx_signed.encoded_length();
2946
2947 let unknown_tx_hash = B256::random();
2948 let unknown_tx_type_byte = 0xff_u8;
2949 let unknown_tx_size = 150;
2950
2951 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2952 types: vec![known_tx_type_byte, unknown_tx_type_byte],
2953 sizes: vec![known_tx_size, unknown_tx_size],
2954 hashes: vec![known_tx_hash, unknown_tx_hash],
2955 });
2956
2957 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2958
2959 poll_fn(|cx| {
2960 let _ = tx_manager.poll_unpin(cx);
2961 Poll::Ready(())
2962 })
2963 .await;
2964
2965 let mut requested_hashes_in_getpooled = HashSet::new();
2966 let mut unexpected_request_received = false;
2967
2968 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2969 .await
2970 {
2971 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2972 let GetPooledTransactions(hashes) = request;
2973 for hash in hashes {
2974 requested_hashes_in_getpooled.insert(hash);
2975 }
2976 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2977 }
2978 Ok(Some(other_request)) => {
2979 tracing::error!(?other_request, "Received unexpected PeerRequest type");
2980 unexpected_request_received = true;
2981 }
2982 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2983 Err(_timeout_err) => {
2984 tracing::info!("Timeout: No GetPooledTransactions request received.")
2985 }
2986 }
2987
2988 assert!(
2989 requested_hashes_in_getpooled.contains(&known_tx_hash),
2990 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2991 );
2992 assert!(
2993 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2994 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2995 );
2996 assert!(
2997 !unexpected_request_received,
2998 "An unexpected P2P request was received by the mock peer."
2999 );
3000
3001 network_service_handle.abort();
3002 }
3003}