1use alloy_consensus::transaction::TxHashRef;
4
5pub mod config;
7pub mod constants;
9pub mod fetcher;
11pub mod policy;
13
14pub use self::constants::{
15 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
16 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
17};
18use config::{AnnouncementAcceptance, StrictEthAnnouncementFilter, TransactionPropagationKind};
19pub use config::{
20 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionPropagationMode,
21 TransactionPropagationPolicy, TransactionsManagerConfig,
22};
23use policy::{NetworkPolicies, TransactionPolicies};
24
25pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
26
27use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28use crate::{
29 budget::{
30 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
31 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
32 },
33 cache::LruCache,
34 duration_metered_exec, metered_poll_nested_stream_with_budget,
35 metrics::{
36 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
37 },
38 NetworkHandle, TxTypesCounter,
39};
40use alloy_primitives::{TxHash, B256};
41use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
42use futures::{stream::FuturesUnordered, Future, StreamExt};
43use reth_eth_wire::{
44 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
45 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
46 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
47 RequestTxHashes, Transactions, ValidAnnouncementData,
48};
49use reth_ethereum_primitives::{TransactionSigned, TxType};
50use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
51use reth_network_api::{
52 events::{PeerEvent, SessionInfo},
53 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
54};
55use reth_network_p2p::{
56 error::{RequestError, RequestResult},
57 sync::SyncStateProvider,
58};
59use reth_network_peers::PeerId;
60use reth_network_types::ReputationChangeKind;
61use reth_primitives_traits::SignedTransaction;
62use reth_tokio_util::EventStream;
63use reth_transaction_pool::{
64 error::{PoolError, PoolResult},
65 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
66 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
67};
68use std::{
69 collections::{hash_map::Entry, HashMap, HashSet},
70 pin::Pin,
71 sync::{
72 atomic::{AtomicUsize, Ordering},
73 Arc,
74 },
75 task::{Context, Poll},
76 time::{Duration, Instant},
77};
78use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
79use tokio_stream::wrappers::UnboundedReceiverStream;
80use tracing::{debug, trace};
81
82pub type PoolImportFuture =
86 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
87
88#[derive(Debug, Clone)]
96pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
97 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
99}
100
101impl<N: NetworkPrimitives> TransactionsHandle<N> {
104 fn send(&self, cmd: TransactionsCommand<N>) {
105 let _ = self.manager_tx.send(cmd);
106 }
107
108 async fn peer_handle(
110 &self,
111 peer_id: PeerId,
112 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113 let (tx, rx) = oneshot::channel();
114 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115 rx.await
116 }
117
118 pub fn propagate(&self, hash: TxHash) {
120 self.send(TransactionsCommand::PropagateHash(hash))
121 }
122
123 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127 self.propagate_hashes_to(Some(hash), peer)
128 }
129
130 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134 let hashes = hash.into_iter().collect::<Vec<_>>();
135 if hashes.is_empty() {
136 return
137 }
138 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139 }
140
141 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143 let (tx, rx) = oneshot::channel();
144 self.send(TransactionsCommand::GetActivePeers(tx));
145 rx.await
146 }
147
148 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152 if transactions.is_empty() {
153 return
154 }
155 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156 }
157
158 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163 if transactions.is_empty() {
164 return
165 }
166 self.send(TransactionsCommand::PropagateTransactions(transactions))
167 }
168
169 pub fn broadcast_transactions(
174 &self,
175 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176 ) {
177 let transactions =
178 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179 if transactions.is_empty() {
180 return
181 }
182 self.send(TransactionsCommand::BroadcastTransactions(transactions))
183 }
184
185 pub async fn get_transaction_hashes(
187 &self,
188 peers: Vec<PeerId>,
189 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190 if peers.is_empty() {
191 return Ok(Default::default())
192 }
193 let (tx, rx) = oneshot::channel();
194 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195 rx.await
196 }
197
198 pub async fn get_peer_transaction_hashes(
200 &self,
201 peer: PeerId,
202 ) -> Result<HashSet<TxHash>, RecvError> {
203 let res = self.get_transaction_hashes(vec![peer]).await?;
204 Ok(res.into_values().next().unwrap_or_default())
205 }
206
207 pub async fn get_pooled_transactions_from(
213 &self,
214 peer_id: PeerId,
215 hashes: Vec<B256>,
216 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219 let (tx, rx) = oneshot::channel();
220 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221 peer.try_send(request).ok();
222
223 rx.await?.map(|res| Some(res.0))
224 }
225}
226
227#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<
284 Pool,
285 N: NetworkPrimitives = EthNetworkPrimitives,
286 PBundle: TransactionPolicies = NetworkPolicies<
287 TransactionPropagationKind,
288 StrictEthAnnouncementFilter,
289 >,
290> {
291 pool: Pool,
293 network: NetworkHandle<N>,
295 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
299 transaction_fetcher: TransactionFetcher<N>,
301 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
306 pool_imports: FuturesUnordered<PoolImportFuture>,
318 pending_pool_imports_info: PendingPoolImportsInfo,
320 bad_imports: LruCache<TxHash>,
322 peers: HashMap<PeerId, PeerMetadata<N>>,
324 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
328 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
333 pending_transactions: mpsc::Receiver<TxHash>,
342 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
344 config: TransactionsManagerConfig,
346 policies: PBundle,
348 metrics: TransactionsManagerMetrics,
350 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
352}
353
354impl<Pool: TransactionPool, N: NetworkPrimitives>
355 TransactionsManager<
356 Pool,
357 N,
358 NetworkPolicies<TransactionPropagationKind, StrictEthAnnouncementFilter>,
359 >
360{
361 pub fn new(
365 network: NetworkHandle<N>,
366 pool: Pool,
367 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
368 transactions_manager_config: TransactionsManagerConfig,
369 ) -> Self {
370 Self::with_policy(
371 network,
372 pool,
373 from_network,
374 transactions_manager_config,
375 NetworkPolicies::default(),
376 )
377 }
378}
379
380impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
381 TransactionsManager<Pool, N, PBundle>
382{
383 pub fn with_policy(
387 network: NetworkHandle<N>,
388 pool: Pool,
389 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
390 transactions_manager_config: TransactionsManagerConfig,
391 policies: PBundle,
392 ) -> Self {
393 let network_events = network.event_listener();
394
395 let (command_tx, command_rx) = mpsc::unbounded_channel();
396
397 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
398 &transactions_manager_config.transaction_fetcher_config,
399 );
400
401 let pending = pool.pending_transactions_listener();
404 let pending_pool_imports_info = PendingPoolImportsInfo::default();
405 let metrics = TransactionsManagerMetrics::default();
406 metrics
407 .capacity_pending_pool_imports
408 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
409
410 Self {
411 pool,
412 network,
413 network_events,
414 transaction_fetcher,
415 transactions_by_peers: Default::default(),
416 pool_imports: Default::default(),
417 pending_pool_imports_info: PendingPoolImportsInfo::new(
418 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
419 ),
420 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
421 peers: Default::default(),
422 command_tx,
423 command_rx: UnboundedReceiverStream::new(command_rx),
424 pending_transactions: pending,
425 transaction_events: UnboundedMeteredReceiver::new(
426 from_network,
427 NETWORK_POOL_TRANSACTIONS_SCOPE,
428 ),
429 config: transactions_manager_config,
430 policies,
431 metrics,
432 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
433 }
434 }
435
436 pub fn handle(&self) -> TransactionsHandle<N> {
438 TransactionsHandle { manager_tx: self.command_tx.clone() }
439 }
440
441 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
444 self.pending_pool_imports_info
445 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
446 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
447 }
448
449 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
450 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
451 self.metrics.reported_bad_transactions.increment(1);
452 }
453
454 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
455 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
456 self.network.reputation_change(peer_id, kind);
457 }
458
459 fn report_already_seen(&self, peer_id: PeerId) {
460 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
461 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
462 }
463
464 fn on_good_import(&mut self, hash: TxHash) {
466 self.transactions_by_peers.remove(&hash);
467 }
468
469 fn on_bad_import(&mut self, err: PoolError) {
493 let peers = self.transactions_by_peers.remove(&err.hash);
494
495 if !err.is_bad_transaction() || self.network.is_syncing() {
497 return
498 }
499 if let Some(peers) = peers {
502 for peer_id in peers {
503 self.report_peer_bad_transactions(peer_id);
504 }
505 }
506 self.metrics.bad_imports.increment(1);
507 self.bad_imports.insert(err.hash);
508 }
509
510 fn on_fetch_hashes_pending_fetch(&mut self) {
512 let info = &self.pending_pool_imports_info;
514 let max_pending_pool_imports = info.max_pending_pool_imports;
515 let has_capacity_wrt_pending_pool_imports =
516 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
517
518 self.transaction_fetcher
519 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
520 }
521
522 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
523 let kind = match req_err {
524 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
525 RequestError::Timeout => ReputationChangeKind::Timeout,
526 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
527 return
529 }
530 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
531 };
532 self.report_peer(peer_id, kind);
533 }
534
535 #[inline]
536 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
537 let metrics = &self.metrics;
538
539 let TxManagerPollDurations {
540 acc_network_events,
541 acc_pending_imports,
542 acc_tx_events,
543 acc_imported_txns,
544 acc_fetch_events,
545 acc_pending_fetch,
546 acc_cmds,
547 } = poll_durations;
548
549 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
551 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
553 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
554 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
555 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
556 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
557 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
558 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
559 }
560}
561
562impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
563 TransactionsManager<Pool, N, PBundle>
564{
565 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
567 for res in batch_results {
568 match res {
569 Ok(AddedTransactionOutcome { hash, .. }) => {
570 self.on_good_import(hash);
571 }
572 Err(err) => {
573 self.on_bad_import(err);
574 }
575 }
576 }
577 }
578
579 fn on_new_pooled_transaction_hashes(
581 &mut self,
582 peer_id: PeerId,
583 msg: NewPooledTransactionHashes,
584 ) {
585 if self.network.is_initially_syncing() {
587 return
588 }
589 if self.network.tx_gossip_disabled() {
590 return
591 }
592
593 let Some(peer) = self.peers.get_mut(&peer_id) else {
595 trace!(
596 peer_id = format!("{peer_id:#}"),
597 ?msg,
598 "discarding announcement from inactive peer"
599 );
600
601 return
602 };
603 let client = peer.client_version.clone();
604
605 let mut count_txns_already_seen_by_peer = 0;
607 for tx in msg.iter_hashes().copied() {
608 if !peer.seen_transactions.insert(tx) {
609 count_txns_already_seen_by_peer += 1;
610 }
611 }
612 if count_txns_already_seen_by_peer > 0 {
613 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
618 self.metrics
619 .occurrences_hash_already_seen_by_peer
620 .increment(count_txns_already_seen_by_peer);
621
622 trace!(target: "net::tx",
623 %count_txns_already_seen_by_peer,
624 peer_id=format!("{peer_id:#}"),
625 ?client,
626 "Peer sent hashes that have already been marked as seen by peer"
627 );
628
629 self.report_already_seen(peer_id);
630 }
631
632 if msg.is_empty() {
634 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
635 return;
636 }
637
638 let original_len = msg.len();
639 let mut partially_valid_msg = msg.dedup();
640
641 if partially_valid_msg.len() != original_len {
642 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
643 }
644
645 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
647
648 let hashes_count_pre_pool_filter = partially_valid_msg.len();
656 self.pool.retain_unknown(&mut partially_valid_msg);
657 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
658 let already_known_hashes_count =
659 hashes_count_pre_pool_filter - partially_valid_msg.len();
660 self.metrics
661 .occurrences_hashes_already_in_pool
662 .increment(already_known_hashes_count as u64);
663 }
664
665 if partially_valid_msg.is_empty() {
666 return
668 }
669
670 let mut should_report_peer = false;
675 let mut tx_types_counter = TxTypesCounter::default();
676
677 let is_eth68_message = partially_valid_msg
678 .msg_version()
679 .expect("partially valid announcement should have a version")
680 .is_eth68();
681
682 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
683 let (ty_byte, size_val) = match *metadata_ref_mut {
684 Some((ty, size)) => {
685 if !is_eth68_message {
686 should_report_peer = true;
687 }
688 (ty, size)
689 }
690 None => {
691 if is_eth68_message {
692 should_report_peer = true;
693 return false;
694 }
695 (0u8, 0)
696 }
697 };
698
699 if is_eth68_message &&
700 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
701 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
702 {
703 tx_types_counter.increase_by_tx_type(parsed_tx_type);
704 }
705
706 let decision = self
707 .policies
708 .announcement_filter()
709 .decide_on_announcement(ty_byte, tx_hash, size_val);
710
711 match decision {
712 AnnouncementAcceptance::Accept => true,
713 AnnouncementAcceptance::Ignore => false,
714 AnnouncementAcceptance::Reject { penalize_peer } => {
715 if penalize_peer {
716 should_report_peer = true;
717 }
718 false
719 }
720 }
721 });
722
723 if is_eth68_message {
724 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
725 }
726
727 if should_report_peer {
728 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
729 }
730
731 let mut valid_announcement_data =
732 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
733
734 if valid_announcement_data.is_empty() {
735 return
737 }
738
739 let bad_imports = &self.bad_imports;
746 self.transaction_fetcher.filter_unseen_and_pending_hashes(
747 &mut valid_announcement_data,
748 |hash| bad_imports.contains(hash),
749 &peer_id,
750 &client,
751 );
752
753 if valid_announcement_data.is_empty() {
754 return
756 }
757
758 trace!(target: "net::tx::propagation",
759 peer_id=format!("{peer_id:#}"),
760 hashes_len=valid_announcement_data.len(),
761 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
762 msg_version=%valid_announcement_data.msg_version(),
763 client_version=%client,
764 "received previously unseen and pending hashes in announcement from peer"
765 );
766
767 if !self.transaction_fetcher.is_idle(&peer_id) {
770 let msg_version = valid_announcement_data.msg_version();
772 let (hashes, _version) = valid_announcement_data.into_request_hashes();
773
774 trace!(target: "net::tx",
775 peer_id=format!("{peer_id:#}"),
776 hashes=?*hashes,
777 %msg_version,
778 %client,
779 "buffering hashes announced by busy peer"
780 );
781
782 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
783
784 return
785 }
786
787 let mut hashes_to_request =
788 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
789 let surplus_hashes =
790 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
791
792 if !surplus_hashes.is_empty() {
793 trace!(target: "net::tx",
794 peer_id=format!("{peer_id:#}"),
795 surplus_hashes=?*surplus_hashes,
796 %client,
797 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
798 );
799
800 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
801 }
802
803 trace!(target: "net::tx",
804 peer_id=format!("{peer_id:#}"),
805 hashes=?*hashes_to_request,
806 %client,
807 "sending hashes in `GetPooledTransactions` request to peer's session"
808 );
809
810 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
814 if let Some(failed_to_request_hashes) =
815 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
816 {
817 let conn_eth_version = peer.version;
818
819 trace!(target: "net::tx",
820 peer_id=format!("{peer_id:#}"),
821 failed_to_request_hashes=?*failed_to_request_hashes,
822 %conn_eth_version,
823 %client,
824 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
825 );
826 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
827 }
828 }
829}
830
831impl<Pool, N, PBundle> TransactionsManager<Pool, N, PBundle>
832where
833 Pool: TransactionPool + Unpin + 'static,
834
835 N: NetworkPrimitives<
836 BroadcastedTransaction: SignedTransaction,
837 PooledTransaction: SignedTransaction,
838 > + Unpin,
839
840 PBundle: TransactionPolicies,
841 Pool::Transaction:
842 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
843{
844 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
856 if self.network.is_initially_syncing() {
858 return
859 }
860 if self.network.tx_gossip_disabled() {
861 return
862 }
863
864 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
865
866 self.propagate_all(hashes);
867 }
868
869 fn propagate_full_transactions_to_peer(
873 &mut self,
874 txs: Vec<TxHash>,
875 peer_id: PeerId,
876 propagation_mode: PropagationMode,
877 ) -> Option<PropagatedTransactions> {
878 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
879
880 let peer = self.peers.get_mut(&peer_id)?;
881 let mut propagated = PropagatedTransactions::default();
882
883 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
885
886 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
887
888 if propagation_mode.is_forced() {
889 full_transactions.extend(to_propagate);
891 } else {
892 for tx in to_propagate {
895 if !peer.seen_transactions.contains(tx.tx_hash()) {
896 full_transactions.push(&tx);
898 }
899 }
900 }
901
902 if full_transactions.is_empty() {
903 return None
905 }
906
907 let PropagateTransactions { pooled, full } = full_transactions.build();
908
909 if let Some(new_pooled_hashes) = pooled {
911 for hash in new_pooled_hashes.iter_hashes().copied() {
912 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
913 peer.seen_transactions.insert(hash);
915 }
916
917 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
919 }
920
921 if let Some(new_full_transactions) = full {
923 for tx in &new_full_transactions {
924 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
925 peer.seen_transactions.insert(*tx.tx_hash());
927 }
928
929 self.network.send_transactions(peer_id, new_full_transactions);
931 }
932
933 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
935
936 Some(propagated)
937 }
938
939 fn propagate_hashes_to(
943 &mut self,
944 hashes: Vec<TxHash>,
945 peer_id: PeerId,
946 propagation_mode: PropagationMode,
947 ) {
948 trace!(target: "net::tx", "Start propagating transactions as hashes");
949
950 let propagated = {
953 let Some(peer) = self.peers.get_mut(&peer_id) else {
954 return
956 };
957
958 let to_propagate = self
959 .pool
960 .get_all(hashes)
961 .into_iter()
962 .map(PropagateTransaction::pool_tx)
963 .collect::<Vec<_>>();
964
965 let mut propagated = PropagatedTransactions::default();
966
967 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
969
970 if propagation_mode.is_forced() {
971 hashes.extend(to_propagate)
972 } else {
973 for tx in to_propagate {
974 if !peer.seen_transactions.contains(tx.tx_hash()) {
975 hashes.push(&tx);
977 }
978 }
979 }
980
981 let new_pooled_hashes = hashes.build();
982
983 if new_pooled_hashes.is_empty() {
984 return
986 }
987
988 for hash in new_pooled_hashes.iter_hashes().copied() {
989 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
990 }
991
992 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
993
994 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
996
997 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
999
1000 propagated
1001 };
1002
1003 self.pool.on_propagated(propagated);
1005 }
1006
1007 fn propagate_transactions(
1014 &mut self,
1015 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1016 propagation_mode: PropagationMode,
1017 ) -> PropagatedTransactions {
1018 let mut propagated = PropagatedTransactions::default();
1019 if self.network.tx_gossip_disabled() {
1020 return propagated
1021 }
1022
1023 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1025
1026 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1028 if !self.policies.propagation_policy().can_propagate(peer) {
1029 continue
1031 }
1032 let mut builder = if peer_idx > max_num_full {
1034 PropagateTransactionsBuilder::pooled(peer.version)
1035 } else {
1036 PropagateTransactionsBuilder::full(peer.version)
1037 };
1038
1039 if propagation_mode.is_forced() {
1040 builder.extend(to_propagate.iter());
1041 } else {
1042 for tx in &to_propagate {
1046 if !peer.seen_transactions.contains(tx.tx_hash()) {
1049 builder.push(tx);
1050 }
1051 }
1052 }
1053
1054 if builder.is_empty() {
1055 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1056 continue
1057 }
1058
1059 let PropagateTransactions { pooled, full } = builder.build();
1060
1061 if let Some(mut new_pooled_hashes) = pooled {
1063 new_pooled_hashes
1066 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1067
1068 for hash in new_pooled_hashes.iter_hashes().copied() {
1069 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1070 peer.seen_transactions.insert(hash);
1072 }
1073
1074 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1075
1076 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1078 }
1079
1080 if let Some(new_full_transactions) = full {
1082 for tx in &new_full_transactions {
1083 propagated
1084 .0
1085 .entry(*tx.tx_hash())
1086 .or_default()
1087 .push(PropagateKind::Full(*peer_id));
1088 peer.seen_transactions.insert(*tx.tx_hash());
1090 }
1091
1092 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1093
1094 self.network.send_transactions(*peer_id, new_full_transactions);
1096 }
1097 }
1098
1099 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1101
1102 propagated
1103 }
1104
1105 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1110 if self.peers.is_empty() {
1111 return
1113 }
1114 let propagated = self.propagate_transactions(
1115 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1116 PropagationMode::Basic,
1117 );
1118
1119 self.pool.on_propagated(propagated);
1121 }
1122
1123 fn on_get_pooled_transactions(
1125 &mut self,
1126 peer_id: PeerId,
1127 request: GetPooledTransactions,
1128 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1129 ) {
1130 if let Some(peer) = self.peers.get_mut(&peer_id) {
1131 if self.network.tx_gossip_disabled() {
1132 let _ = response.send(Ok(PooledTransactions::default()));
1133 return
1134 }
1135 let transactions = self.pool.get_pooled_transaction_elements(
1136 request.0,
1137 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1138 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1139 ),
1140 );
1141 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1142
1143 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1146
1147 let resp = PooledTransactions(transactions);
1148 let _ = response.send(Ok(resp));
1149 }
1150 }
1151
1152 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1154 match cmd {
1155 TransactionsCommand::PropagateHash(hash) => {
1156 self.on_new_pending_transactions(vec![hash])
1157 }
1158 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1159 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1160 }
1161 TransactionsCommand::GetActivePeers(tx) => {
1162 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1163 tx.send(peers).ok();
1164 }
1165 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1166 if let Some(propagated) =
1167 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1168 {
1169 self.pool.on_propagated(propagated);
1170 }
1171 }
1172 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1173 TransactionsCommand::BroadcastTransactions(txs) => {
1174 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1175 self.pool.on_propagated(propagated);
1176 }
1177 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1178 let mut res = HashMap::with_capacity(peers.len());
1179 for peer_id in peers {
1180 let hashes = self
1181 .peers
1182 .get(&peer_id)
1183 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1184 .unwrap_or_default();
1185 res.insert(peer_id, hashes);
1186 }
1187 tx.send(res).ok();
1188 }
1189 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1190 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1191 peer_request_sender.send(sender).ok();
1192 }
1193 }
1194 }
1195
1196 fn handle_peer_session(
1200 &mut self,
1201 info: SessionInfo,
1202 messages: PeerRequestSender<PeerRequest<N>>,
1203 ) {
1204 let SessionInfo { peer_id, client_version, version, .. } = info;
1205
1206 let peer = PeerMetadata::<N>::new(
1208 messages,
1209 version,
1210 client_version,
1211 self.config.max_transactions_seen_by_peer_history,
1212 info.peer_kind,
1213 );
1214 let peer = match self.peers.entry(peer_id) {
1215 Entry::Occupied(mut entry) => {
1216 entry.insert(peer);
1217 entry.into_mut()
1218 }
1219 Entry::Vacant(entry) => entry.insert(peer),
1220 };
1221
1222 self.policies.propagation_policy_mut().on_session_established(peer);
1223
1224 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1228 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1229 return
1230 }
1231
1232 let pooled_txs = self.pool.pooled_transactions_max(
1234 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1235 );
1236 if pooled_txs.is_empty() {
1237 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1238 return;
1239 }
1240
1241 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1243 for pooled_tx in pooled_txs {
1244 peer.seen_transactions.insert(*pooled_tx.hash());
1245 msg_builder.push_pooled(pooled_tx);
1246 }
1247
1248 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1249 let msg = msg_builder.build();
1250 self.network.send_transactions_hashes(peer_id, msg);
1251 }
1252
1253 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1255 match event_result {
1256 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1257 let peer = self.peers.remove(&peer_id);
1260 if let Some(mut peer) = peer {
1261 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1262 }
1263 self.transaction_fetcher.remove_peer(&peer_id);
1264 }
1265 NetworkEvent::ActivePeerSession { info, messages } => {
1266 self.handle_peer_session(info, messages);
1268 }
1269 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1270 let peer_id = info.peer_id;
1271 let messages = match self.peers.get(&peer_id) {
1273 Some(p) => p.request_tx.clone(),
1274 None => {
1275 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1276 return;
1277 }
1278 };
1279 self.handle_peer_session(info, messages);
1280 }
1281 _ => {}
1282 }
1283 }
1284
1285 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1287 match event {
1288 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1289 let has_blob_txs = msg.has_eip4844();
1293
1294 let non_blob_txs = msg
1295 .0
1296 .into_iter()
1297 .map(N::PooledTransaction::try_from)
1298 .filter_map(Result::ok)
1299 .collect();
1300
1301 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1302
1303 if has_blob_txs {
1304 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1305 self.report_peer_bad_transactions(peer_id);
1306 }
1307 }
1308 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1309 self.on_new_pooled_transaction_hashes(peer_id, msg)
1310 }
1311 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1312 self.on_get_pooled_transactions(peer_id, request, response)
1313 }
1314 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1315 let _ = response.send(Some(self.handle()));
1316 }
1317 }
1318 }
1319
1320 fn import_transactions(
1322 &mut self,
1323 peer_id: PeerId,
1324 transactions: PooledTransactions<N::PooledTransaction>,
1325 source: TransactionSource,
1326 ) {
1327 if self.network.is_initially_syncing() {
1329 return
1330 }
1331 if self.network.tx_gossip_disabled() {
1332 return
1333 }
1334
1335 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1336 let mut transactions = transactions.0;
1337
1338 self.transaction_fetcher
1340 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1341
1342 let mut num_already_seen_by_peer = 0;
1347 for tx in &transactions {
1348 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1349 num_already_seen_by_peer += 1;
1350 }
1351 }
1352
1353 let txns_count_pre_pool_filter = transactions.len();
1355 self.pool.retain_unknown(&mut transactions);
1356 if txns_count_pre_pool_filter > transactions.len() {
1357 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1358 self.metrics
1359 .occurrences_transactions_already_in_pool
1360 .increment(already_known_txns_count as u64);
1361 }
1362
1363 let mut has_bad_transactions = false;
1365
1366 let mut new_txs = Vec::with_capacity(transactions.len());
1369 for tx in transactions {
1370 let tx = match tx.try_into_recovered() {
1372 Ok(tx) => tx,
1373 Err(badtx) => {
1374 trace!(target: "net::tx",
1375 peer_id=format!("{peer_id:#}"),
1376 hash=%badtx.tx_hash(),
1377 client_version=%peer.client_version,
1378 "failed ecrecovery for transaction"
1379 );
1380 has_bad_transactions = true;
1381 continue
1382 }
1383 };
1384
1385 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1386 Entry::Occupied(mut entry) => {
1387 entry.get_mut().insert(peer_id);
1389 }
1390 Entry::Vacant(entry) => {
1391 if self.bad_imports.contains(tx.tx_hash()) {
1392 trace!(target: "net::tx",
1393 peer_id=format!("{peer_id:#}"),
1394 hash=%tx.tx_hash(),
1395 client_version=%peer.client_version,
1396 "received a known bad transaction from peer"
1397 );
1398 has_bad_transactions = true;
1399 } else {
1400 let pool_transaction = Pool::Transaction::from_pooled(tx);
1403 new_txs.push(pool_transaction);
1404
1405 entry.insert(HashSet::from([peer_id]));
1406 }
1407 }
1408 }
1409 }
1410 new_txs.shrink_to_fit();
1411
1412 if !new_txs.is_empty() {
1415 let pool = self.pool.clone();
1416 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1418 metric_pending_pool_imports.increment(new_txs.len() as f64);
1419
1420 self.pending_pool_imports_info
1422 .pending_pool_imports
1423 .fetch_add(new_txs.len(), Ordering::Relaxed);
1424 let tx_manager_info_pending_pool_imports =
1425 self.pending_pool_imports_info.pending_pool_imports.clone();
1426
1427 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1428 let import = Box::pin(async move {
1429 let added = new_txs.len();
1430 let res = pool.add_external_transactions(new_txs).await;
1431
1432 metric_pending_pool_imports.decrement(added as f64);
1434 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1436
1437 res
1438 });
1439
1440 self.pool_imports.push(import);
1441 }
1442
1443 if num_already_seen_by_peer > 0 {
1444 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1445 self.metrics
1446 .occurrences_of_transaction_already_seen_by_peer
1447 .increment(num_already_seen_by_peer);
1448 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1449 }
1450
1451 if has_bad_transactions {
1452 self.report_peer_bad_transactions(peer_id)
1454 }
1455
1456 if num_already_seen_by_peer > 0 {
1457 self.report_already_seen(peer_id);
1458 }
1459 }
1460
1461 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1463 match fetch_event {
1464 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1465 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1466 if report_peer {
1467 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1468 }
1469 }
1470 FetchEvent::FetchError { peer_id, error } => {
1471 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1472 self.on_request_error(peer_id, error);
1473 }
1474 FetchEvent::EmptyResponse { peer_id } => {
1475 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1476 }
1477 }
1478 }
1479}
1480
1481impl<
1489 Pool: TransactionPool + Unpin + 'static,
1490 N: NetworkPrimitives<
1491 BroadcastedTransaction: SignedTransaction,
1492 PooledTransaction: SignedTransaction,
1493 > + Unpin,
1494 PBundle: TransactionPolicies + Unpin,
1495 > Future for TransactionsManager<Pool, N, PBundle>
1496where
1497 Pool::Transaction:
1498 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1499{
1500 type Output = ();
1501
1502 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1503 let start = Instant::now();
1504 let mut poll_durations = TxManagerPollDurations::default();
1505
1506 let this = self.get_mut();
1507
1508 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1514 poll_durations.acc_network_events,
1515 "net::tx",
1516 "Network events stream",
1517 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1518 this.network_events.poll_next_unpin(cx),
1519 |event| this.on_network_event(event)
1520 );
1521
1522 let mut new_txs = Vec::new();
1531 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1532 cx,
1533 &mut new_txs,
1534 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1535 ) {
1536 Poll::Ready(count) => {
1537 count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE
1538 }
1539 Poll::Pending => false,
1540 };
1541 if !new_txs.is_empty() {
1542 this.on_new_pending_transactions(new_txs);
1543 }
1544
1545 let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1556 poll_durations.acc_fetch_events,
1557 "net::tx",
1558 "Transaction fetch events stream",
1559 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1560 this.transaction_fetcher.poll_next_unpin(cx),
1561 |event| this.on_fetch_event(event),
1562 );
1563
1564 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1579 poll_durations.acc_tx_events,
1580 "net::tx",
1581 "Network transaction events stream",
1582 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1583 this.transaction_events.poll_next_unpin(cx),
1584 |event| this.on_network_tx_event(event),
1585 );
1586
1587 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1602 poll_durations.acc_pending_imports,
1603 "net::tx",
1604 "Batched pool imports stream",
1605 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1606 this.pool_imports.poll_next_unpin(cx),
1607 |batch_results| this.on_batch_import_result(batch_results)
1608 );
1609
1610 duration_metered_exec!(
1615 {
1616 if this.has_capacity_for_fetching_pending_hashes() {
1617 this.on_fetch_hashes_pending_fetch();
1618 }
1619 },
1620 poll_durations.acc_pending_fetch
1621 );
1622
1623 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1625 poll_durations.acc_cmds,
1626 "net::tx",
1627 "Commands channel",
1628 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1629 this.command_rx.poll_next_unpin(cx),
1630 |cmd| this.on_command(cmd)
1631 );
1632
1633 this.transaction_fetcher.update_metrics();
1634
1635 if maybe_more_network_events ||
1637 maybe_more_commands ||
1638 maybe_more_tx_events ||
1639 maybe_more_tx_fetch_events ||
1640 maybe_more_pool_imports ||
1641 maybe_more_pending_txns
1642 {
1643 cx.waker().wake_by_ref();
1645 return Poll::Pending
1646 }
1647
1648 this.update_poll_metrics(start, poll_durations);
1649
1650 Poll::Pending
1651 }
1652}
1653
1654#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1658enum PropagationMode {
1659 Basic,
1663 Forced,
1668}
1669
1670impl PropagationMode {
1671 const fn is_forced(self) -> bool {
1673 matches!(self, Self::Forced)
1674 }
1675}
1676
1677#[derive(Debug, Clone)]
1679struct PropagateTransaction<T = TransactionSigned> {
1680 size: usize,
1681 transaction: Arc<T>,
1682}
1683
1684impl<T: SignedTransaction> PropagateTransaction<T> {
1685 pub fn new(transaction: T) -> Self {
1687 let size = transaction.length();
1688 Self { size, transaction: Arc::new(transaction) }
1689 }
1690
1691 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1693 where
1694 P: PoolTransaction<Consensus = T>,
1695 {
1696 let size = tx.encoded_length();
1697 let transaction = tx.transaction.clone_into_consensus();
1698 let transaction = Arc::new(transaction.into_inner());
1699 Self { size, transaction }
1700 }
1701
1702 fn tx_hash(&self) -> &TxHash {
1703 self.transaction.tx_hash()
1704 }
1705}
1706
1707#[derive(Debug, Clone)]
1710enum PropagateTransactionsBuilder<T> {
1711 Pooled(PooledTransactionsHashesBuilder),
1712 Full(FullTransactionsBuilder<T>),
1713}
1714
1715impl<T> PropagateTransactionsBuilder<T> {
1716 fn pooled(version: EthVersion) -> Self {
1718 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1719 }
1720
1721 fn full(version: EthVersion) -> Self {
1723 Self::Full(FullTransactionsBuilder::new(version))
1724 }
1725
1726 fn is_empty(&self) -> bool {
1728 match self {
1729 Self::Pooled(builder) => builder.is_empty(),
1730 Self::Full(builder) => builder.is_empty(),
1731 }
1732 }
1733
1734 fn build(self) -> PropagateTransactions<T> {
1736 match self {
1737 Self::Pooled(pooled) => {
1738 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1739 }
1740 Self::Full(full) => full.build(),
1741 }
1742 }
1743}
1744
1745impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1746 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1748 for tx in txs {
1749 self.push(tx);
1750 }
1751 }
1752
1753 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1755 match self {
1756 Self::Pooled(builder) => builder.push(transaction),
1757 Self::Full(builder) => builder.push(transaction),
1758 }
1759 }
1760}
1761
1762struct PropagateTransactions<T> {
1764 pooled: Option<NewPooledTransactionHashes>,
1766 full: Option<Vec<Arc<T>>>,
1768}
1769
1770#[derive(Debug, Clone)]
1775struct FullTransactionsBuilder<T> {
1776 total_size: usize,
1778 transactions: Vec<Arc<T>>,
1780 pooled: PooledTransactionsHashesBuilder,
1782}
1783
1784impl<T> FullTransactionsBuilder<T> {
1785 fn new(version: EthVersion) -> Self {
1787 Self {
1788 total_size: 0,
1789 pooled: PooledTransactionsHashesBuilder::new(version),
1790 transactions: vec![],
1791 }
1792 }
1793
1794 fn is_empty(&self) -> bool {
1796 self.transactions.is_empty() && self.pooled.is_empty()
1797 }
1798
1799 fn build(self) -> PropagateTransactions<T> {
1801 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1802 let full = Some(self.transactions).filter(|full| !full.is_empty());
1803 PropagateTransactions { pooled, full }
1804 }
1805}
1806
1807impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1808 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1810 for tx in txs {
1811 self.push(&tx)
1812 }
1813 }
1814
1815 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1825 if !transaction.transaction.is_broadcastable_in_full() {
1834 self.pooled.push(transaction);
1835 return
1836 }
1837
1838 let new_size = self.total_size + transaction.size;
1839 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1840 self.total_size > 0
1841 {
1842 self.pooled.push(transaction);
1844 return
1845 }
1846
1847 self.total_size = new_size;
1848 self.transactions.push(Arc::clone(&transaction.transaction));
1849 }
1850}
1851
1852#[derive(Debug, Clone)]
1855enum PooledTransactionsHashesBuilder {
1856 Eth66(NewPooledTransactionHashes66),
1857 Eth68(NewPooledTransactionHashes68),
1858}
1859
1860impl PooledTransactionsHashesBuilder {
1863 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1865 match self {
1866 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1867 Self::Eth68(msg) => {
1868 msg.hashes.push(*pooled_tx.hash());
1869 msg.sizes.push(pooled_tx.encoded_length());
1870 msg.types.push(pooled_tx.transaction.ty());
1871 }
1872 }
1873 }
1874
1875 fn is_empty(&self) -> bool {
1877 match self {
1878 Self::Eth66(hashes) => hashes.is_empty(),
1879 Self::Eth68(hashes) => hashes.is_empty(),
1880 }
1881 }
1882
1883 fn extend<T: SignedTransaction>(
1885 &mut self,
1886 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1887 ) {
1888 for tx in txs {
1889 self.push(&tx);
1890 }
1891 }
1892
1893 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1894 match self {
1895 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1896 Self::Eth68(msg) => {
1897 msg.hashes.push(*tx.tx_hash());
1898 msg.sizes.push(tx.size);
1899 msg.types.push(tx.transaction.ty());
1900 }
1901 }
1902 }
1903
1904 fn new(version: EthVersion) -> Self {
1906 match version {
1907 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1908 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1909 }
1910 }
1911
1912 fn build(self) -> NewPooledTransactionHashes {
1913 match self {
1914 Self::Eth66(msg) => msg.into(),
1915 Self::Eth68(msg) => msg.into(),
1916 }
1917 }
1918}
1919
1920enum TransactionSource {
1922 Broadcast,
1924 Response,
1926}
1927
1928impl TransactionSource {
1931 const fn is_broadcast(&self) -> bool {
1933 matches!(self, Self::Broadcast)
1934 }
1935}
1936
1937#[derive(Debug)]
1939pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1940 seen_transactions: LruCache<TxHash>,
1944 request_tx: PeerRequestSender<PeerRequest<N>>,
1946 version: EthVersion,
1948 client_version: Arc<str>,
1950 peer_kind: PeerKind,
1952}
1953
1954impl<N: NetworkPrimitives> PeerMetadata<N> {
1955 pub fn new(
1957 request_tx: PeerRequestSender<PeerRequest<N>>,
1958 version: EthVersion,
1959 client_version: Arc<str>,
1960 max_transactions_seen_by_peer: u32,
1961 peer_kind: PeerKind,
1962 ) -> Self {
1963 Self {
1964 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1965 request_tx,
1966 version,
1967 client_version,
1968 peer_kind,
1969 }
1970 }
1971
1972 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1974 &self.request_tx
1975 }
1976
1977 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1979 &mut self.seen_transactions
1980 }
1981
1982 pub const fn version(&self) -> EthVersion {
1984 self.version
1985 }
1986
1987 pub fn client_version(&self) -> &str {
1989 &self.client_version
1990 }
1991
1992 pub const fn peer_kind(&self) -> PeerKind {
1994 self.peer_kind
1995 }
1996}
1997
1998#[derive(Debug)]
2000enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2001 PropagateHash(B256),
2003 PropagateHashesTo(Vec<B256>, PeerId),
2005 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2007 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2009 PropagateTransactions(Vec<TxHash>),
2011 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2013 GetTransactionHashes {
2015 peers: Vec<PeerId>,
2016 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2017 },
2018 GetPeerSender {
2020 peer_id: PeerId,
2021 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2022 },
2023}
2024
2025#[derive(Debug)]
2027pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2028 IncomingTransactions {
2032 peer_id: PeerId,
2034 msg: Transactions<N::BroadcastedTransaction>,
2036 },
2037 IncomingPooledTransactionHashes {
2039 peer_id: PeerId,
2041 msg: NewPooledTransactionHashes,
2043 },
2044 GetPooledTransactions {
2046 peer_id: PeerId,
2048 request: GetPooledTransactions,
2050 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2052 },
2053 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2055}
2056
2057#[derive(Debug)]
2059pub struct PendingPoolImportsInfo {
2060 pending_pool_imports: Arc<AtomicUsize>,
2062 max_pending_pool_imports: usize,
2064}
2065
2066impl PendingPoolImportsInfo {
2067 pub fn new(max_pending_pool_imports: usize) -> Self {
2069 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2070 }
2071
2072 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2074 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2075 }
2076}
2077
2078impl Default for PendingPoolImportsInfo {
2079 fn default() -> Self {
2080 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2081 }
2082}
2083
2084#[derive(Debug, Default)]
2085struct TxManagerPollDurations {
2086 acc_network_events: Duration,
2087 acc_pending_imports: Duration,
2088 acc_tx_events: Duration,
2089 acc_imported_txns: Duration,
2090 acc_fetch_events: Duration,
2091 acc_pending_fetch: Duration,
2092 acc_cmds: Duration,
2093}
2094
2095#[cfg(test)]
2096mod tests {
2097 use super::*;
2098 use crate::{
2099 test_utils::{
2100 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2101 Testnet,
2102 },
2103 transactions::config::RelaxedEthAnnouncementFilter,
2104 NetworkConfigBuilder, NetworkManager,
2105 };
2106 use alloy_consensus::{TxEip1559, TxLegacy};
2107 use alloy_primitives::{hex, Signature, TxKind, U256};
2108 use alloy_rlp::Decodable;
2109 use futures::FutureExt;
2110 use reth_chainspec::MIN_TRANSACTION_GAS;
2111 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2112 use reth_network_api::{NetworkInfo, PeerKind};
2113 use reth_network_p2p::{
2114 error::{RequestError, RequestResult},
2115 sync::{NetworkSyncUpdater, SyncState},
2116 };
2117 use reth_storage_api::noop::NoopProvider;
2118 use reth_transaction_pool::test_utils::{
2119 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2120 };
2121 use secp256k1::SecretKey;
2122 use std::{
2123 future::poll_fn,
2124 net::{IpAddr, Ipv4Addr, SocketAddr},
2125 str::FromStr,
2126 };
2127 use tracing::error;
2128
2129 #[tokio::test(flavor = "multi_thread")]
2130 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2131 reth_tracing::init_test_tracing();
2132 let net = Testnet::create(3).await;
2133
2134 let mut handles = net.handles();
2135 let handle0 = handles.next().unwrap();
2136 let handle1 = handles.next().unwrap();
2137
2138 drop(handles);
2139 let handle = net.spawn();
2140
2141 let listener0 = handle0.event_listener();
2142 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2143 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2144
2145 let client = NoopProvider::default();
2146 let pool = testing_pool();
2147 let config = NetworkConfigBuilder::eth(secret_key)
2148 .disable_discovery()
2149 .listener_port(0)
2150 .build(client);
2151 let transactions_manager_config = config.transactions_manager_config.clone();
2152 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2153 .await
2154 .unwrap()
2155 .into_builder()
2156 .transactions(pool.clone(), transactions_manager_config)
2157 .split_with_handle();
2158
2159 tokio::task::spawn(network);
2160
2161 network_handle.update_sync_state(SyncState::Syncing);
2163 assert!(NetworkInfo::is_syncing(&network_handle));
2164 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2165
2166 let mut established = listener0.take(2);
2168 while let Some(ev) = established.next().await {
2169 match ev {
2170 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2171 transactions
2173 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2174 }
2175 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2176 ev => {
2177 error!("unexpected event {ev:?}")
2178 }
2179 }
2180 }
2181 let input = hex!(
2183 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2184 );
2185 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2186 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2187 peer_id: *handle1.peer_id(),
2188 msg: Transactions(vec![signed_tx.clone()]),
2189 });
2190 poll_fn(|cx| {
2191 let _ = transactions.poll_unpin(cx);
2192 Poll::Ready(())
2193 })
2194 .await;
2195 assert!(pool.is_empty());
2196 handle.terminate().await;
2197 }
2198
2199 #[tokio::test(flavor = "multi_thread")]
2200 async fn test_tx_broadcasts_through_two_syncs() {
2201 reth_tracing::init_test_tracing();
2202 let net = Testnet::create(3).await;
2203
2204 let mut handles = net.handles();
2205 let handle0 = handles.next().unwrap();
2206 let handle1 = handles.next().unwrap();
2207
2208 drop(handles);
2209 let handle = net.spawn();
2210
2211 let listener0 = handle0.event_listener();
2212 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2213 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2214
2215 let client = NoopProvider::default();
2216 let pool = testing_pool();
2217 let config = NetworkConfigBuilder::new(secret_key)
2218 .disable_discovery()
2219 .listener_port(0)
2220 .build(client);
2221 let transactions_manager_config = config.transactions_manager_config.clone();
2222 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2223 .await
2224 .unwrap()
2225 .into_builder()
2226 .transactions(pool.clone(), transactions_manager_config)
2227 .split_with_handle();
2228
2229 tokio::task::spawn(network);
2230
2231 network_handle.update_sync_state(SyncState::Syncing);
2233 assert!(NetworkInfo::is_syncing(&network_handle));
2234 network_handle.update_sync_state(SyncState::Idle);
2235 assert!(!NetworkInfo::is_syncing(&network_handle));
2236 network_handle.update_sync_state(SyncState::Syncing);
2237 assert!(NetworkInfo::is_syncing(&network_handle));
2238
2239 let mut established = listener0.take(2);
2241 while let Some(ev) = established.next().await {
2242 match ev {
2243 NetworkEvent::ActivePeerSession { .. } |
2244 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2245 transactions.on_network_event(ev);
2247 }
2248 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2249 _ => {
2250 error!("unexpected event {ev:?}")
2251 }
2252 }
2253 }
2254 let input = hex!(
2256 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2257 );
2258 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2259 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2260 peer_id: *handle1.peer_id(),
2261 msg: Transactions(vec![signed_tx.clone()]),
2262 });
2263 poll_fn(|cx| {
2264 let _ = transactions.poll_unpin(cx);
2265 Poll::Ready(())
2266 })
2267 .await;
2268 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2269 assert!(NetworkInfo::is_syncing(&network_handle));
2270 assert!(!pool.is_empty());
2271 handle.terminate().await;
2272 }
2273
2274 #[tokio::test(flavor = "multi_thread")]
2277 async fn test_handle_incoming_transactions_hashes() {
2278 reth_tracing::init_test_tracing();
2279
2280 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2281 let client = NoopProvider::default();
2282
2283 let config = NetworkConfigBuilder::new(secret_key)
2284 .listener_port(0)
2286 .disable_discovery()
2287 .build(client);
2288
2289 let pool = testing_pool();
2290
2291 let transactions_manager_config = config.transactions_manager_config.clone();
2292 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2293 .await
2294 .unwrap()
2295 .into_builder()
2296 .transactions(pool.clone(), transactions_manager_config)
2297 .split_with_handle();
2298
2299 let peer_id_1 = PeerId::new([1; 64]);
2300 let eth_version = EthVersion::Eth66;
2301
2302 let txs = vec![TransactionSigned::new_unhashed(
2303 Transaction::Legacy(TxLegacy {
2304 chain_id: Some(4),
2305 nonce: 15u64,
2306 gas_price: 2200000000,
2307 gas_limit: 34811,
2308 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2309 value: U256::from(1234u64),
2310 input: Default::default(),
2311 }),
2312 Signature::new(
2313 U256::from_str(
2314 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2315 )
2316 .unwrap(),
2317 U256::from_str(
2318 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2319 )
2320 .unwrap(),
2321 true,
2322 ),
2323 )];
2324
2325 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2326
2327 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2328 tx_manager.peers.insert(peer_id_1, peer_1);
2329
2330 assert!(pool.is_empty());
2331
2332 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2333 peer_id: peer_id_1,
2334 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2335 txs_hashes.clone(),
2336 )),
2337 });
2338
2339 let req = to_mock_session_rx
2341 .recv()
2342 .await
2343 .expect("peer_1 session should receive request with buffered hashes");
2344 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2345 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2346
2347 let message: Vec<PooledTransactionVariant> = txs
2348 .into_iter()
2349 .map(|tx| {
2350 PooledTransactionVariant::try_from(tx)
2351 .expect("Failed to convert MockTransaction to PooledTransaction")
2352 })
2353 .collect();
2354
2355 response
2357 .send(Ok(PooledTransactions(message)))
2358 .expect("should send peer_1 response to tx manager");
2359
2360 poll_fn(|cx| {
2362 let _ = tx_manager.poll_unpin(cx);
2363 Poll::Ready(())
2364 })
2365 .await;
2366
2367 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2370 }
2371
2372 #[tokio::test(flavor = "multi_thread")]
2373 async fn test_handle_incoming_transactions() {
2374 reth_tracing::init_test_tracing();
2375 let net = Testnet::create(3).await;
2376
2377 let mut handles = net.handles();
2378 let handle0 = handles.next().unwrap();
2379 let handle1 = handles.next().unwrap();
2380
2381 drop(handles);
2382 let handle = net.spawn();
2383
2384 let listener0 = handle0.event_listener();
2385
2386 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2387 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2388
2389 let client = NoopProvider::default();
2390 let pool = testing_pool();
2391 let config = NetworkConfigBuilder::new(secret_key)
2392 .disable_discovery()
2393 .listener_port(0)
2394 .build(client);
2395 let transactions_manager_config = config.transactions_manager_config.clone();
2396 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2397 .await
2398 .unwrap()
2399 .into_builder()
2400 .transactions(pool.clone(), transactions_manager_config)
2401 .split_with_handle();
2402 tokio::task::spawn(network);
2403
2404 network_handle.update_sync_state(SyncState::Idle);
2405
2406 assert!(!NetworkInfo::is_syncing(&network_handle));
2407
2408 let mut established = listener0.take(2);
2410 while let Some(ev) = established.next().await {
2411 match ev {
2412 NetworkEvent::ActivePeerSession { .. } |
2413 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2414 transactions.on_network_event(ev);
2416 }
2417 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2418 ev => {
2419 error!("unexpected event {ev:?}")
2420 }
2421 }
2422 }
2423 let input = hex!(
2425 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2426 );
2427 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2428 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2429 peer_id: *handle1.peer_id(),
2430 msg: Transactions(vec![signed_tx.clone()]),
2431 });
2432 assert!(transactions
2433 .transactions_by_peers
2434 .get(signed_tx.tx_hash())
2435 .unwrap()
2436 .contains(handle1.peer_id()));
2437
2438 poll_fn(|cx| {
2440 let _ = transactions.poll_unpin(cx);
2441 Poll::Ready(())
2442 })
2443 .await;
2444
2445 assert!(!pool.is_empty());
2446 assert!(pool.get(signed_tx.tx_hash()).is_some());
2447 handle.terminate().await;
2448 }
2449
2450 #[tokio::test(flavor = "multi_thread")]
2451 async fn test_on_get_pooled_transactions_network() {
2452 reth_tracing::init_test_tracing();
2453 let net = Testnet::create(2).await;
2454
2455 let mut handles = net.handles();
2456 let handle0 = handles.next().unwrap();
2457 let handle1 = handles.next().unwrap();
2458
2459 drop(handles);
2460 let handle = net.spawn();
2461
2462 let listener0 = handle0.event_listener();
2463
2464 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2465 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2466
2467 let client = NoopProvider::default();
2468 let pool = testing_pool();
2469 let config = NetworkConfigBuilder::new(secret_key)
2470 .disable_discovery()
2471 .listener_port(0)
2472 .build(client);
2473 let transactions_manager_config = config.transactions_manager_config.clone();
2474 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2475 .await
2476 .unwrap()
2477 .into_builder()
2478 .transactions(pool.clone(), transactions_manager_config)
2479 .split_with_handle();
2480 tokio::task::spawn(network);
2481
2482 network_handle.update_sync_state(SyncState::Idle);
2483
2484 assert!(!NetworkInfo::is_syncing(&network_handle));
2485
2486 let mut established = listener0.take(2);
2488 while let Some(ev) = established.next().await {
2489 match ev {
2490 NetworkEvent::ActivePeerSession { .. } |
2491 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2492 transactions.on_network_event(ev);
2493 }
2494 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2495 ev => {
2496 error!("unexpected event {ev:?}")
2497 }
2498 }
2499 }
2500 handle.terminate().await;
2501
2502 let tx = MockTransaction::eip1559();
2503 let _ = transactions
2504 .pool
2505 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2506 .await;
2507
2508 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2509
2510 let (send, receive) =
2511 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2512
2513 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2514 peer_id: *handle1.peer_id(),
2515 request,
2516 response: send,
2517 });
2518
2519 match receive.await.unwrap() {
2520 Ok(PooledTransactions(transactions)) => {
2521 assert_eq!(transactions.len(), 1);
2522 }
2523 Err(e) => {
2524 panic!("error: {e:?}");
2525 }
2526 }
2527 }
2528
2529 #[tokio::test]
2533 async fn test_partially_tx_response() {
2534 reth_tracing::init_test_tracing();
2535
2536 let mut tx_manager = new_tx_manager().await.0;
2537 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2538
2539 let peer_id_1 = PeerId::new([1; 64]);
2540 let eth_version = EthVersion::Eth66;
2541
2542 let txs = vec![
2543 TransactionSigned::new_unhashed(
2544 Transaction::Legacy(TxLegacy {
2545 chain_id: Some(4),
2546 nonce: 15u64,
2547 gas_price: 2200000000,
2548 gas_limit: 34811,
2549 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2550 value: U256::from(1234u64),
2551 input: Default::default(),
2552 }),
2553 Signature::new(
2554 U256::from_str(
2555 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2556 )
2557 .unwrap(),
2558 U256::from_str(
2559 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2560 )
2561 .unwrap(),
2562 true,
2563 ),
2564 ),
2565 TransactionSigned::new_unhashed(
2566 Transaction::Eip1559(TxEip1559 {
2567 chain_id: 4,
2568 nonce: 26u64,
2569 max_priority_fee_per_gas: 1500000000,
2570 max_fee_per_gas: 1500000013,
2571 gas_limit: MIN_TRANSACTION_GAS,
2572 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2573 value: U256::from(3000000000000000000u64),
2574 input: Default::default(),
2575 access_list: Default::default(),
2576 }),
2577 Signature::new(
2578 U256::from_str(
2579 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2580 )
2581 .unwrap(),
2582 U256::from_str(
2583 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2584 )
2585 .unwrap(),
2586 true,
2587 ),
2588 ),
2589 ];
2590
2591 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2592
2593 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2594 peer_1.seen_transactions.insert(txs_hashes[0]);
2597 peer_1.seen_transactions.insert(txs_hashes[1]);
2598 tx_manager.peers.insert(peer_id_1, peer_1);
2599
2600 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2601 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2602
2603 assert!(tx_fetcher.is_idle(&peer_id_1));
2605 assert_eq!(tx_fetcher.active_peers.len(), 0);
2606
2607 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2609
2610 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2611 assert!(!tx_fetcher.is_idle(&peer_id_1));
2613 assert_eq!(tx_fetcher.active_peers.len(), 1);
2614
2615 let req = to_mock_session_rx
2617 .recv()
2618 .await
2619 .expect("peer_1 session should receive request with buffered hashes");
2620 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2621
2622 let message: Vec<PooledTransactionVariant> = txs
2623 .into_iter()
2624 .take(1)
2625 .map(|tx| {
2626 PooledTransactionVariant::try_from(tx)
2627 .expect("Failed to convert MockTransaction to PooledTransaction")
2628 })
2629 .collect();
2630 response
2632 .send(Ok(PooledTransactions(message)))
2633 .expect("should send peer_1 response to tx manager");
2634 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2635 unreachable!()
2636 };
2637
2638 assert!(tx_fetcher.is_idle(&peer_id));
2640 assert_eq!(tx_fetcher.active_peers.len(), 0);
2641 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2643 }
2644
2645 #[tokio::test]
2646 async fn test_max_retries_tx_request() {
2647 reth_tracing::init_test_tracing();
2648
2649 let mut tx_manager = new_tx_manager().await.0;
2650 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2651
2652 let peer_id_1 = PeerId::new([1; 64]);
2653 let peer_id_2 = PeerId::new([2; 64]);
2654 let eth_version = EthVersion::Eth66;
2655 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2656
2657 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2658 peer_1.seen_transactions.insert(seen_hashes[0]);
2661 peer_1.seen_transactions.insert(seen_hashes[1]);
2662 tx_manager.peers.insert(peer_id_1, peer_1);
2663
2664 let retries = 1;
2667 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2668 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2669
2670 assert!(tx_fetcher.is_idle(&peer_id_1));
2672 assert_eq!(tx_fetcher.active_peers.len(), 0);
2673
2674 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2676
2677 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2678
2679 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2680 assert!(!tx_fetcher.is_idle(&peer_id_1));
2682 assert_eq!(tx_fetcher.active_peers.len(), 1);
2683
2684 let req = to_mock_session_rx
2686 .recv()
2687 .await
2688 .expect("peer_1 session should receive request with buffered hashes");
2689 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2690 let GetPooledTransactions(hashes) = request;
2691
2692 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2693
2694 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2695
2696 response
2698 .send(Err(RequestError::BadResponse))
2699 .expect("should send peer_1 response to tx manager");
2700 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2701 unreachable!()
2702 };
2703
2704 assert!(tx_fetcher.is_idle(&peer_id));
2706 assert_eq!(tx_fetcher.active_peers.len(), 0);
2707 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2709
2710 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2711 tx_manager.peers.insert(peer_id_2, peer_2);
2712
2713 let msg =
2715 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2716 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2717
2718 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2719
2720 assert_eq!(tx_fetcher.active_peers.len(), 1);
2722
2723 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2725 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2727
2728 let req = to_mock_session_rx
2730 .recv()
2731 .await
2732 .expect("peer_2 session should receive request with buffered hashes");
2733 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2734
2735 response
2737 .send(Err(RequestError::BadResponse))
2738 .expect("should send peer_2 response to tx manager");
2739 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2740
2741 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2744 assert_eq!(tx_fetcher.active_peers.len(), 0);
2745 }
2746
2747 #[test]
2748 fn test_transaction_builder_empty() {
2749 let mut builder =
2750 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2751 assert!(builder.is_empty());
2752
2753 let mut factory = MockTransactionFactory::default();
2754 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2755 builder.push(&tx);
2756 assert!(!builder.is_empty());
2757
2758 let txs = builder.build();
2759 assert!(txs.full.is_none());
2760 let txs = txs.pooled.unwrap();
2761 assert_eq!(txs.len(), 1);
2762 }
2763
2764 #[test]
2765 fn test_transaction_builder_large() {
2766 let mut builder =
2767 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2768 assert!(builder.is_empty());
2769
2770 let mut factory = MockTransactionFactory::default();
2771 let mut tx = factory.create_eip1559();
2772 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2774 let tx = Arc::new(tx);
2775 let tx = PropagateTransaction::pool_tx(tx);
2776 builder.push(&tx);
2777 assert!(!builder.is_empty());
2778
2779 let txs = builder.clone().build();
2780 assert!(txs.pooled.is_none());
2781 let txs = txs.full.unwrap();
2782 assert_eq!(txs.len(), 1);
2783
2784 builder.push(&tx);
2785
2786 let txs = builder.clone().build();
2787 let pooled = txs.pooled.unwrap();
2788 assert_eq!(pooled.len(), 1);
2789 let txs = txs.full.unwrap();
2790 assert_eq!(txs.len(), 1);
2791 }
2792
2793 #[test]
2794 fn test_transaction_builder_eip4844() {
2795 let mut builder =
2796 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2797 assert!(builder.is_empty());
2798
2799 let mut factory = MockTransactionFactory::default();
2800 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2801 builder.push(&tx);
2802 assert!(!builder.is_empty());
2803
2804 let txs = builder.clone().build();
2805 assert!(txs.full.is_none());
2806 let txs = txs.pooled.unwrap();
2807 assert_eq!(txs.len(), 1);
2808
2809 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2810 builder.push(&tx);
2811
2812 let txs = builder.clone().build();
2813 let pooled = txs.pooled.unwrap();
2814 assert_eq!(pooled.len(), 1);
2815 let txs = txs.full.unwrap();
2816 assert_eq!(txs.len(), 1);
2817 }
2818
2819 #[tokio::test]
2820 async fn test_propagate_full() {
2821 reth_tracing::init_test_tracing();
2822
2823 let (mut tx_manager, network) = new_tx_manager().await;
2824 let peer_id = PeerId::random();
2825
2826 network.handle().update_sync_state(SyncState::Idle);
2828
2829 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2831
2832 let session_info = SessionInfo {
2833 peer_id,
2834 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2835 client_version: Arc::from(""),
2836 capabilities: Arc::new(vec![].into()),
2837 status: Arc::new(Default::default()),
2838 version: EthVersion::Eth68,
2839 peer_kind: PeerKind::Basic,
2840 };
2841 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2842 tx_manager
2843 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2844 let mut propagate = vec![];
2845 let mut factory = MockTransactionFactory::default();
2846 let eip1559_tx = Arc::new(factory.create_eip1559());
2847 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2848 let eip4844_tx = Arc::new(factory.create_eip4844());
2849 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2850
2851 let propagated =
2852 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2853 assert_eq!(propagated.0.len(), 2);
2854 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2855 assert_eq!(prop_txs.len(), 1);
2856 assert!(prop_txs[0].is_full());
2857
2858 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2859 assert_eq!(prop_txs.len(), 1);
2860 assert!(prop_txs[0].is_hash());
2861
2862 let peer = tx_manager.peers.get(&peer_id).unwrap();
2863 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2864 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2865 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2866
2867 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2869 assert!(propagated.0.is_empty());
2870 }
2871
2872 #[tokio::test]
2873 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2874 reth_tracing::init_test_tracing();
2875
2876 let transactions_manager_config = TransactionsManagerConfig::default();
2877
2878 let propagation_policy = TransactionPropagationKind::default();
2879 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2880
2881 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2882
2883 let pool = testing_pool();
2884 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2885 let client = NoopProvider::default();
2886
2887 let network_config = NetworkConfigBuilder::new(secret_key)
2888 .listener_port(0)
2889 .disable_discovery()
2890 .build(client.clone());
2891
2892 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2893 let (to_tx_manager_tx, from_network_rx) =
2894 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2895 network_manager.set_transactions(to_tx_manager_tx);
2896 let network_handle = network_manager.handle().clone();
2897 let network_service_handle = tokio::spawn(network_manager);
2898
2899 let mut tx_manager = TransactionsManager::<
2900 TestPool,
2901 EthNetworkPrimitives,
2902 NetworkPolicies<TransactionPropagationKind, RelaxedEthAnnouncementFilter>,
2903 >::with_policy(
2904 network_handle.clone(),
2905 pool.clone(),
2906 from_network_rx,
2907 transactions_manager_config,
2908 policy_bundle,
2909 );
2910
2911 let peer_id = PeerId::random();
2912 let eth_version = EthVersion::Eth68;
2913 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2914 tx_manager.peers.insert(peer_id, mock_peer_metadata);
2915
2916 let mut tx_factory = MockTransactionFactory::default();
2917
2918 let valid_known_tx = tx_factory.create_eip1559();
2919 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2920
2921 let known_tx_hash = *known_tx_signed.hash();
2922 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2923 let known_tx_size = known_tx_signed.encoded_length();
2924
2925 let unknown_tx_hash = B256::random();
2926 let unknown_tx_type_byte = 0xff_u8;
2927 let unknown_tx_size = 150;
2928
2929 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2930 types: vec![known_tx_type_byte, unknown_tx_type_byte],
2931 sizes: vec![known_tx_size, unknown_tx_size],
2932 hashes: vec![known_tx_hash, unknown_tx_hash],
2933 });
2934
2935 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2936
2937 poll_fn(|cx| {
2938 let _ = tx_manager.poll_unpin(cx);
2939 Poll::Ready(())
2940 })
2941 .await;
2942
2943 let mut requested_hashes_in_getpooled = HashSet::new();
2944 let mut unexpected_request_received = false;
2945
2946 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2947 .await
2948 {
2949 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2950 let GetPooledTransactions(hashes) = request;
2951 for hash in hashes {
2952 requested_hashes_in_getpooled.insert(hash);
2953 }
2954 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2955 }
2956 Ok(Some(other_request)) => {
2957 tracing::error!(?other_request, "Received unexpected PeerRequest type");
2958 unexpected_request_received = true;
2959 }
2960 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2961 Err(_timeout_err) => {
2962 tracing::info!("Timeout: No GetPooledTransactions request received.")
2963 }
2964 }
2965
2966 assert!(
2967 requested_hashes_in_getpooled.contains(&known_tx_hash),
2968 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2969 );
2970 assert!(
2971 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2972 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2973 );
2974 assert!(
2975 !unexpected_request_received,
2976 "An unexpected P2P request was received by the mock peer."
2977 );
2978
2979 network_service_handle.abort();
2980 }
2981}