1use alloy_consensus::transaction::TxHashRef;
4
5pub mod config;
7pub mod constants;
9pub mod fetcher;
11pub mod policy;
13
14pub use self::constants::{
15 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
16 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
17};
18use config::AnnouncementAcceptance;
19pub use config::{
20 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
21 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
22};
23use policy::NetworkPolicies;
24
25pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
26
27use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28use crate::{
29 budget::{
30 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
31 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
32 },
33 cache::LruCache,
34 duration_metered_exec, metered_poll_nested_stream_with_budget,
35 metrics::{
36 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
37 },
38 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
39 NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{TxHash, B256};
42use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
43use futures::{stream::FuturesUnordered, Future, StreamExt};
44use reth_eth_wire::{
45 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
46 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
47 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
48 RequestTxHashes, Transactions, ValidAnnouncementData,
49};
50use reth_ethereum_primitives::{TransactionSigned, TxType};
51use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
52use reth_network_api::{
53 events::{PeerEvent, SessionInfo},
54 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
55};
56use reth_network_p2p::{
57 error::{RequestError, RequestResult},
58 sync::SyncStateProvider,
59};
60use reth_network_peers::PeerId;
61use reth_network_types::ReputationChangeKind;
62use reth_primitives_traits::SignedTransaction;
63use reth_tokio_util::EventStream;
64use reth_transaction_pool::{
65 error::{PoolError, PoolResult},
66 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
67 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
68};
69use std::{
70 collections::{hash_map::Entry, HashMap, HashSet},
71 pin::Pin,
72 sync::{
73 atomic::{AtomicUsize, Ordering},
74 Arc,
75 },
76 task::{Context, Poll},
77 time::{Duration, Instant},
78};
79use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
80use tokio_stream::wrappers::UnboundedReceiverStream;
81use tracing::{debug, trace};
82
83pub type PoolImportFuture =
87 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
88
89#[derive(Debug, Clone)]
97pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
98 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
100}
101
102impl<N: NetworkPrimitives> TransactionsHandle<N> {
103 fn send(&self, cmd: TransactionsCommand<N>) {
104 let _ = self.manager_tx.send(cmd);
105 }
106
107 async fn peer_handle(
109 &self,
110 peer_id: PeerId,
111 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
112 let (tx, rx) = oneshot::channel();
113 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
114 rx.await
115 }
116
117 pub fn propagate(&self, hash: TxHash) {
119 self.send(TransactionsCommand::PropagateHash(hash))
120 }
121
122 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
126 self.propagate_hashes_to(Some(hash), peer)
127 }
128
129 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
133 let hashes = hash.into_iter().collect::<Vec<_>>();
134 if hashes.is_empty() {
135 return
136 }
137 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
138 }
139
140 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
142 let (tx, rx) = oneshot::channel();
143 self.send(TransactionsCommand::GetActivePeers(tx));
144 rx.await
145 }
146
147 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
151 if transactions.is_empty() {
152 return
153 }
154 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
155 }
156
157 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
162 if transactions.is_empty() {
163 return
164 }
165 self.send(TransactionsCommand::PropagateTransactions(transactions))
166 }
167
168 pub fn broadcast_transactions(
173 &self,
174 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
175 ) {
176 let transactions =
177 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
178 if transactions.is_empty() {
179 return
180 }
181 self.send(TransactionsCommand::BroadcastTransactions(transactions))
182 }
183
184 pub async fn get_transaction_hashes(
186 &self,
187 peers: Vec<PeerId>,
188 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
189 if peers.is_empty() {
190 return Ok(Default::default())
191 }
192 let (tx, rx) = oneshot::channel();
193 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
194 rx.await
195 }
196
197 pub async fn get_peer_transaction_hashes(
199 &self,
200 peer: PeerId,
201 ) -> Result<HashSet<TxHash>, RecvError> {
202 let res = self.get_transaction_hashes(vec![peer]).await?;
203 Ok(res.into_values().next().unwrap_or_default())
204 }
205
206 pub async fn get_pooled_transactions_from(
212 &self,
213 peer_id: PeerId,
214 hashes: Vec<B256>,
215 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
216 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
217
218 let (tx, rx) = oneshot::channel();
219 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
220 peer.try_send(request).ok();
221
222 rx.await?.map(|res| Some(res.0))
223 }
224}
225
226#[derive(Debug)]
281#[must_use = "Manager does nothing unless polled."]
282pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
283 pool: Pool,
285 network: NetworkHandle<N>,
287 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
291 transaction_fetcher: TransactionFetcher<N>,
293 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
298 pool_imports: FuturesUnordered<PoolImportFuture>,
310 pending_pool_imports_info: PendingPoolImportsInfo,
312 bad_imports: LruCache<TxHash>,
314 peers: HashMap<PeerId, PeerMetadata<N>>,
316 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
320 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
325 pending_transactions: mpsc::Receiver<TxHash>,
334 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
336 config: TransactionsManagerConfig,
338 policies: NetworkPolicies<N>,
340 metrics: TransactionsManagerMetrics,
342 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
344}
345
346impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
347 pub fn new(
351 network: NetworkHandle<N>,
352 pool: Pool,
353 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
354 transactions_manager_config: TransactionsManagerConfig,
355 ) -> Self {
356 Self::with_policy(
357 network,
358 pool,
359 from_network,
360 transactions_manager_config,
361 NetworkPolicies::new(
362 TransactionPropagationKind::default(),
363 StrictEthAnnouncementFilter::default(),
364 ),
365 )
366 }
367}
368
369impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
370 pub fn with_policy(
374 network: NetworkHandle<N>,
375 pool: Pool,
376 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
377 transactions_manager_config: TransactionsManagerConfig,
378 policies: NetworkPolicies<N>,
379 ) -> Self {
380 let network_events = network.event_listener();
381
382 let (command_tx, command_rx) = mpsc::unbounded_channel();
383
384 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
385 &transactions_manager_config.transaction_fetcher_config,
386 );
387
388 let pending = pool.pending_transactions_listener();
391 let pending_pool_imports_info = PendingPoolImportsInfo::default();
392 let metrics = TransactionsManagerMetrics::default();
393 metrics
394 .capacity_pending_pool_imports
395 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
396
397 Self {
398 pool,
399 network,
400 network_events,
401 transaction_fetcher,
402 transactions_by_peers: Default::default(),
403 pool_imports: Default::default(),
404 pending_pool_imports_info: PendingPoolImportsInfo::new(
405 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
406 ),
407 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
408 peers: Default::default(),
409 command_tx,
410 command_rx: UnboundedReceiverStream::new(command_rx),
411 pending_transactions: pending,
412 transaction_events: UnboundedMeteredReceiver::new(
413 from_network,
414 NETWORK_POOL_TRANSACTIONS_SCOPE,
415 ),
416 config: transactions_manager_config,
417 policies,
418 metrics,
419 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
420 }
421 }
422
423 pub fn handle(&self) -> TransactionsHandle<N> {
425 TransactionsHandle { manager_tx: self.command_tx.clone() }
426 }
427
428 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
431 self.pending_pool_imports_info
432 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
433 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434 }
435
436 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
437 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
438 self.metrics.reported_bad_transactions.increment(1);
439 }
440
441 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
442 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
443 self.network.reputation_change(peer_id, kind);
444 }
445
446 fn report_already_seen(&self, peer_id: PeerId) {
447 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
448 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
449 }
450
451 fn on_good_import(&mut self, hash: TxHash) {
453 self.transactions_by_peers.remove(&hash);
454 }
455
456 fn on_bad_import(&mut self, err: PoolError) {
480 let peers = self.transactions_by_peers.remove(&err.hash);
481
482 if !err.is_bad_transaction() || self.network.is_syncing() {
484 return
485 }
486 if let Some(peers) = peers {
489 for peer_id in peers {
490 self.report_peer_bad_transactions(peer_id);
491 }
492 }
493 self.metrics.bad_imports.increment(1);
494 self.bad_imports.insert(err.hash);
495 }
496
497 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
501 let info = &self.pending_pool_imports_info;
503 let max_pending_pool_imports = info.max_pending_pool_imports;
504 let has_capacity_wrt_pending_pool_imports =
505 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
506
507 self.transaction_fetcher
508 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
509 }
510
511 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
512 let kind = match req_err {
513 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
514 RequestError::Timeout => ReputationChangeKind::Timeout,
515 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
516 return
518 }
519 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
520 };
521 self.report_peer(peer_id, kind);
522 }
523
524 #[inline]
525 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
526 let metrics = &self.metrics;
527
528 let TxManagerPollDurations {
529 acc_network_events,
530 acc_pending_imports,
531 acc_tx_events,
532 acc_imported_txns,
533 acc_fetch_events,
534 acc_pending_fetch,
535 acc_cmds,
536 } = poll_durations;
537
538 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
540 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
542 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
543 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
544 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
545 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
546 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
547 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
548 }
549}
550
551impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
552 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
554 for res in batch_results {
555 match res {
556 Ok(AddedTransactionOutcome { hash, .. }) => {
557 self.on_good_import(hash);
558 }
559 Err(err) => {
560 self.on_bad_import(err);
561 }
562 }
563 }
564 }
565
566 fn on_new_pooled_transaction_hashes(
568 &mut self,
569 peer_id: PeerId,
570 msg: NewPooledTransactionHashes,
571 ) {
572 if self.network.is_initially_syncing() {
574 return
575 }
576 if self.network.tx_gossip_disabled() {
577 return
578 }
579
580 let Some(peer) = self.peers.get_mut(&peer_id) else {
582 trace!(
583 peer_id = format!("{peer_id:#}"),
584 ?msg,
585 "discarding announcement from inactive peer"
586 );
587
588 return
589 };
590 let client = peer.client_version.clone();
591
592 let mut count_txns_already_seen_by_peer = 0;
594 for tx in msg.iter_hashes().copied() {
595 if !peer.seen_transactions.insert(tx) {
596 count_txns_already_seen_by_peer += 1;
597 }
598 }
599 if count_txns_already_seen_by_peer > 0 {
600 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
605 self.metrics
606 .occurrences_hash_already_seen_by_peer
607 .increment(count_txns_already_seen_by_peer);
608
609 trace!(target: "net::tx",
610 %count_txns_already_seen_by_peer,
611 peer_id=format!("{peer_id:#}"),
612 ?client,
613 "Peer sent hashes that have already been marked as seen by peer"
614 );
615
616 self.report_already_seen(peer_id);
617 }
618
619 if msg.is_empty() {
621 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
622 return;
623 }
624
625 let original_len = msg.len();
626 let mut partially_valid_msg = msg.dedup();
627
628 if partially_valid_msg.len() != original_len {
629 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
630 }
631
632 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
634
635 let hashes_count_pre_pool_filter = partially_valid_msg.len();
643 self.pool.retain_unknown(&mut partially_valid_msg);
644 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
645 let already_known_hashes_count =
646 hashes_count_pre_pool_filter - partially_valid_msg.len();
647 self.metrics
648 .occurrences_hashes_already_in_pool
649 .increment(already_known_hashes_count as u64);
650 }
651
652 if partially_valid_msg.is_empty() {
653 return
655 }
656
657 let mut should_report_peer = false;
662 let mut tx_types_counter = TxTypesCounter::default();
663
664 let is_eth68_message = partially_valid_msg
665 .msg_version()
666 .expect("partially valid announcement should have a version")
667 .is_eth68();
668
669 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
670 let (ty_byte, size_val) = match *metadata_ref_mut {
671 Some((ty, size)) => {
672 if !is_eth68_message {
673 should_report_peer = true;
674 }
675 (ty, size)
676 }
677 None => {
678 if is_eth68_message {
679 should_report_peer = true;
680 return false;
681 }
682 (0u8, 0)
683 }
684 };
685
686 if is_eth68_message &&
687 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
688 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
689 {
690 tx_types_counter.increase_by_tx_type(parsed_tx_type);
691 }
692
693 let decision = self
694 .policies
695 .announcement_filter()
696 .decide_on_announcement(ty_byte, tx_hash, size_val);
697
698 match decision {
699 AnnouncementAcceptance::Accept => true,
700 AnnouncementAcceptance::Ignore => false,
701 AnnouncementAcceptance::Reject { penalize_peer } => {
702 if penalize_peer {
703 should_report_peer = true;
704 }
705 false
706 }
707 }
708 });
709
710 if is_eth68_message {
711 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
712 }
713
714 if should_report_peer {
715 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
716 }
717
718 let mut valid_announcement_data =
719 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
720
721 if valid_announcement_data.is_empty() {
722 return
724 }
725
726 let bad_imports = &self.bad_imports;
733 self.transaction_fetcher.filter_unseen_and_pending_hashes(
734 &mut valid_announcement_data,
735 |hash| bad_imports.contains(hash),
736 &peer_id,
737 &client,
738 );
739
740 if valid_announcement_data.is_empty() {
741 return
743 }
744
745 trace!(target: "net::tx::propagation",
746 peer_id=format!("{peer_id:#}"),
747 hashes_len=valid_announcement_data.len(),
748 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
749 msg_version=%valid_announcement_data.msg_version(),
750 client_version=%client,
751 "received previously unseen and pending hashes in announcement from peer"
752 );
753
754 if !self.transaction_fetcher.is_idle(&peer_id) {
757 let msg_version = valid_announcement_data.msg_version();
759 let (hashes, _version) = valid_announcement_data.into_request_hashes();
760
761 trace!(target: "net::tx",
762 peer_id=format!("{peer_id:#}"),
763 hashes=?*hashes,
764 %msg_version,
765 %client,
766 "buffering hashes announced by busy peer"
767 );
768
769 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
770
771 return
772 }
773
774 let mut hashes_to_request =
775 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
776 let surplus_hashes =
777 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
778
779 if !surplus_hashes.is_empty() {
780 trace!(target: "net::tx",
781 peer_id=format!("{peer_id:#}"),
782 surplus_hashes=?*surplus_hashes,
783 %client,
784 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
785 );
786
787 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
788 }
789
790 trace!(target: "net::tx",
791 peer_id=format!("{peer_id:#}"),
792 hashes=?*hashes_to_request,
793 %client,
794 "sending hashes in `GetPooledTransactions` request to peer's session"
795 );
796
797 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
801 if let Some(failed_to_request_hashes) =
802 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
803 {
804 let conn_eth_version = peer.version;
805
806 trace!(target: "net::tx",
807 peer_id=format!("{peer_id:#}"),
808 failed_to_request_hashes=?*failed_to_request_hashes,
809 %conn_eth_version,
810 %client,
811 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
812 );
813 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
814 }
815 }
816}
817
818impl<Pool, N> TransactionsManager<Pool, N>
819where
820 Pool: TransactionPool + Unpin + 'static,
821 N: NetworkPrimitives<
822 BroadcastedTransaction: SignedTransaction,
823 PooledTransaction: SignedTransaction,
824 > + Unpin,
825 Pool::Transaction:
826 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
827{
828 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
840 if self.network.is_initially_syncing() {
842 return
843 }
844 if self.network.tx_gossip_disabled() {
845 return
846 }
847
848 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
849
850 self.propagate_all(hashes);
851 }
852
853 fn propagate_full_transactions_to_peer(
857 &mut self,
858 txs: Vec<TxHash>,
859 peer_id: PeerId,
860 propagation_mode: PropagationMode,
861 ) -> Option<PropagatedTransactions> {
862 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
863
864 let peer = self.peers.get_mut(&peer_id)?;
865 let mut propagated = PropagatedTransactions::default();
866
867 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
869
870 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
871
872 if propagation_mode.is_forced() {
873 full_transactions.extend(to_propagate);
875 } else {
876 for tx in to_propagate {
879 if !peer.seen_transactions.contains(tx.tx_hash()) {
880 full_transactions.push(&tx);
882 }
883 }
884 }
885
886 if full_transactions.is_empty() {
887 return None
889 }
890
891 let PropagateTransactions { pooled, full } = full_transactions.build();
892
893 if let Some(new_pooled_hashes) = pooled {
895 for hash in new_pooled_hashes.iter_hashes().copied() {
896 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
897 peer.seen_transactions.insert(hash);
899 }
900
901 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
903 }
904
905 if let Some(new_full_transactions) = full {
907 for tx in &new_full_transactions {
908 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
909 peer.seen_transactions.insert(*tx.tx_hash());
911 }
912
913 self.network.send_transactions(peer_id, new_full_transactions);
915 }
916
917 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
919
920 Some(propagated)
921 }
922
923 fn propagate_hashes_to(
927 &mut self,
928 hashes: Vec<TxHash>,
929 peer_id: PeerId,
930 propagation_mode: PropagationMode,
931 ) {
932 trace!(target: "net::tx", "Start propagating transactions as hashes");
933
934 let propagated = {
937 let Some(peer) = self.peers.get_mut(&peer_id) else {
938 return
940 };
941
942 let to_propagate = self
943 .pool
944 .get_all(hashes)
945 .into_iter()
946 .map(PropagateTransaction::pool_tx)
947 .collect::<Vec<_>>();
948
949 let mut propagated = PropagatedTransactions::default();
950
951 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
953
954 if propagation_mode.is_forced() {
955 hashes.extend(to_propagate)
956 } else {
957 for tx in to_propagate {
958 if !peer.seen_transactions.contains(tx.tx_hash()) {
959 hashes.push(&tx);
961 }
962 }
963 }
964
965 let new_pooled_hashes = hashes.build();
966
967 if new_pooled_hashes.is_empty() {
968 return
970 }
971
972 for hash in new_pooled_hashes.iter_hashes().copied() {
973 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
974 }
975
976 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
977
978 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
980
981 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
983
984 propagated
985 };
986
987 self.pool.on_propagated(propagated);
989 }
990
991 fn propagate_transactions(
998 &mut self,
999 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1000 propagation_mode: PropagationMode,
1001 ) -> PropagatedTransactions {
1002 let mut propagated = PropagatedTransactions::default();
1003 if self.network.tx_gossip_disabled() {
1004 return propagated
1005 }
1006
1007 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1009
1010 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1012 if !self.policies.propagation_policy().can_propagate(peer) {
1013 continue
1015 }
1016 let mut builder = if peer_idx > max_num_full {
1018 PropagateTransactionsBuilder::pooled(peer.version)
1019 } else {
1020 PropagateTransactionsBuilder::full(peer.version)
1021 };
1022
1023 if propagation_mode.is_forced() {
1024 builder.extend(to_propagate.iter());
1025 } else {
1026 for tx in &to_propagate {
1030 if !peer.seen_transactions.contains(tx.tx_hash()) {
1033 builder.push(tx);
1034 }
1035 }
1036 }
1037
1038 if builder.is_empty() {
1039 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1040 continue
1041 }
1042
1043 let PropagateTransactions { pooled, full } = builder.build();
1044
1045 if let Some(mut new_pooled_hashes) = pooled {
1047 new_pooled_hashes
1050 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1051
1052 for hash in new_pooled_hashes.iter_hashes().copied() {
1053 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1054 peer.seen_transactions.insert(hash);
1056 }
1057
1058 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1059
1060 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1062 }
1063
1064 if let Some(new_full_transactions) = full {
1066 for tx in &new_full_transactions {
1067 propagated
1068 .0
1069 .entry(*tx.tx_hash())
1070 .or_default()
1071 .push(PropagateKind::Full(*peer_id));
1072 peer.seen_transactions.insert(*tx.tx_hash());
1074 }
1075
1076 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1077
1078 self.network.send_transactions(*peer_id, new_full_transactions);
1080 }
1081 }
1082
1083 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1085
1086 propagated
1087 }
1088
1089 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1094 if self.peers.is_empty() {
1095 return
1097 }
1098 let propagated = self.propagate_transactions(
1099 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1100 PropagationMode::Basic,
1101 );
1102
1103 self.pool.on_propagated(propagated);
1105 }
1106
1107 fn on_get_pooled_transactions(
1109 &mut self,
1110 peer_id: PeerId,
1111 request: GetPooledTransactions,
1112 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1113 ) {
1114 if let Some(peer) = self.peers.get_mut(&peer_id) {
1115 if self.network.tx_gossip_disabled() {
1116 let _ = response.send(Ok(PooledTransactions::default()));
1117 return
1118 }
1119 let transactions = self.pool.get_pooled_transaction_elements(
1120 request.0,
1121 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1122 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1123 ),
1124 );
1125 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1126
1127 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1130
1131 let resp = PooledTransactions(transactions);
1132 let _ = response.send(Ok(resp));
1133 }
1134 }
1135
1136 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1138 match cmd {
1139 TransactionsCommand::PropagateHash(hash) => {
1140 self.on_new_pending_transactions(vec![hash])
1141 }
1142 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1143 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1144 }
1145 TransactionsCommand::GetActivePeers(tx) => {
1146 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1147 tx.send(peers).ok();
1148 }
1149 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1150 if let Some(propagated) =
1151 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1152 {
1153 self.pool.on_propagated(propagated);
1154 }
1155 }
1156 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1157 TransactionsCommand::BroadcastTransactions(txs) => {
1158 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1159 self.pool.on_propagated(propagated);
1160 }
1161 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1162 let mut res = HashMap::with_capacity(peers.len());
1163 for peer_id in peers {
1164 let hashes = self
1165 .peers
1166 .get(&peer_id)
1167 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1168 .unwrap_or_default();
1169 res.insert(peer_id, hashes);
1170 }
1171 tx.send(res).ok();
1172 }
1173 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1174 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1175 peer_request_sender.send(sender).ok();
1176 }
1177 }
1178 }
1179
1180 fn handle_peer_session(
1184 &mut self,
1185 info: SessionInfo,
1186 messages: PeerRequestSender<PeerRequest<N>>,
1187 ) {
1188 let SessionInfo { peer_id, client_version, version, .. } = info;
1189
1190 let peer = PeerMetadata::<N>::new(
1192 messages,
1193 version,
1194 client_version,
1195 self.config.max_transactions_seen_by_peer_history,
1196 info.peer_kind,
1197 );
1198 let peer = match self.peers.entry(peer_id) {
1199 Entry::Occupied(mut entry) => {
1200 entry.insert(peer);
1201 entry.into_mut()
1202 }
1203 Entry::Vacant(entry) => entry.insert(peer),
1204 };
1205
1206 self.policies.propagation_policy_mut().on_session_established(peer);
1207
1208 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1212 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1213 return
1214 }
1215
1216 let pooled_txs = self.pool.pooled_transactions_max(
1218 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1219 );
1220 if pooled_txs.is_empty() {
1221 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1222 return;
1223 }
1224
1225 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1227 for pooled_tx in pooled_txs {
1228 peer.seen_transactions.insert(*pooled_tx.hash());
1229 msg_builder.push_pooled(pooled_tx);
1230 }
1231
1232 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1233 let msg = msg_builder.build();
1234 self.network.send_transactions_hashes(peer_id, msg);
1235 }
1236
1237 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1239 match event_result {
1240 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1241 let peer = self.peers.remove(&peer_id);
1244 if let Some(mut peer) = peer {
1245 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1246 }
1247 self.transaction_fetcher.remove_peer(&peer_id);
1248 }
1249 NetworkEvent::ActivePeerSession { info, messages } => {
1250 self.handle_peer_session(info, messages);
1252 }
1253 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1254 let peer_id = info.peer_id;
1255 let messages = match self.peers.get(&peer_id) {
1257 Some(p) => p.request_tx.clone(),
1258 None => {
1259 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1260 return;
1261 }
1262 };
1263 self.handle_peer_session(info, messages);
1264 }
1265 _ => {}
1266 }
1267 }
1268
1269 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1271 if self.config.ingress_policy.allows_all() {
1272 return true;
1273 }
1274 let Some(peer) = self.peers.get(peer_id) else {
1275 return false;
1276 };
1277 self.config.ingress_policy.allows(peer.peer_kind())
1278 }
1279
1280 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1282 match event {
1283 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1284 if !self.accepts_incoming_from(&peer_id) {
1285 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1286 return;
1287 }
1288 let has_blob_txs = msg.has_eip4844();
1292
1293 let non_blob_txs = msg
1294 .0
1295 .into_iter()
1296 .map(N::PooledTransaction::try_from)
1297 .filter_map(Result::ok)
1298 .collect();
1299
1300 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1301
1302 if has_blob_txs {
1303 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1304 self.report_peer_bad_transactions(peer_id);
1305 }
1306 }
1307 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1308 if !self.accepts_incoming_from(&peer_id) {
1309 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1310 return;
1311 }
1312 self.on_new_pooled_transaction_hashes(peer_id, msg)
1313 }
1314 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1315 self.on_get_pooled_transactions(peer_id, request, response)
1316 }
1317 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1318 let _ = response.send(Some(self.handle()));
1319 }
1320 }
1321 }
1322
1323 fn import_transactions(
1325 &mut self,
1326 peer_id: PeerId,
1327 transactions: PooledTransactions<N::PooledTransaction>,
1328 source: TransactionSource,
1329 ) {
1330 if self.network.is_initially_syncing() {
1332 return
1333 }
1334 if self.network.tx_gossip_disabled() {
1335 return
1336 }
1337
1338 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1339 let mut transactions = transactions.0;
1340
1341 let start = Instant::now();
1342
1343 self.transaction_fetcher
1345 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1346
1347 let mut num_already_seen_by_peer = 0;
1352 for tx in &transactions {
1353 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1354 num_already_seen_by_peer += 1;
1355 }
1356 }
1357
1358 let txns_count_pre_pool_filter = transactions.len();
1360 self.pool.retain_unknown(&mut transactions);
1361 if txns_count_pre_pool_filter > transactions.len() {
1362 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1363 self.metrics
1364 .occurrences_transactions_already_in_pool
1365 .increment(already_known_txns_count as u64);
1366 }
1367
1368 let mut has_bad_transactions = false;
1370
1371 let mut new_txs = Vec::with_capacity(transactions.len());
1374 for tx in transactions {
1375 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1376 Entry::Occupied(mut entry) => {
1377 entry.get_mut().insert(peer_id);
1379 }
1380 Entry::Vacant(entry) => {
1381 if self.bad_imports.contains(tx.tx_hash()) {
1382 trace!(target: "net::tx",
1383 peer_id=format!("{peer_id:#}"),
1384 hash=%tx.tx_hash(),
1385 client_version=%peer.client_version,
1386 "received a known bad transaction from peer"
1387 );
1388 has_bad_transactions = true;
1389 } else {
1390 let tx = match tx.try_into_recovered() {
1394 Ok(tx) => tx,
1395 Err(badtx) => {
1396 trace!(target: "net::tx",
1397 peer_id=format!("{peer_id:#}"),
1398 hash=%badtx.tx_hash(),
1399 client_version=%peer.client_version,
1400 "failed ecrecovery for transaction"
1401 );
1402 has_bad_transactions = true;
1403 continue
1404 }
1405 };
1406
1407 let pool_transaction = Pool::Transaction::from_pooled(tx);
1408 new_txs.push(pool_transaction);
1409
1410 entry.insert(HashSet::from([peer_id]));
1411 }
1412 }
1413 }
1414 }
1415 new_txs.shrink_to_fit();
1416
1417 if !new_txs.is_empty() {
1420 let pool = self.pool.clone();
1421 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1423 metric_pending_pool_imports.increment(new_txs.len() as f64);
1424
1425 self.pending_pool_imports_info
1427 .pending_pool_imports
1428 .fetch_add(new_txs.len(), Ordering::Relaxed);
1429 let tx_manager_info_pending_pool_imports =
1430 self.pending_pool_imports_info.pending_pool_imports.clone();
1431
1432 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1433 let import = Box::pin(async move {
1434 let added = new_txs.len();
1435 let res = pool.add_external_transactions(new_txs).await;
1436
1437 metric_pending_pool_imports.decrement(added as f64);
1439 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1441
1442 res
1443 });
1444
1445 self.pool_imports.push(import);
1446 }
1447
1448 if num_already_seen_by_peer > 0 {
1449 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1450 self.metrics
1451 .occurrences_of_transaction_already_seen_by_peer
1452 .increment(num_already_seen_by_peer);
1453 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1454 }
1455
1456 if has_bad_transactions {
1457 self.report_peer_bad_transactions(peer_id)
1459 }
1460
1461 if num_already_seen_by_peer > 0 {
1462 self.report_already_seen(peer_id);
1463 }
1464
1465 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1466 }
1467
1468 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1470 match fetch_event {
1471 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1472 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1473 if report_peer {
1474 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1475 }
1476 }
1477 FetchEvent::FetchError { peer_id, error } => {
1478 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1479 self.on_request_error(peer_id, error);
1480 }
1481 FetchEvent::EmptyResponse { peer_id } => {
1482 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1483 }
1484 }
1485 }
1486}
1487
1488impl<
1496 Pool: TransactionPool + Unpin + 'static,
1497 N: NetworkPrimitives<
1498 BroadcastedTransaction: SignedTransaction,
1499 PooledTransaction: SignedTransaction,
1500 > + Unpin,
1501 > Future for TransactionsManager<Pool, N>
1502where
1503 Pool::Transaction:
1504 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1505{
1506 type Output = ();
1507
1508 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1509 let start = Instant::now();
1510 let mut poll_durations = TxManagerPollDurations::default();
1511
1512 let this = self.get_mut();
1513
1514 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1520 poll_durations.acc_network_events,
1521 "net::tx",
1522 "Network events stream",
1523 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1524 this.network_events.poll_next_unpin(cx),
1525 |event| this.on_network_event(event)
1526 );
1527
1528 let mut new_txs = Vec::new();
1537 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1538 cx,
1539 &mut new_txs,
1540 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1541 ) {
1542 Poll::Ready(count) => {
1543 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1544 true
1547 } else {
1548 let limit =
1552 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1553 new_txs.len();
1554 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1555 }
1556 }
1557 Poll::Pending => false,
1558 };
1559 if !new_txs.is_empty() {
1560 this.on_new_pending_transactions(new_txs);
1561 }
1562
1563 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1578 poll_durations.acc_tx_events,
1579 "net::tx",
1580 "Network transaction events stream",
1581 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1582 this.transaction_events.poll_next_unpin(cx),
1583 |event| this.on_network_tx_event(event),
1584 );
1585
1586 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1597 poll_durations.acc_fetch_events,
1598 "net::tx",
1599 "Transaction fetch events stream",
1600 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1601 this.transaction_fetcher.poll_next_unpin(cx),
1602 |event| this.on_fetch_event(event),
1603 );
1604
1605 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1620 poll_durations.acc_pending_imports,
1621 "net::tx",
1622 "Batched pool imports stream",
1623 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1624 this.pool_imports.poll_next_unpin(cx),
1625 |batch_results| this.on_batch_import_result(batch_results)
1626 );
1627
1628 duration_metered_exec!(
1633 {
1634 if this.has_capacity_for_fetching_pending_hashes() &&
1635 this.on_fetch_hashes_pending_fetch()
1636 {
1637 maybe_more_tx_fetch_events = true;
1638 }
1639 },
1640 poll_durations.acc_pending_fetch
1641 );
1642
1643 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1645 poll_durations.acc_cmds,
1646 "net::tx",
1647 "Commands channel",
1648 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1649 this.command_rx.poll_next_unpin(cx),
1650 |cmd| this.on_command(cmd)
1651 );
1652
1653 this.transaction_fetcher.update_metrics();
1654
1655 if maybe_more_network_events ||
1657 maybe_more_commands ||
1658 maybe_more_tx_events ||
1659 maybe_more_tx_fetch_events ||
1660 maybe_more_pool_imports ||
1661 maybe_more_pending_txns
1662 {
1663 cx.waker().wake_by_ref();
1665 return Poll::Pending
1666 }
1667
1668 this.update_poll_metrics(start, poll_durations);
1669
1670 Poll::Pending
1671 }
1672}
1673
1674#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1678enum PropagationMode {
1679 Basic,
1683 Forced,
1688}
1689
1690impl PropagationMode {
1691 const fn is_forced(self) -> bool {
1693 matches!(self, Self::Forced)
1694 }
1695}
1696
1697#[derive(Debug, Clone)]
1699struct PropagateTransaction<T = TransactionSigned> {
1700 size: usize,
1701 transaction: Arc<T>,
1702}
1703
1704impl<T: SignedTransaction> PropagateTransaction<T> {
1705 pub fn new(transaction: T) -> Self {
1707 let size = transaction.length();
1708 Self { size, transaction: Arc::new(transaction) }
1709 }
1710
1711 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1713 where
1714 P: PoolTransaction<Consensus = T>,
1715 {
1716 let size = tx.encoded_length();
1717 let transaction = tx.transaction.clone_into_consensus();
1718 let transaction = Arc::new(transaction.into_inner());
1719 Self { size, transaction }
1720 }
1721
1722 fn tx_hash(&self) -> &TxHash {
1723 self.transaction.tx_hash()
1724 }
1725}
1726
1727#[derive(Debug, Clone)]
1730enum PropagateTransactionsBuilder<T> {
1731 Pooled(PooledTransactionsHashesBuilder),
1732 Full(FullTransactionsBuilder<T>),
1733}
1734
1735impl<T> PropagateTransactionsBuilder<T> {
1736 fn pooled(version: EthVersion) -> Self {
1738 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1739 }
1740
1741 fn full(version: EthVersion) -> Self {
1743 Self::Full(FullTransactionsBuilder::new(version))
1744 }
1745
1746 fn is_empty(&self) -> bool {
1748 match self {
1749 Self::Pooled(builder) => builder.is_empty(),
1750 Self::Full(builder) => builder.is_empty(),
1751 }
1752 }
1753
1754 fn build(self) -> PropagateTransactions<T> {
1756 match self {
1757 Self::Pooled(pooled) => {
1758 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1759 }
1760 Self::Full(full) => full.build(),
1761 }
1762 }
1763}
1764
1765impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1766 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1768 for tx in txs {
1769 self.push(tx);
1770 }
1771 }
1772
1773 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1775 match self {
1776 Self::Pooled(builder) => builder.push(transaction),
1777 Self::Full(builder) => builder.push(transaction),
1778 }
1779 }
1780}
1781
1782struct PropagateTransactions<T> {
1784 pooled: Option<NewPooledTransactionHashes>,
1786 full: Option<Vec<Arc<T>>>,
1788}
1789
1790#[derive(Debug, Clone)]
1795struct FullTransactionsBuilder<T> {
1796 total_size: usize,
1798 transactions: Vec<Arc<T>>,
1800 pooled: PooledTransactionsHashesBuilder,
1802}
1803
1804impl<T> FullTransactionsBuilder<T> {
1805 fn new(version: EthVersion) -> Self {
1807 Self {
1808 total_size: 0,
1809 pooled: PooledTransactionsHashesBuilder::new(version),
1810 transactions: vec![],
1811 }
1812 }
1813
1814 fn is_empty(&self) -> bool {
1816 self.transactions.is_empty() && self.pooled.is_empty()
1817 }
1818
1819 fn build(self) -> PropagateTransactions<T> {
1821 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1822 let full = Some(self.transactions).filter(|full| !full.is_empty());
1823 PropagateTransactions { pooled, full }
1824 }
1825}
1826
1827impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1828 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1830 for tx in txs {
1831 self.push(&tx)
1832 }
1833 }
1834
1835 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1845 if !transaction.transaction.is_broadcastable_in_full() {
1854 self.pooled.push(transaction);
1855 return
1856 }
1857
1858 let new_size = self.total_size + transaction.size;
1859 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1860 self.total_size > 0
1861 {
1862 self.pooled.push(transaction);
1864 return
1865 }
1866
1867 self.total_size = new_size;
1868 self.transactions.push(Arc::clone(&transaction.transaction));
1869 }
1870}
1871
1872#[derive(Debug, Clone)]
1875enum PooledTransactionsHashesBuilder {
1876 Eth66(NewPooledTransactionHashes66),
1877 Eth68(NewPooledTransactionHashes68),
1878}
1879
1880impl PooledTransactionsHashesBuilder {
1883 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1885 match self {
1886 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1887 Self::Eth68(msg) => {
1888 msg.hashes.push(*pooled_tx.hash());
1889 msg.sizes.push(pooled_tx.encoded_length());
1890 msg.types.push(pooled_tx.transaction.ty());
1891 }
1892 }
1893 }
1894
1895 fn is_empty(&self) -> bool {
1897 match self {
1898 Self::Eth66(hashes) => hashes.is_empty(),
1899 Self::Eth68(hashes) => hashes.is_empty(),
1900 }
1901 }
1902
1903 fn extend<T: SignedTransaction>(
1905 &mut self,
1906 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1907 ) {
1908 for tx in txs {
1909 self.push(&tx);
1910 }
1911 }
1912
1913 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1914 match self {
1915 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1916 Self::Eth68(msg) => {
1917 msg.hashes.push(*tx.tx_hash());
1918 msg.sizes.push(tx.size);
1919 msg.types.push(tx.transaction.ty());
1920 }
1921 }
1922 }
1923
1924 fn new(version: EthVersion) -> Self {
1926 match version {
1927 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1928 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1929 }
1930 }
1931
1932 fn build(self) -> NewPooledTransactionHashes {
1933 match self {
1934 Self::Eth66(msg) => msg.into(),
1935 Self::Eth68(msg) => msg.into(),
1936 }
1937 }
1938}
1939
1940enum TransactionSource {
1942 Broadcast,
1944 Response,
1946}
1947
1948impl TransactionSource {
1951 const fn is_broadcast(&self) -> bool {
1953 matches!(self, Self::Broadcast)
1954 }
1955}
1956
1957#[derive(Debug)]
1959pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1960 seen_transactions: LruCache<TxHash>,
1964 request_tx: PeerRequestSender<PeerRequest<N>>,
1966 version: EthVersion,
1968 client_version: Arc<str>,
1970 peer_kind: PeerKind,
1972}
1973
1974impl<N: NetworkPrimitives> PeerMetadata<N> {
1975 pub fn new(
1977 request_tx: PeerRequestSender<PeerRequest<N>>,
1978 version: EthVersion,
1979 client_version: Arc<str>,
1980 max_transactions_seen_by_peer: u32,
1981 peer_kind: PeerKind,
1982 ) -> Self {
1983 Self {
1984 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1985 request_tx,
1986 version,
1987 client_version,
1988 peer_kind,
1989 }
1990 }
1991
1992 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1994 &self.request_tx
1995 }
1996
1997 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1999 &mut self.seen_transactions
2000 }
2001
2002 pub const fn version(&self) -> EthVersion {
2004 self.version
2005 }
2006
2007 pub fn client_version(&self) -> &str {
2009 &self.client_version
2010 }
2011
2012 pub const fn peer_kind(&self) -> PeerKind {
2014 self.peer_kind
2015 }
2016}
2017
2018#[derive(Debug)]
2020enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2021 PropagateHash(B256),
2023 PropagateHashesTo(Vec<B256>, PeerId),
2025 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2027 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2029 PropagateTransactions(Vec<TxHash>),
2031 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2033 GetTransactionHashes {
2035 peers: Vec<PeerId>,
2036 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2037 },
2038 GetPeerSender {
2040 peer_id: PeerId,
2041 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2042 },
2043}
2044
2045#[derive(Debug)]
2047pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2048 IncomingTransactions {
2052 peer_id: PeerId,
2054 msg: Transactions<N::BroadcastedTransaction>,
2056 },
2057 IncomingPooledTransactionHashes {
2059 peer_id: PeerId,
2061 msg: NewPooledTransactionHashes,
2063 },
2064 GetPooledTransactions {
2066 peer_id: PeerId,
2068 request: GetPooledTransactions,
2070 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2072 },
2073 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2075}
2076
2077#[derive(Debug)]
2079pub struct PendingPoolImportsInfo {
2080 pending_pool_imports: Arc<AtomicUsize>,
2082 max_pending_pool_imports: usize,
2084}
2085
2086impl PendingPoolImportsInfo {
2087 pub fn new(max_pending_pool_imports: usize) -> Self {
2089 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2090 }
2091
2092 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2094 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2095 }
2096}
2097
2098impl Default for PendingPoolImportsInfo {
2099 fn default() -> Self {
2100 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2101 }
2102}
2103
2104#[derive(Debug, Default)]
2105struct TxManagerPollDurations {
2106 acc_network_events: Duration,
2107 acc_pending_imports: Duration,
2108 acc_tx_events: Duration,
2109 acc_imported_txns: Duration,
2110 acc_fetch_events: Duration,
2111 acc_pending_fetch: Duration,
2112 acc_cmds: Duration,
2113}
2114
2115#[cfg(test)]
2116mod tests {
2117 use super::*;
2118 use crate::{
2119 test_utils::{
2120 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2121 Testnet,
2122 },
2123 transactions::config::RelaxedEthAnnouncementFilter,
2124 NetworkConfigBuilder, NetworkManager,
2125 };
2126 use alloy_consensus::{TxEip1559, TxLegacy};
2127 use alloy_primitives::{hex, Signature, TxKind, U256};
2128 use alloy_rlp::Decodable;
2129 use futures::FutureExt;
2130 use reth_chainspec::MIN_TRANSACTION_GAS;
2131 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2132 use reth_network_api::{NetworkInfo, PeerKind};
2133 use reth_network_p2p::{
2134 error::{RequestError, RequestResult},
2135 sync::{NetworkSyncUpdater, SyncState},
2136 };
2137 use reth_storage_api::noop::NoopProvider;
2138 use reth_transaction_pool::test_utils::{
2139 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2140 };
2141 use secp256k1::SecretKey;
2142 use std::{
2143 future::poll_fn,
2144 net::{IpAddr, Ipv4Addr, SocketAddr},
2145 str::FromStr,
2146 };
2147 use tracing::error;
2148
2149 #[tokio::test(flavor = "multi_thread")]
2150 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2151 reth_tracing::init_test_tracing();
2152 let net = Testnet::create(3).await;
2153
2154 let mut handles = net.handles();
2155 let handle0 = handles.next().unwrap();
2156 let handle1 = handles.next().unwrap();
2157
2158 drop(handles);
2159 let handle = net.spawn();
2160
2161 let listener0 = handle0.event_listener();
2162 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2163 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2164
2165 let client = NoopProvider::default();
2166 let pool = testing_pool();
2167 let config = NetworkConfigBuilder::eth(secret_key)
2168 .disable_discovery()
2169 .listener_port(0)
2170 .build(client);
2171 let transactions_manager_config = config.transactions_manager_config.clone();
2172 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2173 .await
2174 .unwrap()
2175 .into_builder()
2176 .transactions(pool.clone(), transactions_manager_config)
2177 .split_with_handle();
2178
2179 tokio::task::spawn(network);
2180
2181 network_handle.update_sync_state(SyncState::Syncing);
2183 assert!(NetworkInfo::is_syncing(&network_handle));
2184 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2185
2186 let mut established = listener0.take(2);
2188 while let Some(ev) = established.next().await {
2189 match ev {
2190 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2191 transactions
2193 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2194 }
2195 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2196 ev => {
2197 error!("unexpected event {ev:?}")
2198 }
2199 }
2200 }
2201 let input = hex!(
2203 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2204 );
2205 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2206 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2207 peer_id: *handle1.peer_id(),
2208 msg: Transactions(vec![signed_tx.clone()]),
2209 });
2210 poll_fn(|cx| {
2211 let _ = transactions.poll_unpin(cx);
2212 Poll::Ready(())
2213 })
2214 .await;
2215 assert!(pool.is_empty());
2216 handle.terminate().await;
2217 }
2218
2219 #[tokio::test(flavor = "multi_thread")]
2220 async fn test_tx_broadcasts_through_two_syncs() {
2221 reth_tracing::init_test_tracing();
2222 let net = Testnet::create(3).await;
2223
2224 let mut handles = net.handles();
2225 let handle0 = handles.next().unwrap();
2226 let handle1 = handles.next().unwrap();
2227
2228 drop(handles);
2229 let handle = net.spawn();
2230
2231 let listener0 = handle0.event_listener();
2232 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2233 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2234
2235 let client = NoopProvider::default();
2236 let pool = testing_pool();
2237 let config = NetworkConfigBuilder::new(secret_key)
2238 .disable_discovery()
2239 .listener_port(0)
2240 .build(client);
2241 let transactions_manager_config = config.transactions_manager_config.clone();
2242 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2243 .await
2244 .unwrap()
2245 .into_builder()
2246 .transactions(pool.clone(), transactions_manager_config)
2247 .split_with_handle();
2248
2249 tokio::task::spawn(network);
2250
2251 network_handle.update_sync_state(SyncState::Syncing);
2253 assert!(NetworkInfo::is_syncing(&network_handle));
2254 network_handle.update_sync_state(SyncState::Idle);
2255 assert!(!NetworkInfo::is_syncing(&network_handle));
2256 network_handle.update_sync_state(SyncState::Syncing);
2257 assert!(NetworkInfo::is_syncing(&network_handle));
2258
2259 let mut established = listener0.take(2);
2261 while let Some(ev) = established.next().await {
2262 match ev {
2263 NetworkEvent::ActivePeerSession { .. } |
2264 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2265 transactions.on_network_event(ev);
2267 }
2268 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2269 _ => {
2270 error!("unexpected event {ev:?}")
2271 }
2272 }
2273 }
2274 let input = hex!(
2276 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2277 );
2278 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2279 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2280 peer_id: *handle1.peer_id(),
2281 msg: Transactions(vec![signed_tx.clone()]),
2282 });
2283 poll_fn(|cx| {
2284 let _ = transactions.poll_unpin(cx);
2285 Poll::Ready(())
2286 })
2287 .await;
2288 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2289 assert!(NetworkInfo::is_syncing(&network_handle));
2290 assert!(!pool.is_empty());
2291 handle.terminate().await;
2292 }
2293
2294 #[tokio::test(flavor = "multi_thread")]
2297 async fn test_handle_incoming_transactions_hashes() {
2298 reth_tracing::init_test_tracing();
2299
2300 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2301 let client = NoopProvider::default();
2302
2303 let config = NetworkConfigBuilder::new(secret_key)
2304 .listener_port(0)
2306 .disable_discovery()
2307 .build(client);
2308
2309 let pool = testing_pool();
2310
2311 let transactions_manager_config = config.transactions_manager_config.clone();
2312 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2313 .await
2314 .unwrap()
2315 .into_builder()
2316 .transactions(pool.clone(), transactions_manager_config)
2317 .split_with_handle();
2318
2319 let peer_id_1 = PeerId::new([1; 64]);
2320 let eth_version = EthVersion::Eth66;
2321
2322 let txs = vec![TransactionSigned::new_unhashed(
2323 Transaction::Legacy(TxLegacy {
2324 chain_id: Some(4),
2325 nonce: 15u64,
2326 gas_price: 2200000000,
2327 gas_limit: 34811,
2328 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2329 value: U256::from(1234u64),
2330 input: Default::default(),
2331 }),
2332 Signature::new(
2333 U256::from_str(
2334 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2335 )
2336 .unwrap(),
2337 U256::from_str(
2338 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2339 )
2340 .unwrap(),
2341 true,
2342 ),
2343 )];
2344
2345 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2346
2347 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2348 tx_manager.peers.insert(peer_id_1, peer_1);
2349
2350 assert!(pool.is_empty());
2351
2352 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2353 peer_id: peer_id_1,
2354 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2355 txs_hashes.clone(),
2356 )),
2357 });
2358
2359 let req = to_mock_session_rx
2361 .recv()
2362 .await
2363 .expect("peer_1 session should receive request with buffered hashes");
2364 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2365 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2366
2367 let message: Vec<PooledTransactionVariant> = txs
2368 .into_iter()
2369 .map(|tx| {
2370 PooledTransactionVariant::try_from(tx)
2371 .expect("Failed to convert MockTransaction to PooledTransaction")
2372 })
2373 .collect();
2374
2375 response
2377 .send(Ok(PooledTransactions(message)))
2378 .expect("should send peer_1 response to tx manager");
2379
2380 poll_fn(|cx| {
2382 let _ = tx_manager.poll_unpin(cx);
2383 Poll::Ready(())
2384 })
2385 .await;
2386
2387 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2390 }
2391
2392 #[tokio::test(flavor = "multi_thread")]
2393 async fn test_handle_incoming_transactions() {
2394 reth_tracing::init_test_tracing();
2395 let net = Testnet::create(3).await;
2396
2397 let mut handles = net.handles();
2398 let handle0 = handles.next().unwrap();
2399 let handle1 = handles.next().unwrap();
2400
2401 drop(handles);
2402 let handle = net.spawn();
2403
2404 let listener0 = handle0.event_listener();
2405
2406 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2407 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2408
2409 let client = NoopProvider::default();
2410 let pool = testing_pool();
2411 let config = NetworkConfigBuilder::new(secret_key)
2412 .disable_discovery()
2413 .listener_port(0)
2414 .build(client);
2415 let transactions_manager_config = config.transactions_manager_config.clone();
2416 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2417 .await
2418 .unwrap()
2419 .into_builder()
2420 .transactions(pool.clone(), transactions_manager_config)
2421 .split_with_handle();
2422 tokio::task::spawn(network);
2423
2424 network_handle.update_sync_state(SyncState::Idle);
2425
2426 assert!(!NetworkInfo::is_syncing(&network_handle));
2427
2428 let mut established = listener0.take(2);
2430 while let Some(ev) = established.next().await {
2431 match ev {
2432 NetworkEvent::ActivePeerSession { .. } |
2433 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2434 transactions.on_network_event(ev);
2436 }
2437 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2438 ev => {
2439 error!("unexpected event {ev:?}")
2440 }
2441 }
2442 }
2443 let input = hex!(
2445 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2446 );
2447 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2448 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2449 peer_id: *handle1.peer_id(),
2450 msg: Transactions(vec![signed_tx.clone()]),
2451 });
2452 assert!(transactions
2453 .transactions_by_peers
2454 .get(signed_tx.tx_hash())
2455 .unwrap()
2456 .contains(handle1.peer_id()));
2457
2458 poll_fn(|cx| {
2460 let _ = transactions.poll_unpin(cx);
2461 Poll::Ready(())
2462 })
2463 .await;
2464
2465 assert!(!pool.is_empty());
2466 assert!(pool.get(signed_tx.tx_hash()).is_some());
2467 handle.terminate().await;
2468 }
2469
2470 #[tokio::test(flavor = "multi_thread")]
2471 async fn test_on_get_pooled_transactions_network() {
2472 reth_tracing::init_test_tracing();
2473 let net = Testnet::create(2).await;
2474
2475 let mut handles = net.handles();
2476 let handle0 = handles.next().unwrap();
2477 let handle1 = handles.next().unwrap();
2478
2479 drop(handles);
2480 let handle = net.spawn();
2481
2482 let listener0 = handle0.event_listener();
2483
2484 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2485 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2486
2487 let client = NoopProvider::default();
2488 let pool = testing_pool();
2489 let config = NetworkConfigBuilder::new(secret_key)
2490 .disable_discovery()
2491 .listener_port(0)
2492 .build(client);
2493 let transactions_manager_config = config.transactions_manager_config.clone();
2494 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2495 .await
2496 .unwrap()
2497 .into_builder()
2498 .transactions(pool.clone(), transactions_manager_config)
2499 .split_with_handle();
2500 tokio::task::spawn(network);
2501
2502 network_handle.update_sync_state(SyncState::Idle);
2503
2504 assert!(!NetworkInfo::is_syncing(&network_handle));
2505
2506 let mut established = listener0.take(2);
2508 while let Some(ev) = established.next().await {
2509 match ev {
2510 NetworkEvent::ActivePeerSession { .. } |
2511 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2512 transactions.on_network_event(ev);
2513 }
2514 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2515 ev => {
2516 error!("unexpected event {ev:?}")
2517 }
2518 }
2519 }
2520 handle.terminate().await;
2521
2522 let tx = MockTransaction::eip1559();
2523 let _ = transactions
2524 .pool
2525 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2526 .await;
2527
2528 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2529
2530 let (send, receive) =
2531 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2532
2533 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2534 peer_id: *handle1.peer_id(),
2535 request,
2536 response: send,
2537 });
2538
2539 match receive.await.unwrap() {
2540 Ok(PooledTransactions(transactions)) => {
2541 assert_eq!(transactions.len(), 1);
2542 }
2543 Err(e) => {
2544 panic!("error: {e:?}");
2545 }
2546 }
2547 }
2548
2549 #[tokio::test]
2553 async fn test_partially_tx_response() {
2554 reth_tracing::init_test_tracing();
2555
2556 let mut tx_manager = new_tx_manager().await.0;
2557 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2558
2559 let peer_id_1 = PeerId::new([1; 64]);
2560 let eth_version = EthVersion::Eth66;
2561
2562 let txs = vec![
2563 TransactionSigned::new_unhashed(
2564 Transaction::Legacy(TxLegacy {
2565 chain_id: Some(4),
2566 nonce: 15u64,
2567 gas_price: 2200000000,
2568 gas_limit: 34811,
2569 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2570 value: U256::from(1234u64),
2571 input: Default::default(),
2572 }),
2573 Signature::new(
2574 U256::from_str(
2575 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2576 )
2577 .unwrap(),
2578 U256::from_str(
2579 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2580 )
2581 .unwrap(),
2582 true,
2583 ),
2584 ),
2585 TransactionSigned::new_unhashed(
2586 Transaction::Eip1559(TxEip1559 {
2587 chain_id: 4,
2588 nonce: 26u64,
2589 max_priority_fee_per_gas: 1500000000,
2590 max_fee_per_gas: 1500000013,
2591 gas_limit: MIN_TRANSACTION_GAS,
2592 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2593 value: U256::from(3000000000000000000u64),
2594 input: Default::default(),
2595 access_list: Default::default(),
2596 }),
2597 Signature::new(
2598 U256::from_str(
2599 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2600 )
2601 .unwrap(),
2602 U256::from_str(
2603 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2604 )
2605 .unwrap(),
2606 true,
2607 ),
2608 ),
2609 ];
2610
2611 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2612
2613 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2614 peer_1.seen_transactions.insert(txs_hashes[0]);
2617 peer_1.seen_transactions.insert(txs_hashes[1]);
2618 tx_manager.peers.insert(peer_id_1, peer_1);
2619
2620 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2621 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2622
2623 assert!(tx_fetcher.is_idle(&peer_id_1));
2625 assert_eq!(tx_fetcher.active_peers.len(), 0);
2626
2627 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2629
2630 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2631 assert!(!tx_fetcher.is_idle(&peer_id_1));
2633 assert_eq!(tx_fetcher.active_peers.len(), 1);
2634
2635 let req = to_mock_session_rx
2637 .recv()
2638 .await
2639 .expect("peer_1 session should receive request with buffered hashes");
2640 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2641
2642 let message: Vec<PooledTransactionVariant> = txs
2643 .into_iter()
2644 .take(1)
2645 .map(|tx| {
2646 PooledTransactionVariant::try_from(tx)
2647 .expect("Failed to convert MockTransaction to PooledTransaction")
2648 })
2649 .collect();
2650 response
2652 .send(Ok(PooledTransactions(message)))
2653 .expect("should send peer_1 response to tx manager");
2654 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2655 unreachable!()
2656 };
2657
2658 assert!(tx_fetcher.is_idle(&peer_id));
2660 assert_eq!(tx_fetcher.active_peers.len(), 0);
2661 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2663 }
2664
2665 #[tokio::test]
2666 async fn test_max_retries_tx_request() {
2667 reth_tracing::init_test_tracing();
2668
2669 let mut tx_manager = new_tx_manager().await.0;
2670 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2671
2672 let peer_id_1 = PeerId::new([1; 64]);
2673 let peer_id_2 = PeerId::new([2; 64]);
2674 let eth_version = EthVersion::Eth66;
2675 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2676
2677 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2678 peer_1.seen_transactions.insert(seen_hashes[0]);
2681 peer_1.seen_transactions.insert(seen_hashes[1]);
2682 tx_manager.peers.insert(peer_id_1, peer_1);
2683
2684 let retries = 1;
2687 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2688 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2689
2690 assert!(tx_fetcher.is_idle(&peer_id_1));
2692 assert_eq!(tx_fetcher.active_peers.len(), 0);
2693
2694 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2696
2697 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2698
2699 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2700 assert!(!tx_fetcher.is_idle(&peer_id_1));
2702 assert_eq!(tx_fetcher.active_peers.len(), 1);
2703
2704 let req = to_mock_session_rx
2706 .recv()
2707 .await
2708 .expect("peer_1 session should receive request with buffered hashes");
2709 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2710 let GetPooledTransactions(hashes) = request;
2711
2712 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2713
2714 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2715
2716 response
2718 .send(Err(RequestError::BadResponse))
2719 .expect("should send peer_1 response to tx manager");
2720 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2721 unreachable!()
2722 };
2723
2724 assert!(tx_fetcher.is_idle(&peer_id));
2726 assert_eq!(tx_fetcher.active_peers.len(), 0);
2727 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2729
2730 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2731 tx_manager.peers.insert(peer_id_2, peer_2);
2732
2733 let msg =
2735 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2736 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2737
2738 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2739
2740 assert_eq!(tx_fetcher.active_peers.len(), 1);
2742
2743 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2745 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2747
2748 let req = to_mock_session_rx
2750 .recv()
2751 .await
2752 .expect("peer_2 session should receive request with buffered hashes");
2753 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2754
2755 response
2757 .send(Err(RequestError::BadResponse))
2758 .expect("should send peer_2 response to tx manager");
2759 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2760
2761 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2764 assert_eq!(tx_fetcher.active_peers.len(), 0);
2765 }
2766
2767 #[test]
2768 fn test_transaction_builder_empty() {
2769 let mut builder =
2770 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2771 assert!(builder.is_empty());
2772
2773 let mut factory = MockTransactionFactory::default();
2774 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2775 builder.push(&tx);
2776 assert!(!builder.is_empty());
2777
2778 let txs = builder.build();
2779 assert!(txs.full.is_none());
2780 let txs = txs.pooled.unwrap();
2781 assert_eq!(txs.len(), 1);
2782 }
2783
2784 #[test]
2785 fn test_transaction_builder_large() {
2786 let mut builder =
2787 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2788 assert!(builder.is_empty());
2789
2790 let mut factory = MockTransactionFactory::default();
2791 let mut tx = factory.create_eip1559();
2792 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2794 let tx = Arc::new(tx);
2795 let tx = PropagateTransaction::pool_tx(tx);
2796 builder.push(&tx);
2797 assert!(!builder.is_empty());
2798
2799 let txs = builder.clone().build();
2800 assert!(txs.pooled.is_none());
2801 let txs = txs.full.unwrap();
2802 assert_eq!(txs.len(), 1);
2803
2804 builder.push(&tx);
2805
2806 let txs = builder.clone().build();
2807 let pooled = txs.pooled.unwrap();
2808 assert_eq!(pooled.len(), 1);
2809 let txs = txs.full.unwrap();
2810 assert_eq!(txs.len(), 1);
2811 }
2812
2813 #[test]
2814 fn test_transaction_builder_eip4844() {
2815 let mut builder =
2816 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2817 assert!(builder.is_empty());
2818
2819 let mut factory = MockTransactionFactory::default();
2820 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2821 builder.push(&tx);
2822 assert!(!builder.is_empty());
2823
2824 let txs = builder.clone().build();
2825 assert!(txs.full.is_none());
2826 let txs = txs.pooled.unwrap();
2827 assert_eq!(txs.len(), 1);
2828
2829 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2830 builder.push(&tx);
2831
2832 let txs = builder.clone().build();
2833 let pooled = txs.pooled.unwrap();
2834 assert_eq!(pooled.len(), 1);
2835 let txs = txs.full.unwrap();
2836 assert_eq!(txs.len(), 1);
2837 }
2838
2839 #[tokio::test]
2840 async fn test_propagate_full() {
2841 reth_tracing::init_test_tracing();
2842
2843 let (mut tx_manager, network) = new_tx_manager().await;
2844 let peer_id = PeerId::random();
2845
2846 network.handle().update_sync_state(SyncState::Idle);
2848
2849 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2851
2852 let session_info = SessionInfo {
2853 peer_id,
2854 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2855 client_version: Arc::from(""),
2856 capabilities: Arc::new(vec![].into()),
2857 status: Arc::new(Default::default()),
2858 version: EthVersion::Eth68,
2859 peer_kind: PeerKind::Basic,
2860 };
2861 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2862 tx_manager
2863 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2864 let mut propagate = vec![];
2865 let mut factory = MockTransactionFactory::default();
2866 let eip1559_tx = Arc::new(factory.create_eip1559());
2867 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2868 let eip4844_tx = Arc::new(factory.create_eip4844());
2869 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2870
2871 let propagated =
2872 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2873 assert_eq!(propagated.0.len(), 2);
2874 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2875 assert_eq!(prop_txs.len(), 1);
2876 assert!(prop_txs[0].is_full());
2877
2878 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2879 assert_eq!(prop_txs.len(), 1);
2880 assert!(prop_txs[0].is_hash());
2881
2882 let peer = tx_manager.peers.get(&peer_id).unwrap();
2883 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2884 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2885 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2886
2887 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2889 assert!(propagated.0.is_empty());
2890 }
2891
2892 #[tokio::test]
2893 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2894 reth_tracing::init_test_tracing();
2895
2896 let transactions_manager_config = TransactionsManagerConfig::default();
2897
2898 let propagation_policy = TransactionPropagationKind::default();
2899 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2900
2901 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2902
2903 let pool = testing_pool();
2904 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2905 let client = NoopProvider::default();
2906
2907 let network_config = NetworkConfigBuilder::new(secret_key)
2908 .listener_port(0)
2909 .disable_discovery()
2910 .build(client.clone());
2911
2912 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2913 let (to_tx_manager_tx, from_network_rx) =
2914 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2915 network_manager.set_transactions(to_tx_manager_tx);
2916 let network_handle = network_manager.handle().clone();
2917 let network_service_handle = tokio::spawn(network_manager);
2918
2919 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2920 network_handle.clone(),
2921 pool.clone(),
2922 from_network_rx,
2923 transactions_manager_config,
2924 policy_bundle,
2925 );
2926
2927 let peer_id = PeerId::random();
2928 let eth_version = EthVersion::Eth68;
2929 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2930 tx_manager.peers.insert(peer_id, mock_peer_metadata);
2931
2932 let mut tx_factory = MockTransactionFactory::default();
2933
2934 let valid_known_tx = tx_factory.create_eip1559();
2935 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2936
2937 let known_tx_hash = *known_tx_signed.hash();
2938 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2939 let known_tx_size = known_tx_signed.encoded_length();
2940
2941 let unknown_tx_hash = B256::random();
2942 let unknown_tx_type_byte = 0xff_u8;
2943 let unknown_tx_size = 150;
2944
2945 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2946 types: vec![known_tx_type_byte, unknown_tx_type_byte],
2947 sizes: vec![known_tx_size, unknown_tx_size],
2948 hashes: vec![known_tx_hash, unknown_tx_hash],
2949 });
2950
2951 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2952
2953 poll_fn(|cx| {
2954 let _ = tx_manager.poll_unpin(cx);
2955 Poll::Ready(())
2956 })
2957 .await;
2958
2959 let mut requested_hashes_in_getpooled = HashSet::new();
2960 let mut unexpected_request_received = false;
2961
2962 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2963 .await
2964 {
2965 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2966 let GetPooledTransactions(hashes) = request;
2967 for hash in hashes {
2968 requested_hashes_in_getpooled.insert(hash);
2969 }
2970 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2971 }
2972 Ok(Some(other_request)) => {
2973 tracing::error!(?other_request, "Received unexpected PeerRequest type");
2974 unexpected_request_received = true;
2975 }
2976 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2977 Err(_timeout_err) => {
2978 tracing::info!("Timeout: No GetPooledTransactions request received.")
2979 }
2980 }
2981
2982 assert!(
2983 requested_hashes_in_getpooled.contains(&known_tx_hash),
2984 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2985 );
2986 assert!(
2987 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2988 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2989 );
2990 assert!(
2991 !unexpected_request_received,
2992 "An unexpected P2P request was received by the mock peer."
2993 );
2994
2995 network_service_handle.abort();
2996 }
2997}