1use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6pub mod config;
8pub mod constants;
10pub mod fetcher;
12pub mod policy;
14
15pub use self::constants::{
16 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30 budget::{
31 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33 },
34 cache::LruCache,
35 duration_metered_exec, metered_poll_nested_stream_with_budget,
36 metrics::{
37 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38 },
39 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40 NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49 RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54 events::{PeerEvent, SessionInfo},
55 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58 error::{RequestError, RequestResult},
59 sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66 error::{PoolError, PoolResult},
67 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71 collections::{hash_map::Entry, HashMap, HashSet},
72 pin::Pin,
73 sync::{
74 atomic::{AtomicUsize, Ordering},
75 Arc,
76 },
77 task::{Context, Poll},
78 time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84pub type PoolImportFuture =
88 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104 fn send(&self, cmd: TransactionsCommand<N>) {
105 let _ = self.manager_tx.send(cmd);
106 }
107
108 async fn peer_handle(
110 &self,
111 peer_id: PeerId,
112 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113 let (tx, rx) = oneshot::channel();
114 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115 rx.await
116 }
117
118 pub fn propagate(&self, hash: TxHash) {
120 self.send(TransactionsCommand::PropagateHash(hash))
121 }
122
123 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127 self.propagate_hashes_to(Some(hash), peer)
128 }
129
130 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134 let hashes = hash.into_iter().collect::<Vec<_>>();
135 if hashes.is_empty() {
136 return
137 }
138 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139 }
140
141 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143 let (tx, rx) = oneshot::channel();
144 self.send(TransactionsCommand::GetActivePeers(tx));
145 rx.await
146 }
147
148 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152 if transactions.is_empty() {
153 return
154 }
155 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156 }
157
158 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163 if transactions.is_empty() {
164 return
165 }
166 self.send(TransactionsCommand::PropagateTransactions(transactions))
167 }
168
169 pub fn broadcast_transactions(
174 &self,
175 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176 ) {
177 let transactions =
178 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179 if transactions.is_empty() {
180 return
181 }
182 self.send(TransactionsCommand::BroadcastTransactions(transactions))
183 }
184
185 pub async fn get_transaction_hashes(
187 &self,
188 peers: Vec<PeerId>,
189 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190 if peers.is_empty() {
191 return Ok(Default::default())
192 }
193 let (tx, rx) = oneshot::channel();
194 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195 rx.await
196 }
197
198 pub async fn get_peer_transaction_hashes(
200 &self,
201 peer: PeerId,
202 ) -> Result<HashSet<TxHash>, RecvError> {
203 let res = self.get_transaction_hashes(vec![peer]).await?;
204 Ok(res.into_values().next().unwrap_or_default())
205 }
206
207 pub async fn get_pooled_transactions_from(
213 &self,
214 peer_id: PeerId,
215 hashes: Vec<B256>,
216 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219 let (tx, rx) = oneshot::channel();
220 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221 peer.try_send(request).ok();
222
223 rx.await?.map(|res| Some(res.0))
224 }
225}
226
227#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284 pool: Pool,
286 network: NetworkHandle<N>,
288 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292 transaction_fetcher: TransactionFetcher<N>,
294 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299 pool_imports: FuturesUnordered<PoolImportFuture>,
311 pending_pool_imports_info: PendingPoolImportsInfo,
313 bad_imports: LruCache<TxHash>,
315 peers: HashMap<PeerId, PeerMetadata<N>>,
317 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326 pending_transactions: mpsc::Receiver<TxHash>,
335 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337 config: TransactionsManagerConfig,
339 policies: NetworkPolicies<N>,
341 metrics: TransactionsManagerMetrics,
343 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348 pub fn new(
352 network: NetworkHandle<N>,
353 pool: Pool,
354 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355 transactions_manager_config: TransactionsManagerConfig,
356 ) -> Self {
357 Self::with_policy(
358 network,
359 pool,
360 from_network,
361 transactions_manager_config,
362 NetworkPolicies::new(
363 TransactionPropagationKind::default(),
364 StrictEthAnnouncementFilter::default(),
365 ),
366 )
367 }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371 pub fn with_policy(
375 network: NetworkHandle<N>,
376 pool: Pool,
377 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378 transactions_manager_config: TransactionsManagerConfig,
379 policies: NetworkPolicies<N>,
380 ) -> Self {
381 let network_events = network.event_listener();
382
383 let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386 &transactions_manager_config.transaction_fetcher_config,
387 );
388
389 let pending = pool.pending_transactions_listener();
392 let pending_pool_imports_info = PendingPoolImportsInfo::default();
393 let metrics = TransactionsManagerMetrics::default();
394 metrics
395 .capacity_pending_pool_imports
396 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398 Self {
399 pool,
400 network,
401 network_events,
402 transaction_fetcher,
403 transactions_by_peers: Default::default(),
404 pool_imports: Default::default(),
405 pending_pool_imports_info: PendingPoolImportsInfo::new(
406 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
407 ),
408 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409 peers: Default::default(),
410 command_tx,
411 command_rx: UnboundedReceiverStream::new(command_rx),
412 pending_transactions: pending,
413 transaction_events: UnboundedMeteredReceiver::new(
414 from_network,
415 NETWORK_POOL_TRANSACTIONS_SCOPE,
416 ),
417 config: transactions_manager_config,
418 policies,
419 metrics,
420 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421 }
422 }
423
424 pub fn handle(&self) -> TransactionsHandle<N> {
426 TransactionsHandle { manager_tx: self.command_tx.clone() }
427 }
428
429 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432 self.has_capacity_for_pending_pool_imports() &&
433 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434 }
435
436 fn has_capacity_for_pending_pool_imports(&self) -> bool {
438 self.remaining_pool_import_capacity() > 0
439 }
440
441 fn remaining_pool_import_capacity(&self) -> usize {
443 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445 )
446 }
447
448 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450 self.metrics.reported_bad_transactions.increment(1);
451 }
452
453 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455 self.network.reputation_change(peer_id, kind);
456 }
457
458 fn report_already_seen(&self, peer_id: PeerId) {
459 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461 }
462
463 fn on_good_import(&mut self, hash: TxHash) {
465 self.transactions_by_peers.remove(&hash);
466 }
467
468 fn on_bad_import(&mut self, err: PoolError) {
492 let peers = self.transactions_by_peers.remove(&err.hash);
493
494 if !err.is_bad_transaction() || self.network.is_syncing() {
496 return
497 }
498 if let Some(peers) = peers {
501 for peer_id in peers {
502 self.report_peer_bad_transactions(peer_id);
503 }
504 }
505 self.metrics.bad_imports.increment(1);
506 self.bad_imports.insert(err.hash);
507 }
508
509 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
513 let info = &self.pending_pool_imports_info;
515 let max_pending_pool_imports = info.max_pending_pool_imports;
516 let has_capacity_wrt_pending_pool_imports =
517 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
518
519 self.transaction_fetcher
520 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
521 }
522
523 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
524 let kind = match req_err {
525 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
526 RequestError::Timeout => ReputationChangeKind::Timeout,
527 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
528 return
530 }
531 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
532 };
533 self.report_peer(peer_id, kind);
534 }
535
536 #[inline]
537 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
538 let metrics = &self.metrics;
539
540 let TxManagerPollDurations {
541 acc_network_events,
542 acc_pending_imports,
543 acc_tx_events,
544 acc_imported_txns,
545 acc_fetch_events,
546 acc_pending_fetch,
547 acc_cmds,
548 } = poll_durations;
549
550 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
552 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
554 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
555 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
556 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
557 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
558 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
559 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
560 }
561}
562
563impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
564 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
566 for res in batch_results {
567 match res {
568 Ok(AddedTransactionOutcome { hash, .. }) => {
569 self.on_good_import(hash);
570 }
571 Err(err) => {
572 self.on_bad_import(err);
573 }
574 }
575 }
576 }
577
578 fn on_new_pooled_transaction_hashes(
580 &mut self,
581 peer_id: PeerId,
582 msg: NewPooledTransactionHashes,
583 ) {
584 if self.network.is_initially_syncing() {
586 return
587 }
588 if self.network.tx_gossip_disabled() {
589 return
590 }
591
592 let Some(peer) = self.peers.get_mut(&peer_id) else {
594 trace!(
595 peer_id = format!("{peer_id:#}"),
596 ?msg,
597 "discarding announcement from inactive peer"
598 );
599
600 return
601 };
602 let client = peer.client_version.clone();
603
604 let mut count_txns_already_seen_by_peer = 0;
606 for tx in msg.iter_hashes().copied() {
607 if !peer.seen_transactions.insert(tx) {
608 count_txns_already_seen_by_peer += 1;
609 }
610 }
611 if count_txns_already_seen_by_peer > 0 {
612 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
617 self.metrics
618 .occurrences_hash_already_seen_by_peer
619 .increment(count_txns_already_seen_by_peer);
620
621 trace!(target: "net::tx",
622 %count_txns_already_seen_by_peer,
623 peer_id=format!("{peer_id:#}"),
624 ?client,
625 "Peer sent hashes that have already been marked as seen by peer"
626 );
627
628 self.report_already_seen(peer_id);
629 }
630
631 if msg.is_empty() {
633 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
634 return;
635 }
636
637 let original_len = msg.len();
638 let mut partially_valid_msg = msg.dedup();
639
640 if partially_valid_msg.len() != original_len {
641 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
642 }
643
644 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
646
647 let hashes_count_pre_pool_filter = partially_valid_msg.len();
655 self.pool.retain_unknown(&mut partially_valid_msg);
656 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
657 let already_known_hashes_count =
658 hashes_count_pre_pool_filter - partially_valid_msg.len();
659 self.metrics
660 .occurrences_hashes_already_in_pool
661 .increment(already_known_hashes_count as u64);
662 }
663
664 if partially_valid_msg.is_empty() {
665 return
667 }
668
669 let mut should_report_peer = false;
674 let mut tx_types_counter = TxTypesCounter::default();
675
676 let is_eth68_message = partially_valid_msg
677 .msg_version()
678 .expect("partially valid announcement should have a version")
679 .is_eth68();
680
681 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
682 let (ty_byte, size_val) = match *metadata_ref_mut {
683 Some((ty, size)) => {
684 if !is_eth68_message {
685 should_report_peer = true;
686 }
687 (ty, size)
688 }
689 None => {
690 if is_eth68_message {
691 should_report_peer = true;
692 return false;
693 }
694 (0u8, 0)
695 }
696 };
697
698 if is_eth68_message &&
699 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
700 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
701 {
702 tx_types_counter.increase_by_tx_type(parsed_tx_type);
703 }
704
705 let decision = self
706 .policies
707 .announcement_filter()
708 .decide_on_announcement(ty_byte, tx_hash, size_val);
709
710 match decision {
711 AnnouncementAcceptance::Accept => true,
712 AnnouncementAcceptance::Ignore => false,
713 AnnouncementAcceptance::Reject { penalize_peer } => {
714 if penalize_peer {
715 should_report_peer = true;
716 }
717 false
718 }
719 }
720 });
721
722 if is_eth68_message {
723 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
724 }
725
726 if should_report_peer {
727 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
728 }
729
730 let mut valid_announcement_data =
731 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
732
733 if valid_announcement_data.is_empty() {
734 return
736 }
737
738 let bad_imports = &self.bad_imports;
745 self.transaction_fetcher.filter_unseen_and_pending_hashes(
746 &mut valid_announcement_data,
747 |hash| bad_imports.contains(hash),
748 &peer_id,
749 &client,
750 );
751
752 if valid_announcement_data.is_empty() {
753 return
755 }
756
757 trace!(target: "net::tx::propagation",
758 peer_id=format!("{peer_id:#}"),
759 hashes_len=valid_announcement_data.len(),
760 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
761 msg_version=%valid_announcement_data.msg_version(),
762 client_version=%client,
763 "received previously unseen and pending hashes in announcement from peer"
764 );
765
766 if !self.transaction_fetcher.is_idle(&peer_id) {
769 let msg_version = valid_announcement_data.msg_version();
771 let (hashes, _version) = valid_announcement_data.into_request_hashes();
772
773 trace!(target: "net::tx",
774 peer_id=format!("{peer_id:#}"),
775 hashes=?*hashes,
776 %msg_version,
777 %client,
778 "buffering hashes announced by busy peer"
779 );
780
781 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
782
783 return
784 }
785
786 let mut hashes_to_request =
787 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
788 let surplus_hashes =
789 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
790
791 if !surplus_hashes.is_empty() {
792 trace!(target: "net::tx",
793 peer_id=format!("{peer_id:#}"),
794 surplus_hashes=?*surplus_hashes,
795 %client,
796 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
797 );
798
799 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
800 }
801
802 trace!(target: "net::tx",
803 peer_id=format!("{peer_id:#}"),
804 hashes=?*hashes_to_request,
805 %client,
806 "sending hashes in `GetPooledTransactions` request to peer's session"
807 );
808
809 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
813 if let Some(failed_to_request_hashes) =
814 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
815 {
816 let conn_eth_version = peer.version;
817
818 trace!(target: "net::tx",
819 peer_id=format!("{peer_id:#}"),
820 failed_to_request_hashes=?*failed_to_request_hashes,
821 %conn_eth_version,
822 %client,
823 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
824 );
825 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
826 }
827 }
828}
829
830impl<Pool, N> TransactionsManager<Pool, N>
831where
832 Pool: TransactionPool + Unpin + 'static,
833 N: NetworkPrimitives<
834 BroadcastedTransaction: SignedTransaction,
835 PooledTransaction: SignedTransaction,
836 > + Unpin,
837 Pool::Transaction:
838 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
839{
840 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
852 if self.network.is_initially_syncing() {
854 return
855 }
856 if self.network.tx_gossip_disabled() {
857 return
858 }
859
860 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
861
862 self.propagate_all(hashes);
863 }
864
865 fn propagate_full_transactions_to_peer(
869 &mut self,
870 txs: Vec<TxHash>,
871 peer_id: PeerId,
872 propagation_mode: PropagationMode,
873 ) -> Option<PropagatedTransactions> {
874 let peer = self.peers.get_mut(&peer_id)?;
875 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
876 let mut propagated = PropagatedTransactions::default();
877
878 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
880
881 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
882
883 if propagation_mode.is_forced() {
884 full_transactions.extend(to_propagate);
886 } else {
887 for tx in to_propagate {
890 if !peer.seen_transactions.contains(tx.tx_hash()) {
891 full_transactions.push(&tx);
893 }
894 }
895 }
896
897 if full_transactions.is_empty() {
898 return None
900 }
901
902 let PropagateTransactions { pooled, full } = full_transactions.build();
903
904 if let Some(new_pooled_hashes) = pooled {
906 for hash in new_pooled_hashes.iter_hashes().copied() {
907 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
908 peer.seen_transactions.insert(hash);
910 }
911
912 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
914 }
915
916 if let Some(new_full_transactions) = full {
918 for tx in &new_full_transactions {
919 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
920 peer.seen_transactions.insert(*tx.tx_hash());
922 }
923
924 self.network.send_transactions(peer_id, new_full_transactions);
926 }
927
928 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
930
931 Some(propagated)
932 }
933
934 fn propagate_hashes_to(
938 &mut self,
939 hashes: Vec<TxHash>,
940 peer_id: PeerId,
941 propagation_mode: PropagationMode,
942 ) {
943 trace!(target: "net::tx", "Start propagating transactions as hashes");
944
945 let propagated = {
948 let Some(peer) = self.peers.get_mut(&peer_id) else {
949 return
951 };
952
953 let to_propagate = self
954 .pool
955 .get_all(hashes)
956 .into_iter()
957 .map(PropagateTransaction::pool_tx)
958 .collect::<Vec<_>>();
959
960 let mut propagated = PropagatedTransactions::default();
961
962 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
964
965 if propagation_mode.is_forced() {
966 hashes.extend(to_propagate)
967 } else {
968 for tx in to_propagate {
969 if !peer.seen_transactions.contains(tx.tx_hash()) {
970 hashes.push(&tx);
972 }
973 }
974 }
975
976 let new_pooled_hashes = hashes.build();
977
978 if new_pooled_hashes.is_empty() {
979 return
981 }
982
983 for hash in new_pooled_hashes.iter_hashes().copied() {
984 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
985 }
986
987 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
988
989 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
991
992 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
994
995 propagated
996 };
997
998 self.pool.on_propagated(propagated);
1000 }
1001
1002 fn propagate_transactions(
1009 &mut self,
1010 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1011 propagation_mode: PropagationMode,
1012 ) -> PropagatedTransactions {
1013 let mut propagated = PropagatedTransactions::default();
1014 if self.network.tx_gossip_disabled() {
1015 return propagated
1016 }
1017
1018 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1020
1021 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1023 if !self.policies.propagation_policy().can_propagate(peer) {
1024 continue
1026 }
1027 let mut builder = if peer_idx > max_num_full {
1029 PropagateTransactionsBuilder::pooled(peer.version)
1030 } else {
1031 PropagateTransactionsBuilder::full(peer.version)
1032 };
1033
1034 if propagation_mode.is_forced() {
1035 builder.extend(to_propagate.iter());
1036 } else {
1037 for tx in &to_propagate {
1041 if !peer.seen_transactions.contains(tx.tx_hash()) {
1044 builder.push(tx);
1045 }
1046 }
1047 }
1048
1049 if builder.is_empty() {
1050 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1051 continue
1052 }
1053
1054 let PropagateTransactions { pooled, full } = builder.build();
1055
1056 if let Some(mut new_pooled_hashes) = pooled {
1058 new_pooled_hashes
1061 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1062
1063 for hash in new_pooled_hashes.iter_hashes().copied() {
1064 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1065 peer.seen_transactions.insert(hash);
1067 }
1068
1069 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1070
1071 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1073 }
1074
1075 if let Some(new_full_transactions) = full {
1077 for tx in &new_full_transactions {
1078 propagated
1079 .0
1080 .entry(*tx.tx_hash())
1081 .or_default()
1082 .push(PropagateKind::Full(*peer_id));
1083 peer.seen_transactions.insert(*tx.tx_hash());
1085 }
1086
1087 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1088
1089 self.network.send_transactions(*peer_id, new_full_transactions);
1091 }
1092 }
1093
1094 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1096
1097 propagated
1098 }
1099
1100 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1105 if self.peers.is_empty() {
1106 return
1108 }
1109 let propagated = self.propagate_transactions(
1110 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1111 PropagationMode::Basic,
1112 );
1113
1114 self.pool.on_propagated(propagated);
1116 }
1117
1118 fn on_get_pooled_transactions(
1120 &mut self,
1121 peer_id: PeerId,
1122 request: GetPooledTransactions,
1123 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1124 ) {
1125 if let Some(peer) = self.peers.get_mut(&peer_id) {
1126 if self.network.tx_gossip_disabled() {
1127 let _ = response.send(Ok(PooledTransactions::default()));
1128 return
1129 }
1130 let transactions = self.pool.get_pooled_transaction_elements(
1131 request.0,
1132 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1133 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1134 ),
1135 );
1136 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1137
1138 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1141
1142 let resp = PooledTransactions(transactions);
1143 let _ = response.send(Ok(resp));
1144 }
1145 }
1146
1147 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1149 match cmd {
1150 TransactionsCommand::PropagateHash(hash) => {
1151 self.on_new_pending_transactions(vec![hash])
1152 }
1153 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1154 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1155 }
1156 TransactionsCommand::GetActivePeers(tx) => {
1157 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1158 tx.send(peers).ok();
1159 }
1160 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1161 if let Some(propagated) =
1162 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1163 {
1164 self.pool.on_propagated(propagated);
1165 }
1166 }
1167 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1168 TransactionsCommand::BroadcastTransactions(txs) => {
1169 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1170 self.pool.on_propagated(propagated);
1171 }
1172 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1173 let mut res = HashMap::with_capacity(peers.len());
1174 for peer_id in peers {
1175 let hashes = self
1176 .peers
1177 .get(&peer_id)
1178 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1179 .unwrap_or_default();
1180 res.insert(peer_id, hashes);
1181 }
1182 tx.send(res).ok();
1183 }
1184 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1185 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1186 peer_request_sender.send(sender).ok();
1187 }
1188 }
1189 }
1190
1191 fn handle_peer_session(
1195 &mut self,
1196 info: SessionInfo,
1197 messages: PeerRequestSender<PeerRequest<N>>,
1198 ) {
1199 let SessionInfo { peer_id, client_version, version, .. } = info;
1200
1201 let peer = PeerMetadata::<N>::new(
1203 messages,
1204 version,
1205 client_version,
1206 self.config.max_transactions_seen_by_peer_history,
1207 info.peer_kind,
1208 );
1209 let peer = match self.peers.entry(peer_id) {
1210 Entry::Occupied(mut entry) => {
1211 entry.insert(peer);
1212 entry.into_mut()
1213 }
1214 Entry::Vacant(entry) => entry.insert(peer),
1215 };
1216
1217 self.policies.propagation_policy_mut().on_session_established(peer);
1218
1219 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1223 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1224 return
1225 }
1226
1227 let pooled_txs = self.pool.pooled_transactions_max(
1229 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1230 );
1231 if pooled_txs.is_empty() {
1232 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1233 return;
1234 }
1235
1236 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1238 for pooled_tx in pooled_txs {
1239 peer.seen_transactions.insert(*pooled_tx.hash());
1240 msg_builder.push_pooled(pooled_tx);
1241 }
1242
1243 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1244 let msg = msg_builder.build();
1245 self.network.send_transactions_hashes(peer_id, msg);
1246 }
1247
1248 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1250 match event_result {
1251 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1252 let peer = self.peers.remove(&peer_id);
1255 if let Some(mut peer) = peer {
1256 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1257 }
1258 self.transaction_fetcher.remove_peer(&peer_id);
1259 }
1260 NetworkEvent::ActivePeerSession { info, messages } => {
1261 self.handle_peer_session(info, messages);
1263 }
1264 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1265 let peer_id = info.peer_id;
1266 let messages = match self.peers.get(&peer_id) {
1268 Some(p) => p.request_tx.clone(),
1269 None => {
1270 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1271 return;
1272 }
1273 };
1274 self.handle_peer_session(info, messages);
1275 }
1276 _ => {}
1277 }
1278 }
1279
1280 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1282 if self.config.ingress_policy.allows_all() {
1283 return true;
1284 }
1285 let Some(peer) = self.peers.get(peer_id) else {
1286 return false;
1287 };
1288 self.config.ingress_policy.allows(peer.peer_kind())
1289 }
1290
1291 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1293 match event {
1294 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1295 if !self.accepts_incoming_from(&peer_id) {
1296 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1297 return;
1298 }
1299
1300 let has_blob_txs = msg.has_eip4844();
1304
1305 let non_blob_txs = msg
1306 .0
1307 .into_iter()
1308 .map(N::PooledTransaction::try_from)
1309 .filter_map(Result::ok)
1310 .collect();
1311
1312 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1313
1314 if has_blob_txs {
1315 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1316 self.report_peer_bad_transactions(peer_id);
1317 }
1318 }
1319 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1320 if !self.accepts_incoming_from(&peer_id) {
1321 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1322 return;
1323 }
1324 self.on_new_pooled_transaction_hashes(peer_id, msg)
1325 }
1326 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1327 self.on_get_pooled_transactions(peer_id, request, response)
1328 }
1329 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1330 let _ = response.send(Some(self.handle()));
1331 }
1332 }
1333 }
1334
1335 fn import_transactions(
1337 &mut self,
1338 peer_id: PeerId,
1339 transactions: PooledTransactions<N::PooledTransaction>,
1340 source: TransactionSource,
1341 ) {
1342 if self.network.is_initially_syncing() {
1344 return
1345 }
1346 if self.network.tx_gossip_disabled() {
1347 return
1348 }
1349
1350 if !self.has_capacity_for_pending_pool_imports() {
1352 return
1353 }
1354
1355 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1356 let client_version = peer.client_version.clone();
1357 let mut transactions = transactions.0;
1358
1359 let start = Instant::now();
1360
1361 self.transaction_fetcher
1363 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1364
1365 let mut num_already_seen_by_peer = 0;
1370 for tx in &transactions {
1371 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1372 num_already_seen_by_peer += 1;
1373 }
1374 }
1375
1376 let txns_count_pre_pool_filter = transactions.len();
1378 self.pool.retain_unknown(&mut transactions);
1379 if txns_count_pre_pool_filter > transactions.len() {
1380 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1381 self.metrics
1382 .occurrences_transactions_already_in_pool
1383 .increment(already_known_txns_count as u64);
1384 }
1385
1386 let mut has_bad_transactions = false;
1388
1389 transactions.retain(|tx| {
1391 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1392 entry.get_mut().insert(peer_id);
1393 return false
1394 }
1395 if self.bad_imports.contains(tx.tx_hash()) {
1396 trace!(target: "net::tx",
1397 peer_id=format!("{peer_id:#}"),
1398 hash=%tx.tx_hash(),
1399 %client_version,
1400 "received a known bad transaction from peer"
1401 );
1402 has_bad_transactions = true;
1403 return false;
1404 }
1405 true
1406 });
1407
1408 let capacity = self.remaining_pool_import_capacity();
1411 if transactions.len() > capacity {
1412 let skipped = transactions.len() - capacity;
1413 transactions.truncate(capacity);
1414 self.metrics
1415 .skipped_transactions_pending_pool_imports_at_capacity
1416 .increment(skipped as u64);
1417 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1418 }
1419
1420 let txs_len = transactions.len();
1421
1422 let new_txs = transactions
1423 .into_par_iter()
1424 .filter_map(|tx| match tx.try_into_recovered() {
1425 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1426 Err(badtx) => {
1427 trace!(target: "net::tx",
1428 peer_id=format!("{peer_id:#}"),
1429 hash=%badtx.tx_hash(),
1430 client_version=%client_version,
1431 "failed ecrecovery for transaction"
1432 );
1433 None
1434 }
1435 })
1436 .collect::<Vec<_>>();
1437
1438 has_bad_transactions |= new_txs.len() != txs_len;
1439
1440 for tx in &new_txs {
1442 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1443 }
1444
1445 if !new_txs.is_empty() {
1448 let pool = self.pool.clone();
1449 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1451 metric_pending_pool_imports.increment(new_txs.len() as f64);
1452
1453 self.pending_pool_imports_info
1455 .pending_pool_imports
1456 .fetch_add(new_txs.len(), Ordering::Relaxed);
1457 let tx_manager_info_pending_pool_imports =
1458 self.pending_pool_imports_info.pending_pool_imports.clone();
1459
1460 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1461 let import = Box::pin(async move {
1462 let added = new_txs.len();
1463 let res = pool.add_external_transactions(new_txs).await;
1464
1465 metric_pending_pool_imports.decrement(added as f64);
1467 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1469
1470 res
1471 });
1472
1473 self.pool_imports.push(import);
1474 }
1475
1476 if num_already_seen_by_peer > 0 {
1477 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1478 self.metrics
1479 .occurrences_of_transaction_already_seen_by_peer
1480 .increment(num_already_seen_by_peer);
1481 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1482 }
1483
1484 if has_bad_transactions {
1485 self.report_peer_bad_transactions(peer_id)
1487 }
1488
1489 if num_already_seen_by_peer > 0 {
1490 self.report_already_seen(peer_id);
1491 }
1492
1493 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1494 }
1495
1496 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1498 match fetch_event {
1499 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1500 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1501 if report_peer {
1502 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1503 }
1504 }
1505 FetchEvent::FetchError { peer_id, error } => {
1506 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1507 self.on_request_error(peer_id, error);
1508 }
1509 FetchEvent::EmptyResponse { peer_id } => {
1510 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1511 }
1512 }
1513 }
1514}
1515
1516impl<
1524 Pool: TransactionPool + Unpin + 'static,
1525 N: NetworkPrimitives<
1526 BroadcastedTransaction: SignedTransaction,
1527 PooledTransaction: SignedTransaction,
1528 > + Unpin,
1529 > Future for TransactionsManager<Pool, N>
1530where
1531 Pool::Transaction:
1532 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1533{
1534 type Output = ();
1535
1536 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1537 let start = Instant::now();
1538 let mut poll_durations = TxManagerPollDurations::default();
1539
1540 let this = self.get_mut();
1541
1542 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1548 poll_durations.acc_network_events,
1549 "net::tx",
1550 "Network events stream",
1551 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1552 this.network_events.poll_next_unpin(cx),
1553 |event| this.on_network_event(event)
1554 );
1555
1556 let mut new_txs = Vec::new();
1565 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1566 cx,
1567 &mut new_txs,
1568 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1569 ) {
1570 Poll::Ready(count) => {
1571 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1572 true
1575 } else {
1576 let limit =
1580 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1581 new_txs.len();
1582 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1583 }
1584 }
1585 Poll::Pending => false,
1586 };
1587 if !new_txs.is_empty() {
1588 this.on_new_pending_transactions(new_txs);
1589 }
1590
1591 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1606 poll_durations.acc_tx_events,
1607 "net::tx",
1608 "Network transaction events stream",
1609 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1610 this.transaction_events.poll_next_unpin(cx),
1611 |event| this.on_network_tx_event(event),
1612 );
1613
1614 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1625 poll_durations.acc_fetch_events,
1626 "net::tx",
1627 "Transaction fetch events stream",
1628 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1629 this.transaction_fetcher.poll_next_unpin(cx),
1630 |event| this.on_fetch_event(event),
1631 );
1632
1633 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1648 poll_durations.acc_pending_imports,
1649 "net::tx",
1650 "Batched pool imports stream",
1651 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1652 this.pool_imports.poll_next_unpin(cx),
1653 |batch_results| this.on_batch_import_result(batch_results)
1654 );
1655
1656 duration_metered_exec!(
1661 {
1662 if this.has_capacity_for_fetching_pending_hashes() &&
1663 this.on_fetch_hashes_pending_fetch()
1664 {
1665 maybe_more_tx_fetch_events = true;
1666 }
1667 },
1668 poll_durations.acc_pending_fetch
1669 );
1670
1671 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1673 poll_durations.acc_cmds,
1674 "net::tx",
1675 "Commands channel",
1676 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1677 this.command_rx.poll_next_unpin(cx),
1678 |cmd| this.on_command(cmd)
1679 );
1680
1681 this.transaction_fetcher.update_metrics();
1682
1683 if maybe_more_network_events ||
1685 maybe_more_commands ||
1686 maybe_more_tx_events ||
1687 maybe_more_tx_fetch_events ||
1688 maybe_more_pool_imports ||
1689 maybe_more_pending_txns
1690 {
1691 cx.waker().wake_by_ref();
1693 return Poll::Pending
1694 }
1695
1696 this.update_poll_metrics(start, poll_durations);
1697
1698 Poll::Pending
1699 }
1700}
1701
1702#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1706enum PropagationMode {
1707 Basic,
1711 Forced,
1716}
1717
1718impl PropagationMode {
1719 const fn is_forced(self) -> bool {
1721 matches!(self, Self::Forced)
1722 }
1723}
1724
1725#[derive(Debug, Clone)]
1727struct PropagateTransaction<T = TransactionSigned> {
1728 size: usize,
1729 transaction: Arc<T>,
1730}
1731
1732impl<T: SignedTransaction> PropagateTransaction<T> {
1733 pub fn new(transaction: T) -> Self {
1735 let size = transaction.length();
1736 Self { size, transaction: Arc::new(transaction) }
1737 }
1738
1739 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1741 where
1742 P: PoolTransaction<Consensus = T>,
1743 {
1744 let size = tx.encoded_length();
1745 let transaction = tx.transaction.clone_into_consensus();
1746 let transaction = Arc::new(transaction.into_inner());
1747 Self { size, transaction }
1748 }
1749
1750 fn tx_hash(&self) -> &TxHash {
1751 self.transaction.tx_hash()
1752 }
1753}
1754
1755#[derive(Debug, Clone)]
1758enum PropagateTransactionsBuilder<T> {
1759 Pooled(PooledTransactionsHashesBuilder),
1760 Full(FullTransactionsBuilder<T>),
1761}
1762
1763impl<T> PropagateTransactionsBuilder<T> {
1764 fn pooled(version: EthVersion) -> Self {
1766 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1767 }
1768
1769 fn full(version: EthVersion) -> Self {
1771 Self::Full(FullTransactionsBuilder::new(version))
1772 }
1773
1774 fn is_empty(&self) -> bool {
1776 match self {
1777 Self::Pooled(builder) => builder.is_empty(),
1778 Self::Full(builder) => builder.is_empty(),
1779 }
1780 }
1781
1782 fn build(self) -> PropagateTransactions<T> {
1784 match self {
1785 Self::Pooled(pooled) => {
1786 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1787 }
1788 Self::Full(full) => full.build(),
1789 }
1790 }
1791}
1792
1793impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1794 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1796 for tx in txs {
1797 self.push(tx);
1798 }
1799 }
1800
1801 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1803 match self {
1804 Self::Pooled(builder) => builder.push(transaction),
1805 Self::Full(builder) => builder.push(transaction),
1806 }
1807 }
1808}
1809
1810struct PropagateTransactions<T> {
1812 pooled: Option<NewPooledTransactionHashes>,
1814 full: Option<Vec<Arc<T>>>,
1816}
1817
1818#[derive(Debug, Clone)]
1823struct FullTransactionsBuilder<T> {
1824 total_size: usize,
1826 transactions: Vec<Arc<T>>,
1828 pooled: PooledTransactionsHashesBuilder,
1830}
1831
1832impl<T> FullTransactionsBuilder<T> {
1833 fn new(version: EthVersion) -> Self {
1835 Self {
1836 total_size: 0,
1837 pooled: PooledTransactionsHashesBuilder::new(version),
1838 transactions: vec![],
1839 }
1840 }
1841
1842 fn is_empty(&self) -> bool {
1844 self.transactions.is_empty() && self.pooled.is_empty()
1845 }
1846
1847 fn build(self) -> PropagateTransactions<T> {
1849 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1850 let full = Some(self.transactions).filter(|full| !full.is_empty());
1851 PropagateTransactions { pooled, full }
1852 }
1853}
1854
1855impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1856 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1858 for tx in txs {
1859 self.push(&tx)
1860 }
1861 }
1862
1863 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1873 if !transaction.transaction.is_broadcastable_in_full() {
1882 self.pooled.push(transaction);
1883 return
1884 }
1885
1886 let new_size = self.total_size + transaction.size;
1887 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1888 self.total_size > 0
1889 {
1890 self.pooled.push(transaction);
1892 return
1893 }
1894
1895 self.total_size = new_size;
1896 self.transactions.push(Arc::clone(&transaction.transaction));
1897 }
1898}
1899
1900#[derive(Debug, Clone)]
1903enum PooledTransactionsHashesBuilder {
1904 Eth66(NewPooledTransactionHashes66),
1905 Eth68(NewPooledTransactionHashes68),
1906}
1907
1908impl PooledTransactionsHashesBuilder {
1911 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1913 match self {
1914 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1915 Self::Eth68(msg) => {
1916 msg.hashes.push(*pooled_tx.hash());
1917 msg.sizes.push(pooled_tx.encoded_length());
1918 msg.types.push(pooled_tx.transaction.ty());
1919 }
1920 }
1921 }
1922
1923 fn is_empty(&self) -> bool {
1925 match self {
1926 Self::Eth66(hashes) => hashes.is_empty(),
1927 Self::Eth68(hashes) => hashes.is_empty(),
1928 }
1929 }
1930
1931 fn extend<T: SignedTransaction>(
1933 &mut self,
1934 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1935 ) {
1936 for tx in txs {
1937 self.push(&tx);
1938 }
1939 }
1940
1941 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1942 match self {
1943 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1944 Self::Eth68(msg) => {
1945 msg.hashes.push(*tx.tx_hash());
1946 msg.sizes.push(tx.size);
1947 msg.types.push(tx.transaction.ty());
1948 }
1949 }
1950 }
1951
1952 fn new(version: EthVersion) -> Self {
1954 match version {
1955 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1956 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
1957 Self::Eth68(Default::default())
1958 }
1959 }
1960 }
1961
1962 fn build(self) -> NewPooledTransactionHashes {
1963 match self {
1964 Self::Eth66(mut msg) => {
1965 msg.0.shrink_to_fit();
1966 msg.into()
1967 }
1968 Self::Eth68(mut msg) => {
1969 msg.shrink_to_fit();
1970 msg.into()
1971 }
1972 }
1973 }
1974}
1975
1976enum TransactionSource {
1978 Broadcast,
1980 Response,
1982}
1983
1984impl TransactionSource {
1987 const fn is_broadcast(&self) -> bool {
1989 matches!(self, Self::Broadcast)
1990 }
1991}
1992
1993#[derive(Debug)]
1995pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1996 seen_transactions: LruCache<TxHash>,
2000 request_tx: PeerRequestSender<PeerRequest<N>>,
2002 version: EthVersion,
2004 client_version: Arc<str>,
2006 peer_kind: PeerKind,
2008}
2009
2010impl<N: NetworkPrimitives> PeerMetadata<N> {
2011 pub fn new(
2013 request_tx: PeerRequestSender<PeerRequest<N>>,
2014 version: EthVersion,
2015 client_version: Arc<str>,
2016 max_transactions_seen_by_peer: u32,
2017 peer_kind: PeerKind,
2018 ) -> Self {
2019 Self {
2020 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2021 request_tx,
2022 version,
2023 client_version,
2024 peer_kind,
2025 }
2026 }
2027
2028 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2030 &self.request_tx
2031 }
2032
2033 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2035 &mut self.seen_transactions
2036 }
2037
2038 pub const fn version(&self) -> EthVersion {
2040 self.version
2041 }
2042
2043 pub fn client_version(&self) -> &str {
2045 &self.client_version
2046 }
2047
2048 pub const fn peer_kind(&self) -> PeerKind {
2050 self.peer_kind
2051 }
2052}
2053
2054#[derive(Debug)]
2056enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2057 PropagateHash(B256),
2059 PropagateHashesTo(Vec<B256>, PeerId),
2061 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2063 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2065 PropagateTransactions(Vec<TxHash>),
2067 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2069 GetTransactionHashes {
2071 peers: Vec<PeerId>,
2072 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2073 },
2074 GetPeerSender {
2076 peer_id: PeerId,
2077 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2078 },
2079}
2080
2081#[derive(Debug)]
2083pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2084 IncomingTransactions {
2088 peer_id: PeerId,
2090 msg: Transactions<N::BroadcastedTransaction>,
2092 },
2093 IncomingPooledTransactionHashes {
2095 peer_id: PeerId,
2097 msg: NewPooledTransactionHashes,
2099 },
2100 GetPooledTransactions {
2102 peer_id: PeerId,
2104 request: GetPooledTransactions,
2106 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2108 },
2109 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2111}
2112
2113#[derive(Debug)]
2115pub struct PendingPoolImportsInfo {
2116 pending_pool_imports: Arc<AtomicUsize>,
2118 max_pending_pool_imports: usize,
2120}
2121
2122impl PendingPoolImportsInfo {
2123 pub fn new(max_pending_pool_imports: usize) -> Self {
2125 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2126 }
2127
2128 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2130 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2131 }
2132}
2133
2134impl Default for PendingPoolImportsInfo {
2135 fn default() -> Self {
2136 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2137 }
2138}
2139
2140#[derive(Debug, Default)]
2141struct TxManagerPollDurations {
2142 acc_network_events: Duration,
2143 acc_pending_imports: Duration,
2144 acc_tx_events: Duration,
2145 acc_imported_txns: Duration,
2146 acc_fetch_events: Duration,
2147 acc_pending_fetch: Duration,
2148 acc_cmds: Duration,
2149}
2150
2151#[cfg(test)]
2152mod tests {
2153 use super::*;
2154 use crate::{
2155 test_utils::{
2156 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2157 Testnet,
2158 },
2159 transactions::config::RelaxedEthAnnouncementFilter,
2160 NetworkConfigBuilder, NetworkManager,
2161 };
2162 use alloy_consensus::{TxEip1559, TxLegacy};
2163 use alloy_primitives::{hex, Signature, TxKind, U256};
2164 use alloy_rlp::Decodable;
2165 use futures::FutureExt;
2166 use reth_chainspec::MIN_TRANSACTION_GAS;
2167 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2168 use reth_network_api::{NetworkInfo, PeerKind};
2169 use reth_network_p2p::{
2170 error::{RequestError, RequestResult},
2171 sync::{NetworkSyncUpdater, SyncState},
2172 };
2173 use reth_storage_api::noop::NoopProvider;
2174 use reth_transaction_pool::test_utils::{
2175 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2176 };
2177 use secp256k1::SecretKey;
2178 use std::{
2179 future::poll_fn,
2180 net::{IpAddr, Ipv4Addr, SocketAddr},
2181 str::FromStr,
2182 };
2183 use tracing::error;
2184
2185 #[tokio::test(flavor = "multi_thread")]
2186 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2187 reth_tracing::init_test_tracing();
2188 let net = Testnet::create(3).await;
2189
2190 let mut handles = net.handles();
2191 let handle0 = handles.next().unwrap();
2192 let handle1 = handles.next().unwrap();
2193
2194 drop(handles);
2195 let handle = net.spawn();
2196
2197 let listener0 = handle0.event_listener();
2198 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2199 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2200
2201 let client = NoopProvider::default();
2202 let pool = testing_pool();
2203 let config = NetworkConfigBuilder::eth(secret_key)
2204 .disable_discovery()
2205 .listener_port(0)
2206 .build(client);
2207 let transactions_manager_config = config.transactions_manager_config.clone();
2208 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2209 .await
2210 .unwrap()
2211 .into_builder()
2212 .transactions(pool.clone(), transactions_manager_config)
2213 .split_with_handle();
2214
2215 tokio::task::spawn(network);
2216
2217 network_handle.update_sync_state(SyncState::Syncing);
2219 assert!(NetworkInfo::is_syncing(&network_handle));
2220 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2221
2222 let mut established = listener0.take(2);
2224 while let Some(ev) = established.next().await {
2225 match ev {
2226 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2227 transactions
2229 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2230 }
2231 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2232 ev => {
2233 error!("unexpected event {ev:?}")
2234 }
2235 }
2236 }
2237 let input = hex!(
2239 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2240 );
2241 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2242 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2243 peer_id: *handle1.peer_id(),
2244 msg: Transactions(vec![signed_tx.clone()]),
2245 });
2246 poll_fn(|cx| {
2247 let _ = transactions.poll_unpin(cx);
2248 Poll::Ready(())
2249 })
2250 .await;
2251 assert!(pool.is_empty());
2252 handle.terminate().await;
2253 }
2254
2255 #[tokio::test(flavor = "multi_thread")]
2256 async fn test_tx_broadcasts_through_two_syncs() {
2257 reth_tracing::init_test_tracing();
2258 let net = Testnet::create(3).await;
2259
2260 let mut handles = net.handles();
2261 let handle0 = handles.next().unwrap();
2262 let handle1 = handles.next().unwrap();
2263
2264 drop(handles);
2265 let handle = net.spawn();
2266
2267 let listener0 = handle0.event_listener();
2268 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2269 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2270
2271 let client = NoopProvider::default();
2272 let pool = testing_pool();
2273 let config = NetworkConfigBuilder::new(secret_key)
2274 .disable_discovery()
2275 .listener_port(0)
2276 .build(client);
2277 let transactions_manager_config = config.transactions_manager_config.clone();
2278 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2279 .await
2280 .unwrap()
2281 .into_builder()
2282 .transactions(pool.clone(), transactions_manager_config)
2283 .split_with_handle();
2284
2285 tokio::task::spawn(network);
2286
2287 network_handle.update_sync_state(SyncState::Syncing);
2289 assert!(NetworkInfo::is_syncing(&network_handle));
2290 network_handle.update_sync_state(SyncState::Idle);
2291 assert!(!NetworkInfo::is_syncing(&network_handle));
2292 network_handle.update_sync_state(SyncState::Syncing);
2293 assert!(NetworkInfo::is_syncing(&network_handle));
2294
2295 let mut established = listener0.take(2);
2297 while let Some(ev) = established.next().await {
2298 match ev {
2299 NetworkEvent::ActivePeerSession { .. } |
2300 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2301 transactions.on_network_event(ev);
2303 }
2304 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2305 _ => {
2306 error!("unexpected event {ev:?}")
2307 }
2308 }
2309 }
2310 let input = hex!(
2312 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2313 );
2314 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2315 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2316 peer_id: *handle1.peer_id(),
2317 msg: Transactions(vec![signed_tx.clone()]),
2318 });
2319 poll_fn(|cx| {
2320 let _ = transactions.poll_unpin(cx);
2321 Poll::Ready(())
2322 })
2323 .await;
2324 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2325 assert!(NetworkInfo::is_syncing(&network_handle));
2326 assert!(!pool.is_empty());
2327 handle.terminate().await;
2328 }
2329
2330 #[tokio::test(flavor = "multi_thread")]
2333 async fn test_handle_incoming_transactions_hashes() {
2334 reth_tracing::init_test_tracing();
2335
2336 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2337 let client = NoopProvider::default();
2338
2339 let config = NetworkConfigBuilder::new(secret_key)
2340 .listener_port(0)
2342 .disable_discovery()
2343 .build(client);
2344
2345 let pool = testing_pool();
2346
2347 let transactions_manager_config = config.transactions_manager_config.clone();
2348 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2349 .await
2350 .unwrap()
2351 .into_builder()
2352 .transactions(pool.clone(), transactions_manager_config)
2353 .split_with_handle();
2354
2355 let peer_id_1 = PeerId::new([1; 64]);
2356 let eth_version = EthVersion::Eth66;
2357
2358 let txs = vec![TransactionSigned::new_unhashed(
2359 Transaction::Legacy(TxLegacy {
2360 chain_id: Some(4),
2361 nonce: 15u64,
2362 gas_price: 2200000000,
2363 gas_limit: 34811,
2364 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2365 value: U256::from(1234u64),
2366 input: Default::default(),
2367 }),
2368 Signature::new(
2369 U256::from_str(
2370 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2371 )
2372 .unwrap(),
2373 U256::from_str(
2374 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2375 )
2376 .unwrap(),
2377 true,
2378 ),
2379 )];
2380
2381 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2382
2383 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2384 tx_manager.peers.insert(peer_id_1, peer_1);
2385
2386 assert!(pool.is_empty());
2387
2388 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2389 peer_id: peer_id_1,
2390 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2391 txs_hashes.clone(),
2392 )),
2393 });
2394
2395 let req = to_mock_session_rx
2397 .recv()
2398 .await
2399 .expect("peer_1 session should receive request with buffered hashes");
2400 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2401 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2402
2403 let message: Vec<PooledTransactionVariant> = txs
2404 .into_iter()
2405 .map(|tx| {
2406 PooledTransactionVariant::try_from(tx)
2407 .expect("Failed to convert MockTransaction to PooledTransaction")
2408 })
2409 .collect();
2410
2411 response
2413 .send(Ok(PooledTransactions(message)))
2414 .expect("should send peer_1 response to tx manager");
2415
2416 poll_fn(|cx| {
2418 let _ = tx_manager.poll_unpin(cx);
2419 Poll::Ready(())
2420 })
2421 .await;
2422
2423 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2426 }
2427
2428 #[tokio::test(flavor = "multi_thread")]
2429 async fn test_handle_incoming_transactions() {
2430 reth_tracing::init_test_tracing();
2431 let net = Testnet::create(3).await;
2432
2433 let mut handles = net.handles();
2434 let handle0 = handles.next().unwrap();
2435 let handle1 = handles.next().unwrap();
2436
2437 drop(handles);
2438 let handle = net.spawn();
2439
2440 let listener0 = handle0.event_listener();
2441
2442 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2443 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2444
2445 let client = NoopProvider::default();
2446 let pool = testing_pool();
2447 let config = NetworkConfigBuilder::new(secret_key)
2448 .disable_discovery()
2449 .listener_port(0)
2450 .build(client);
2451 let transactions_manager_config = config.transactions_manager_config.clone();
2452 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2453 .await
2454 .unwrap()
2455 .into_builder()
2456 .transactions(pool.clone(), transactions_manager_config)
2457 .split_with_handle();
2458 tokio::task::spawn(network);
2459
2460 network_handle.update_sync_state(SyncState::Idle);
2461
2462 assert!(!NetworkInfo::is_syncing(&network_handle));
2463
2464 let mut established = listener0.take(2);
2466 while let Some(ev) = established.next().await {
2467 match ev {
2468 NetworkEvent::ActivePeerSession { .. } |
2469 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2470 transactions.on_network_event(ev);
2472 }
2473 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2474 ev => {
2475 error!("unexpected event {ev:?}")
2476 }
2477 }
2478 }
2479 let input = hex!(
2481 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2482 );
2483 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2484 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2485 peer_id: *handle1.peer_id(),
2486 msg: Transactions(vec![signed_tx.clone()]),
2487 });
2488 assert!(transactions
2489 .transactions_by_peers
2490 .get(signed_tx.tx_hash())
2491 .unwrap()
2492 .contains(handle1.peer_id()));
2493
2494 poll_fn(|cx| {
2496 let _ = transactions.poll_unpin(cx);
2497 Poll::Ready(())
2498 })
2499 .await;
2500
2501 assert!(!pool.is_empty());
2502 assert!(pool.get(signed_tx.tx_hash()).is_some());
2503 handle.terminate().await;
2504 }
2505
2506 #[tokio::test(flavor = "multi_thread")]
2507 async fn test_on_get_pooled_transactions_network() {
2508 reth_tracing::init_test_tracing();
2509 let net = Testnet::create(2).await;
2510
2511 let mut handles = net.handles();
2512 let handle0 = handles.next().unwrap();
2513 let handle1 = handles.next().unwrap();
2514
2515 drop(handles);
2516 let handle = net.spawn();
2517
2518 let listener0 = handle0.event_listener();
2519
2520 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2521 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2522
2523 let client = NoopProvider::default();
2524 let pool = testing_pool();
2525 let config = NetworkConfigBuilder::new(secret_key)
2526 .disable_discovery()
2527 .listener_port(0)
2528 .build(client);
2529 let transactions_manager_config = config.transactions_manager_config.clone();
2530 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2531 .await
2532 .unwrap()
2533 .into_builder()
2534 .transactions(pool.clone(), transactions_manager_config)
2535 .split_with_handle();
2536 tokio::task::spawn(network);
2537
2538 network_handle.update_sync_state(SyncState::Idle);
2539
2540 assert!(!NetworkInfo::is_syncing(&network_handle));
2541
2542 let mut established = listener0.take(2);
2544 while let Some(ev) = established.next().await {
2545 match ev {
2546 NetworkEvent::ActivePeerSession { .. } |
2547 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2548 transactions.on_network_event(ev);
2549 }
2550 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2551 ev => {
2552 error!("unexpected event {ev:?}")
2553 }
2554 }
2555 }
2556 handle.terminate().await;
2557
2558 let tx = MockTransaction::eip1559();
2559 let _ = transactions
2560 .pool
2561 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2562 .await;
2563
2564 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2565
2566 let (send, receive) =
2567 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2568
2569 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2570 peer_id: *handle1.peer_id(),
2571 request,
2572 response: send,
2573 });
2574
2575 match receive.await.unwrap() {
2576 Ok(PooledTransactions(transactions)) => {
2577 assert_eq!(transactions.len(), 1);
2578 }
2579 Err(e) => {
2580 panic!("error: {e:?}");
2581 }
2582 }
2583 }
2584
2585 #[tokio::test]
2589 async fn test_partially_tx_response() {
2590 reth_tracing::init_test_tracing();
2591
2592 let mut tx_manager = new_tx_manager().await.0;
2593 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2594
2595 let peer_id_1 = PeerId::new([1; 64]);
2596 let eth_version = EthVersion::Eth66;
2597
2598 let txs = vec![
2599 TransactionSigned::new_unhashed(
2600 Transaction::Legacy(TxLegacy {
2601 chain_id: Some(4),
2602 nonce: 15u64,
2603 gas_price: 2200000000,
2604 gas_limit: 34811,
2605 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2606 value: U256::from(1234u64),
2607 input: Default::default(),
2608 }),
2609 Signature::new(
2610 U256::from_str(
2611 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2612 )
2613 .unwrap(),
2614 U256::from_str(
2615 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2616 )
2617 .unwrap(),
2618 true,
2619 ),
2620 ),
2621 TransactionSigned::new_unhashed(
2622 Transaction::Eip1559(TxEip1559 {
2623 chain_id: 4,
2624 nonce: 26u64,
2625 max_priority_fee_per_gas: 1500000000,
2626 max_fee_per_gas: 1500000013,
2627 gas_limit: MIN_TRANSACTION_GAS,
2628 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2629 value: U256::from(3000000000000000000u64),
2630 input: Default::default(),
2631 access_list: Default::default(),
2632 }),
2633 Signature::new(
2634 U256::from_str(
2635 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2636 )
2637 .unwrap(),
2638 U256::from_str(
2639 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2640 )
2641 .unwrap(),
2642 true,
2643 ),
2644 ),
2645 ];
2646
2647 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2648
2649 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2650 peer_1.seen_transactions.insert(txs_hashes[0]);
2653 peer_1.seen_transactions.insert(txs_hashes[1]);
2654 tx_manager.peers.insert(peer_id_1, peer_1);
2655
2656 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2657 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2658
2659 assert!(tx_fetcher.is_idle(&peer_id_1));
2661 assert_eq!(tx_fetcher.active_peers.len(), 0);
2662
2663 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2665
2666 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2667 assert!(!tx_fetcher.is_idle(&peer_id_1));
2669 assert_eq!(tx_fetcher.active_peers.len(), 1);
2670
2671 let req = to_mock_session_rx
2673 .recv()
2674 .await
2675 .expect("peer_1 session should receive request with buffered hashes");
2676 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2677
2678 let message: Vec<PooledTransactionVariant> = txs
2679 .into_iter()
2680 .take(1)
2681 .map(|tx| {
2682 PooledTransactionVariant::try_from(tx)
2683 .expect("Failed to convert MockTransaction to PooledTransaction")
2684 })
2685 .collect();
2686 response
2688 .send(Ok(PooledTransactions(message)))
2689 .expect("should send peer_1 response to tx manager");
2690 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2691 unreachable!()
2692 };
2693
2694 assert!(tx_fetcher.is_idle(&peer_id));
2696 assert_eq!(tx_fetcher.active_peers.len(), 0);
2697 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2699 }
2700
2701 #[tokio::test]
2702 async fn test_max_retries_tx_request() {
2703 reth_tracing::init_test_tracing();
2704
2705 let mut tx_manager = new_tx_manager().await.0;
2706 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2707
2708 let peer_id_1 = PeerId::new([1; 64]);
2709 let peer_id_2 = PeerId::new([2; 64]);
2710 let eth_version = EthVersion::Eth66;
2711 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2712
2713 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2714 peer_1.seen_transactions.insert(seen_hashes[0]);
2717 peer_1.seen_transactions.insert(seen_hashes[1]);
2718 tx_manager.peers.insert(peer_id_1, peer_1);
2719
2720 let retries = 1;
2723 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2724 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2725
2726 assert!(tx_fetcher.is_idle(&peer_id_1));
2728 assert_eq!(tx_fetcher.active_peers.len(), 0);
2729
2730 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2732
2733 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2734
2735 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2736 assert!(!tx_fetcher.is_idle(&peer_id_1));
2738 assert_eq!(tx_fetcher.active_peers.len(), 1);
2739
2740 let req = to_mock_session_rx
2742 .recv()
2743 .await
2744 .expect("peer_1 session should receive request with buffered hashes");
2745 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2746 let GetPooledTransactions(hashes) = request;
2747
2748 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2749
2750 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2751
2752 response
2754 .send(Err(RequestError::BadResponse))
2755 .expect("should send peer_1 response to tx manager");
2756 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2757 unreachable!()
2758 };
2759
2760 assert!(tx_fetcher.is_idle(&peer_id));
2762 assert_eq!(tx_fetcher.active_peers.len(), 0);
2763 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2765
2766 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2767 tx_manager.peers.insert(peer_id_2, peer_2);
2768
2769 let msg =
2771 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2772 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2773
2774 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2775
2776 assert_eq!(tx_fetcher.active_peers.len(), 1);
2778
2779 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2781 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2783
2784 let req = to_mock_session_rx
2786 .recv()
2787 .await
2788 .expect("peer_2 session should receive request with buffered hashes");
2789 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2790
2791 response
2793 .send(Err(RequestError::BadResponse))
2794 .expect("should send peer_2 response to tx manager");
2795 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2796
2797 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2800 assert_eq!(tx_fetcher.active_peers.len(), 0);
2801 }
2802
2803 #[test]
2804 fn test_transaction_builder_empty() {
2805 let mut builder =
2806 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2807 assert!(builder.is_empty());
2808
2809 let mut factory = MockTransactionFactory::default();
2810 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2811 builder.push(&tx);
2812 assert!(!builder.is_empty());
2813
2814 let txs = builder.build();
2815 assert!(txs.full.is_none());
2816 let txs = txs.pooled.unwrap();
2817 assert_eq!(txs.len(), 1);
2818 }
2819
2820 #[test]
2821 fn test_transaction_builder_large() {
2822 let mut builder =
2823 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2824 assert!(builder.is_empty());
2825
2826 let mut factory = MockTransactionFactory::default();
2827 let mut tx = factory.create_eip1559();
2828 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2830 let tx = Arc::new(tx);
2831 let tx = PropagateTransaction::pool_tx(tx);
2832 builder.push(&tx);
2833 assert!(!builder.is_empty());
2834
2835 let txs = builder.clone().build();
2836 assert!(txs.pooled.is_none());
2837 let txs = txs.full.unwrap();
2838 assert_eq!(txs.len(), 1);
2839
2840 builder.push(&tx);
2841
2842 let txs = builder.clone().build();
2843 let pooled = txs.pooled.unwrap();
2844 assert_eq!(pooled.len(), 1);
2845 let txs = txs.full.unwrap();
2846 assert_eq!(txs.len(), 1);
2847 }
2848
2849 #[test]
2850 fn test_transaction_builder_eip4844() {
2851 let mut builder =
2852 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2853 assert!(builder.is_empty());
2854
2855 let mut factory = MockTransactionFactory::default();
2856 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2857 builder.push(&tx);
2858 assert!(!builder.is_empty());
2859
2860 let txs = builder.clone().build();
2861 assert!(txs.full.is_none());
2862 let txs = txs.pooled.unwrap();
2863 assert_eq!(txs.len(), 1);
2864
2865 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2866 builder.push(&tx);
2867
2868 let txs = builder.clone().build();
2869 let pooled = txs.pooled.unwrap();
2870 assert_eq!(pooled.len(), 1);
2871 let txs = txs.full.unwrap();
2872 assert_eq!(txs.len(), 1);
2873 }
2874
2875 #[tokio::test]
2876 async fn test_propagate_full() {
2877 reth_tracing::init_test_tracing();
2878
2879 let (mut tx_manager, network) = new_tx_manager().await;
2880 let peer_id = PeerId::random();
2881
2882 network.handle().update_sync_state(SyncState::Idle);
2884
2885 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2887
2888 let session_info = SessionInfo {
2889 peer_id,
2890 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2891 client_version: Arc::from(""),
2892 capabilities: Arc::new(vec![].into()),
2893 status: Arc::new(Default::default()),
2894 version: EthVersion::Eth68,
2895 peer_kind: PeerKind::Basic,
2896 };
2897 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2898 tx_manager
2899 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2900 let mut propagate = vec![];
2901 let mut factory = MockTransactionFactory::default();
2902 let eip1559_tx = Arc::new(factory.create_eip1559());
2903 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2904 let eip4844_tx = Arc::new(factory.create_eip4844());
2905 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2906
2907 let propagated =
2908 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2909 assert_eq!(propagated.0.len(), 2);
2910 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2911 assert_eq!(prop_txs.len(), 1);
2912 assert!(prop_txs[0].is_full());
2913
2914 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2915 assert_eq!(prop_txs.len(), 1);
2916 assert!(prop_txs[0].is_hash());
2917
2918 let peer = tx_manager.peers.get(&peer_id).unwrap();
2919 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2920 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2921 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2922
2923 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2925 assert!(propagated.0.is_empty());
2926 }
2927
2928 #[tokio::test]
2929 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2930 reth_tracing::init_test_tracing();
2931
2932 let transactions_manager_config = TransactionsManagerConfig::default();
2933
2934 let propagation_policy = TransactionPropagationKind::default();
2935 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2936
2937 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2938
2939 let pool = testing_pool();
2940 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2941 let client = NoopProvider::default();
2942
2943 let network_config = NetworkConfigBuilder::new(secret_key)
2944 .listener_port(0)
2945 .disable_discovery()
2946 .build(client.clone());
2947
2948 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2949 let (to_tx_manager_tx, from_network_rx) =
2950 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2951 network_manager.set_transactions(to_tx_manager_tx);
2952 let network_handle = network_manager.handle().clone();
2953 let network_service_handle = tokio::spawn(network_manager);
2954
2955 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2956 network_handle.clone(),
2957 pool.clone(),
2958 from_network_rx,
2959 transactions_manager_config,
2960 policy_bundle,
2961 );
2962
2963 let peer_id = PeerId::random();
2964 let eth_version = EthVersion::Eth68;
2965 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2966 tx_manager.peers.insert(peer_id, mock_peer_metadata);
2967
2968 let mut tx_factory = MockTransactionFactory::default();
2969
2970 let valid_known_tx = tx_factory.create_eip1559();
2971 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2972
2973 let known_tx_hash = *known_tx_signed.hash();
2974 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2975 let known_tx_size = known_tx_signed.encoded_length();
2976
2977 let unknown_tx_hash = B256::random();
2978 let unknown_tx_type_byte = 0xff_u8;
2979 let unknown_tx_size = 150;
2980
2981 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2982 types: vec![known_tx_type_byte, unknown_tx_type_byte],
2983 sizes: vec![known_tx_size, unknown_tx_size],
2984 hashes: vec![known_tx_hash, unknown_tx_hash],
2985 });
2986
2987 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2988
2989 poll_fn(|cx| {
2990 let _ = tx_manager.poll_unpin(cx);
2991 Poll::Ready(())
2992 })
2993 .await;
2994
2995 let mut requested_hashes_in_getpooled = HashSet::new();
2996 let mut unexpected_request_received = false;
2997
2998 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2999 .await
3000 {
3001 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3002 let GetPooledTransactions(hashes) = request;
3003 for hash in hashes {
3004 requested_hashes_in_getpooled.insert(hash);
3005 }
3006 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3007 }
3008 Ok(Some(other_request)) => {
3009 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3010 unexpected_request_received = true;
3011 }
3012 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3013 Err(_timeout_err) => {
3014 tracing::info!("Timeout: No GetPooledTransactions request received.")
3015 }
3016 }
3017
3018 assert!(
3019 requested_hashes_in_getpooled.contains(&known_tx_hash),
3020 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3021 );
3022 assert!(
3023 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3024 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3025 );
3026 assert!(
3027 !unexpected_request_received,
3028 "An unexpected P2P request was received by the mock peer."
3029 );
3030
3031 network_service_handle.abort();
3032 }
3033}