1use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5use smallvec::SmallVec;
6
7pub mod config;
9pub mod constants;
11pub mod fetcher;
13pub mod policy;
15
16pub use self::constants::{
17 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
18 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
19};
20use config::AnnouncementAcceptance;
21pub use config::{
22 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
23 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
24};
25use policy::NetworkPolicies;
26
27pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
28
29use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
30use crate::{
31 budget::{
32 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
33 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
34 },
35 cache::LruCache,
36 duration_metered_exec, metered_poll_nested_stream_with_budget,
37 metrics::{AnnouncedTxTypesMetrics, TransactionsManagerMetrics},
38 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
39 NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{
42 map::{B256Map, B256Set, FbBuildHasher},
43 TxHash, B256,
44};
45use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
46use futures::{stream::FuturesUnordered, Future, StreamExt};
47use reth_eth_wire::{
48 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
49 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
50 NewPooledTransactionHashes66, NewPooledTransactionHashes68, NewPooledTransactionHashes72,
51 PooledTransactions, RequestTxHashes, Transactions, ValidAnnouncementData,
52};
53use reth_ethereum_primitives::{TransactionSigned, TxType};
54use reth_metrics::common::mpsc::MemoryBoundedReceiver;
55use reth_network_api::{
56 events::{PeerEvent, SessionInfo},
57 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
58};
59use reth_network_p2p::{
60 error::{RequestError, RequestResult},
61 sync::SyncStateProvider,
62};
63use reth_network_peers::PeerId;
64use reth_network_types::ReputationChangeKind;
65use reth_primitives_traits::{InMemorySize, SignedTransaction};
66use reth_tokio_util::EventStream;
67use reth_transaction_pool::{
68 error::{PoolError, PoolResult},
69 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
70 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
71};
72use std::{
73 collections::{hash_map::Entry, HashMap, HashSet},
74 pin::Pin,
75 sync::{
76 atomic::{AtomicUsize, Ordering},
77 Arc,
78 },
79 task::{Context, Poll},
80 time::{Duration, Instant},
81};
82use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
83use tokio_stream::wrappers::UnboundedReceiverStream;
84use tracing::{debug, trace};
85
86pub type PoolImportFuture =
90 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
91
92#[derive(Debug, Clone)]
100pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
101 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
103}
104
105impl<N: NetworkPrimitives> TransactionsHandle<N> {
106 fn send(&self, cmd: TransactionsCommand<N>) {
107 let _ = self.manager_tx.send(cmd);
108 }
109
110 async fn peer_handle(
112 &self,
113 peer_id: PeerId,
114 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
115 let (tx, rx) = oneshot::channel();
116 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
117 rx.await
118 }
119
120 pub fn propagate(&self, hash: TxHash) {
122 self.send(TransactionsCommand::PropagateHash(hash))
123 }
124
125 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
129 self.propagate_hashes_to(Some(hash), peer)
130 }
131
132 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
136 let hashes = hash.into_iter().collect::<Vec<_>>();
137 if hashes.is_empty() {
138 return
139 }
140 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
141 }
142
143 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
145 let (tx, rx) = oneshot::channel();
146 self.send(TransactionsCommand::GetActivePeers(tx));
147 rx.await
148 }
149
150 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
154 if transactions.is_empty() {
155 return
156 }
157 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
158 }
159
160 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
165 if transactions.is_empty() {
166 return
167 }
168 self.send(TransactionsCommand::PropagateTransactions(transactions))
169 }
170
171 pub fn broadcast_transactions(
176 &self,
177 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
178 ) {
179 let transactions =
180 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
181 if transactions.is_empty() {
182 return
183 }
184 self.send(TransactionsCommand::BroadcastTransactions(transactions))
185 }
186
187 pub async fn get_transaction_hashes(
189 &self,
190 peers: Vec<PeerId>,
191 ) -> Result<HashMap<PeerId, B256Set>, RecvError> {
192 if peers.is_empty() {
193 return Ok(Default::default())
194 }
195 let (tx, rx) = oneshot::channel();
196 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
197 rx.await
198 }
199
200 pub async fn get_peer_transaction_hashes(&self, peer: PeerId) -> Result<B256Set, RecvError> {
202 let res = self.get_transaction_hashes(vec![peer]).await?;
203 Ok(res.into_values().next().unwrap_or_default())
204 }
205
206 pub async fn get_pooled_transactions_from(
212 &self,
213 peer_id: PeerId,
214 hashes: Vec<B256>,
215 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
216 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
217
218 let (tx, rx) = oneshot::channel();
219 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
220 peer.try_send(request).ok();
221
222 rx.await?.map(|res| Some(res.0))
223 }
224}
225
226#[derive(Debug)]
281#[must_use = "Manager does nothing unless polled."]
282pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
283 pool: Pool,
285 network: NetworkHandle<N>,
287 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
291 transaction_fetcher: TransactionFetcher<N>,
293 transactions_by_peers: B256Map<SmallVec<[PeerId; 1]>>,
298 pool_imports: FuturesUnordered<PoolImportFuture>,
310 pending_pool_imports_info: PendingPoolImportsInfo,
312 bad_imports: LruCache<TxHash, FbBuildHasher<32>>,
314 peers: HashMap<PeerId, PeerMetadata<N>>,
316 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
320 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
325 pending_transactions: mpsc::Receiver<TxHash>,
334 transaction_events: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
336 config: TransactionsManagerConfig,
338 policies: NetworkPolicies<N>,
340 metrics: TransactionsManagerMetrics,
342 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
344}
345
346impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
347 pub fn new(
351 network: NetworkHandle<N>,
352 pool: Pool,
353 from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
354 transactions_manager_config: TransactionsManagerConfig,
355 ) -> Self {
356 Self::with_policy(
357 network,
358 pool,
359 from_network,
360 transactions_manager_config,
361 NetworkPolicies::new(
362 TransactionPropagationKind::default(),
363 StrictEthAnnouncementFilter::default(),
364 ),
365 )
366 }
367}
368
369impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
370 pub fn with_policy(
374 network: NetworkHandle<N>,
375 pool: Pool,
376 from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
377 transactions_manager_config: TransactionsManagerConfig,
378 policies: NetworkPolicies<N>,
379 ) -> Self {
380 let network_events = network.event_listener();
381
382 let (command_tx, command_rx) = mpsc::unbounded_channel();
383
384 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
385 &transactions_manager_config.transaction_fetcher_config,
386 );
387
388 let pending = pool.pending_transactions_listener();
391 let pending_pool_imports_info =
392 PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
393 let metrics = TransactionsManagerMetrics::default();
394 metrics
395 .capacity_pending_pool_imports
396 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398 Self {
399 pool,
400 network,
401 network_events,
402 transaction_fetcher,
403 transactions_by_peers: Default::default(),
404 pool_imports: Default::default(),
405 pending_pool_imports_info,
406 bad_imports: LruCache::with_hasher(DEFAULT_MAX_COUNT_BAD_IMPORTS, Default::default()),
407 peers: Default::default(),
408 command_tx,
409 command_rx: UnboundedReceiverStream::new(command_rx),
410 pending_transactions: pending,
411 transaction_events: from_network,
412 config: transactions_manager_config,
413 policies,
414 metrics,
415 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
416 }
417 }
418
419 pub fn handle(&self) -> TransactionsHandle<N> {
421 TransactionsHandle { manager_tx: self.command_tx.clone() }
422 }
423
424 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
427 self.has_capacity_for_pending_pool_imports() &&
428 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
429 }
430
431 fn has_capacity_for_pending_pool_imports(&self) -> bool {
433 self.remaining_pool_import_capacity() > 0
434 }
435
436 fn remaining_pool_import_capacity(&self) -> usize {
438 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
439 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
440 )
441 }
442
443 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
444 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
445 self.metrics.reported_bad_transactions.increment(1);
446 }
447
448 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
449 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
450 self.network.reputation_change(peer_id, kind);
451 }
452
453 fn report_already_seen(&self, peer_id: PeerId) {
454 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
455 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
456 }
457
458 fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
460 if let Some(mut peer) = self.peers.remove(peer_id) {
461 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
462 }
463 self.transaction_fetcher.remove_peer(peer_id);
464 }
465
466 fn on_good_import(&mut self, hash: TxHash) {
468 self.transactions_by_peers.remove(&hash);
469 }
470
471 fn on_bad_import(&mut self, err: PoolError) {
501 let peers = self.transactions_by_peers.remove(&err.hash);
502
503 if err.is_bad_blob_sidecar() {
504 if let Some(peers) = peers {
508 for peer_id in peers {
509 self.report_peer_bad_transactions(peer_id);
510 }
511 }
512 return
513 }
514
515 if !err.is_bad_transaction() || self.network.is_syncing() {
517 return
518 }
519 if let Some(peers) = peers {
522 for peer_id in peers {
523 self.report_peer_bad_transactions(peer_id);
524 }
525 }
526 self.metrics.bad_imports.increment(1);
527 self.bad_imports.insert(err.hash);
528 }
529
530 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
534 let info = &self.pending_pool_imports_info;
536 let max_pending_pool_imports = info.max_pending_pool_imports;
537 let has_capacity_wrt_pending_pool_imports =
538 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
539
540 self.transaction_fetcher
541 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
542 }
543
544 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
545 let kind = match req_err {
546 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
547 RequestError::Timeout => ReputationChangeKind::Timeout,
548 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
549 return
551 }
552 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
553 };
554 self.report_peer(peer_id, kind);
555 }
556
557 #[inline]
558 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
559 let metrics = &self.metrics;
560
561 let TxManagerPollDurations {
562 acc_network_events,
563 acc_pending_imports,
564 acc_tx_events,
565 acc_imported_txns,
566 acc_fetch_events,
567 acc_pending_fetch,
568 acc_cmds,
569 } = poll_durations;
570
571 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
573 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
575 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
576 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
577 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
578 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
579 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
580 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
581 }
582}
583
584impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
585 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
587 for res in batch_results {
588 match res {
589 Ok(AddedTransactionOutcome { hash, .. }) => {
590 self.on_good_import(hash);
591 }
592 Err(err) => {
593 self.on_bad_import(err);
594 }
595 }
596 }
597 }
598
599 fn on_new_pooled_transaction_hashes(
601 &mut self,
602 peer_id: PeerId,
603 msg: NewPooledTransactionHashes,
604 ) {
605 if self.network.is_initially_syncing() {
607 return
608 }
609 if self.network.tx_gossip_disabled() {
610 return
611 }
612
613 let Some(peer) = self.peers.get_mut(&peer_id) else {
615 trace!(
616 peer_id = format!("{peer_id:#}"),
617 ?msg,
618 "discarding announcement from inactive peer"
619 );
620
621 return
622 };
623 let client = peer.client_version.clone();
624
625 let mut count_txns_already_seen_by_peer = 0;
627 for tx in msg.iter_hashes().copied() {
628 if !peer.seen_transactions.insert(tx) {
629 count_txns_already_seen_by_peer += 1;
630 }
631 }
632 if count_txns_already_seen_by_peer > 0 {
633 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
638 self.metrics
639 .occurrences_hash_already_seen_by_peer
640 .increment(count_txns_already_seen_by_peer);
641
642 trace!(target: "net::tx",
643 %count_txns_already_seen_by_peer,
644 peer_id=format!("{peer_id:#}"),
645 ?client,
646 "Peer sent hashes that have already been marked as seen by peer"
647 );
648
649 self.report_already_seen(peer_id);
650 }
651
652 if msg.is_empty() {
654 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
655 return;
656 }
657
658 let original_len = msg.len();
659 let mut partially_valid_msg = msg.dedup();
660
661 if partially_valid_msg.len() != original_len {
662 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
663 }
664
665 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
667
668 let mut should_report_peer = false;
675 let mut tx_types_counter = TxTypesCounter::default();
676
677 let has_eth68_metadata = partially_valid_msg
678 .msg_version()
679 .expect("partially valid announcement should have a version")
680 .has_eth68_metadata();
681
682 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
683 let (ty_byte, size_val) = match *metadata_ref_mut {
684 Some((ty, size)) => {
685 if !has_eth68_metadata {
686 should_report_peer = true;
687 }
688 (ty, size)
689 }
690 None => {
691 if has_eth68_metadata {
692 should_report_peer = true;
693 return false;
694 }
695 (0u8, 0)
696 }
697 };
698
699 if has_eth68_metadata && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
700 match TxType::try_from(actual_ty_byte) {
701 Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
702 Err(_) => tx_types_counter.increase_other(),
703 }
704 }
705
706 let decision = self
707 .policies
708 .announcement_filter()
709 .decide_on_announcement(ty_byte, tx_hash, size_val);
710
711 match decision {
712 AnnouncementAcceptance::Accept => true,
713 AnnouncementAcceptance::Ignore => false,
714 AnnouncementAcceptance::Reject { penalize_peer } => {
715 if penalize_peer {
716 should_report_peer = true;
717 }
718 false
719 }
720 }
721 });
722
723 if has_eth68_metadata {
724 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
725 }
726
727 if should_report_peer {
728 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
729 }
730
731 let hashes_count_pre_pool_filter = partially_valid_msg.len();
739 self.pool.retain_unknown(&mut partially_valid_msg);
740 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
741 let already_known_hashes_count =
742 hashes_count_pre_pool_filter - partially_valid_msg.len();
743 self.metrics
744 .occurrences_hashes_already_in_pool
745 .increment(already_known_hashes_count as u64);
746 }
747
748 if partially_valid_msg.is_empty() {
749 return
751 }
752
753 let mut valid_announcement_data =
754 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
755
756 if valid_announcement_data.is_empty() {
757 return
759 }
760
761 let bad_imports = &self.bad_imports;
768 self.transaction_fetcher.filter_unseen_and_pending_hashes(
769 &mut valid_announcement_data,
770 |hash| bad_imports.contains(hash),
771 &peer_id,
772 &client,
773 );
774
775 if valid_announcement_data.is_empty() {
776 return
778 }
779
780 trace!(target: "net::tx::propagation",
781 peer_id=format!("{peer_id:#}"),
782 hashes_len=valid_announcement_data.len(),
783 hashes=?valid_announcement_data.keys(),
784 msg_version=%valid_announcement_data.msg_version(),
785 client_version=%client,
786 "received previously unseen and pending hashes in announcement from peer"
787 );
788
789 if !self.transaction_fetcher.is_idle(&peer_id) {
792 let msg_version = valid_announcement_data.msg_version();
794 let (hashes, _version) = valid_announcement_data.into_request_hashes();
795
796 trace!(target: "net::tx",
797 peer_id=format!("{peer_id:#}"),
798 hashes=?*hashes,
799 %msg_version,
800 %client,
801 "buffering hashes announced by busy peer"
802 );
803
804 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
805
806 return
807 }
808
809 let mut hashes_to_request =
810 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
811 let surplus_hashes =
812 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
813
814 if !surplus_hashes.is_empty() {
815 trace!(target: "net::tx",
816 peer_id=format!("{peer_id:#}"),
817 surplus_hashes=?*surplus_hashes,
818 %client,
819 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
820 );
821
822 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
823 }
824
825 trace!(target: "net::tx",
826 peer_id=format!("{peer_id:#}"),
827 hashes=?*hashes_to_request,
828 %client,
829 "sending hashes in `GetPooledTransactions` request to peer's session"
830 );
831
832 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
836 if let Some(failed_to_request_hashes) =
837 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
838 {
839 let conn_eth_version = peer.version;
840
841 trace!(target: "net::tx",
842 peer_id=format!("{peer_id:#}"),
843 failed_to_request_hashes=?*failed_to_request_hashes,
844 %conn_eth_version,
845 %client,
846 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
847 );
848 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
849 }
850 }
851}
852
853impl<Pool, N> TransactionsManager<Pool, N>
854where
855 Pool: TransactionPool + Unpin + 'static,
856 N: NetworkPrimitives<
857 BroadcastedTransaction: SignedTransaction,
858 PooledTransaction: SignedTransaction,
859 > + Unpin,
860 Pool::Transaction:
861 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
862{
863 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
875 if self.network.tx_gossip_disabled() {
879 return
880 }
881
882 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
883
884 self.propagate_all(hashes);
885 }
886
887 fn propagate_full_transactions_to_peer(
891 &mut self,
892 txs: Vec<TxHash>,
893 peer_id: PeerId,
894 propagation_mode: PropagationMode,
895 ) -> Option<PropagatedTransactions> {
896 let peer = self.peers.get_mut(&peer_id)?;
897 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
898 let mut propagated = PropagatedTransactions::default();
899
900 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
902
903 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
904
905 if propagation_mode.is_forced() {
906 full_transactions.extend(to_propagate);
908 } else {
909 for tx in to_propagate {
912 if !peer.seen_transactions.contains(tx.tx_hash()) {
913 full_transactions.push(&tx);
915 }
916 }
917 }
918
919 if full_transactions.is_empty() {
920 return None
922 }
923
924 let PropagateTransactions { pooled, full } = full_transactions.build();
925
926 if let Some(new_pooled_hashes) = pooled {
928 for hash in new_pooled_hashes.iter_hashes().copied() {
929 propagated.record(hash, PropagateKind::Hash(peer_id));
930 peer.seen_transactions.insert(hash);
932 }
933
934 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
936 }
937
938 if let Some(new_full_transactions) = full {
940 for tx in &new_full_transactions {
941 propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
942 peer.seen_transactions.insert(*tx.tx_hash());
944 }
945
946 self.network.send_transactions(peer_id, new_full_transactions);
948 }
949
950 self.metrics.propagated_transactions.increment(propagated.len() as u64);
952
953 Some(propagated)
954 }
955
956 fn propagate_hashes_to(
960 &mut self,
961 hashes: Vec<TxHash>,
962 peer_id: PeerId,
963 propagation_mode: PropagationMode,
964 ) {
965 trace!(target: "net::tx", "Start propagating transactions as hashes");
966
967 let propagated = {
970 let Some(peer) = self.peers.get_mut(&peer_id) else {
971 return
973 };
974
975 let to_propagate =
976 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
977
978 let mut propagated = PropagatedTransactions::default();
979
980 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
982
983 if propagation_mode.is_forced() {
984 hashes.extend(to_propagate)
985 } else {
986 for tx in to_propagate {
987 if !peer.seen_transactions.contains(tx.tx_hash()) {
988 hashes.push(&tx);
990 }
991 }
992 }
993
994 let new_pooled_hashes = hashes.build();
995
996 if new_pooled_hashes.is_empty() {
997 return
999 }
1000
1001 for hash in new_pooled_hashes.iter_hashes().copied() {
1002 propagated.record(hash, PropagateKind::Hash(peer_id));
1003 peer.seen_transactions.insert(hash);
1004 }
1005
1006 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1007
1008 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1010
1011 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1013
1014 propagated
1015 };
1016
1017 self.pool.on_propagated(propagated);
1019 }
1020
1021 fn propagate_transactions(
1028 &mut self,
1029 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1030 propagation_mode: PropagationMode,
1031 ) -> PropagatedTransactions {
1032 let mut propagated = PropagatedTransactions::default();
1033 if self.network.tx_gossip_disabled() {
1034 return propagated
1035 }
1036
1037 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1039
1040 let mut num_full_peers = 0;
1042 for (peer_id, peer) in &mut self.peers {
1043 if !self.policies.propagation_policy().can_propagate(peer) {
1044 continue
1046 }
1047 let mut builder = if num_full_peers < max_num_full {
1049 num_full_peers += 1;
1050 PropagateTransactionsBuilder::full(peer.version, to_propagate.len())
1051 } else {
1052 PropagateTransactionsBuilder::pooled(peer.version, to_propagate.len())
1053 };
1054
1055 if propagation_mode.is_forced() {
1058 for tx in &to_propagate {
1059 peer.seen_transactions.insert(*tx.tx_hash());
1060 builder.push(tx);
1061 }
1062 } else {
1063 for tx in &to_propagate {
1067 if peer.seen_transactions.insert(*tx.tx_hash()) {
1069 builder.push(tx);
1070 }
1071 }
1072 }
1073
1074 if builder.is_empty() {
1075 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1076 continue
1077 }
1078
1079 let PropagateTransactions { pooled, full } = builder.build();
1080
1081 if let Some(mut new_pooled_hashes) = pooled {
1083 if new_pooled_hashes.len() >
1087 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE
1088 {
1089 for hash in new_pooled_hashes
1092 .iter_hashes()
1093 .skip(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE)
1094 {
1095 peer.seen_transactions.remove(hash);
1096 }
1097 new_pooled_hashes.truncate(
1098 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1099 );
1100 }
1101
1102 for hash in new_pooled_hashes.iter_hashes().copied() {
1103 propagated.record(hash, PropagateKind::Hash(*peer_id));
1104 }
1105
1106 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1107
1108 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1110 }
1111
1112 if let Some(new_full_transactions) = full {
1114 for tx in &new_full_transactions {
1115 propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1116 }
1117
1118 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1119
1120 self.network.send_transactions(*peer_id, new_full_transactions);
1122 }
1123 }
1124
1125 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1127
1128 propagated
1129 }
1130
1131 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1136 if self.peers.is_empty() {
1137 return
1139 }
1140 let propagated = self.propagate_transactions(
1141 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1142 PropagationMode::Basic,
1143 );
1144
1145 self.pool.on_propagated(propagated);
1147 }
1148
1149 fn on_get_pooled_transactions(
1151 &mut self,
1152 peer_id: PeerId,
1153 request: GetPooledTransactions,
1154 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1155 ) {
1156 if self.network.tx_gossip_disabled() {
1158 let _ = response.send(Ok(PooledTransactions::default()));
1159 return
1160 }
1161 if let Some(peer) = self.peers.get_mut(&peer_id) {
1162 let transactions = self.pool.get_pooled_transaction_elements(
1163 request.0,
1164 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1165 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1166 ),
1167 );
1168 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1169
1170 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1173
1174 let resp = PooledTransactions(transactions);
1175 let _ = response.send(Ok(resp));
1176 }
1177 }
1178
1179 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1181 match cmd {
1182 TransactionsCommand::PropagateHash(hash) => {
1183 self.on_new_pending_transactions(vec![hash])
1184 }
1185 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1186 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1187 }
1188 TransactionsCommand::GetActivePeers(tx) => {
1189 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1190 tx.send(peers).ok();
1191 }
1192 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1193 if let Some(propagated) =
1194 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1195 {
1196 self.pool.on_propagated(propagated);
1197 }
1198 }
1199 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1200 TransactionsCommand::BroadcastTransactions(txs) => {
1201 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1202 self.pool.on_propagated(propagated);
1203 }
1204 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1205 let mut res = HashMap::with_capacity(peers.len());
1206 for peer_id in peers {
1207 let hashes = self
1208 .peers
1209 .get(&peer_id)
1210 .map(|peer| peer.seen_transactions.iter().copied().collect::<B256Set>())
1211 .unwrap_or_default();
1212 res.insert(peer_id, hashes);
1213 }
1214 tx.send(res).ok();
1215 }
1216 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1217 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1218 peer_request_sender.send(sender).ok();
1219 }
1220 }
1221 }
1222
1223 fn handle_peer_session(
1227 &mut self,
1228 info: SessionInfo,
1229 messages: PeerRequestSender<PeerRequest<N>>,
1230 ) {
1231 let SessionInfo { peer_id, client_version, version, .. } = info;
1232
1233 let peer = PeerMetadata::<N>::new(
1235 messages,
1236 version,
1237 client_version,
1238 self.config.max_transactions_seen_by_peer_history,
1239 info.peer_kind,
1240 );
1241 let peer = match self.peers.entry(peer_id) {
1242 Entry::Occupied(mut entry) => {
1243 entry.insert(peer);
1244 entry.into_mut()
1245 }
1246 Entry::Vacant(entry) => entry.insert(peer),
1247 };
1248
1249 self.policies.propagation_policy_mut().on_session_established(peer);
1250
1251 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1255 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1256 return
1257 }
1258
1259 let pooled_txs = self.pool.pooled_transactions_max(
1261 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1262 );
1263 if pooled_txs.is_empty() {
1264 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1265 return;
1266 }
1267
1268 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1270 for pooled_tx in pooled_txs {
1271 peer.seen_transactions.insert(*pooled_tx.hash());
1272 msg_builder.push_pooled(pooled_tx);
1273 }
1274
1275 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1276 let msg = msg_builder.build();
1277 self.network.send_transactions_hashes(peer_id, msg);
1278 }
1279
1280 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1282 match event_result {
1283 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1284 self.on_peer_session_closed(&peer_id);
1285 }
1286 NetworkEvent::ActivePeerSession { info, messages } => {
1287 self.handle_peer_session(info, messages);
1289 }
1290 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1291 let peer_id = info.peer_id;
1292 let messages = match self.peers.get(&peer_id) {
1294 Some(p) => p.request_tx.clone(),
1295 None => {
1296 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1297 return;
1298 }
1299 };
1300 self.handle_peer_session(info, messages);
1301 }
1302 _ => {}
1303 }
1304 }
1305
1306 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1308 if self.config.ingress_policy.allows_all() {
1309 return true;
1310 }
1311 let Some(peer) = self.peers.get(peer_id) else {
1312 return false;
1313 };
1314 self.config.ingress_policy.allows(peer.peer_kind())
1315 }
1316
1317 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1319 match event {
1320 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1321 if !self.accepts_incoming_from(&peer_id) {
1322 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1323 return;
1324 }
1325
1326 let has_blob_txs = msg.has_eip4844();
1330
1331 let non_blob_txs = msg
1332 .into_iter()
1333 .map(N::PooledTransaction::try_from)
1334 .filter_map(Result::ok)
1335 .collect();
1336
1337 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1338
1339 if has_blob_txs {
1340 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1341 self.report_peer_bad_transactions(peer_id);
1342 }
1343 }
1344 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1345 if !self.accepts_incoming_from(&peer_id) {
1346 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1347 return;
1348 }
1349 self.on_new_pooled_transaction_hashes(peer_id, msg)
1350 }
1351 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1352 self.on_get_pooled_transactions(peer_id, request, response)
1353 }
1354 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1355 let _ = response.send(Some(self.handle()));
1356 }
1357 }
1358 }
1359
1360 fn import_transactions(
1362 &mut self,
1363 peer_id: PeerId,
1364 transactions: PooledTransactions<N::PooledTransaction>,
1365 source: TransactionSource,
1366 ) {
1367 if self.network.is_initially_syncing() {
1369 return
1370 }
1371 if self.network.tx_gossip_disabled() {
1372 return
1373 }
1374
1375 if !self.has_capacity_for_pending_pool_imports() {
1377 return
1378 }
1379
1380 let mut transactions = transactions.0;
1381
1382 let capacity = self.remaining_pool_import_capacity();
1386 if transactions.len() > capacity {
1387 let skipped = transactions.len() - capacity;
1388 transactions.truncate(capacity);
1389 self.metrics
1390 .skipped_transactions_pending_pool_imports_at_capacity
1391 .increment(skipped as u64);
1392 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1393 }
1394
1395 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1396 let client_version = peer.client_version.clone();
1397
1398 let start = Instant::now();
1399
1400 self.transaction_fetcher
1402 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1403
1404 let mut num_already_seen_by_peer = 0;
1409 for tx in &transactions {
1410 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1411 num_already_seen_by_peer += 1;
1412 }
1413 }
1414
1415 let mut has_bad_transactions = false;
1417
1418 transactions.retain(|tx| {
1421 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1422 let peers = entry.get_mut();
1423 if !peers.contains(&peer_id) {
1424 peers.push(peer_id);
1425 }
1426 return false
1427 }
1428 if self.bad_imports.contains(tx.tx_hash()) {
1429 trace!(target: "net::tx",
1430 peer_id=format!("{peer_id:#}"),
1431 hash=%tx.tx_hash(),
1432 %client_version,
1433 "received a known bad transaction from peer"
1434 );
1435 has_bad_transactions = true;
1436 return false;
1437 }
1438 true
1439 });
1440
1441 let txns_count_pre_pool_filter = transactions.len();
1443 self.pool.retain_unknown(&mut transactions);
1444 if txns_count_pre_pool_filter > transactions.len() {
1445 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1446 self.metrics
1447 .occurrences_transactions_already_in_pool
1448 .increment(already_known_txns_count as u64);
1449 }
1450
1451 let txs_len = transactions.len();
1452
1453 let new_txs = transactions
1454 .into_par_iter()
1455 .filter_map(|tx| match tx.try_into_recovered() {
1456 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1457 Err(badtx) => {
1458 trace!(target: "net::tx",
1459 peer_id=format!("{peer_id:#}"),
1460 hash=%badtx.tx_hash(),
1461 client_version=%client_version,
1462 "failed ecrecovery for transaction"
1463 );
1464 None
1465 }
1466 })
1467 .collect::<Vec<_>>();
1468
1469 has_bad_transactions |= new_txs.len() != txs_len;
1470
1471 for tx in &new_txs {
1473 self.transactions_by_peers.insert(*tx.hash(), smallvec::smallvec![peer_id]);
1474 }
1475
1476 if !new_txs.is_empty() {
1479 let pool = self.pool.clone();
1480 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1482 metric_pending_pool_imports.increment(new_txs.len() as f64);
1483
1484 self.pending_pool_imports_info
1486 .pending_pool_imports
1487 .fetch_add(new_txs.len(), Ordering::Relaxed);
1488 let tx_manager_info_pending_pool_imports =
1489 self.pending_pool_imports_info.pending_pool_imports.clone();
1490
1491 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1492 let import = Box::pin(async move {
1493 let added = new_txs.len();
1494 let res = pool.add_external_transactions(new_txs).await;
1495
1496 metric_pending_pool_imports.decrement(added as f64);
1498 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1500
1501 res
1502 });
1503
1504 self.pool_imports.push(import);
1505 }
1506
1507 if num_already_seen_by_peer > 0 {
1508 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1509 self.metrics
1510 .occurrences_of_transaction_already_seen_by_peer
1511 .increment(num_already_seen_by_peer);
1512 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1513 }
1514
1515 if has_bad_transactions {
1516 self.report_peer_bad_transactions(peer_id)
1518 }
1519
1520 if num_already_seen_by_peer > 0 {
1521 self.report_already_seen(peer_id);
1522 }
1523
1524 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1525 }
1526
1527 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1529 match fetch_event {
1530 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1531 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1532 if report_peer {
1533 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1534 }
1535 }
1536 FetchEvent::FetchError { peer_id, error } => {
1537 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1538 self.on_request_error(peer_id, error);
1539 }
1540 FetchEvent::EmptyResponse { peer_id } => {
1541 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1542 }
1543 }
1544 }
1545}
1546
1547impl<
1555 Pool: TransactionPool + Unpin + 'static,
1556 N: NetworkPrimitives<
1557 BroadcastedTransaction: SignedTransaction,
1558 PooledTransaction: SignedTransaction,
1559 > + Unpin,
1560 > Future for TransactionsManager<Pool, N>
1561where
1562 Pool::Transaction:
1563 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1564{
1565 type Output = ();
1566
1567 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1568 let start = Instant::now();
1569 let mut poll_durations = TxManagerPollDurations::default();
1570
1571 let this = self.get_mut();
1572
1573 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1579 poll_durations.acc_network_events,
1580 "net::tx",
1581 "Network events stream",
1582 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1583 this.network_events.poll_next_unpin(cx),
1584 |event| this.on_network_event(event)
1585 );
1586
1587 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1602 poll_durations.acc_tx_events,
1603 "net::tx",
1604 "Network transaction events stream",
1605 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1606 this.transaction_events.poll_next_unpin(cx),
1607 |event: NetworkTransactionEvent<N>| this.on_network_tx_event(event),
1608 );
1609
1610 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1621 poll_durations.acc_fetch_events,
1622 "net::tx",
1623 "Transaction fetch events stream",
1624 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1625 this.transaction_fetcher.poll_next_unpin(cx),
1626 |event| this.on_fetch_event(event),
1627 );
1628
1629 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1644 poll_durations.acc_pending_imports,
1645 "net::tx",
1646 "Batched pool imports stream",
1647 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1648 this.pool_imports.poll_next_unpin(cx),
1649 |batch_results| this.on_batch_import_result(batch_results)
1650 );
1651
1652 let mut new_txs = Vec::new();
1665 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1666 cx,
1667 &mut new_txs,
1668 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1669 ) {
1670 Poll::Ready(count) => {
1671 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1672 true
1675 } else {
1676 let limit =
1680 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1681 new_txs.len();
1682 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1683 }
1684 }
1685 Poll::Pending => false,
1686 };
1687 if !new_txs.is_empty() {
1688 this.on_new_pending_transactions(new_txs);
1689 }
1690
1691 duration_metered_exec!(
1696 {
1697 if this.has_capacity_for_fetching_pending_hashes() &&
1698 this.on_fetch_hashes_pending_fetch()
1699 {
1700 maybe_more_tx_fetch_events = true;
1701 }
1702 },
1703 poll_durations.acc_pending_fetch
1704 );
1705
1706 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1708 poll_durations.acc_cmds,
1709 "net::tx",
1710 "Commands channel",
1711 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1712 this.command_rx.poll_next_unpin(cx),
1713 |cmd| this.on_command(cmd)
1714 );
1715
1716 this.transaction_fetcher.update_metrics();
1717
1718 if maybe_more_network_events ||
1720 maybe_more_commands ||
1721 maybe_more_tx_events ||
1722 maybe_more_tx_fetch_events ||
1723 maybe_more_pool_imports ||
1724 maybe_more_pending_txns
1725 {
1726 cx.waker().wake_by_ref();
1728 return Poll::Pending
1729 }
1730
1731 this.update_poll_metrics(start, poll_durations);
1732
1733 Poll::Pending
1734 }
1735}
1736
1737#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1741enum PropagationMode {
1742 Basic,
1746 Forced,
1751}
1752
1753impl PropagationMode {
1754 const fn is_forced(self) -> bool {
1756 matches!(self, Self::Forced)
1757 }
1758}
1759
1760#[derive(Debug, Clone)]
1762struct PropagateTransaction<T = TransactionSigned> {
1763 size: usize,
1764 transaction: Arc<T>,
1765}
1766
1767impl<T: SignedTransaction> PropagateTransaction<T> {
1768 pub fn new(transaction: T) -> Self {
1770 let size = transaction.length();
1771 Self { size, transaction: Arc::new(transaction) }
1772 }
1773
1774 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1776 where
1777 P: PoolTransaction<Consensus = T>,
1778 {
1779 let size = tx.encoded_length();
1780 let transaction = tx.transaction.clone_into_consensus();
1781 let transaction = Arc::new(transaction.into_inner());
1782 Self { size, transaction }
1783 }
1784
1785 fn tx_hash(&self) -> &TxHash {
1786 self.transaction.tx_hash()
1787 }
1788}
1789
1790#[derive(Debug, Clone)]
1793enum PropagateTransactionsBuilder<T> {
1794 Pooled(PooledTransactionsHashesBuilder),
1795 Full(FullTransactionsBuilder<T>),
1796}
1797
1798impl<T> PropagateTransactionsBuilder<T> {
1799 fn pooled(version: EthVersion, capacity: usize) -> Self {
1802 Self::Pooled(PooledTransactionsHashesBuilder::with_capacity(version, capacity))
1803 }
1804
1805 fn full(version: EthVersion, capacity: usize) -> Self {
1808 Self::Full(FullTransactionsBuilder::with_capacity(version, capacity))
1809 }
1810
1811 fn is_empty(&self) -> bool {
1813 match self {
1814 Self::Pooled(builder) => builder.is_empty(),
1815 Self::Full(builder) => builder.is_empty(),
1816 }
1817 }
1818
1819 fn build(self) -> PropagateTransactions<T> {
1821 match self {
1822 Self::Pooled(pooled) => {
1823 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1824 }
1825 Self::Full(full) => full.build(),
1826 }
1827 }
1828}
1829
1830impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1831 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1833 match self {
1834 Self::Pooled(builder) => builder.push(transaction),
1835 Self::Full(builder) => builder.push(transaction),
1836 }
1837 }
1838}
1839
1840struct PropagateTransactions<T> {
1842 pooled: Option<NewPooledTransactionHashes>,
1844 full: Option<Vec<Arc<T>>>,
1846}
1847
1848#[derive(Debug, Clone)]
1853struct FullTransactionsBuilder<T> {
1854 total_size: usize,
1856 transactions: Vec<Arc<T>>,
1858 pooled: PooledTransactionsHashesBuilder,
1860}
1861
1862impl<T> FullTransactionsBuilder<T> {
1863 fn new(version: EthVersion) -> Self {
1865 Self {
1866 total_size: 0,
1867 pooled: PooledTransactionsHashesBuilder::new(version),
1868 transactions: vec![],
1869 }
1870 }
1871
1872 fn with_capacity(version: EthVersion, capacity: usize) -> Self {
1877 Self {
1878 total_size: 0,
1879 pooled: PooledTransactionsHashesBuilder::new(version),
1880 transactions: Vec::with_capacity(capacity),
1881 }
1882 }
1883
1884 fn is_empty(&self) -> bool {
1886 self.transactions.is_empty() && self.pooled.is_empty()
1887 }
1888
1889 fn build(self) -> PropagateTransactions<T> {
1891 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1892 let full = Some(self.transactions).filter(|full| !full.is_empty());
1893 PropagateTransactions { pooled, full }
1894 }
1895}
1896
1897impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1898 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1900 for tx in txs {
1901 self.push(&tx)
1902 }
1903 }
1904
1905 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1915 if !transaction.transaction.is_broadcastable_in_full() {
1924 self.pooled.push(transaction);
1925 return
1926 }
1927
1928 let new_size = self.total_size + transaction.size;
1929 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1930 self.total_size > 0
1931 {
1932 self.pooled.push(transaction);
1934 return
1935 }
1936
1937 self.total_size = new_size;
1938 self.transactions.push(Arc::clone(&transaction.transaction));
1939 }
1940}
1941
1942#[derive(Debug, Clone)]
1945enum PooledTransactionsHashesBuilder {
1946 Eth66(NewPooledTransactionHashes66),
1947 Eth68(NewPooledTransactionHashes68),
1948 Eth72(NewPooledTransactionHashes72),
1949}
1950
1951impl PooledTransactionsHashesBuilder {
1954 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1956 match self {
1957 Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1958 Self::Eth68(msg) => {
1959 msg.hashes.push(*pooled_tx.hash());
1960 msg.sizes.push(pooled_tx.encoded_length());
1961 msg.types.push(pooled_tx.transaction.ty());
1962 }
1963 Self::Eth72(msg) => {
1964 msg.hashes.push(*pooled_tx.hash());
1965 msg.sizes.push(pooled_tx.encoded_length());
1966 msg.types.push(pooled_tx.transaction.ty());
1967 }
1968 }
1969 }
1970
1971 fn is_empty(&self) -> bool {
1973 match self {
1974 Self::Eth66(hashes) => hashes.is_empty(),
1975 Self::Eth68(hashes) => hashes.is_empty(),
1976 Self::Eth72(hashes) => hashes.is_empty(),
1977 }
1978 }
1979
1980 fn len(&self) -> usize {
1982 match self {
1983 Self::Eth66(hashes) => hashes.len(),
1984 Self::Eth68(hashes) => hashes.len(),
1985 Self::Eth72(hashes) => hashes.len(),
1986 }
1987 }
1988
1989 fn extend<T: SignedTransaction>(
1991 &mut self,
1992 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1993 ) {
1994 for tx in txs {
1995 self.push(&tx);
1996 }
1997 }
1998
1999 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
2000 match self {
2001 Self::Eth66(msg) => msg.push(*tx.tx_hash()),
2002 Self::Eth68(msg) => {
2003 msg.hashes.push(*tx.tx_hash());
2004 msg.sizes.push(tx.size);
2005 msg.types.push(tx.transaction.ty());
2006 }
2007 Self::Eth72(msg) => {
2008 msg.hashes.push(*tx.tx_hash());
2009 msg.sizes.push(tx.size);
2010 msg.types.push(tx.transaction.ty());
2011 }
2012 }
2013 }
2014
2015 fn new(version: EthVersion) -> Self {
2017 match version {
2018 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
2019 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
2020 Self::Eth68(Default::default())
2021 }
2022 EthVersion::Eth72 => Self::Eth72(Default::default()),
2023 }
2024 }
2025
2026 fn with_capacity(version: EthVersion, capacity: usize) -> Self {
2029 match version {
2030 EthVersion::Eth66 | EthVersion::Eth67 => {
2031 Self::Eth66(NewPooledTransactionHashes66::with_capacity(capacity))
2032 }
2033 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
2034 Self::Eth68(NewPooledTransactionHashes68::with_capacity(capacity))
2035 }
2036 EthVersion::Eth72 => Self::Eth72(NewPooledTransactionHashes72::with_capacity(capacity)),
2037 }
2038 }
2039
2040 fn build(self) -> NewPooledTransactionHashes {
2041 match self {
2042 Self::Eth66(mut msg) => {
2043 msg.shrink_to_fit();
2044 msg.into()
2045 }
2046 Self::Eth68(mut msg) => {
2047 msg.shrink_to_fit();
2048 msg.into()
2049 }
2050 Self::Eth72(mut msg) => {
2051 msg.shrink_to_fit();
2052 msg.into()
2053 }
2054 }
2055 }
2056}
2057
2058enum TransactionSource {
2060 Broadcast,
2062 Response,
2064}
2065
2066impl TransactionSource {
2069 const fn is_broadcast(&self) -> bool {
2071 matches!(self, Self::Broadcast)
2072 }
2073}
2074
2075#[derive(Debug)]
2077pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2078 seen_transactions: LruCache<TxHash, FbBuildHasher<32>>,
2082 request_tx: PeerRequestSender<PeerRequest<N>>,
2084 version: EthVersion,
2086 client_version: Arc<str>,
2088 peer_kind: PeerKind,
2090}
2091
2092impl<N: NetworkPrimitives> PeerMetadata<N> {
2093 pub fn new(
2095 request_tx: PeerRequestSender<PeerRequest<N>>,
2096 version: EthVersion,
2097 client_version: Arc<str>,
2098 max_transactions_seen_by_peer: u32,
2099 peer_kind: PeerKind,
2100 ) -> Self {
2101 Self {
2102 seen_transactions: LruCache::with_hasher(
2103 max_transactions_seen_by_peer,
2104 Default::default(),
2105 ),
2106 request_tx,
2107 version,
2108 client_version,
2109 peer_kind,
2110 }
2111 }
2112
2113 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2115 &self.request_tx
2116 }
2117
2118 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash, FbBuildHasher<32>> {
2120 &mut self.seen_transactions
2121 }
2122
2123 pub const fn version(&self) -> EthVersion {
2125 self.version
2126 }
2127
2128 pub fn client_version(&self) -> &str {
2130 &self.client_version
2131 }
2132
2133 pub const fn peer_kind(&self) -> PeerKind {
2135 self.peer_kind
2136 }
2137}
2138
2139#[derive(Debug)]
2141enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2142 PropagateHash(B256),
2144 PropagateHashesTo(Vec<B256>, PeerId),
2146 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2148 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2150 PropagateTransactions(Vec<TxHash>),
2152 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2154 GetTransactionHashes { peers: Vec<PeerId>, tx: oneshot::Sender<HashMap<PeerId, B256Set>> },
2156 GetPeerSender {
2158 peer_id: PeerId,
2159 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2160 },
2161}
2162
2163#[derive(Debug)]
2165pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2166 IncomingTransactions {
2170 peer_id: PeerId,
2172 msg: Transactions<N::BroadcastedTransaction>,
2174 },
2175 IncomingPooledTransactionHashes {
2177 peer_id: PeerId,
2179 msg: NewPooledTransactionHashes,
2181 },
2182 GetPooledTransactions {
2184 peer_id: PeerId,
2186 request: GetPooledTransactions,
2188 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2190 },
2191 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2193}
2194
2195#[derive(Debug)]
2197pub struct PendingPoolImportsInfo {
2198 pending_pool_imports: Arc<AtomicUsize>,
2200 max_pending_pool_imports: usize,
2202}
2203
2204impl PendingPoolImportsInfo {
2205 pub fn new(max_pending_pool_imports: usize) -> Self {
2207 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2208 }
2209
2210 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2212 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2213 }
2214}
2215
2216impl Default for PendingPoolImportsInfo {
2217 fn default() -> Self {
2218 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2219 }
2220}
2221
2222#[derive(Debug, Default)]
2223struct TxManagerPollDurations {
2224 acc_network_events: Duration,
2225 acc_pending_imports: Duration,
2226 acc_tx_events: Duration,
2227 acc_imported_txns: Duration,
2228 acc_fetch_events: Duration,
2229 acc_pending_fetch: Duration,
2230 acc_cmds: Duration,
2231}
2232
2233impl<N: NetworkPrimitives> InMemorySize for NetworkTransactionEvent<N> {
2234 fn size(&self) -> usize {
2237 match self {
2238 Self::IncomingTransactions { peer_id, msg } => {
2239 core::mem::size_of_val(peer_id) +
2240 msg.0.iter().map(InMemorySize::size).sum::<usize>()
2241 }
2242 Self::IncomingPooledTransactionHashes { peer_id, msg } => {
2243 core::mem::size_of_val(peer_id) + msg.size()
2244 }
2245 Self::GetPooledTransactions { peer_id, request, response } => {
2246 core::mem::size_of_val(peer_id) +
2247 request.0.len() * core::mem::size_of::<TxHash>() +
2248 core::mem::size_of_val(response)
2249 }
2250 Self::GetTransactionsHandle(_) => 0,
2251 }
2252 }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257 use super::*;
2258 use crate::{
2259 test_utils::{
2260 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2261 Testnet,
2262 },
2263 transactions::config::RelaxedEthAnnouncementFilter,
2264 NetworkConfigBuilder, NetworkManager,
2265 };
2266 use alloy_consensus::{TxEip1559, TxLegacy};
2267 use alloy_eips::eip4844::BlobTransactionValidationError;
2268 use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2269 use alloy_rlp::Decodable;
2270 use futures::FutureExt;
2271 use reth_chainspec::MIN_TRANSACTION_GAS;
2272 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2273 use reth_network_api::{NetworkInfo, PeerKind};
2274 use reth_network_p2p::{
2275 error::{RequestError, RequestResult},
2276 sync::{NetworkSyncUpdater, SyncState},
2277 };
2278 use reth_storage_api::noop::NoopProvider;
2279 use reth_tasks::Runtime;
2280 use reth_transaction_pool::{
2281 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2282 test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2283 };
2284 use secp256k1::SecretKey;
2285 use std::{
2286 future::poll_fn,
2287 net::{IpAddr, Ipv4Addr, SocketAddr},
2288 str::FromStr,
2289 };
2290 use tracing::error;
2291
2292 #[tokio::test(flavor = "multi_thread")]
2293 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2294 reth_tracing::init_test_tracing();
2295 let net = Testnet::create(3).await;
2296
2297 let mut handles = net.handles();
2298 let handle0 = handles.next().unwrap();
2299 let handle1 = handles.next().unwrap();
2300
2301 drop(handles);
2302 let handle = net.spawn();
2303
2304 let listener0 = handle0.event_listener();
2305 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2306 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2307
2308 let client = NoopProvider::default();
2309 let pool = testing_pool();
2310 let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2311 .disable_discovery()
2312 .listener_port(0)
2313 .build(client);
2314 let transactions_manager_config = config.transactions_manager_config.clone();
2315 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2316 .await
2317 .unwrap()
2318 .into_builder()
2319 .transactions(pool.clone(), transactions_manager_config)
2320 .split_with_handle();
2321
2322 tokio::task::spawn(network);
2323
2324 network_handle.update_sync_state(SyncState::Syncing);
2326 assert!(NetworkInfo::is_syncing(&network_handle));
2327 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2328
2329 let mut established = listener0.take(2);
2331 while let Some(ev) = established.next().await {
2332 match ev {
2333 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2334 transactions
2336 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2337 }
2338 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2339 ev => {
2340 error!("unexpected event {ev:?}")
2341 }
2342 }
2343 }
2344 let input = hex!(
2346 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2347 );
2348 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2349 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2350 peer_id: *handle1.peer_id(),
2351 msg: Transactions(vec![signed_tx.clone()]),
2352 });
2353 poll_fn(|cx| {
2354 let _ = transactions.poll_unpin(cx);
2355 Poll::Ready(())
2356 })
2357 .await;
2358 assert!(pool.is_empty());
2359 handle.terminate().await;
2360 }
2361
2362 #[tokio::test(flavor = "multi_thread")]
2363 async fn test_tx_broadcasts_through_two_syncs() {
2364 reth_tracing::init_test_tracing();
2365 let net = Testnet::create(3).await;
2366
2367 let mut handles = net.handles();
2368 let handle0 = handles.next().unwrap();
2369 let handle1 = handles.next().unwrap();
2370
2371 drop(handles);
2372 let handle = net.spawn();
2373
2374 let listener0 = handle0.event_listener();
2375 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2376 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2377
2378 let client = NoopProvider::default();
2379 let pool = testing_pool();
2380 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2381 .disable_discovery()
2382 .listener_port(0)
2383 .build(client);
2384 let transactions_manager_config = config.transactions_manager_config.clone();
2385 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2386 .await
2387 .unwrap()
2388 .into_builder()
2389 .transactions(pool.clone(), transactions_manager_config)
2390 .split_with_handle();
2391
2392 tokio::task::spawn(network);
2393
2394 network_handle.update_sync_state(SyncState::Syncing);
2396 assert!(NetworkInfo::is_syncing(&network_handle));
2397 network_handle.update_sync_state(SyncState::Idle);
2398 assert!(!NetworkInfo::is_syncing(&network_handle));
2399 network_handle.update_sync_state(SyncState::Syncing);
2400 assert!(NetworkInfo::is_syncing(&network_handle));
2401
2402 let mut established = listener0.take(2);
2404 while let Some(ev) = established.next().await {
2405 match ev {
2406 NetworkEvent::ActivePeerSession { .. } |
2407 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2408 transactions.on_network_event(ev);
2410 }
2411 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2412 _ => {
2413 error!("unexpected event {ev:?}")
2414 }
2415 }
2416 }
2417 let input = hex!(
2419 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2420 );
2421 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2422 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2423 peer_id: *handle1.peer_id(),
2424 msg: Transactions(vec![signed_tx.clone()]),
2425 });
2426 poll_fn(|cx| {
2427 let _ = transactions.poll_unpin(cx);
2428 Poll::Ready(())
2429 })
2430 .await;
2431 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2432 assert!(NetworkInfo::is_syncing(&network_handle));
2433 assert!(!pool.is_empty());
2434 handle.terminate().await;
2435 }
2436
2437 #[tokio::test(flavor = "multi_thread")]
2440 async fn test_handle_incoming_transactions_hashes() {
2441 reth_tracing::init_test_tracing();
2442
2443 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2444 let client = NoopProvider::default();
2445
2446 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2447 .listener_port(0)
2449 .disable_discovery()
2450 .build(client);
2451
2452 let pool = testing_pool();
2453
2454 let transactions_manager_config = config.transactions_manager_config.clone();
2455 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2456 .await
2457 .unwrap()
2458 .into_builder()
2459 .transactions(pool.clone(), transactions_manager_config)
2460 .split_with_handle();
2461
2462 let peer_id_1 = PeerId::new([1; 64]);
2463 let eth_version = EthVersion::Eth66;
2464
2465 let txs = vec![TransactionSigned::new_unhashed(
2466 Transaction::Legacy(TxLegacy {
2467 chain_id: Some(4),
2468 nonce: 15u64,
2469 gas_price: 2200000000,
2470 gas_limit: 34811,
2471 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2472 value: U256::from(1234u64),
2473 input: Default::default(),
2474 }),
2475 Signature::new(
2476 U256::from_str(
2477 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2478 )
2479 .unwrap(),
2480 U256::from_str(
2481 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2482 )
2483 .unwrap(),
2484 true,
2485 ),
2486 )];
2487
2488 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2489
2490 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2491 tx_manager.peers.insert(peer_id_1, peer_1);
2492
2493 assert!(pool.is_empty());
2494
2495 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2496 peer_id: peer_id_1,
2497 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2498 txs_hashes.clone(),
2499 )),
2500 });
2501
2502 let req = to_mock_session_rx
2504 .recv()
2505 .await
2506 .expect("peer_1 session should receive request with buffered hashes");
2507 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2508 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2509
2510 let message: Vec<PooledTransactionVariant> = txs
2511 .into_iter()
2512 .map(|tx| {
2513 PooledTransactionVariant::try_from(tx)
2514 .expect("Failed to convert MockTransaction to PooledTransaction")
2515 })
2516 .collect();
2517
2518 response
2520 .send(Ok(PooledTransactions(message)))
2521 .expect("should send peer_1 response to tx manager");
2522
2523 poll_fn(|cx| {
2525 let _ = tx_manager.poll_unpin(cx);
2526 Poll::Ready(())
2527 })
2528 .await;
2529
2530 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2533 }
2534
2535 #[tokio::test(flavor = "multi_thread")]
2536 async fn test_handle_incoming_transactions() {
2537 reth_tracing::init_test_tracing();
2538 let net = Testnet::create(3).await;
2539
2540 let mut handles = net.handles();
2541 let handle0 = handles.next().unwrap();
2542 let handle1 = handles.next().unwrap();
2543
2544 drop(handles);
2545 let handle = net.spawn();
2546
2547 let listener0 = handle0.event_listener();
2548
2549 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2550 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2551
2552 let client = NoopProvider::default();
2553 let pool = testing_pool();
2554 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2555 .disable_discovery()
2556 .listener_port(0)
2557 .build(client);
2558 let transactions_manager_config = config.transactions_manager_config.clone();
2559 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2560 .await
2561 .unwrap()
2562 .into_builder()
2563 .transactions(pool.clone(), transactions_manager_config)
2564 .split_with_handle();
2565 tokio::task::spawn(network);
2566
2567 network_handle.update_sync_state(SyncState::Idle);
2568
2569 assert!(!NetworkInfo::is_syncing(&network_handle));
2570
2571 let mut established = listener0.take(2);
2573 while let Some(ev) = established.next().await {
2574 match ev {
2575 NetworkEvent::ActivePeerSession { .. } |
2576 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2577 transactions.on_network_event(ev);
2579 }
2580 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2581 ev => {
2582 error!("unexpected event {ev:?}")
2583 }
2584 }
2585 }
2586 let input = hex!(
2588 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2589 );
2590 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2591 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2592 peer_id: *handle1.peer_id(),
2593 msg: Transactions(vec![signed_tx.clone()]),
2594 });
2595 assert!(transactions
2596 .transactions_by_peers
2597 .get(signed_tx.tx_hash())
2598 .unwrap()
2599 .contains(handle1.peer_id()));
2600
2601 poll_fn(|cx| {
2603 let _ = transactions.poll_unpin(cx);
2604 Poll::Ready(())
2605 })
2606 .await;
2607
2608 assert!(!pool.is_empty());
2609 assert!(pool.get(signed_tx.tx_hash()).is_some());
2610 handle.terminate().await;
2611 }
2612
2613 #[tokio::test(flavor = "multi_thread")]
2614 async fn test_session_closed_cleans_transaction_peer_state() {
2615 let (mut tx_manager, _network) = new_tx_manager().await;
2616 let peer_id = PeerId::new([1; 64]);
2617 let fallback_peer = PeerId::new([2; 64]);
2618 let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2619 let hash_shared = B256::from_slice(&[1; 32]);
2620
2621 tx_manager.peers.insert(peer_id, peer);
2622 buffer_hash_to_tx_fetcher(
2623 &mut tx_manager.transaction_fetcher,
2624 hash_shared,
2625 peer_id,
2626 0,
2627 None,
2628 );
2629 buffer_hash_to_tx_fetcher(
2630 &mut tx_manager.transaction_fetcher,
2631 hash_shared,
2632 fallback_peer,
2633 0,
2634 None,
2635 );
2636 tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2637
2638 tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2639 peer_id,
2640 reason: None,
2641 }));
2642
2643 assert!(!tx_manager.peers.contains_key(&peer_id));
2645 assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2646 assert_eq!(
2648 tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2649 Some(&fallback_peer)
2650 );
2651 }
2652
2653 #[tokio::test(flavor = "multi_thread")]
2654 async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2655 let (mut tx_manager, _network) = new_tx_manager().await;
2656 let peer_id = PeerId::new([1; 64]);
2657 let hash = B256::from_slice(&[1; 32]);
2658
2659 tx_manager.network.update_sync_state(SyncState::Idle);
2660 tx_manager.transactions_by_peers.insert(hash, smallvec::smallvec![peer_id]);
2661
2662 let err = PoolError::new(
2663 hash,
2664 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2665 BlobTransactionValidationError::InvalidProof,
2666 )),
2667 );
2668
2669 tx_manager.on_bad_import(err);
2670
2671 assert!(!tx_manager.bad_imports.contains(&hash));
2672 }
2673
2674 #[tokio::test(flavor = "multi_thread")]
2675 async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2676 let (mut tx_manager, _network) = new_tx_manager().await;
2677 let peer_id = PeerId::new([1; 64]);
2678 let hash = B256::from_slice(&[3; 32]);
2679
2680 tx_manager.network.update_sync_state(SyncState::Idle);
2681 tx_manager.transactions_by_peers.insert(hash, smallvec::smallvec![peer_id]);
2682
2683 let err = PoolError::new(
2684 hash,
2685 InvalidPoolTransactionError::Eip4844(
2686 Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2687 ),
2688 );
2689
2690 tx_manager.on_bad_import(err);
2691
2692 assert!(!tx_manager.bad_imports.contains(&hash));
2693 }
2694
2695 #[tokio::test(flavor = "multi_thread")]
2696 async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2697 let (mut tx_manager, _network) = new_tx_manager().await;
2698 let peer_id = PeerId::new([1; 64]);
2699 let hash = B256::from_slice(&[2; 32]);
2700
2701 tx_manager.network.update_sync_state(SyncState::Idle);
2702 tx_manager.transactions_by_peers.insert(hash, smallvec::smallvec![peer_id]);
2703
2704 let err = PoolError::new(
2705 hash,
2706 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2707 );
2708
2709 tx_manager.on_bad_import(err);
2710
2711 assert!(tx_manager.bad_imports.contains(&hash));
2712 }
2713
2714 #[tokio::test(flavor = "multi_thread")]
2715 async fn test_on_get_pooled_transactions_network() {
2716 reth_tracing::init_test_tracing();
2717 let net = Testnet::create(2).await;
2718
2719 let mut handles = net.handles();
2720 let handle0 = handles.next().unwrap();
2721 let handle1 = handles.next().unwrap();
2722
2723 drop(handles);
2724 let handle = net.spawn();
2725
2726 let listener0 = handle0.event_listener();
2727
2728 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2729 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2730
2731 let client = NoopProvider::default();
2732 let pool = testing_pool();
2733 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2734 .disable_discovery()
2735 .listener_port(0)
2736 .build(client);
2737 let transactions_manager_config = config.transactions_manager_config.clone();
2738 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2739 .await
2740 .unwrap()
2741 .into_builder()
2742 .transactions(pool.clone(), transactions_manager_config)
2743 .split_with_handle();
2744 tokio::task::spawn(network);
2745
2746 network_handle.update_sync_state(SyncState::Idle);
2747
2748 assert!(!NetworkInfo::is_syncing(&network_handle));
2749
2750 let mut established = listener0.take(2);
2752 while let Some(ev) = established.next().await {
2753 match ev {
2754 NetworkEvent::ActivePeerSession { .. } |
2755 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2756 transactions.on_network_event(ev);
2757 }
2758 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2759 ev => {
2760 error!("unexpected event {ev:?}")
2761 }
2762 }
2763 }
2764 handle.terminate().await;
2765
2766 let tx = MockTransaction::eip1559();
2767 let _ = transactions
2768 .pool
2769 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2770 .await;
2771
2772 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2773
2774 let (send, receive) =
2775 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2776
2777 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2778 peer_id: *handle1.peer_id(),
2779 request,
2780 response: send,
2781 });
2782
2783 match receive.await.unwrap() {
2784 Ok(PooledTransactions(transactions)) => {
2785 assert_eq!(transactions.len(), 1);
2786 }
2787 Err(e) => {
2788 panic!("error: {e:?}");
2789 }
2790 }
2791 }
2792
2793 #[tokio::test]
2797 async fn test_partially_tx_response() {
2798 reth_tracing::init_test_tracing();
2799
2800 let mut tx_manager = new_tx_manager().await.0;
2801 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2802
2803 let peer_id_1 = PeerId::new([1; 64]);
2804 let eth_version = EthVersion::Eth66;
2805
2806 let txs = vec![
2807 TransactionSigned::new_unhashed(
2808 Transaction::Legacy(TxLegacy {
2809 chain_id: Some(4),
2810 nonce: 15u64,
2811 gas_price: 2200000000,
2812 gas_limit: 34811,
2813 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2814 value: U256::from(1234u64),
2815 input: Default::default(),
2816 }),
2817 Signature::new(
2818 U256::from_str(
2819 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2820 )
2821 .unwrap(),
2822 U256::from_str(
2823 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2824 )
2825 .unwrap(),
2826 true,
2827 ),
2828 ),
2829 TransactionSigned::new_unhashed(
2830 Transaction::Eip1559(TxEip1559 {
2831 chain_id: 4,
2832 nonce: 26u64,
2833 max_priority_fee_per_gas: 1500000000,
2834 max_fee_per_gas: 1500000013,
2835 gas_limit: MIN_TRANSACTION_GAS,
2836 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2837 value: U256::from(3000000000000000000u64),
2838 input: Default::default(),
2839 access_list: Default::default(),
2840 }),
2841 Signature::new(
2842 U256::from_str(
2843 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2844 )
2845 .unwrap(),
2846 U256::from_str(
2847 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2848 )
2849 .unwrap(),
2850 true,
2851 ),
2852 ),
2853 ];
2854
2855 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2856
2857 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2858 peer_1.seen_transactions.insert(txs_hashes[0]);
2861 peer_1.seen_transactions.insert(txs_hashes[1]);
2862 tx_manager.peers.insert(peer_id_1, peer_1);
2863
2864 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2865 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2866
2867 assert!(tx_fetcher.is_idle(&peer_id_1));
2869 assert_eq!(tx_fetcher.active_peers.len(), 0);
2870
2871 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2873
2874 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2875 assert!(!tx_fetcher.is_idle(&peer_id_1));
2877 assert_eq!(tx_fetcher.active_peers.len(), 1);
2878
2879 let req = to_mock_session_rx
2881 .recv()
2882 .await
2883 .expect("peer_1 session should receive request with buffered hashes");
2884 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2885
2886 let message: Vec<PooledTransactionVariant> = txs
2887 .into_iter()
2888 .take(1)
2889 .map(|tx| {
2890 PooledTransactionVariant::try_from(tx)
2891 .expect("Failed to convert MockTransaction to PooledTransaction")
2892 })
2893 .collect();
2894 response
2896 .send(Ok(PooledTransactions(message)))
2897 .expect("should send peer_1 response to tx manager");
2898 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2899 unreachable!()
2900 };
2901
2902 assert!(tx_fetcher.is_idle(&peer_id));
2904 assert_eq!(tx_fetcher.active_peers.len(), 0);
2905 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2907 }
2908
2909 #[tokio::test]
2910 async fn test_max_retries_tx_request() {
2911 reth_tracing::init_test_tracing();
2912
2913 let mut tx_manager = new_tx_manager().await.0;
2914 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2915
2916 let peer_id_1 = PeerId::new([1; 64]);
2917 let peer_id_2 = PeerId::new([2; 64]);
2918 let eth_version = EthVersion::Eth66;
2919 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2920
2921 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2922 peer_1.seen_transactions.insert(seen_hashes[0]);
2925 peer_1.seen_transactions.insert(seen_hashes[1]);
2926 tx_manager.peers.insert(peer_id_1, peer_1);
2927
2928 let retries = 1;
2931 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2932 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2933
2934 assert!(tx_fetcher.is_idle(&peer_id_1));
2936 assert_eq!(tx_fetcher.active_peers.len(), 0);
2937
2938 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2940
2941 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2942
2943 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2944 assert!(!tx_fetcher.is_idle(&peer_id_1));
2946 assert_eq!(tx_fetcher.active_peers.len(), 1);
2947
2948 let req = to_mock_session_rx
2950 .recv()
2951 .await
2952 .expect("peer_1 session should receive request with buffered hashes");
2953 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2954 let GetPooledTransactions(hashes) = request;
2955
2956 let hashes = hashes.into_iter().collect::<B256Set>();
2957
2958 assert_eq!(hashes, seen_hashes.into_iter().collect::<B256Set>());
2959
2960 response
2962 .send(Err(RequestError::BadResponse))
2963 .expect("should send peer_1 response to tx manager");
2964 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2965 unreachable!()
2966 };
2967
2968 assert!(tx_fetcher.is_idle(&peer_id));
2970 assert_eq!(tx_fetcher.active_peers.len(), 0);
2971 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2973
2974 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2975 tx_manager.peers.insert(peer_id_2, peer_2);
2976
2977 let msg =
2979 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2980 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2981
2982 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2983
2984 assert_eq!(tx_fetcher.active_peers.len(), 1);
2986
2987 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2989 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2991
2992 let req = to_mock_session_rx
2994 .recv()
2995 .await
2996 .expect("peer_2 session should receive request with buffered hashes");
2997 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2998
2999 response
3001 .send(Err(RequestError::BadResponse))
3002 .expect("should send peer_2 response to tx manager");
3003 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
3004
3005 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
3008 assert_eq!(tx_fetcher.active_peers.len(), 0);
3009 }
3010
3011 #[test]
3012 fn test_transaction_builder_empty() {
3013 let mut builder =
3014 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68, 0);
3015 assert!(builder.is_empty());
3016
3017 let mut factory = MockTransactionFactory::default();
3018 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
3019 builder.push(&tx);
3020 assert!(!builder.is_empty());
3021
3022 let txs = builder.build();
3023 assert!(txs.full.is_none());
3024 let txs = txs.pooled.unwrap();
3025 assert_eq!(txs.len(), 1);
3026 }
3027
3028 #[test]
3029 fn test_transaction_builder_large() {
3030 let mut builder =
3031 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68, 0);
3032 assert!(builder.is_empty());
3033
3034 let mut factory = MockTransactionFactory::default();
3035 let mut tx = factory.create_eip1559();
3036 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
3038 let tx = Arc::new(tx);
3039 let tx = PropagateTransaction::pool_tx(tx);
3040 builder.push(&tx);
3041 assert!(!builder.is_empty());
3042
3043 let txs = builder.clone().build();
3044 assert!(txs.pooled.is_none());
3045 let txs = txs.full.unwrap();
3046 assert_eq!(txs.len(), 1);
3047
3048 builder.push(&tx);
3049
3050 let txs = builder.clone().build();
3051 let pooled = txs.pooled.unwrap();
3052 assert_eq!(pooled.len(), 1);
3053 let txs = txs.full.unwrap();
3054 assert_eq!(txs.len(), 1);
3055 }
3056
3057 #[test]
3058 fn test_transaction_builder_eip4844() {
3059 let mut builder =
3060 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68, 0);
3061 assert!(builder.is_empty());
3062
3063 let mut factory = MockTransactionFactory::default();
3064 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
3065 builder.push(&tx);
3066 assert!(!builder.is_empty());
3067
3068 let txs = builder.clone().build();
3069 assert!(txs.full.is_none());
3070 let txs = txs.pooled.unwrap();
3071 assert_eq!(txs.len(), 1);
3072
3073 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
3074 builder.push(&tx);
3075
3076 let txs = builder.clone().build();
3077 let pooled = txs.pooled.unwrap();
3078 assert_eq!(pooled.len(), 1);
3079 let txs = txs.full.unwrap();
3080 assert_eq!(txs.len(), 1);
3081 }
3082
3083 #[tokio::test]
3084 async fn test_propagate_full() {
3085 reth_tracing::init_test_tracing();
3086
3087 let (mut tx_manager, network) = new_tx_manager().await;
3088 let peer_id = PeerId::random();
3089
3090 network.handle().update_sync_state(SyncState::Idle);
3092
3093 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3095
3096 let session_info = SessionInfo {
3097 peer_id,
3098 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3099 client_version: Arc::from(""),
3100 capabilities: Arc::new(vec![].into()),
3101 status: Arc::new(Default::default()),
3102 version: EthVersion::Eth68,
3103 peer_kind: PeerKind::Basic,
3104 };
3105 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3106 tx_manager
3107 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3108 let mut propagate = vec![];
3109 let mut factory = MockTransactionFactory::default();
3110 let eip1559_tx = Arc::new(factory.create_eip1559());
3111 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3112 let eip4844_tx = Arc::new(factory.create_eip4844());
3113 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3114
3115 let propagated =
3116 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3117 assert_eq!(propagated.len(), 2);
3118 let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3119 assert_eq!(prop_txs.len(), 1);
3120 assert!(prop_txs[0].is_full());
3121
3122 let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3123 assert_eq!(prop_txs.len(), 1);
3124 assert!(prop_txs[0].is_hash());
3125
3126 let peer = tx_manager.peers.get(&peer_id).unwrap();
3127 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3128 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3129 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3130
3131 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3133 assert!(propagated.is_empty());
3134 }
3135
3136 #[tokio::test]
3137 async fn test_truncated_hash_announcement_not_marked_seen() {
3138 reth_tracing::init_test_tracing();
3139
3140 let (mut tx_manager, network) = new_tx_manager().await;
3141 tx_manager.config.propagation_mode = TransactionPropagationMode::Max(0);
3143
3144 network.handle().update_sync_state(SyncState::Idle);
3146
3147 let peer_id = PeerId::random();
3148 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3149 tx_manager.peers.insert(peer_id, peer);
3150
3151 let mut factory = MockTransactionFactory::default();
3153 let txs = (0..=SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE)
3154 .map(|_| PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559())))
3155 .collect::<Vec<_>>();
3156 let last_sent = *txs[txs.len() - 2].tx_hash();
3157 let truncated = *txs[txs.len() - 1].tx_hash();
3158
3159 let propagated = tx_manager.propagate_transactions(txs, PropagationMode::Basic);
3160
3161 assert!(propagated.get(&truncated).is_none());
3163 let peer = tx_manager.peers.get(&peer_id).unwrap();
3164 assert!(!peer.seen_transactions.contains(&truncated));
3165 assert!(peer.seen_transactions.contains(&last_sent));
3166 }
3167
3168 #[tokio::test]
3169 async fn test_propagate_pending_txs_while_initially_syncing() {
3170 reth_tracing::init_test_tracing();
3171
3172 let (mut tx_manager, network) = new_tx_manager().await;
3173 let peer_id = PeerId::random();
3174
3175 network.handle().update_sync_state(SyncState::Syncing);
3177 assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3178
3179 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3181 tx_manager.peers.insert(peer_id, peer);
3182
3183 let tx = MockTransaction::eip1559();
3184 tx_manager
3185 .pool
3186 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3187 .await
3188 .expect("transaction should be accepted into the pool");
3189
3190 tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3191
3192 let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3193 assert!(peer.seen_transactions.contains(tx.get_hash()));
3194 }
3195
3196 #[tokio::test]
3197 async fn test_relaxed_filter_ignores_unknown_tx_types() {
3198 reth_tracing::init_test_tracing();
3199
3200 let transactions_manager_config = TransactionsManagerConfig::default();
3201
3202 let propagation_policy = TransactionPropagationKind::default();
3203 let announcement_policy = RelaxedEthAnnouncementFilter::default();
3204
3205 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3206
3207 let pool = testing_pool();
3208 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3209 let client = NoopProvider::default();
3210
3211 let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3212 .listener_port(0)
3213 .disable_discovery()
3214 .build(client.clone());
3215
3216 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3217 let (to_tx_manager_tx, from_network_rx) =
3218 reth_metrics::common::mpsc::memory_bounded_channel::<
3219 NetworkTransactionEvent<EthNetworkPrimitives>,
3220 >(
3221 crate::transactions::constants::tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
3222 "test_tx_channel",
3223 );
3224 network_manager.set_transactions(to_tx_manager_tx);
3225 let network_handle = network_manager.handle().clone();
3226 let network_service_handle = tokio::spawn(network_manager);
3227
3228 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3229 network_handle.clone(),
3230 pool.clone(),
3231 from_network_rx,
3232 transactions_manager_config,
3233 policy_bundle,
3234 );
3235
3236 let peer_id = PeerId::random();
3237 let eth_version = EthVersion::Eth68;
3238 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3239 tx_manager.peers.insert(peer_id, mock_peer_metadata);
3240
3241 let mut tx_factory = MockTransactionFactory::default();
3242
3243 let valid_known_tx = tx_factory.create_eip1559();
3244 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3245
3246 let known_tx_hash = *known_tx_signed.hash();
3247 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3248 let known_tx_size = known_tx_signed.encoded_length();
3249
3250 let unknown_tx_hash = B256::random();
3251 let unknown_tx_type_byte = 0xff_u8;
3252 let unknown_tx_size = 150;
3253
3254 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3255 types: vec![known_tx_type_byte, unknown_tx_type_byte],
3256 sizes: vec![known_tx_size, unknown_tx_size],
3257 hashes: vec![known_tx_hash, unknown_tx_hash],
3258 });
3259
3260 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3261
3262 poll_fn(|cx| {
3263 let _ = tx_manager.poll_unpin(cx);
3264 Poll::Ready(())
3265 })
3266 .await;
3267
3268 let mut requested_hashes_in_getpooled = B256Set::default();
3269 let mut unexpected_request_received = false;
3270
3271 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3272 .await
3273 {
3274 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3275 let GetPooledTransactions(hashes) = request;
3276 for hash in hashes {
3277 requested_hashes_in_getpooled.insert(hash);
3278 }
3279 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3280 }
3281 Ok(Some(other_request)) => {
3282 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3283 unexpected_request_received = true;
3284 }
3285 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3286 Err(_timeout_err) => {
3287 tracing::info!("Timeout: No GetPooledTransactions request received.")
3288 }
3289 }
3290
3291 assert!(
3292 requested_hashes_in_getpooled.contains(&known_tx_hash),
3293 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3294 );
3295 assert!(
3296 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3297 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3298 );
3299 assert!(
3300 !unexpected_request_received,
3301 "An unexpected P2P request was received by the mock peer."
3302 );
3303
3304 network_service_handle.abort();
3305 }
3306}