1use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6pub mod config;
8pub mod constants;
10pub mod fetcher;
12pub mod policy;
14
15pub use self::constants::{
16 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30 budget::{
31 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33 },
34 cache::LruCache,
35 duration_metered_exec, metered_poll_nested_stream_with_budget,
36 metrics::{
37 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38 },
39 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40 NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49 RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54 events::{PeerEvent, SessionInfo},
55 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58 error::{RequestError, RequestResult},
59 sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66 error::{PoolError, PoolResult},
67 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71 collections::{hash_map::Entry, HashMap, HashSet},
72 pin::Pin,
73 sync::{
74 atomic::{AtomicUsize, Ordering},
75 Arc,
76 },
77 task::{Context, Poll},
78 time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84pub type PoolImportFuture =
88 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104 fn send(&self, cmd: TransactionsCommand<N>) {
105 let _ = self.manager_tx.send(cmd);
106 }
107
108 async fn peer_handle(
110 &self,
111 peer_id: PeerId,
112 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113 let (tx, rx) = oneshot::channel();
114 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115 rx.await
116 }
117
118 pub fn propagate(&self, hash: TxHash) {
120 self.send(TransactionsCommand::PropagateHash(hash))
121 }
122
123 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127 self.propagate_hashes_to(Some(hash), peer)
128 }
129
130 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134 let hashes = hash.into_iter().collect::<Vec<_>>();
135 if hashes.is_empty() {
136 return
137 }
138 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139 }
140
141 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143 let (tx, rx) = oneshot::channel();
144 self.send(TransactionsCommand::GetActivePeers(tx));
145 rx.await
146 }
147
148 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152 if transactions.is_empty() {
153 return
154 }
155 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156 }
157
158 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163 if transactions.is_empty() {
164 return
165 }
166 self.send(TransactionsCommand::PropagateTransactions(transactions))
167 }
168
169 pub fn broadcast_transactions(
174 &self,
175 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176 ) {
177 let transactions =
178 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179 if transactions.is_empty() {
180 return
181 }
182 self.send(TransactionsCommand::BroadcastTransactions(transactions))
183 }
184
185 pub async fn get_transaction_hashes(
187 &self,
188 peers: Vec<PeerId>,
189 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190 if peers.is_empty() {
191 return Ok(Default::default())
192 }
193 let (tx, rx) = oneshot::channel();
194 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195 rx.await
196 }
197
198 pub async fn get_peer_transaction_hashes(
200 &self,
201 peer: PeerId,
202 ) -> Result<HashSet<TxHash>, RecvError> {
203 let res = self.get_transaction_hashes(vec![peer]).await?;
204 Ok(res.into_values().next().unwrap_or_default())
205 }
206
207 pub async fn get_pooled_transactions_from(
213 &self,
214 peer_id: PeerId,
215 hashes: Vec<B256>,
216 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219 let (tx, rx) = oneshot::channel();
220 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221 peer.try_send(request).ok();
222
223 rx.await?.map(|res| Some(res.0))
224 }
225}
226
227#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284 pool: Pool,
286 network: NetworkHandle<N>,
288 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292 transaction_fetcher: TransactionFetcher<N>,
294 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299 pool_imports: FuturesUnordered<PoolImportFuture>,
311 pending_pool_imports_info: PendingPoolImportsInfo,
313 bad_imports: LruCache<TxHash>,
315 peers: HashMap<PeerId, PeerMetadata<N>>,
317 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326 pending_transactions: mpsc::Receiver<TxHash>,
335 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337 config: TransactionsManagerConfig,
339 policies: NetworkPolicies<N>,
341 metrics: TransactionsManagerMetrics,
343 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348 pub fn new(
352 network: NetworkHandle<N>,
353 pool: Pool,
354 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355 transactions_manager_config: TransactionsManagerConfig,
356 ) -> Self {
357 Self::with_policy(
358 network,
359 pool,
360 from_network,
361 transactions_manager_config,
362 NetworkPolicies::new(
363 TransactionPropagationKind::default(),
364 StrictEthAnnouncementFilter::default(),
365 ),
366 )
367 }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371 pub fn with_policy(
375 network: NetworkHandle<N>,
376 pool: Pool,
377 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378 transactions_manager_config: TransactionsManagerConfig,
379 policies: NetworkPolicies<N>,
380 ) -> Self {
381 let network_events = network.event_listener();
382
383 let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386 &transactions_manager_config.transaction_fetcher_config,
387 );
388
389 let pending = pool.pending_transactions_listener();
392 let pending_pool_imports_info = PendingPoolImportsInfo::default();
393 let metrics = TransactionsManagerMetrics::default();
394 metrics
395 .capacity_pending_pool_imports
396 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398 Self {
399 pool,
400 network,
401 network_events,
402 transaction_fetcher,
403 transactions_by_peers: Default::default(),
404 pool_imports: Default::default(),
405 pending_pool_imports_info: PendingPoolImportsInfo::new(
406 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
407 ),
408 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409 peers: Default::default(),
410 command_tx,
411 command_rx: UnboundedReceiverStream::new(command_rx),
412 pending_transactions: pending,
413 transaction_events: UnboundedMeteredReceiver::new(
414 from_network,
415 NETWORK_POOL_TRANSACTIONS_SCOPE,
416 ),
417 config: transactions_manager_config,
418 policies,
419 metrics,
420 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421 }
422 }
423
424 pub fn handle(&self) -> TransactionsHandle<N> {
426 TransactionsHandle { manager_tx: self.command_tx.clone() }
427 }
428
429 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432 self.has_capacity_for_pending_pool_imports() &&
433 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434 }
435
436 fn has_capacity_for_pending_pool_imports(&self) -> bool {
438 self.remaining_pool_import_capacity() > 0
439 }
440
441 fn remaining_pool_import_capacity(&self) -> usize {
443 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445 )
446 }
447
448 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450 self.metrics.reported_bad_transactions.increment(1);
451 }
452
453 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455 self.network.reputation_change(peer_id, kind);
456 }
457
458 fn report_already_seen(&self, peer_id: PeerId) {
459 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461 }
462
463 fn on_good_import(&mut self, hash: TxHash) {
465 self.transactions_by_peers.remove(&hash);
466 }
467
468 fn on_bad_import(&mut self, err: PoolError) {
492 let peers = self.transactions_by_peers.remove(&err.hash);
493
494 if !err.is_bad_transaction() || self.network.is_syncing() {
496 return
497 }
498 if let Some(peers) = peers {
501 for peer_id in peers {
502 self.report_peer_bad_transactions(peer_id);
503 }
504 }
505 self.metrics.bad_imports.increment(1);
506 self.bad_imports.insert(err.hash);
507 }
508
509 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
513 let info = &self.pending_pool_imports_info;
515 let max_pending_pool_imports = info.max_pending_pool_imports;
516 let has_capacity_wrt_pending_pool_imports =
517 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
518
519 self.transaction_fetcher
520 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
521 }
522
523 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
524 let kind = match req_err {
525 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
526 RequestError::Timeout => ReputationChangeKind::Timeout,
527 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
528 return
530 }
531 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
532 };
533 self.report_peer(peer_id, kind);
534 }
535
536 #[inline]
537 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
538 let metrics = &self.metrics;
539
540 let TxManagerPollDurations {
541 acc_network_events,
542 acc_pending_imports,
543 acc_tx_events,
544 acc_imported_txns,
545 acc_fetch_events,
546 acc_pending_fetch,
547 acc_cmds,
548 } = poll_durations;
549
550 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
552 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
554 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
555 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
556 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
557 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
558 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
559 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
560 }
561}
562
563impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
564 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
566 for res in batch_results {
567 match res {
568 Ok(AddedTransactionOutcome { hash, .. }) => {
569 self.on_good_import(hash);
570 }
571 Err(err) => {
572 self.on_bad_import(err);
573 }
574 }
575 }
576 }
577
578 fn on_new_pooled_transaction_hashes(
580 &mut self,
581 peer_id: PeerId,
582 msg: NewPooledTransactionHashes,
583 ) {
584 if self.network.is_initially_syncing() {
586 return
587 }
588 if self.network.tx_gossip_disabled() {
589 return
590 }
591
592 let Some(peer) = self.peers.get_mut(&peer_id) else {
594 trace!(
595 peer_id = format!("{peer_id:#}"),
596 ?msg,
597 "discarding announcement from inactive peer"
598 );
599
600 return
601 };
602 let client = peer.client_version.clone();
603
604 let mut count_txns_already_seen_by_peer = 0;
606 for tx in msg.iter_hashes().copied() {
607 if !peer.seen_transactions.insert(tx) {
608 count_txns_already_seen_by_peer += 1;
609 }
610 }
611 if count_txns_already_seen_by_peer > 0 {
612 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
617 self.metrics
618 .occurrences_hash_already_seen_by_peer
619 .increment(count_txns_already_seen_by_peer);
620
621 trace!(target: "net::tx",
622 %count_txns_already_seen_by_peer,
623 peer_id=format!("{peer_id:#}"),
624 ?client,
625 "Peer sent hashes that have already been marked as seen by peer"
626 );
627
628 self.report_already_seen(peer_id);
629 }
630
631 if msg.is_empty() {
633 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
634 return;
635 }
636
637 let original_len = msg.len();
638 let mut partially_valid_msg = msg.dedup();
639
640 if partially_valid_msg.len() != original_len {
641 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
642 }
643
644 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
646
647 let mut should_report_peer = false;
654 let mut tx_types_counter = TxTypesCounter::default();
655
656 let is_eth68_message = partially_valid_msg
657 .msg_version()
658 .expect("partially valid announcement should have a version")
659 .is_eth68();
660
661 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
662 let (ty_byte, size_val) = match *metadata_ref_mut {
663 Some((ty, size)) => {
664 if !is_eth68_message {
665 should_report_peer = true;
666 }
667 (ty, size)
668 }
669 None => {
670 if is_eth68_message {
671 should_report_peer = true;
672 return false;
673 }
674 (0u8, 0)
675 }
676 };
677
678 if is_eth68_message &&
679 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
680 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
681 {
682 tx_types_counter.increase_by_tx_type(parsed_tx_type);
683 }
684
685 let decision = self
686 .policies
687 .announcement_filter()
688 .decide_on_announcement(ty_byte, tx_hash, size_val);
689
690 match decision {
691 AnnouncementAcceptance::Accept => true,
692 AnnouncementAcceptance::Ignore => false,
693 AnnouncementAcceptance::Reject { penalize_peer } => {
694 if penalize_peer {
695 should_report_peer = true;
696 }
697 false
698 }
699 }
700 });
701
702 if is_eth68_message {
703 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
704 }
705
706 if should_report_peer {
707 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
708 }
709
710 let hashes_count_pre_pool_filter = partially_valid_msg.len();
718 self.pool.retain_unknown(&mut partially_valid_msg);
719 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
720 let already_known_hashes_count =
721 hashes_count_pre_pool_filter - partially_valid_msg.len();
722 self.metrics
723 .occurrences_hashes_already_in_pool
724 .increment(already_known_hashes_count as u64);
725 }
726
727 if partially_valid_msg.is_empty() {
728 return
730 }
731
732 let mut valid_announcement_data =
733 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
734
735 if valid_announcement_data.is_empty() {
736 return
738 }
739
740 let bad_imports = &self.bad_imports;
747 self.transaction_fetcher.filter_unseen_and_pending_hashes(
748 &mut valid_announcement_data,
749 |hash| bad_imports.contains(hash),
750 &peer_id,
751 &client,
752 );
753
754 if valid_announcement_data.is_empty() {
755 return
757 }
758
759 trace!(target: "net::tx::propagation",
760 peer_id=format!("{peer_id:#}"),
761 hashes_len=valid_announcement_data.len(),
762 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
763 msg_version=%valid_announcement_data.msg_version(),
764 client_version=%client,
765 "received previously unseen and pending hashes in announcement from peer"
766 );
767
768 if !self.transaction_fetcher.is_idle(&peer_id) {
771 let msg_version = valid_announcement_data.msg_version();
773 let (hashes, _version) = valid_announcement_data.into_request_hashes();
774
775 trace!(target: "net::tx",
776 peer_id=format!("{peer_id:#}"),
777 hashes=?*hashes,
778 %msg_version,
779 %client,
780 "buffering hashes announced by busy peer"
781 );
782
783 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
784
785 return
786 }
787
788 let mut hashes_to_request =
789 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
790 let surplus_hashes =
791 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
792
793 if !surplus_hashes.is_empty() {
794 trace!(target: "net::tx",
795 peer_id=format!("{peer_id:#}"),
796 surplus_hashes=?*surplus_hashes,
797 %client,
798 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
799 );
800
801 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
802 }
803
804 trace!(target: "net::tx",
805 peer_id=format!("{peer_id:#}"),
806 hashes=?*hashes_to_request,
807 %client,
808 "sending hashes in `GetPooledTransactions` request to peer's session"
809 );
810
811 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
815 if let Some(failed_to_request_hashes) =
816 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
817 {
818 let conn_eth_version = peer.version;
819
820 trace!(target: "net::tx",
821 peer_id=format!("{peer_id:#}"),
822 failed_to_request_hashes=?*failed_to_request_hashes,
823 %conn_eth_version,
824 %client,
825 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
826 );
827 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
828 }
829 }
830}
831
832impl<Pool, N> TransactionsManager<Pool, N>
833where
834 Pool: TransactionPool + Unpin + 'static,
835 N: NetworkPrimitives<
836 BroadcastedTransaction: SignedTransaction,
837 PooledTransaction: SignedTransaction,
838 > + Unpin,
839 Pool::Transaction:
840 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
841{
842 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
854 if self.network.tx_gossip_disabled() {
858 return
859 }
860
861 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
862
863 self.propagate_all(hashes);
864 }
865
866 fn propagate_full_transactions_to_peer(
870 &mut self,
871 txs: Vec<TxHash>,
872 peer_id: PeerId,
873 propagation_mode: PropagationMode,
874 ) -> Option<PropagatedTransactions> {
875 let peer = self.peers.get_mut(&peer_id)?;
876 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
877 let mut propagated = PropagatedTransactions::default();
878
879 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
881
882 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
883
884 if propagation_mode.is_forced() {
885 full_transactions.extend(to_propagate);
887 } else {
888 for tx in to_propagate {
891 if !peer.seen_transactions.contains(tx.tx_hash()) {
892 full_transactions.push(&tx);
894 }
895 }
896 }
897
898 if full_transactions.is_empty() {
899 return None
901 }
902
903 let PropagateTransactions { pooled, full } = full_transactions.build();
904
905 if let Some(new_pooled_hashes) = pooled {
907 for hash in new_pooled_hashes.iter_hashes().copied() {
908 propagated.record(hash, PropagateKind::Hash(peer_id));
909 peer.seen_transactions.insert(hash);
911 }
912
913 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
915 }
916
917 if let Some(new_full_transactions) = full {
919 for tx in &new_full_transactions {
920 propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
921 peer.seen_transactions.insert(*tx.tx_hash());
923 }
924
925 self.network.send_transactions(peer_id, new_full_transactions);
927 }
928
929 self.metrics.propagated_transactions.increment(propagated.len() as u64);
931
932 Some(propagated)
933 }
934
935 fn propagate_hashes_to(
939 &mut self,
940 hashes: Vec<TxHash>,
941 peer_id: PeerId,
942 propagation_mode: PropagationMode,
943 ) {
944 trace!(target: "net::tx", "Start propagating transactions as hashes");
945
946 let propagated = {
949 let Some(peer) = self.peers.get_mut(&peer_id) else {
950 return
952 };
953
954 let to_propagate =
955 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
956
957 let mut propagated = PropagatedTransactions::default();
958
959 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
961
962 if propagation_mode.is_forced() {
963 hashes.extend(to_propagate)
964 } else {
965 for tx in to_propagate {
966 if !peer.seen_transactions.contains(tx.tx_hash()) {
967 hashes.push(&tx);
969 }
970 }
971 }
972
973 let new_pooled_hashes = hashes.build();
974
975 if new_pooled_hashes.is_empty() {
976 return
978 }
979
980 if let Some(peer) = self.peers.get_mut(&peer_id) {
981 for hash in new_pooled_hashes.iter_hashes().copied() {
982 propagated.record(hash, PropagateKind::Hash(peer_id));
983 peer.seen_transactions.insert(hash);
984 }
985 }
986
987 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
988
989 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
991
992 self.metrics.propagated_transactions.increment(propagated.len() as u64);
994
995 propagated
996 };
997
998 self.pool.on_propagated(propagated);
1000 }
1001
1002 fn propagate_transactions(
1009 &mut self,
1010 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1011 propagation_mode: PropagationMode,
1012 ) -> PropagatedTransactions {
1013 let mut propagated = PropagatedTransactions::default();
1014 if self.network.tx_gossip_disabled() {
1015 return propagated
1016 }
1017
1018 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1020
1021 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1023 if !self.policies.propagation_policy().can_propagate(peer) {
1024 continue
1026 }
1027 let mut builder = if peer_idx > max_num_full {
1029 PropagateTransactionsBuilder::pooled(peer.version)
1030 } else {
1031 PropagateTransactionsBuilder::full(peer.version)
1032 };
1033
1034 if propagation_mode.is_forced() {
1035 builder.extend(to_propagate.iter());
1036 } else {
1037 for tx in &to_propagate {
1041 if !peer.seen_transactions.contains(tx.tx_hash()) {
1044 builder.push(tx);
1045 }
1046 }
1047 }
1048
1049 if builder.is_empty() {
1050 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1051 continue
1052 }
1053
1054 let PropagateTransactions { pooled, full } = builder.build();
1055
1056 if let Some(mut new_pooled_hashes) = pooled {
1058 new_pooled_hashes
1061 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1062
1063 for hash in new_pooled_hashes.iter_hashes().copied() {
1064 propagated.record(hash, PropagateKind::Hash(*peer_id));
1065 peer.seen_transactions.insert(hash);
1067 }
1068
1069 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1070
1071 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1073 }
1074
1075 if let Some(new_full_transactions) = full {
1077 for tx in &new_full_transactions {
1078 propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1079 peer.seen_transactions.insert(*tx.tx_hash());
1081 }
1082
1083 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1084
1085 self.network.send_transactions(*peer_id, new_full_transactions);
1087 }
1088 }
1089
1090 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1092
1093 propagated
1094 }
1095
1096 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1101 if self.peers.is_empty() {
1102 return
1104 }
1105 let propagated = self.propagate_transactions(
1106 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1107 PropagationMode::Basic,
1108 );
1109
1110 self.pool.on_propagated(propagated);
1112 }
1113
1114 fn on_get_pooled_transactions(
1116 &mut self,
1117 peer_id: PeerId,
1118 request: GetPooledTransactions,
1119 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1120 ) {
1121 if self.network.tx_gossip_disabled() {
1123 let _ = response.send(Ok(PooledTransactions::default()));
1124 return
1125 }
1126 if let Some(peer) = self.peers.get_mut(&peer_id) {
1127 let transactions = self.pool.get_pooled_transaction_elements(
1128 request.0,
1129 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1130 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1131 ),
1132 );
1133 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1134
1135 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1138
1139 let resp = PooledTransactions(transactions);
1140 let _ = response.send(Ok(resp));
1141 }
1142 }
1143
1144 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1146 match cmd {
1147 TransactionsCommand::PropagateHash(hash) => {
1148 self.on_new_pending_transactions(vec![hash])
1149 }
1150 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1151 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1152 }
1153 TransactionsCommand::GetActivePeers(tx) => {
1154 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1155 tx.send(peers).ok();
1156 }
1157 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1158 if let Some(propagated) =
1159 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1160 {
1161 self.pool.on_propagated(propagated);
1162 }
1163 }
1164 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1165 TransactionsCommand::BroadcastTransactions(txs) => {
1166 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1167 self.pool.on_propagated(propagated);
1168 }
1169 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1170 let mut res = HashMap::with_capacity(peers.len());
1171 for peer_id in peers {
1172 let hashes = self
1173 .peers
1174 .get(&peer_id)
1175 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1176 .unwrap_or_default();
1177 res.insert(peer_id, hashes);
1178 }
1179 tx.send(res).ok();
1180 }
1181 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1182 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1183 peer_request_sender.send(sender).ok();
1184 }
1185 }
1186 }
1187
1188 fn handle_peer_session(
1192 &mut self,
1193 info: SessionInfo,
1194 messages: PeerRequestSender<PeerRequest<N>>,
1195 ) {
1196 let SessionInfo { peer_id, client_version, version, .. } = info;
1197
1198 let peer = PeerMetadata::<N>::new(
1200 messages,
1201 version,
1202 client_version,
1203 self.config.max_transactions_seen_by_peer_history,
1204 info.peer_kind,
1205 );
1206 let peer = match self.peers.entry(peer_id) {
1207 Entry::Occupied(mut entry) => {
1208 entry.insert(peer);
1209 entry.into_mut()
1210 }
1211 Entry::Vacant(entry) => entry.insert(peer),
1212 };
1213
1214 self.policies.propagation_policy_mut().on_session_established(peer);
1215
1216 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1220 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1221 return
1222 }
1223
1224 let pooled_txs = self.pool.pooled_transactions_max(
1226 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1227 );
1228 if pooled_txs.is_empty() {
1229 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1230 return;
1231 }
1232
1233 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1235 for pooled_tx in pooled_txs {
1236 peer.seen_transactions.insert(*pooled_tx.hash());
1237 msg_builder.push_pooled(pooled_tx);
1238 }
1239
1240 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1241 let msg = msg_builder.build();
1242 self.network.send_transactions_hashes(peer_id, msg);
1243 }
1244
1245 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1247 match event_result {
1248 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1249 let peer = self.peers.remove(&peer_id);
1252 if let Some(mut peer) = peer {
1253 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1254 }
1255 self.transaction_fetcher.remove_peer(&peer_id);
1256 }
1257 NetworkEvent::ActivePeerSession { info, messages } => {
1258 self.handle_peer_session(info, messages);
1260 }
1261 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1262 let peer_id = info.peer_id;
1263 let messages = match self.peers.get(&peer_id) {
1265 Some(p) => p.request_tx.clone(),
1266 None => {
1267 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1268 return;
1269 }
1270 };
1271 self.handle_peer_session(info, messages);
1272 }
1273 _ => {}
1274 }
1275 }
1276
1277 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1279 if self.config.ingress_policy.allows_all() {
1280 return true;
1281 }
1282 let Some(peer) = self.peers.get(peer_id) else {
1283 return false;
1284 };
1285 self.config.ingress_policy.allows(peer.peer_kind())
1286 }
1287
1288 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1290 match event {
1291 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1292 if !self.accepts_incoming_from(&peer_id) {
1293 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1294 return;
1295 }
1296
1297 let has_blob_txs = msg.has_eip4844();
1301
1302 let non_blob_txs = msg
1303 .into_iter()
1304 .map(N::PooledTransaction::try_from)
1305 .filter_map(Result::ok)
1306 .collect();
1307
1308 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1309
1310 if has_blob_txs {
1311 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1312 self.report_peer_bad_transactions(peer_id);
1313 }
1314 }
1315 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1316 if !self.accepts_incoming_from(&peer_id) {
1317 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1318 return;
1319 }
1320 self.on_new_pooled_transaction_hashes(peer_id, msg)
1321 }
1322 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1323 self.on_get_pooled_transactions(peer_id, request, response)
1324 }
1325 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1326 let _ = response.send(Some(self.handle()));
1327 }
1328 }
1329 }
1330
1331 fn import_transactions(
1333 &mut self,
1334 peer_id: PeerId,
1335 transactions: PooledTransactions<N::PooledTransaction>,
1336 source: TransactionSource,
1337 ) {
1338 if self.network.is_initially_syncing() {
1340 return
1341 }
1342 if self.network.tx_gossip_disabled() {
1343 return
1344 }
1345
1346 if !self.has_capacity_for_pending_pool_imports() {
1348 return
1349 }
1350
1351 let mut transactions = transactions.0;
1352
1353 let capacity = self.remaining_pool_import_capacity();
1357 if transactions.len() > capacity {
1358 let skipped = transactions.len() - capacity;
1359 transactions.truncate(capacity);
1360 self.metrics
1361 .skipped_transactions_pending_pool_imports_at_capacity
1362 .increment(skipped as u64);
1363 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1364 }
1365
1366 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1367 let client_version = peer.client_version.clone();
1368
1369 let start = Instant::now();
1370
1371 self.transaction_fetcher
1373 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1374
1375 let mut num_already_seen_by_peer = 0;
1380 for tx in &transactions {
1381 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1382 num_already_seen_by_peer += 1;
1383 }
1384 }
1385
1386 let mut has_bad_transactions = false;
1388
1389 transactions.retain(|tx| {
1392 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1393 entry.get_mut().insert(peer_id);
1394 return false
1395 }
1396 if self.bad_imports.contains(tx.tx_hash()) {
1397 trace!(target: "net::tx",
1398 peer_id=format!("{peer_id:#}"),
1399 hash=%tx.tx_hash(),
1400 %client_version,
1401 "received a known bad transaction from peer"
1402 );
1403 has_bad_transactions = true;
1404 return false;
1405 }
1406 true
1407 });
1408
1409 let txns_count_pre_pool_filter = transactions.len();
1411 self.pool.retain_unknown(&mut transactions);
1412 if txns_count_pre_pool_filter > transactions.len() {
1413 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1414 self.metrics
1415 .occurrences_transactions_already_in_pool
1416 .increment(already_known_txns_count as u64);
1417 }
1418
1419 let txs_len = transactions.len();
1420
1421 let new_txs = transactions
1422 .into_par_iter()
1423 .filter_map(|tx| match tx.try_into_recovered() {
1424 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1425 Err(badtx) => {
1426 trace!(target: "net::tx",
1427 peer_id=format!("{peer_id:#}"),
1428 hash=%badtx.tx_hash(),
1429 client_version=%client_version,
1430 "failed ecrecovery for transaction"
1431 );
1432 None
1433 }
1434 })
1435 .collect::<Vec<_>>();
1436
1437 has_bad_transactions |= new_txs.len() != txs_len;
1438
1439 for tx in &new_txs {
1441 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1442 }
1443
1444 if !new_txs.is_empty() {
1447 let pool = self.pool.clone();
1448 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1450 metric_pending_pool_imports.increment(new_txs.len() as f64);
1451
1452 self.pending_pool_imports_info
1454 .pending_pool_imports
1455 .fetch_add(new_txs.len(), Ordering::Relaxed);
1456 let tx_manager_info_pending_pool_imports =
1457 self.pending_pool_imports_info.pending_pool_imports.clone();
1458
1459 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1460 let import = Box::pin(async move {
1461 let added = new_txs.len();
1462 let res = pool.add_external_transactions(new_txs).await;
1463
1464 metric_pending_pool_imports.decrement(added as f64);
1466 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1468
1469 res
1470 });
1471
1472 self.pool_imports.push(import);
1473 }
1474
1475 if num_already_seen_by_peer > 0 {
1476 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1477 self.metrics
1478 .occurrences_of_transaction_already_seen_by_peer
1479 .increment(num_already_seen_by_peer);
1480 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1481 }
1482
1483 if has_bad_transactions {
1484 self.report_peer_bad_transactions(peer_id)
1486 }
1487
1488 if num_already_seen_by_peer > 0 {
1489 self.report_already_seen(peer_id);
1490 }
1491
1492 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1493 }
1494
1495 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1497 match fetch_event {
1498 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1499 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1500 if report_peer {
1501 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1502 }
1503 }
1504 FetchEvent::FetchError { peer_id, error } => {
1505 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1506 self.on_request_error(peer_id, error);
1507 }
1508 FetchEvent::EmptyResponse { peer_id } => {
1509 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1510 }
1511 }
1512 }
1513}
1514
1515impl<
1523 Pool: TransactionPool + Unpin + 'static,
1524 N: NetworkPrimitives<
1525 BroadcastedTransaction: SignedTransaction,
1526 PooledTransaction: SignedTransaction,
1527 > + Unpin,
1528 > Future for TransactionsManager<Pool, N>
1529where
1530 Pool::Transaction:
1531 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1532{
1533 type Output = ();
1534
1535 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1536 let start = Instant::now();
1537 let mut poll_durations = TxManagerPollDurations::default();
1538
1539 let this = self.get_mut();
1540
1541 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1547 poll_durations.acc_network_events,
1548 "net::tx",
1549 "Network events stream",
1550 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1551 this.network_events.poll_next_unpin(cx),
1552 |event| this.on_network_event(event)
1553 );
1554
1555 let mut new_txs = Vec::new();
1564 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1565 cx,
1566 &mut new_txs,
1567 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1568 ) {
1569 Poll::Ready(count) => {
1570 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1571 true
1574 } else {
1575 let limit =
1579 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1580 new_txs.len();
1581 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1582 }
1583 }
1584 Poll::Pending => false,
1585 };
1586 if !new_txs.is_empty() {
1587 this.on_new_pending_transactions(new_txs);
1588 }
1589
1590 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1605 poll_durations.acc_tx_events,
1606 "net::tx",
1607 "Network transaction events stream",
1608 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1609 this.transaction_events.poll_next_unpin(cx),
1610 |event| this.on_network_tx_event(event),
1611 );
1612
1613 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1624 poll_durations.acc_fetch_events,
1625 "net::tx",
1626 "Transaction fetch events stream",
1627 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1628 this.transaction_fetcher.poll_next_unpin(cx),
1629 |event| this.on_fetch_event(event),
1630 );
1631
1632 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1647 poll_durations.acc_pending_imports,
1648 "net::tx",
1649 "Batched pool imports stream",
1650 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1651 this.pool_imports.poll_next_unpin(cx),
1652 |batch_results| this.on_batch_import_result(batch_results)
1653 );
1654
1655 duration_metered_exec!(
1660 {
1661 if this.has_capacity_for_fetching_pending_hashes() &&
1662 this.on_fetch_hashes_pending_fetch()
1663 {
1664 maybe_more_tx_fetch_events = true;
1665 }
1666 },
1667 poll_durations.acc_pending_fetch
1668 );
1669
1670 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1672 poll_durations.acc_cmds,
1673 "net::tx",
1674 "Commands channel",
1675 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1676 this.command_rx.poll_next_unpin(cx),
1677 |cmd| this.on_command(cmd)
1678 );
1679
1680 this.transaction_fetcher.update_metrics();
1681
1682 if maybe_more_network_events ||
1684 maybe_more_commands ||
1685 maybe_more_tx_events ||
1686 maybe_more_tx_fetch_events ||
1687 maybe_more_pool_imports ||
1688 maybe_more_pending_txns
1689 {
1690 cx.waker().wake_by_ref();
1692 return Poll::Pending
1693 }
1694
1695 this.update_poll_metrics(start, poll_durations);
1696
1697 Poll::Pending
1698 }
1699}
1700
1701#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1705enum PropagationMode {
1706 Basic,
1710 Forced,
1715}
1716
1717impl PropagationMode {
1718 const fn is_forced(self) -> bool {
1720 matches!(self, Self::Forced)
1721 }
1722}
1723
1724#[derive(Debug, Clone)]
1726struct PropagateTransaction<T = TransactionSigned> {
1727 size: usize,
1728 transaction: Arc<T>,
1729}
1730
1731impl<T: SignedTransaction> PropagateTransaction<T> {
1732 pub fn new(transaction: T) -> Self {
1734 let size = transaction.length();
1735 Self { size, transaction: Arc::new(transaction) }
1736 }
1737
1738 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1740 where
1741 P: PoolTransaction<Consensus = T>,
1742 {
1743 let size = tx.encoded_length();
1744 let transaction = tx.transaction.clone_into_consensus();
1745 let transaction = Arc::new(transaction.into_inner());
1746 Self { size, transaction }
1747 }
1748
1749 fn tx_hash(&self) -> &TxHash {
1750 self.transaction.tx_hash()
1751 }
1752}
1753
1754#[derive(Debug, Clone)]
1757enum PropagateTransactionsBuilder<T> {
1758 Pooled(PooledTransactionsHashesBuilder),
1759 Full(FullTransactionsBuilder<T>),
1760}
1761
1762impl<T> PropagateTransactionsBuilder<T> {
1763 fn pooled(version: EthVersion) -> Self {
1765 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1766 }
1767
1768 fn full(version: EthVersion) -> Self {
1770 Self::Full(FullTransactionsBuilder::new(version))
1771 }
1772
1773 fn is_empty(&self) -> bool {
1775 match self {
1776 Self::Pooled(builder) => builder.is_empty(),
1777 Self::Full(builder) => builder.is_empty(),
1778 }
1779 }
1780
1781 fn build(self) -> PropagateTransactions<T> {
1783 match self {
1784 Self::Pooled(pooled) => {
1785 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1786 }
1787 Self::Full(full) => full.build(),
1788 }
1789 }
1790}
1791
1792impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1793 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1795 for tx in txs {
1796 self.push(tx);
1797 }
1798 }
1799
1800 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1802 match self {
1803 Self::Pooled(builder) => builder.push(transaction),
1804 Self::Full(builder) => builder.push(transaction),
1805 }
1806 }
1807}
1808
1809struct PropagateTransactions<T> {
1811 pooled: Option<NewPooledTransactionHashes>,
1813 full: Option<Vec<Arc<T>>>,
1815}
1816
1817#[derive(Debug, Clone)]
1822struct FullTransactionsBuilder<T> {
1823 total_size: usize,
1825 transactions: Vec<Arc<T>>,
1827 pooled: PooledTransactionsHashesBuilder,
1829}
1830
1831impl<T> FullTransactionsBuilder<T> {
1832 fn new(version: EthVersion) -> Self {
1834 Self {
1835 total_size: 0,
1836 pooled: PooledTransactionsHashesBuilder::new(version),
1837 transactions: vec![],
1838 }
1839 }
1840
1841 fn is_empty(&self) -> bool {
1843 self.transactions.is_empty() && self.pooled.is_empty()
1844 }
1845
1846 fn build(self) -> PropagateTransactions<T> {
1848 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1849 let full = Some(self.transactions).filter(|full| !full.is_empty());
1850 PropagateTransactions { pooled, full }
1851 }
1852}
1853
1854impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1855 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1857 for tx in txs {
1858 self.push(&tx)
1859 }
1860 }
1861
1862 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1872 if !transaction.transaction.is_broadcastable_in_full() {
1881 self.pooled.push(transaction);
1882 return
1883 }
1884
1885 let new_size = self.total_size + transaction.size;
1886 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1887 self.total_size > 0
1888 {
1889 self.pooled.push(transaction);
1891 return
1892 }
1893
1894 self.total_size = new_size;
1895 self.transactions.push(Arc::clone(&transaction.transaction));
1896 }
1897}
1898
1899#[derive(Debug, Clone)]
1902enum PooledTransactionsHashesBuilder {
1903 Eth66(NewPooledTransactionHashes66),
1904 Eth68(NewPooledTransactionHashes68),
1905}
1906
1907impl PooledTransactionsHashesBuilder {
1910 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1912 match self {
1913 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1914 Self::Eth68(msg) => {
1915 msg.hashes.push(*pooled_tx.hash());
1916 msg.sizes.push(pooled_tx.encoded_length());
1917 msg.types.push(pooled_tx.transaction.ty());
1918 }
1919 }
1920 }
1921
1922 fn is_empty(&self) -> bool {
1924 match self {
1925 Self::Eth66(hashes) => hashes.is_empty(),
1926 Self::Eth68(hashes) => hashes.is_empty(),
1927 }
1928 }
1929
1930 fn len(&self) -> usize {
1932 match self {
1933 Self::Eth66(hashes) => hashes.len(),
1934 Self::Eth68(hashes) => hashes.len(),
1935 }
1936 }
1937
1938 fn extend<T: SignedTransaction>(
1940 &mut self,
1941 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1942 ) {
1943 for tx in txs {
1944 self.push(&tx);
1945 }
1946 }
1947
1948 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1949 match self {
1950 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1951 Self::Eth68(msg) => {
1952 msg.hashes.push(*tx.tx_hash());
1953 msg.sizes.push(tx.size);
1954 msg.types.push(tx.transaction.ty());
1955 }
1956 }
1957 }
1958
1959 fn new(version: EthVersion) -> Self {
1961 match version {
1962 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1963 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1964 Self::Eth68(Default::default())
1965 }
1966 }
1967 }
1968
1969 fn build(self) -> NewPooledTransactionHashes {
1970 match self {
1971 Self::Eth66(mut msg) => {
1972 msg.0.shrink_to_fit();
1973 msg.into()
1974 }
1975 Self::Eth68(mut msg) => {
1976 msg.shrink_to_fit();
1977 msg.into()
1978 }
1979 }
1980 }
1981}
1982
1983enum TransactionSource {
1985 Broadcast,
1987 Response,
1989}
1990
1991impl TransactionSource {
1994 const fn is_broadcast(&self) -> bool {
1996 matches!(self, Self::Broadcast)
1997 }
1998}
1999
2000#[derive(Debug)]
2002pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2003 seen_transactions: LruCache<TxHash>,
2007 request_tx: PeerRequestSender<PeerRequest<N>>,
2009 version: EthVersion,
2011 client_version: Arc<str>,
2013 peer_kind: PeerKind,
2015}
2016
2017impl<N: NetworkPrimitives> PeerMetadata<N> {
2018 pub fn new(
2020 request_tx: PeerRequestSender<PeerRequest<N>>,
2021 version: EthVersion,
2022 client_version: Arc<str>,
2023 max_transactions_seen_by_peer: u32,
2024 peer_kind: PeerKind,
2025 ) -> Self {
2026 Self {
2027 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2028 request_tx,
2029 version,
2030 client_version,
2031 peer_kind,
2032 }
2033 }
2034
2035 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2037 &self.request_tx
2038 }
2039
2040 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2042 &mut self.seen_transactions
2043 }
2044
2045 pub const fn version(&self) -> EthVersion {
2047 self.version
2048 }
2049
2050 pub fn client_version(&self) -> &str {
2052 &self.client_version
2053 }
2054
2055 pub const fn peer_kind(&self) -> PeerKind {
2057 self.peer_kind
2058 }
2059}
2060
2061#[derive(Debug)]
2063enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2064 PropagateHash(B256),
2066 PropagateHashesTo(Vec<B256>, PeerId),
2068 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2070 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2072 PropagateTransactions(Vec<TxHash>),
2074 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2076 GetTransactionHashes {
2078 peers: Vec<PeerId>,
2079 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2080 },
2081 GetPeerSender {
2083 peer_id: PeerId,
2084 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2085 },
2086}
2087
2088#[derive(Debug)]
2090pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2091 IncomingTransactions {
2095 peer_id: PeerId,
2097 msg: Transactions<N::BroadcastedTransaction>,
2099 },
2100 IncomingPooledTransactionHashes {
2102 peer_id: PeerId,
2104 msg: NewPooledTransactionHashes,
2106 },
2107 GetPooledTransactions {
2109 peer_id: PeerId,
2111 request: GetPooledTransactions,
2113 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2115 },
2116 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2118}
2119
2120#[derive(Debug)]
2122pub struct PendingPoolImportsInfo {
2123 pending_pool_imports: Arc<AtomicUsize>,
2125 max_pending_pool_imports: usize,
2127}
2128
2129impl PendingPoolImportsInfo {
2130 pub fn new(max_pending_pool_imports: usize) -> Self {
2132 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2133 }
2134
2135 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2137 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2138 }
2139}
2140
2141impl Default for PendingPoolImportsInfo {
2142 fn default() -> Self {
2143 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2144 }
2145}
2146
2147#[derive(Debug, Default)]
2148struct TxManagerPollDurations {
2149 acc_network_events: Duration,
2150 acc_pending_imports: Duration,
2151 acc_tx_events: Duration,
2152 acc_imported_txns: Duration,
2153 acc_fetch_events: Duration,
2154 acc_pending_fetch: Duration,
2155 acc_cmds: Duration,
2156}
2157
2158#[cfg(test)]
2159mod tests {
2160 use super::*;
2161 use crate::{
2162 test_utils::{
2163 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2164 Testnet,
2165 },
2166 transactions::config::RelaxedEthAnnouncementFilter,
2167 NetworkConfigBuilder, NetworkManager,
2168 };
2169 use alloy_consensus::{TxEip1559, TxLegacy};
2170 use alloy_primitives::{hex, Signature, TxKind, U256};
2171 use alloy_rlp::Decodable;
2172 use futures::FutureExt;
2173 use reth_chainspec::MIN_TRANSACTION_GAS;
2174 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2175 use reth_network_api::{NetworkInfo, PeerKind};
2176 use reth_network_p2p::{
2177 error::{RequestError, RequestResult},
2178 sync::{NetworkSyncUpdater, SyncState},
2179 };
2180 use reth_storage_api::noop::NoopProvider;
2181 use reth_tasks::Runtime;
2182 use reth_transaction_pool::test_utils::{
2183 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2184 };
2185 use secp256k1::SecretKey;
2186 use std::{
2187 future::poll_fn,
2188 net::{IpAddr, Ipv4Addr, SocketAddr},
2189 str::FromStr,
2190 };
2191 use tracing::error;
2192
2193 #[tokio::test(flavor = "multi_thread")]
2194 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2195 reth_tracing::init_test_tracing();
2196 let net = Testnet::create(3).await;
2197
2198 let mut handles = net.handles();
2199 let handle0 = handles.next().unwrap();
2200 let handle1 = handles.next().unwrap();
2201
2202 drop(handles);
2203 let handle = net.spawn();
2204
2205 let listener0 = handle0.event_listener();
2206 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2207 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2208
2209 let client = NoopProvider::default();
2210 let pool = testing_pool();
2211 let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2212 .disable_discovery()
2213 .listener_port(0)
2214 .build(client);
2215 let transactions_manager_config = config.transactions_manager_config.clone();
2216 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2217 .await
2218 .unwrap()
2219 .into_builder()
2220 .transactions(pool.clone(), transactions_manager_config)
2221 .split_with_handle();
2222
2223 tokio::task::spawn(network);
2224
2225 network_handle.update_sync_state(SyncState::Syncing);
2227 assert!(NetworkInfo::is_syncing(&network_handle));
2228 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2229
2230 let mut established = listener0.take(2);
2232 while let Some(ev) = established.next().await {
2233 match ev {
2234 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2235 transactions
2237 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2238 }
2239 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2240 ev => {
2241 error!("unexpected event {ev:?}")
2242 }
2243 }
2244 }
2245 let input = hex!(
2247 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2248 );
2249 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2250 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2251 peer_id: *handle1.peer_id(),
2252 msg: Transactions(vec![signed_tx.clone()]),
2253 });
2254 poll_fn(|cx| {
2255 let _ = transactions.poll_unpin(cx);
2256 Poll::Ready(())
2257 })
2258 .await;
2259 assert!(pool.is_empty());
2260 handle.terminate().await;
2261 }
2262
2263 #[tokio::test(flavor = "multi_thread")]
2264 async fn test_tx_broadcasts_through_two_syncs() {
2265 reth_tracing::init_test_tracing();
2266 let net = Testnet::create(3).await;
2267
2268 let mut handles = net.handles();
2269 let handle0 = handles.next().unwrap();
2270 let handle1 = handles.next().unwrap();
2271
2272 drop(handles);
2273 let handle = net.spawn();
2274
2275 let listener0 = handle0.event_listener();
2276 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2277 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2278
2279 let client = NoopProvider::default();
2280 let pool = testing_pool();
2281 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2282 .disable_discovery()
2283 .listener_port(0)
2284 .build(client);
2285 let transactions_manager_config = config.transactions_manager_config.clone();
2286 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2287 .await
2288 .unwrap()
2289 .into_builder()
2290 .transactions(pool.clone(), transactions_manager_config)
2291 .split_with_handle();
2292
2293 tokio::task::spawn(network);
2294
2295 network_handle.update_sync_state(SyncState::Syncing);
2297 assert!(NetworkInfo::is_syncing(&network_handle));
2298 network_handle.update_sync_state(SyncState::Idle);
2299 assert!(!NetworkInfo::is_syncing(&network_handle));
2300 network_handle.update_sync_state(SyncState::Syncing);
2301 assert!(NetworkInfo::is_syncing(&network_handle));
2302
2303 let mut established = listener0.take(2);
2305 while let Some(ev) = established.next().await {
2306 match ev {
2307 NetworkEvent::ActivePeerSession { .. } |
2308 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2309 transactions.on_network_event(ev);
2311 }
2312 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2313 _ => {
2314 error!("unexpected event {ev:?}")
2315 }
2316 }
2317 }
2318 let input = hex!(
2320 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2321 );
2322 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2323 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2324 peer_id: *handle1.peer_id(),
2325 msg: Transactions(vec![signed_tx.clone()]),
2326 });
2327 poll_fn(|cx| {
2328 let _ = transactions.poll_unpin(cx);
2329 Poll::Ready(())
2330 })
2331 .await;
2332 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2333 assert!(NetworkInfo::is_syncing(&network_handle));
2334 assert!(!pool.is_empty());
2335 handle.terminate().await;
2336 }
2337
2338 #[tokio::test(flavor = "multi_thread")]
2341 async fn test_handle_incoming_transactions_hashes() {
2342 reth_tracing::init_test_tracing();
2343
2344 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2345 let client = NoopProvider::default();
2346
2347 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2348 .listener_port(0)
2350 .disable_discovery()
2351 .build(client);
2352
2353 let pool = testing_pool();
2354
2355 let transactions_manager_config = config.transactions_manager_config.clone();
2356 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2357 .await
2358 .unwrap()
2359 .into_builder()
2360 .transactions(pool.clone(), transactions_manager_config)
2361 .split_with_handle();
2362
2363 let peer_id_1 = PeerId::new([1; 64]);
2364 let eth_version = EthVersion::Eth66;
2365
2366 let txs = vec![TransactionSigned::new_unhashed(
2367 Transaction::Legacy(TxLegacy {
2368 chain_id: Some(4),
2369 nonce: 15u64,
2370 gas_price: 2200000000,
2371 gas_limit: 34811,
2372 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2373 value: U256::from(1234u64),
2374 input: Default::default(),
2375 }),
2376 Signature::new(
2377 U256::from_str(
2378 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2379 )
2380 .unwrap(),
2381 U256::from_str(
2382 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2383 )
2384 .unwrap(),
2385 true,
2386 ),
2387 )];
2388
2389 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2390
2391 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2392 tx_manager.peers.insert(peer_id_1, peer_1);
2393
2394 assert!(pool.is_empty());
2395
2396 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2397 peer_id: peer_id_1,
2398 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2399 txs_hashes.clone(),
2400 )),
2401 });
2402
2403 let req = to_mock_session_rx
2405 .recv()
2406 .await
2407 .expect("peer_1 session should receive request with buffered hashes");
2408 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2409 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2410
2411 let message: Vec<PooledTransactionVariant> = txs
2412 .into_iter()
2413 .map(|tx| {
2414 PooledTransactionVariant::try_from(tx)
2415 .expect("Failed to convert MockTransaction to PooledTransaction")
2416 })
2417 .collect();
2418
2419 response
2421 .send(Ok(PooledTransactions(message)))
2422 .expect("should send peer_1 response to tx manager");
2423
2424 poll_fn(|cx| {
2426 let _ = tx_manager.poll_unpin(cx);
2427 Poll::Ready(())
2428 })
2429 .await;
2430
2431 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2434 }
2435
2436 #[tokio::test(flavor = "multi_thread")]
2437 async fn test_handle_incoming_transactions() {
2438 reth_tracing::init_test_tracing();
2439 let net = Testnet::create(3).await;
2440
2441 let mut handles = net.handles();
2442 let handle0 = handles.next().unwrap();
2443 let handle1 = handles.next().unwrap();
2444
2445 drop(handles);
2446 let handle = net.spawn();
2447
2448 let listener0 = handle0.event_listener();
2449
2450 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2451 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2452
2453 let client = NoopProvider::default();
2454 let pool = testing_pool();
2455 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2456 .disable_discovery()
2457 .listener_port(0)
2458 .build(client);
2459 let transactions_manager_config = config.transactions_manager_config.clone();
2460 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2461 .await
2462 .unwrap()
2463 .into_builder()
2464 .transactions(pool.clone(), transactions_manager_config)
2465 .split_with_handle();
2466 tokio::task::spawn(network);
2467
2468 network_handle.update_sync_state(SyncState::Idle);
2469
2470 assert!(!NetworkInfo::is_syncing(&network_handle));
2471
2472 let mut established = listener0.take(2);
2474 while let Some(ev) = established.next().await {
2475 match ev {
2476 NetworkEvent::ActivePeerSession { .. } |
2477 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2478 transactions.on_network_event(ev);
2480 }
2481 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2482 ev => {
2483 error!("unexpected event {ev:?}")
2484 }
2485 }
2486 }
2487 let input = hex!(
2489 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2490 );
2491 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2492 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2493 peer_id: *handle1.peer_id(),
2494 msg: Transactions(vec![signed_tx.clone()]),
2495 });
2496 assert!(transactions
2497 .transactions_by_peers
2498 .get(signed_tx.tx_hash())
2499 .unwrap()
2500 .contains(handle1.peer_id()));
2501
2502 poll_fn(|cx| {
2504 let _ = transactions.poll_unpin(cx);
2505 Poll::Ready(())
2506 })
2507 .await;
2508
2509 assert!(!pool.is_empty());
2510 assert!(pool.get(signed_tx.tx_hash()).is_some());
2511 handle.terminate().await;
2512 }
2513
2514 #[tokio::test(flavor = "multi_thread")]
2515 async fn test_on_get_pooled_transactions_network() {
2516 reth_tracing::init_test_tracing();
2517 let net = Testnet::create(2).await;
2518
2519 let mut handles = net.handles();
2520 let handle0 = handles.next().unwrap();
2521 let handle1 = handles.next().unwrap();
2522
2523 drop(handles);
2524 let handle = net.spawn();
2525
2526 let listener0 = handle0.event_listener();
2527
2528 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2529 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2530
2531 let client = NoopProvider::default();
2532 let pool = testing_pool();
2533 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2534 .disable_discovery()
2535 .listener_port(0)
2536 .build(client);
2537 let transactions_manager_config = config.transactions_manager_config.clone();
2538 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2539 .await
2540 .unwrap()
2541 .into_builder()
2542 .transactions(pool.clone(), transactions_manager_config)
2543 .split_with_handle();
2544 tokio::task::spawn(network);
2545
2546 network_handle.update_sync_state(SyncState::Idle);
2547
2548 assert!(!NetworkInfo::is_syncing(&network_handle));
2549
2550 let mut established = listener0.take(2);
2552 while let Some(ev) = established.next().await {
2553 match ev {
2554 NetworkEvent::ActivePeerSession { .. } |
2555 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2556 transactions.on_network_event(ev);
2557 }
2558 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2559 ev => {
2560 error!("unexpected event {ev:?}")
2561 }
2562 }
2563 }
2564 handle.terminate().await;
2565
2566 let tx = MockTransaction::eip1559();
2567 let _ = transactions
2568 .pool
2569 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2570 .await;
2571
2572 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2573
2574 let (send, receive) =
2575 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2576
2577 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2578 peer_id: *handle1.peer_id(),
2579 request,
2580 response: send,
2581 });
2582
2583 match receive.await.unwrap() {
2584 Ok(PooledTransactions(transactions)) => {
2585 assert_eq!(transactions.len(), 1);
2586 }
2587 Err(e) => {
2588 panic!("error: {e:?}");
2589 }
2590 }
2591 }
2592
2593 #[tokio::test]
2597 async fn test_partially_tx_response() {
2598 reth_tracing::init_test_tracing();
2599
2600 let mut tx_manager = new_tx_manager().await.0;
2601 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2602
2603 let peer_id_1 = PeerId::new([1; 64]);
2604 let eth_version = EthVersion::Eth66;
2605
2606 let txs = vec![
2607 TransactionSigned::new_unhashed(
2608 Transaction::Legacy(TxLegacy {
2609 chain_id: Some(4),
2610 nonce: 15u64,
2611 gas_price: 2200000000,
2612 gas_limit: 34811,
2613 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2614 value: U256::from(1234u64),
2615 input: Default::default(),
2616 }),
2617 Signature::new(
2618 U256::from_str(
2619 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2620 )
2621 .unwrap(),
2622 U256::from_str(
2623 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2624 )
2625 .unwrap(),
2626 true,
2627 ),
2628 ),
2629 TransactionSigned::new_unhashed(
2630 Transaction::Eip1559(TxEip1559 {
2631 chain_id: 4,
2632 nonce: 26u64,
2633 max_priority_fee_per_gas: 1500000000,
2634 max_fee_per_gas: 1500000013,
2635 gas_limit: MIN_TRANSACTION_GAS,
2636 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2637 value: U256::from(3000000000000000000u64),
2638 input: Default::default(),
2639 access_list: Default::default(),
2640 }),
2641 Signature::new(
2642 U256::from_str(
2643 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2644 )
2645 .unwrap(),
2646 U256::from_str(
2647 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2648 )
2649 .unwrap(),
2650 true,
2651 ),
2652 ),
2653 ];
2654
2655 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2656
2657 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2658 peer_1.seen_transactions.insert(txs_hashes[0]);
2661 peer_1.seen_transactions.insert(txs_hashes[1]);
2662 tx_manager.peers.insert(peer_id_1, peer_1);
2663
2664 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2665 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2666
2667 assert!(tx_fetcher.is_idle(&peer_id_1));
2669 assert_eq!(tx_fetcher.active_peers.len(), 0);
2670
2671 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2673
2674 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2675 assert!(!tx_fetcher.is_idle(&peer_id_1));
2677 assert_eq!(tx_fetcher.active_peers.len(), 1);
2678
2679 let req = to_mock_session_rx
2681 .recv()
2682 .await
2683 .expect("peer_1 session should receive request with buffered hashes");
2684 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2685
2686 let message: Vec<PooledTransactionVariant> = txs
2687 .into_iter()
2688 .take(1)
2689 .map(|tx| {
2690 PooledTransactionVariant::try_from(tx)
2691 .expect("Failed to convert MockTransaction to PooledTransaction")
2692 })
2693 .collect();
2694 response
2696 .send(Ok(PooledTransactions(message)))
2697 .expect("should send peer_1 response to tx manager");
2698 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2699 unreachable!()
2700 };
2701
2702 assert!(tx_fetcher.is_idle(&peer_id));
2704 assert_eq!(tx_fetcher.active_peers.len(), 0);
2705 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2707 }
2708
2709 #[tokio::test]
2710 async fn test_max_retries_tx_request() {
2711 reth_tracing::init_test_tracing();
2712
2713 let mut tx_manager = new_tx_manager().await.0;
2714 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2715
2716 let peer_id_1 = PeerId::new([1; 64]);
2717 let peer_id_2 = PeerId::new([2; 64]);
2718 let eth_version = EthVersion::Eth66;
2719 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2720
2721 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2722 peer_1.seen_transactions.insert(seen_hashes[0]);
2725 peer_1.seen_transactions.insert(seen_hashes[1]);
2726 tx_manager.peers.insert(peer_id_1, peer_1);
2727
2728 let retries = 1;
2731 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2732 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2733
2734 assert!(tx_fetcher.is_idle(&peer_id_1));
2736 assert_eq!(tx_fetcher.active_peers.len(), 0);
2737
2738 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2740
2741 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2742
2743 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2744 assert!(!tx_fetcher.is_idle(&peer_id_1));
2746 assert_eq!(tx_fetcher.active_peers.len(), 1);
2747
2748 let req = to_mock_session_rx
2750 .recv()
2751 .await
2752 .expect("peer_1 session should receive request with buffered hashes");
2753 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2754 let GetPooledTransactions(hashes) = request;
2755
2756 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2757
2758 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2759
2760 response
2762 .send(Err(RequestError::BadResponse))
2763 .expect("should send peer_1 response to tx manager");
2764 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2765 unreachable!()
2766 };
2767
2768 assert!(tx_fetcher.is_idle(&peer_id));
2770 assert_eq!(tx_fetcher.active_peers.len(), 0);
2771 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2773
2774 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2775 tx_manager.peers.insert(peer_id_2, peer_2);
2776
2777 let msg =
2779 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2780 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2781
2782 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2783
2784 assert_eq!(tx_fetcher.active_peers.len(), 1);
2786
2787 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2789 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2791
2792 let req = to_mock_session_rx
2794 .recv()
2795 .await
2796 .expect("peer_2 session should receive request with buffered hashes");
2797 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2798
2799 response
2801 .send(Err(RequestError::BadResponse))
2802 .expect("should send peer_2 response to tx manager");
2803 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2804
2805 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2808 assert_eq!(tx_fetcher.active_peers.len(), 0);
2809 }
2810
2811 #[test]
2812 fn test_transaction_builder_empty() {
2813 let mut builder =
2814 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2815 assert!(builder.is_empty());
2816
2817 let mut factory = MockTransactionFactory::default();
2818 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2819 builder.push(&tx);
2820 assert!(!builder.is_empty());
2821
2822 let txs = builder.build();
2823 assert!(txs.full.is_none());
2824 let txs = txs.pooled.unwrap();
2825 assert_eq!(txs.len(), 1);
2826 }
2827
2828 #[test]
2829 fn test_transaction_builder_large() {
2830 let mut builder =
2831 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2832 assert!(builder.is_empty());
2833
2834 let mut factory = MockTransactionFactory::default();
2835 let mut tx = factory.create_eip1559();
2836 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2838 let tx = Arc::new(tx);
2839 let tx = PropagateTransaction::pool_tx(tx);
2840 builder.push(&tx);
2841 assert!(!builder.is_empty());
2842
2843 let txs = builder.clone().build();
2844 assert!(txs.pooled.is_none());
2845 let txs = txs.full.unwrap();
2846 assert_eq!(txs.len(), 1);
2847
2848 builder.push(&tx);
2849
2850 let txs = builder.clone().build();
2851 let pooled = txs.pooled.unwrap();
2852 assert_eq!(pooled.len(), 1);
2853 let txs = txs.full.unwrap();
2854 assert_eq!(txs.len(), 1);
2855 }
2856
2857 #[test]
2858 fn test_transaction_builder_eip4844() {
2859 let mut builder =
2860 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2861 assert!(builder.is_empty());
2862
2863 let mut factory = MockTransactionFactory::default();
2864 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2865 builder.push(&tx);
2866 assert!(!builder.is_empty());
2867
2868 let txs = builder.clone().build();
2869 assert!(txs.full.is_none());
2870 let txs = txs.pooled.unwrap();
2871 assert_eq!(txs.len(), 1);
2872
2873 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2874 builder.push(&tx);
2875
2876 let txs = builder.clone().build();
2877 let pooled = txs.pooled.unwrap();
2878 assert_eq!(pooled.len(), 1);
2879 let txs = txs.full.unwrap();
2880 assert_eq!(txs.len(), 1);
2881 }
2882
2883 #[tokio::test]
2884 async fn test_propagate_full() {
2885 reth_tracing::init_test_tracing();
2886
2887 let (mut tx_manager, network) = new_tx_manager().await;
2888 let peer_id = PeerId::random();
2889
2890 network.handle().update_sync_state(SyncState::Idle);
2892
2893 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2895
2896 let session_info = SessionInfo {
2897 peer_id,
2898 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2899 client_version: Arc::from(""),
2900 capabilities: Arc::new(vec![].into()),
2901 status: Arc::new(Default::default()),
2902 version: EthVersion::Eth68,
2903 peer_kind: PeerKind::Basic,
2904 };
2905 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2906 tx_manager
2907 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2908 let mut propagate = vec![];
2909 let mut factory = MockTransactionFactory::default();
2910 let eip1559_tx = Arc::new(factory.create_eip1559());
2911 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2912 let eip4844_tx = Arc::new(factory.create_eip4844());
2913 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2914
2915 let propagated =
2916 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2917 assert_eq!(propagated.len(), 2);
2918 let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
2919 assert_eq!(prop_txs.len(), 1);
2920 assert!(prop_txs[0].is_full());
2921
2922 let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
2923 assert_eq!(prop_txs.len(), 1);
2924 assert!(prop_txs[0].is_hash());
2925
2926 let peer = tx_manager.peers.get(&peer_id).unwrap();
2927 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2928 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2929 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2930
2931 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2933 assert!(propagated.is_empty());
2934 }
2935
2936 #[tokio::test]
2937 async fn test_propagate_pending_txs_while_initially_syncing() {
2938 reth_tracing::init_test_tracing();
2939
2940 let (mut tx_manager, network) = new_tx_manager().await;
2941 let peer_id = PeerId::random();
2942
2943 network.handle().update_sync_state(SyncState::Syncing);
2945 assert!(NetworkInfo::is_initially_syncing(&network.handle()));
2946
2947 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
2949 tx_manager.peers.insert(peer_id, peer);
2950
2951 let tx = MockTransaction::eip1559();
2952 tx_manager
2953 .pool
2954 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2955 .await
2956 .expect("transaction should be accepted into the pool");
2957
2958 tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
2959
2960 let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
2961 assert!(peer.seen_transactions.contains(tx.get_hash()));
2962 }
2963
2964 #[tokio::test]
2965 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2966 reth_tracing::init_test_tracing();
2967
2968 let transactions_manager_config = TransactionsManagerConfig::default();
2969
2970 let propagation_policy = TransactionPropagationKind::default();
2971 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2972
2973 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2974
2975 let pool = testing_pool();
2976 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2977 let client = NoopProvider::default();
2978
2979 let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2980 .listener_port(0)
2981 .disable_discovery()
2982 .build(client.clone());
2983
2984 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2985 let (to_tx_manager_tx, from_network_rx) =
2986 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2987 network_manager.set_transactions(to_tx_manager_tx);
2988 let network_handle = network_manager.handle().clone();
2989 let network_service_handle = tokio::spawn(network_manager);
2990
2991 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2992 network_handle.clone(),
2993 pool.clone(),
2994 from_network_rx,
2995 transactions_manager_config,
2996 policy_bundle,
2997 );
2998
2999 let peer_id = PeerId::random();
3000 let eth_version = EthVersion::Eth68;
3001 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3002 tx_manager.peers.insert(peer_id, mock_peer_metadata);
3003
3004 let mut tx_factory = MockTransactionFactory::default();
3005
3006 let valid_known_tx = tx_factory.create_eip1559();
3007 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3008
3009 let known_tx_hash = *known_tx_signed.hash();
3010 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3011 let known_tx_size = known_tx_signed.encoded_length();
3012
3013 let unknown_tx_hash = B256::random();
3014 let unknown_tx_type_byte = 0xff_u8;
3015 let unknown_tx_size = 150;
3016
3017 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3018 types: vec![known_tx_type_byte, unknown_tx_type_byte],
3019 sizes: vec![known_tx_size, unknown_tx_size],
3020 hashes: vec![known_tx_hash, unknown_tx_hash],
3021 });
3022
3023 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3024
3025 poll_fn(|cx| {
3026 let _ = tx_manager.poll_unpin(cx);
3027 Poll::Ready(())
3028 })
3029 .await;
3030
3031 let mut requested_hashes_in_getpooled = HashSet::new();
3032 let mut unexpected_request_received = false;
3033
3034 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3035 .await
3036 {
3037 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3038 let GetPooledTransactions(hashes) = request;
3039 for hash in hashes {
3040 requested_hashes_in_getpooled.insert(hash);
3041 }
3042 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3043 }
3044 Ok(Some(other_request)) => {
3045 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3046 unexpected_request_received = true;
3047 }
3048 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3049 Err(_timeout_err) => {
3050 tracing::info!("Timeout: No GetPooledTransactions request received.")
3051 }
3052 }
3053
3054 assert!(
3055 requested_hashes_in_getpooled.contains(&known_tx_hash),
3056 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3057 );
3058 assert!(
3059 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3060 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3061 );
3062 assert!(
3063 !unexpected_request_received,
3064 "An unexpected P2P request was received by the mock peer."
3065 );
3066
3067 network_service_handle.abort();
3068 }
3069}