1use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6pub mod config;
8pub mod constants;
10pub mod fetcher;
12pub mod policy;
14
15pub use self::constants::{
16 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30 budget::{
31 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33 },
34 cache::LruCache,
35 duration_metered_exec, metered_poll_nested_stream_with_budget,
36 metrics::{AnnouncedTxTypesMetrics, TransactionsManagerMetrics},
37 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
38 NetworkHandle, TxTypesCounter,
39};
40use alloy_primitives::{
41 map::{B256Map, B256Set, FbBuildHasher},
42 TxHash, B256,
43};
44use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
45use futures::{stream::FuturesUnordered, Future, StreamExt};
46use reth_eth_wire::{
47 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
48 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
49 NewPooledTransactionHashes66, NewPooledTransactionHashes68, NewPooledTransactionHashes72,
50 PooledTransactions, RequestTxHashes, Transactions, ValidAnnouncementData,
51};
52use reth_ethereum_primitives::{TransactionSigned, TxType};
53use reth_metrics::common::mpsc::MemoryBoundedReceiver;
54use reth_network_api::{
55 events::{PeerEvent, SessionInfo},
56 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
57};
58use reth_network_p2p::{
59 error::{RequestError, RequestResult},
60 sync::SyncStateProvider,
61};
62use reth_network_peers::PeerId;
63use reth_network_types::ReputationChangeKind;
64use reth_primitives_traits::{InMemorySize, SignedTransaction};
65use reth_tokio_util::EventStream;
66use reth_transaction_pool::{
67 error::{PoolError, PoolResult},
68 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
69 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
70};
71use std::{
72 collections::{hash_map::Entry, HashMap, HashSet},
73 pin::Pin,
74 sync::{
75 atomic::{AtomicUsize, Ordering},
76 Arc,
77 },
78 task::{Context, Poll},
79 time::{Duration, Instant},
80};
81use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
82use tokio_stream::wrappers::UnboundedReceiverStream;
83use tracing::{debug, trace};
84
85pub type PoolImportFuture =
89 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
90
91#[derive(Debug, Clone)]
99pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
100 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
102}
103
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105 fn send(&self, cmd: TransactionsCommand<N>) {
106 let _ = self.manager_tx.send(cmd);
107 }
108
109 async fn peer_handle(
111 &self,
112 peer_id: PeerId,
113 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114 let (tx, rx) = oneshot::channel();
115 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116 rx.await
117 }
118
119 pub fn propagate(&self, hash: TxHash) {
121 self.send(TransactionsCommand::PropagateHash(hash))
122 }
123
124 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128 self.propagate_hashes_to(Some(hash), peer)
129 }
130
131 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135 let hashes = hash.into_iter().collect::<Vec<_>>();
136 if hashes.is_empty() {
137 return
138 }
139 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140 }
141
142 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144 let (tx, rx) = oneshot::channel();
145 self.send(TransactionsCommand::GetActivePeers(tx));
146 rx.await
147 }
148
149 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153 if transactions.is_empty() {
154 return
155 }
156 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157 }
158
159 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164 if transactions.is_empty() {
165 return
166 }
167 self.send(TransactionsCommand::PropagateTransactions(transactions))
168 }
169
170 pub fn broadcast_transactions(
175 &self,
176 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177 ) {
178 let transactions =
179 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180 if transactions.is_empty() {
181 return
182 }
183 self.send(TransactionsCommand::BroadcastTransactions(transactions))
184 }
185
186 pub async fn get_transaction_hashes(
188 &self,
189 peers: Vec<PeerId>,
190 ) -> Result<HashMap<PeerId, B256Set>, RecvError> {
191 if peers.is_empty() {
192 return Ok(Default::default())
193 }
194 let (tx, rx) = oneshot::channel();
195 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196 rx.await
197 }
198
199 pub async fn get_peer_transaction_hashes(&self, peer: PeerId) -> Result<B256Set, RecvError> {
201 let res = self.get_transaction_hashes(vec![peer]).await?;
202 Ok(res.into_values().next().unwrap_or_default())
203 }
204
205 pub async fn get_pooled_transactions_from(
211 &self,
212 peer_id: PeerId,
213 hashes: Vec<B256>,
214 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
215 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
216
217 let (tx, rx) = oneshot::channel();
218 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
219 peer.try_send(request).ok();
220
221 rx.await?.map(|res| Some(res.0))
222 }
223}
224
225#[derive(Debug)]
280#[must_use = "Manager does nothing unless polled."]
281pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
282 pool: Pool,
284 network: NetworkHandle<N>,
286 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
290 transaction_fetcher: TransactionFetcher<N>,
292 transactions_by_peers: B256Map<HashSet<PeerId>>,
297 pool_imports: FuturesUnordered<PoolImportFuture>,
309 pending_pool_imports_info: PendingPoolImportsInfo,
311 bad_imports: LruCache<TxHash, FbBuildHasher<32>>,
313 peers: HashMap<PeerId, PeerMetadata<N>>,
315 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
319 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
324 pending_transactions: mpsc::Receiver<TxHash>,
333 transaction_events: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
335 config: TransactionsManagerConfig,
337 policies: NetworkPolicies<N>,
339 metrics: TransactionsManagerMetrics,
341 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
343}
344
345impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
346 pub fn new(
350 network: NetworkHandle<N>,
351 pool: Pool,
352 from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
353 transactions_manager_config: TransactionsManagerConfig,
354 ) -> Self {
355 Self::with_policy(
356 network,
357 pool,
358 from_network,
359 transactions_manager_config,
360 NetworkPolicies::new(
361 TransactionPropagationKind::default(),
362 StrictEthAnnouncementFilter::default(),
363 ),
364 )
365 }
366}
367
368impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
369 pub fn with_policy(
373 network: NetworkHandle<N>,
374 pool: Pool,
375 from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
376 transactions_manager_config: TransactionsManagerConfig,
377 policies: NetworkPolicies<N>,
378 ) -> Self {
379 let network_events = network.event_listener();
380
381 let (command_tx, command_rx) = mpsc::unbounded_channel();
382
383 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
384 &transactions_manager_config.transaction_fetcher_config,
385 );
386
387 let pending = pool.pending_transactions_listener();
390 let pending_pool_imports_info =
391 PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
392 let metrics = TransactionsManagerMetrics::default();
393 metrics
394 .capacity_pending_pool_imports
395 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
396
397 Self {
398 pool,
399 network,
400 network_events,
401 transaction_fetcher,
402 transactions_by_peers: Default::default(),
403 pool_imports: Default::default(),
404 pending_pool_imports_info,
405 bad_imports: LruCache::with_hasher(DEFAULT_MAX_COUNT_BAD_IMPORTS, Default::default()),
406 peers: Default::default(),
407 command_tx,
408 command_rx: UnboundedReceiverStream::new(command_rx),
409 pending_transactions: pending,
410 transaction_events: from_network,
411 config: transactions_manager_config,
412 policies,
413 metrics,
414 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
415 }
416 }
417
418 pub fn handle(&self) -> TransactionsHandle<N> {
420 TransactionsHandle { manager_tx: self.command_tx.clone() }
421 }
422
423 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
426 self.has_capacity_for_pending_pool_imports() &&
427 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
428 }
429
430 fn has_capacity_for_pending_pool_imports(&self) -> bool {
432 self.remaining_pool_import_capacity() > 0
433 }
434
435 fn remaining_pool_import_capacity(&self) -> usize {
437 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
438 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
439 )
440 }
441
442 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
443 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
444 self.metrics.reported_bad_transactions.increment(1);
445 }
446
447 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
448 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
449 self.network.reputation_change(peer_id, kind);
450 }
451
452 fn report_already_seen(&self, peer_id: PeerId) {
453 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
454 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
455 }
456
457 fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
459 if let Some(mut peer) = self.peers.remove(peer_id) {
460 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
461 }
462 self.transaction_fetcher.remove_peer(peer_id);
463 }
464
465 fn on_good_import(&mut self, hash: TxHash) {
467 self.transactions_by_peers.remove(&hash);
468 }
469
470 fn on_bad_import(&mut self, err: PoolError) {
500 let peers = self.transactions_by_peers.remove(&err.hash);
501
502 if err.is_bad_blob_sidecar() {
503 if let Some(peers) = peers {
507 for peer_id in peers {
508 self.report_peer_bad_transactions(peer_id);
509 }
510 }
511 return
512 }
513
514 if !err.is_bad_transaction() || self.network.is_syncing() {
516 return
517 }
518 if let Some(peers) = peers {
521 for peer_id in peers {
522 self.report_peer_bad_transactions(peer_id);
523 }
524 }
525 self.metrics.bad_imports.increment(1);
526 self.bad_imports.insert(err.hash);
527 }
528
529 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
533 let info = &self.pending_pool_imports_info;
535 let max_pending_pool_imports = info.max_pending_pool_imports;
536 let has_capacity_wrt_pending_pool_imports =
537 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
538
539 self.transaction_fetcher
540 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
541 }
542
543 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
544 let kind = match req_err {
545 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
546 RequestError::Timeout => ReputationChangeKind::Timeout,
547 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
548 return
550 }
551 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
552 };
553 self.report_peer(peer_id, kind);
554 }
555
556 #[inline]
557 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
558 let metrics = &self.metrics;
559
560 let TxManagerPollDurations {
561 acc_network_events,
562 acc_pending_imports,
563 acc_tx_events,
564 acc_imported_txns,
565 acc_fetch_events,
566 acc_pending_fetch,
567 acc_cmds,
568 } = poll_durations;
569
570 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
572 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
574 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
575 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
576 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
577 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
578 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
579 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
580 }
581}
582
583impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
584 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
586 for res in batch_results {
587 match res {
588 Ok(AddedTransactionOutcome { hash, .. }) => {
589 self.on_good_import(hash);
590 }
591 Err(err) => {
592 self.on_bad_import(err);
593 }
594 }
595 }
596 }
597
598 fn on_new_pooled_transaction_hashes(
600 &mut self,
601 peer_id: PeerId,
602 msg: NewPooledTransactionHashes,
603 ) {
604 if self.network.is_initially_syncing() {
606 return
607 }
608 if self.network.tx_gossip_disabled() {
609 return
610 }
611
612 let Some(peer) = self.peers.get_mut(&peer_id) else {
614 trace!(
615 peer_id = format!("{peer_id:#}"),
616 ?msg,
617 "discarding announcement from inactive peer"
618 );
619
620 return
621 };
622 let client = peer.client_version.clone();
623
624 let mut count_txns_already_seen_by_peer = 0;
626 for tx in msg.iter_hashes().copied() {
627 if !peer.seen_transactions.insert(tx) {
628 count_txns_already_seen_by_peer += 1;
629 }
630 }
631 if count_txns_already_seen_by_peer > 0 {
632 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
637 self.metrics
638 .occurrences_hash_already_seen_by_peer
639 .increment(count_txns_already_seen_by_peer);
640
641 trace!(target: "net::tx",
642 %count_txns_already_seen_by_peer,
643 peer_id=format!("{peer_id:#}"),
644 ?client,
645 "Peer sent hashes that have already been marked as seen by peer"
646 );
647
648 self.report_already_seen(peer_id);
649 }
650
651 if msg.is_empty() {
653 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
654 return;
655 }
656
657 let original_len = msg.len();
658 let mut partially_valid_msg = msg.dedup();
659
660 if partially_valid_msg.len() != original_len {
661 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
662 }
663
664 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
666
667 let mut should_report_peer = false;
674 let mut tx_types_counter = TxTypesCounter::default();
675
676 let has_eth68_metadata = partially_valid_msg
677 .msg_version()
678 .expect("partially valid announcement should have a version")
679 .has_eth68_metadata();
680
681 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
682 let (ty_byte, size_val) = match *metadata_ref_mut {
683 Some((ty, size)) => {
684 if !has_eth68_metadata {
685 should_report_peer = true;
686 }
687 (ty, size)
688 }
689 None => {
690 if has_eth68_metadata {
691 should_report_peer = true;
692 return false;
693 }
694 (0u8, 0)
695 }
696 };
697
698 if has_eth68_metadata && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
699 match TxType::try_from(actual_ty_byte) {
700 Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
701 Err(_) => tx_types_counter.increase_other(),
702 }
703 }
704
705 let decision = self
706 .policies
707 .announcement_filter()
708 .decide_on_announcement(ty_byte, tx_hash, size_val);
709
710 match decision {
711 AnnouncementAcceptance::Accept => true,
712 AnnouncementAcceptance::Ignore => false,
713 AnnouncementAcceptance::Reject { penalize_peer } => {
714 if penalize_peer {
715 should_report_peer = true;
716 }
717 false
718 }
719 }
720 });
721
722 if has_eth68_metadata {
723 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
724 }
725
726 if should_report_peer {
727 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
728 }
729
730 let hashes_count_pre_pool_filter = partially_valid_msg.len();
738 self.pool.retain_unknown(&mut partially_valid_msg);
739 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
740 let already_known_hashes_count =
741 hashes_count_pre_pool_filter - partially_valid_msg.len();
742 self.metrics
743 .occurrences_hashes_already_in_pool
744 .increment(already_known_hashes_count as u64);
745 }
746
747 if partially_valid_msg.is_empty() {
748 return
750 }
751
752 let mut valid_announcement_data =
753 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
754
755 if valid_announcement_data.is_empty() {
756 return
758 }
759
760 let bad_imports = &self.bad_imports;
767 self.transaction_fetcher.filter_unseen_and_pending_hashes(
768 &mut valid_announcement_data,
769 |hash| bad_imports.contains(hash),
770 &peer_id,
771 &client,
772 );
773
774 if valid_announcement_data.is_empty() {
775 return
777 }
778
779 trace!(target: "net::tx::propagation",
780 peer_id=format!("{peer_id:#}"),
781 hashes_len=valid_announcement_data.len(),
782 hashes=?valid_announcement_data.keys(),
783 msg_version=%valid_announcement_data.msg_version(),
784 client_version=%client,
785 "received previously unseen and pending hashes in announcement from peer"
786 );
787
788 if !self.transaction_fetcher.is_idle(&peer_id) {
791 let msg_version = valid_announcement_data.msg_version();
793 let (hashes, _version) = valid_announcement_data.into_request_hashes();
794
795 trace!(target: "net::tx",
796 peer_id=format!("{peer_id:#}"),
797 hashes=?*hashes,
798 %msg_version,
799 %client,
800 "buffering hashes announced by busy peer"
801 );
802
803 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
804
805 return
806 }
807
808 let mut hashes_to_request =
809 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
810 let surplus_hashes =
811 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
812
813 if !surplus_hashes.is_empty() {
814 trace!(target: "net::tx",
815 peer_id=format!("{peer_id:#}"),
816 surplus_hashes=?*surplus_hashes,
817 %client,
818 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
819 );
820
821 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
822 }
823
824 trace!(target: "net::tx",
825 peer_id=format!("{peer_id:#}"),
826 hashes=?*hashes_to_request,
827 %client,
828 "sending hashes in `GetPooledTransactions` request to peer's session"
829 );
830
831 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
835 if let Some(failed_to_request_hashes) =
836 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
837 {
838 let conn_eth_version = peer.version;
839
840 trace!(target: "net::tx",
841 peer_id=format!("{peer_id:#}"),
842 failed_to_request_hashes=?*failed_to_request_hashes,
843 %conn_eth_version,
844 %client,
845 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
846 );
847 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
848 }
849 }
850}
851
852impl<Pool, N> TransactionsManager<Pool, N>
853where
854 Pool: TransactionPool + Unpin + 'static,
855 N: NetworkPrimitives<
856 BroadcastedTransaction: SignedTransaction,
857 PooledTransaction: SignedTransaction,
858 > + Unpin,
859 Pool::Transaction:
860 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
861{
862 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
874 if self.network.tx_gossip_disabled() {
878 return
879 }
880
881 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
882
883 self.propagate_all(hashes);
884 }
885
886 fn propagate_full_transactions_to_peer(
890 &mut self,
891 txs: Vec<TxHash>,
892 peer_id: PeerId,
893 propagation_mode: PropagationMode,
894 ) -> Option<PropagatedTransactions> {
895 let peer = self.peers.get_mut(&peer_id)?;
896 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
897 let mut propagated = PropagatedTransactions::default();
898
899 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
901
902 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
903
904 if propagation_mode.is_forced() {
905 full_transactions.extend(to_propagate);
907 } else {
908 for tx in to_propagate {
911 if !peer.seen_transactions.contains(tx.tx_hash()) {
912 full_transactions.push(&tx);
914 }
915 }
916 }
917
918 if full_transactions.is_empty() {
919 return None
921 }
922
923 let PropagateTransactions { pooled, full } = full_transactions.build();
924
925 if let Some(new_pooled_hashes) = pooled {
927 for hash in new_pooled_hashes.iter_hashes().copied() {
928 propagated.record(hash, PropagateKind::Hash(peer_id));
929 peer.seen_transactions.insert(hash);
931 }
932
933 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
935 }
936
937 if let Some(new_full_transactions) = full {
939 for tx in &new_full_transactions {
940 propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
941 peer.seen_transactions.insert(*tx.tx_hash());
943 }
944
945 self.network.send_transactions(peer_id, new_full_transactions);
947 }
948
949 self.metrics.propagated_transactions.increment(propagated.len() as u64);
951
952 Some(propagated)
953 }
954
955 fn propagate_hashes_to(
959 &mut self,
960 hashes: Vec<TxHash>,
961 peer_id: PeerId,
962 propagation_mode: PropagationMode,
963 ) {
964 trace!(target: "net::tx", "Start propagating transactions as hashes");
965
966 let propagated = {
969 let Some(peer) = self.peers.get_mut(&peer_id) else {
970 return
972 };
973
974 let to_propagate =
975 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
976
977 let mut propagated = PropagatedTransactions::default();
978
979 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
981
982 if propagation_mode.is_forced() {
983 hashes.extend(to_propagate)
984 } else {
985 for tx in to_propagate {
986 if !peer.seen_transactions.contains(tx.tx_hash()) {
987 hashes.push(&tx);
989 }
990 }
991 }
992
993 let new_pooled_hashes = hashes.build();
994
995 if new_pooled_hashes.is_empty() {
996 return
998 }
999
1000 if let Some(peer) = self.peers.get_mut(&peer_id) {
1001 for hash in new_pooled_hashes.iter_hashes().copied() {
1002 propagated.record(hash, PropagateKind::Hash(peer_id));
1003 peer.seen_transactions.insert(hash);
1004 }
1005 }
1006
1007 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1008
1009 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1011
1012 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1014
1015 propagated
1016 };
1017
1018 self.pool.on_propagated(propagated);
1020 }
1021
1022 fn propagate_transactions(
1029 &mut self,
1030 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1031 propagation_mode: PropagationMode,
1032 ) -> PropagatedTransactions {
1033 let mut propagated = PropagatedTransactions::default();
1034 if self.network.tx_gossip_disabled() {
1035 return propagated
1036 }
1037
1038 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1040
1041 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1043 if !self.policies.propagation_policy().can_propagate(peer) {
1044 continue
1046 }
1047 let mut builder = if peer_idx > max_num_full {
1049 PropagateTransactionsBuilder::pooled(peer.version)
1050 } else {
1051 PropagateTransactionsBuilder::full(peer.version)
1052 };
1053
1054 if propagation_mode.is_forced() {
1055 builder.extend(to_propagate.iter());
1056 } else {
1057 for tx in &to_propagate {
1061 if !peer.seen_transactions.contains(tx.tx_hash()) {
1064 builder.push(tx);
1065 }
1066 }
1067 }
1068
1069 if builder.is_empty() {
1070 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1071 continue
1072 }
1073
1074 let PropagateTransactions { pooled, full } = builder.build();
1075
1076 if let Some(mut new_pooled_hashes) = pooled {
1078 new_pooled_hashes
1081 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1082
1083 for hash in new_pooled_hashes.iter_hashes().copied() {
1084 propagated.record(hash, PropagateKind::Hash(*peer_id));
1085 peer.seen_transactions.insert(hash);
1087 }
1088
1089 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1090
1091 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1093 }
1094
1095 if let Some(new_full_transactions) = full {
1097 for tx in &new_full_transactions {
1098 propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1099 peer.seen_transactions.insert(*tx.tx_hash());
1101 }
1102
1103 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1104
1105 self.network.send_transactions(*peer_id, new_full_transactions);
1107 }
1108 }
1109
1110 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1112
1113 propagated
1114 }
1115
1116 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1121 if self.peers.is_empty() {
1122 return
1124 }
1125 let propagated = self.propagate_transactions(
1126 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1127 PropagationMode::Basic,
1128 );
1129
1130 self.pool.on_propagated(propagated);
1132 }
1133
1134 fn on_get_pooled_transactions(
1136 &mut self,
1137 peer_id: PeerId,
1138 request: GetPooledTransactions,
1139 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1140 ) {
1141 if self.network.tx_gossip_disabled() {
1143 let _ = response.send(Ok(PooledTransactions::default()));
1144 return
1145 }
1146 if let Some(peer) = self.peers.get_mut(&peer_id) {
1147 let transactions = self.pool.get_pooled_transaction_elements(
1148 request.0,
1149 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1150 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1151 ),
1152 );
1153 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1154
1155 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1158
1159 let resp = PooledTransactions(transactions);
1160 let _ = response.send(Ok(resp));
1161 }
1162 }
1163
1164 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1166 match cmd {
1167 TransactionsCommand::PropagateHash(hash) => {
1168 self.on_new_pending_transactions(vec![hash])
1169 }
1170 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1171 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1172 }
1173 TransactionsCommand::GetActivePeers(tx) => {
1174 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1175 tx.send(peers).ok();
1176 }
1177 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1178 if let Some(propagated) =
1179 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1180 {
1181 self.pool.on_propagated(propagated);
1182 }
1183 }
1184 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1185 TransactionsCommand::BroadcastTransactions(txs) => {
1186 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1187 self.pool.on_propagated(propagated);
1188 }
1189 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1190 let mut res = HashMap::with_capacity(peers.len());
1191 for peer_id in peers {
1192 let hashes = self
1193 .peers
1194 .get(&peer_id)
1195 .map(|peer| peer.seen_transactions.iter().copied().collect::<B256Set>())
1196 .unwrap_or_default();
1197 res.insert(peer_id, hashes);
1198 }
1199 tx.send(res).ok();
1200 }
1201 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1202 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1203 peer_request_sender.send(sender).ok();
1204 }
1205 }
1206 }
1207
1208 fn handle_peer_session(
1212 &mut self,
1213 info: SessionInfo,
1214 messages: PeerRequestSender<PeerRequest<N>>,
1215 ) {
1216 let SessionInfo { peer_id, client_version, version, .. } = info;
1217
1218 let peer = PeerMetadata::<N>::new(
1220 messages,
1221 version,
1222 client_version,
1223 self.config.max_transactions_seen_by_peer_history,
1224 info.peer_kind,
1225 );
1226 let peer = match self.peers.entry(peer_id) {
1227 Entry::Occupied(mut entry) => {
1228 entry.insert(peer);
1229 entry.into_mut()
1230 }
1231 Entry::Vacant(entry) => entry.insert(peer),
1232 };
1233
1234 self.policies.propagation_policy_mut().on_session_established(peer);
1235
1236 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1240 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1241 return
1242 }
1243
1244 let pooled_txs = self.pool.pooled_transactions_max(
1246 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1247 );
1248 if pooled_txs.is_empty() {
1249 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1250 return;
1251 }
1252
1253 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1255 for pooled_tx in pooled_txs {
1256 peer.seen_transactions.insert(*pooled_tx.hash());
1257 msg_builder.push_pooled(pooled_tx);
1258 }
1259
1260 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1261 let msg = msg_builder.build();
1262 self.network.send_transactions_hashes(peer_id, msg);
1263 }
1264
1265 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1267 match event_result {
1268 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1269 self.on_peer_session_closed(&peer_id);
1270 }
1271 NetworkEvent::ActivePeerSession { info, messages } => {
1272 self.handle_peer_session(info, messages);
1274 }
1275 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1276 let peer_id = info.peer_id;
1277 let messages = match self.peers.get(&peer_id) {
1279 Some(p) => p.request_tx.clone(),
1280 None => {
1281 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1282 return;
1283 }
1284 };
1285 self.handle_peer_session(info, messages);
1286 }
1287 _ => {}
1288 }
1289 }
1290
1291 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1293 if self.config.ingress_policy.allows_all() {
1294 return true;
1295 }
1296 let Some(peer) = self.peers.get(peer_id) else {
1297 return false;
1298 };
1299 self.config.ingress_policy.allows(peer.peer_kind())
1300 }
1301
1302 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1304 match event {
1305 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1306 if !self.accepts_incoming_from(&peer_id) {
1307 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1308 return;
1309 }
1310
1311 let has_blob_txs = msg.has_eip4844();
1315
1316 let non_blob_txs = msg
1317 .into_iter()
1318 .map(N::PooledTransaction::try_from)
1319 .filter_map(Result::ok)
1320 .collect();
1321
1322 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1323
1324 if has_blob_txs {
1325 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1326 self.report_peer_bad_transactions(peer_id);
1327 }
1328 }
1329 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1330 if !self.accepts_incoming_from(&peer_id) {
1331 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1332 return;
1333 }
1334 self.on_new_pooled_transaction_hashes(peer_id, msg)
1335 }
1336 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1337 self.on_get_pooled_transactions(peer_id, request, response)
1338 }
1339 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1340 let _ = response.send(Some(self.handle()));
1341 }
1342 }
1343 }
1344
1345 fn import_transactions(
1347 &mut self,
1348 peer_id: PeerId,
1349 transactions: PooledTransactions<N::PooledTransaction>,
1350 source: TransactionSource,
1351 ) {
1352 if self.network.is_initially_syncing() {
1354 return
1355 }
1356 if self.network.tx_gossip_disabled() {
1357 return
1358 }
1359
1360 if !self.has_capacity_for_pending_pool_imports() {
1362 return
1363 }
1364
1365 let mut transactions = transactions.0;
1366
1367 let capacity = self.remaining_pool_import_capacity();
1371 if transactions.len() > capacity {
1372 let skipped = transactions.len() - capacity;
1373 transactions.truncate(capacity);
1374 self.metrics
1375 .skipped_transactions_pending_pool_imports_at_capacity
1376 .increment(skipped as u64);
1377 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1378 }
1379
1380 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1381 let client_version = peer.client_version.clone();
1382
1383 let start = Instant::now();
1384
1385 self.transaction_fetcher
1387 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1388
1389 let mut num_already_seen_by_peer = 0;
1394 for tx in &transactions {
1395 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1396 num_already_seen_by_peer += 1;
1397 }
1398 }
1399
1400 let mut has_bad_transactions = false;
1402
1403 transactions.retain(|tx| {
1406 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1407 entry.get_mut().insert(peer_id);
1408 return false
1409 }
1410 if self.bad_imports.contains(tx.tx_hash()) {
1411 trace!(target: "net::tx",
1412 peer_id=format!("{peer_id:#}"),
1413 hash=%tx.tx_hash(),
1414 %client_version,
1415 "received a known bad transaction from peer"
1416 );
1417 has_bad_transactions = true;
1418 return false;
1419 }
1420 true
1421 });
1422
1423 let txns_count_pre_pool_filter = transactions.len();
1425 self.pool.retain_unknown(&mut transactions);
1426 if txns_count_pre_pool_filter > transactions.len() {
1427 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1428 self.metrics
1429 .occurrences_transactions_already_in_pool
1430 .increment(already_known_txns_count as u64);
1431 }
1432
1433 let txs_len = transactions.len();
1434
1435 let new_txs = transactions
1436 .into_par_iter()
1437 .filter_map(|tx| match tx.try_into_recovered() {
1438 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1439 Err(badtx) => {
1440 trace!(target: "net::tx",
1441 peer_id=format!("{peer_id:#}"),
1442 hash=%badtx.tx_hash(),
1443 client_version=%client_version,
1444 "failed ecrecovery for transaction"
1445 );
1446 None
1447 }
1448 })
1449 .collect::<Vec<_>>();
1450
1451 has_bad_transactions |= new_txs.len() != txs_len;
1452
1453 for tx in &new_txs {
1455 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1456 }
1457
1458 if !new_txs.is_empty() {
1461 let pool = self.pool.clone();
1462 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1464 metric_pending_pool_imports.increment(new_txs.len() as f64);
1465
1466 self.pending_pool_imports_info
1468 .pending_pool_imports
1469 .fetch_add(new_txs.len(), Ordering::Relaxed);
1470 let tx_manager_info_pending_pool_imports =
1471 self.pending_pool_imports_info.pending_pool_imports.clone();
1472
1473 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1474 let import = Box::pin(async move {
1475 let added = new_txs.len();
1476 let res = pool.add_external_transactions(new_txs).await;
1477
1478 metric_pending_pool_imports.decrement(added as f64);
1480 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1482
1483 res
1484 });
1485
1486 self.pool_imports.push(import);
1487 }
1488
1489 if num_already_seen_by_peer > 0 {
1490 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1491 self.metrics
1492 .occurrences_of_transaction_already_seen_by_peer
1493 .increment(num_already_seen_by_peer);
1494 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1495 }
1496
1497 if has_bad_transactions {
1498 self.report_peer_bad_transactions(peer_id)
1500 }
1501
1502 if num_already_seen_by_peer > 0 {
1503 self.report_already_seen(peer_id);
1504 }
1505
1506 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1507 }
1508
1509 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1511 match fetch_event {
1512 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1513 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1514 if report_peer {
1515 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1516 }
1517 }
1518 FetchEvent::FetchError { peer_id, error } => {
1519 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1520 self.on_request_error(peer_id, error);
1521 }
1522 FetchEvent::EmptyResponse { peer_id } => {
1523 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1524 }
1525 }
1526 }
1527}
1528
1529impl<
1537 Pool: TransactionPool + Unpin + 'static,
1538 N: NetworkPrimitives<
1539 BroadcastedTransaction: SignedTransaction,
1540 PooledTransaction: SignedTransaction,
1541 > + Unpin,
1542 > Future for TransactionsManager<Pool, N>
1543where
1544 Pool::Transaction:
1545 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1546{
1547 type Output = ();
1548
1549 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1550 let start = Instant::now();
1551 let mut poll_durations = TxManagerPollDurations::default();
1552
1553 let this = self.get_mut();
1554
1555 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1561 poll_durations.acc_network_events,
1562 "net::tx",
1563 "Network events stream",
1564 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1565 this.network_events.poll_next_unpin(cx),
1566 |event| this.on_network_event(event)
1567 );
1568
1569 let mut new_txs = Vec::new();
1578 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1579 cx,
1580 &mut new_txs,
1581 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1582 ) {
1583 Poll::Ready(count) => {
1584 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1585 true
1588 } else {
1589 let limit =
1593 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1594 new_txs.len();
1595 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1596 }
1597 }
1598 Poll::Pending => false,
1599 };
1600 if !new_txs.is_empty() {
1601 this.on_new_pending_transactions(new_txs);
1602 }
1603
1604 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1619 poll_durations.acc_tx_events,
1620 "net::tx",
1621 "Network transaction events stream",
1622 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1623 this.transaction_events.poll_next_unpin(cx),
1624 |event: NetworkTransactionEvent<N>| this.on_network_tx_event(event),
1625 );
1626
1627 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1638 poll_durations.acc_fetch_events,
1639 "net::tx",
1640 "Transaction fetch events stream",
1641 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1642 this.transaction_fetcher.poll_next_unpin(cx),
1643 |event| this.on_fetch_event(event),
1644 );
1645
1646 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1661 poll_durations.acc_pending_imports,
1662 "net::tx",
1663 "Batched pool imports stream",
1664 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1665 this.pool_imports.poll_next_unpin(cx),
1666 |batch_results| this.on_batch_import_result(batch_results)
1667 );
1668
1669 duration_metered_exec!(
1674 {
1675 if this.has_capacity_for_fetching_pending_hashes() &&
1676 this.on_fetch_hashes_pending_fetch()
1677 {
1678 maybe_more_tx_fetch_events = true;
1679 }
1680 },
1681 poll_durations.acc_pending_fetch
1682 );
1683
1684 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1686 poll_durations.acc_cmds,
1687 "net::tx",
1688 "Commands channel",
1689 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1690 this.command_rx.poll_next_unpin(cx),
1691 |cmd| this.on_command(cmd)
1692 );
1693
1694 this.transaction_fetcher.update_metrics();
1695
1696 if maybe_more_network_events ||
1698 maybe_more_commands ||
1699 maybe_more_tx_events ||
1700 maybe_more_tx_fetch_events ||
1701 maybe_more_pool_imports ||
1702 maybe_more_pending_txns
1703 {
1704 cx.waker().wake_by_ref();
1706 return Poll::Pending
1707 }
1708
1709 this.update_poll_metrics(start, poll_durations);
1710
1711 Poll::Pending
1712 }
1713}
1714
1715#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1719enum PropagationMode {
1720 Basic,
1724 Forced,
1729}
1730
1731impl PropagationMode {
1732 const fn is_forced(self) -> bool {
1734 matches!(self, Self::Forced)
1735 }
1736}
1737
1738#[derive(Debug, Clone)]
1740struct PropagateTransaction<T = TransactionSigned> {
1741 size: usize,
1742 transaction: Arc<T>,
1743}
1744
1745impl<T: SignedTransaction> PropagateTransaction<T> {
1746 pub fn new(transaction: T) -> Self {
1748 let size = transaction.length();
1749 Self { size, transaction: Arc::new(transaction) }
1750 }
1751
1752 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1754 where
1755 P: PoolTransaction<Consensus = T>,
1756 {
1757 let size = tx.encoded_length();
1758 let transaction = tx.transaction.clone_into_consensus();
1759 let transaction = Arc::new(transaction.into_inner());
1760 Self { size, transaction }
1761 }
1762
1763 fn tx_hash(&self) -> &TxHash {
1764 self.transaction.tx_hash()
1765 }
1766}
1767
1768#[derive(Debug, Clone)]
1771enum PropagateTransactionsBuilder<T> {
1772 Pooled(PooledTransactionsHashesBuilder),
1773 Full(FullTransactionsBuilder<T>),
1774}
1775
1776impl<T> PropagateTransactionsBuilder<T> {
1777 fn pooled(version: EthVersion) -> Self {
1779 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1780 }
1781
1782 fn full(version: EthVersion) -> Self {
1784 Self::Full(FullTransactionsBuilder::new(version))
1785 }
1786
1787 fn is_empty(&self) -> bool {
1789 match self {
1790 Self::Pooled(builder) => builder.is_empty(),
1791 Self::Full(builder) => builder.is_empty(),
1792 }
1793 }
1794
1795 fn build(self) -> PropagateTransactions<T> {
1797 match self {
1798 Self::Pooled(pooled) => {
1799 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1800 }
1801 Self::Full(full) => full.build(),
1802 }
1803 }
1804}
1805
1806impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1807 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1809 for tx in txs {
1810 self.push(tx);
1811 }
1812 }
1813
1814 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1816 match self {
1817 Self::Pooled(builder) => builder.push(transaction),
1818 Self::Full(builder) => builder.push(transaction),
1819 }
1820 }
1821}
1822
1823struct PropagateTransactions<T> {
1825 pooled: Option<NewPooledTransactionHashes>,
1827 full: Option<Vec<Arc<T>>>,
1829}
1830
1831#[derive(Debug, Clone)]
1836struct FullTransactionsBuilder<T> {
1837 total_size: usize,
1839 transactions: Vec<Arc<T>>,
1841 pooled: PooledTransactionsHashesBuilder,
1843}
1844
1845impl<T> FullTransactionsBuilder<T> {
1846 fn new(version: EthVersion) -> Self {
1848 Self {
1849 total_size: 0,
1850 pooled: PooledTransactionsHashesBuilder::new(version),
1851 transactions: vec![],
1852 }
1853 }
1854
1855 fn is_empty(&self) -> bool {
1857 self.transactions.is_empty() && self.pooled.is_empty()
1858 }
1859
1860 fn build(self) -> PropagateTransactions<T> {
1862 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1863 let full = Some(self.transactions).filter(|full| !full.is_empty());
1864 PropagateTransactions { pooled, full }
1865 }
1866}
1867
1868impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1869 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1871 for tx in txs {
1872 self.push(&tx)
1873 }
1874 }
1875
1876 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1886 if !transaction.transaction.is_broadcastable_in_full() {
1895 self.pooled.push(transaction);
1896 return
1897 }
1898
1899 let new_size = self.total_size + transaction.size;
1900 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1901 self.total_size > 0
1902 {
1903 self.pooled.push(transaction);
1905 return
1906 }
1907
1908 self.total_size = new_size;
1909 self.transactions.push(Arc::clone(&transaction.transaction));
1910 }
1911}
1912
1913#[derive(Debug, Clone)]
1916enum PooledTransactionsHashesBuilder {
1917 Eth66(NewPooledTransactionHashes66),
1918 Eth68(NewPooledTransactionHashes68),
1919 Eth72(NewPooledTransactionHashes72),
1920}
1921
1922impl PooledTransactionsHashesBuilder {
1925 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1927 match self {
1928 Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1929 Self::Eth68(msg) => {
1930 msg.hashes.push(*pooled_tx.hash());
1931 msg.sizes.push(pooled_tx.encoded_length());
1932 msg.types.push(pooled_tx.transaction.ty());
1933 }
1934 Self::Eth72(msg) => {
1935 msg.hashes.push(*pooled_tx.hash());
1936 msg.sizes.push(pooled_tx.encoded_length());
1937 msg.types.push(pooled_tx.transaction.ty());
1938 }
1939 }
1940 }
1941
1942 fn is_empty(&self) -> bool {
1944 match self {
1945 Self::Eth66(hashes) => hashes.is_empty(),
1946 Self::Eth68(hashes) => hashes.is_empty(),
1947 Self::Eth72(hashes) => hashes.is_empty(),
1948 }
1949 }
1950
1951 fn len(&self) -> usize {
1953 match self {
1954 Self::Eth66(hashes) => hashes.len(),
1955 Self::Eth68(hashes) => hashes.len(),
1956 Self::Eth72(hashes) => hashes.len(),
1957 }
1958 }
1959
1960 fn extend<T: SignedTransaction>(
1962 &mut self,
1963 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1964 ) {
1965 for tx in txs {
1966 self.push(&tx);
1967 }
1968 }
1969
1970 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1971 match self {
1972 Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1973 Self::Eth68(msg) => {
1974 msg.hashes.push(*tx.tx_hash());
1975 msg.sizes.push(tx.size);
1976 msg.types.push(tx.transaction.ty());
1977 }
1978 Self::Eth72(msg) => {
1979 msg.hashes.push(*tx.tx_hash());
1980 msg.sizes.push(tx.size);
1981 msg.types.push(tx.transaction.ty());
1982 }
1983 }
1984 }
1985
1986 fn new(version: EthVersion) -> Self {
1988 match version {
1989 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1990 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1991 Self::Eth68(Default::default())
1992 }
1993 EthVersion::Eth72 => Self::Eth72(Default::default()),
1994 }
1995 }
1996
1997 fn build(self) -> NewPooledTransactionHashes {
1998 match self {
1999 Self::Eth66(mut msg) => {
2000 msg.shrink_to_fit();
2001 msg.into()
2002 }
2003 Self::Eth68(mut msg) => {
2004 msg.shrink_to_fit();
2005 msg.into()
2006 }
2007 Self::Eth72(mut msg) => {
2008 msg.shrink_to_fit();
2009 msg.into()
2010 }
2011 }
2012 }
2013}
2014
2015enum TransactionSource {
2017 Broadcast,
2019 Response,
2021}
2022
2023impl TransactionSource {
2026 const fn is_broadcast(&self) -> bool {
2028 matches!(self, Self::Broadcast)
2029 }
2030}
2031
2032#[derive(Debug)]
2034pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2035 seen_transactions: LruCache<TxHash, FbBuildHasher<32>>,
2039 request_tx: PeerRequestSender<PeerRequest<N>>,
2041 version: EthVersion,
2043 client_version: Arc<str>,
2045 peer_kind: PeerKind,
2047}
2048
2049impl<N: NetworkPrimitives> PeerMetadata<N> {
2050 pub fn new(
2052 request_tx: PeerRequestSender<PeerRequest<N>>,
2053 version: EthVersion,
2054 client_version: Arc<str>,
2055 max_transactions_seen_by_peer: u32,
2056 peer_kind: PeerKind,
2057 ) -> Self {
2058 Self {
2059 seen_transactions: LruCache::with_hasher(
2060 max_transactions_seen_by_peer,
2061 Default::default(),
2062 ),
2063 request_tx,
2064 version,
2065 client_version,
2066 peer_kind,
2067 }
2068 }
2069
2070 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2072 &self.request_tx
2073 }
2074
2075 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash, FbBuildHasher<32>> {
2077 &mut self.seen_transactions
2078 }
2079
2080 pub const fn version(&self) -> EthVersion {
2082 self.version
2083 }
2084
2085 pub fn client_version(&self) -> &str {
2087 &self.client_version
2088 }
2089
2090 pub const fn peer_kind(&self) -> PeerKind {
2092 self.peer_kind
2093 }
2094}
2095
2096#[derive(Debug)]
2098enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2099 PropagateHash(B256),
2101 PropagateHashesTo(Vec<B256>, PeerId),
2103 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2105 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2107 PropagateTransactions(Vec<TxHash>),
2109 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2111 GetTransactionHashes { peers: Vec<PeerId>, tx: oneshot::Sender<HashMap<PeerId, B256Set>> },
2113 GetPeerSender {
2115 peer_id: PeerId,
2116 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2117 },
2118}
2119
2120#[derive(Debug)]
2122pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2123 IncomingTransactions {
2127 peer_id: PeerId,
2129 msg: Transactions<N::BroadcastedTransaction>,
2131 },
2132 IncomingPooledTransactionHashes {
2134 peer_id: PeerId,
2136 msg: NewPooledTransactionHashes,
2138 },
2139 GetPooledTransactions {
2141 peer_id: PeerId,
2143 request: GetPooledTransactions,
2145 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2147 },
2148 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2150}
2151
2152#[derive(Debug)]
2154pub struct PendingPoolImportsInfo {
2155 pending_pool_imports: Arc<AtomicUsize>,
2157 max_pending_pool_imports: usize,
2159}
2160
2161impl PendingPoolImportsInfo {
2162 pub fn new(max_pending_pool_imports: usize) -> Self {
2164 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2165 }
2166
2167 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2169 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2170 }
2171}
2172
2173impl Default for PendingPoolImportsInfo {
2174 fn default() -> Self {
2175 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2176 }
2177}
2178
2179#[derive(Debug, Default)]
2180struct TxManagerPollDurations {
2181 acc_network_events: Duration,
2182 acc_pending_imports: Duration,
2183 acc_tx_events: Duration,
2184 acc_imported_txns: Duration,
2185 acc_fetch_events: Duration,
2186 acc_pending_fetch: Duration,
2187 acc_cmds: Duration,
2188}
2189
2190impl<N: NetworkPrimitives> InMemorySize for NetworkTransactionEvent<N> {
2191 fn size(&self) -> usize {
2194 match self {
2195 Self::IncomingTransactions { peer_id, msg } => {
2196 core::mem::size_of_val(peer_id) +
2197 msg.0.iter().map(InMemorySize::size).sum::<usize>()
2198 }
2199 Self::IncomingPooledTransactionHashes { peer_id, msg } => {
2200 core::mem::size_of_val(peer_id) + msg.size()
2201 }
2202 Self::GetPooledTransactions { peer_id, request, response } => {
2203 core::mem::size_of_val(peer_id) +
2204 request.0.len() * core::mem::size_of::<TxHash>() +
2205 core::mem::size_of_val(response)
2206 }
2207 Self::GetTransactionsHandle(_) => 0,
2208 }
2209 }
2210}
2211
2212#[cfg(test)]
2213mod tests {
2214 use super::*;
2215 use crate::{
2216 test_utils::{
2217 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2218 Testnet,
2219 },
2220 transactions::config::RelaxedEthAnnouncementFilter,
2221 NetworkConfigBuilder, NetworkManager,
2222 };
2223 use alloy_consensus::{TxEip1559, TxLegacy};
2224 use alloy_eips::eip4844::BlobTransactionValidationError;
2225 use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2226 use alloy_rlp::Decodable;
2227 use futures::FutureExt;
2228 use reth_chainspec::MIN_TRANSACTION_GAS;
2229 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2230 use reth_network_api::{NetworkInfo, PeerKind};
2231 use reth_network_p2p::{
2232 error::{RequestError, RequestResult},
2233 sync::{NetworkSyncUpdater, SyncState},
2234 };
2235 use reth_storage_api::noop::NoopProvider;
2236 use reth_tasks::Runtime;
2237 use reth_transaction_pool::{
2238 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2239 test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2240 };
2241 use secp256k1::SecretKey;
2242 use std::{
2243 collections::HashSet,
2244 future::poll_fn,
2245 net::{IpAddr, Ipv4Addr, SocketAddr},
2246 str::FromStr,
2247 };
2248 use tracing::error;
2249
2250 #[tokio::test(flavor = "multi_thread")]
2251 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2252 reth_tracing::init_test_tracing();
2253 let net = Testnet::create(3).await;
2254
2255 let mut handles = net.handles();
2256 let handle0 = handles.next().unwrap();
2257 let handle1 = handles.next().unwrap();
2258
2259 drop(handles);
2260 let handle = net.spawn();
2261
2262 let listener0 = handle0.event_listener();
2263 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2264 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2265
2266 let client = NoopProvider::default();
2267 let pool = testing_pool();
2268 let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2269 .disable_discovery()
2270 .listener_port(0)
2271 .build(client);
2272 let transactions_manager_config = config.transactions_manager_config.clone();
2273 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2274 .await
2275 .unwrap()
2276 .into_builder()
2277 .transactions(pool.clone(), transactions_manager_config)
2278 .split_with_handle();
2279
2280 tokio::task::spawn(network);
2281
2282 network_handle.update_sync_state(SyncState::Syncing);
2284 assert!(NetworkInfo::is_syncing(&network_handle));
2285 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2286
2287 let mut established = listener0.take(2);
2289 while let Some(ev) = established.next().await {
2290 match ev {
2291 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2292 transactions
2294 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2295 }
2296 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2297 ev => {
2298 error!("unexpected event {ev:?}")
2299 }
2300 }
2301 }
2302 let input = hex!(
2304 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2305 );
2306 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2307 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2308 peer_id: *handle1.peer_id(),
2309 msg: Transactions(vec![signed_tx.clone()]),
2310 });
2311 poll_fn(|cx| {
2312 let _ = transactions.poll_unpin(cx);
2313 Poll::Ready(())
2314 })
2315 .await;
2316 assert!(pool.is_empty());
2317 handle.terminate().await;
2318 }
2319
2320 #[tokio::test(flavor = "multi_thread")]
2321 async fn test_tx_broadcasts_through_two_syncs() {
2322 reth_tracing::init_test_tracing();
2323 let net = Testnet::create(3).await;
2324
2325 let mut handles = net.handles();
2326 let handle0 = handles.next().unwrap();
2327 let handle1 = handles.next().unwrap();
2328
2329 drop(handles);
2330 let handle = net.spawn();
2331
2332 let listener0 = handle0.event_listener();
2333 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2334 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2335
2336 let client = NoopProvider::default();
2337 let pool = testing_pool();
2338 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2339 .disable_discovery()
2340 .listener_port(0)
2341 .build(client);
2342 let transactions_manager_config = config.transactions_manager_config.clone();
2343 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2344 .await
2345 .unwrap()
2346 .into_builder()
2347 .transactions(pool.clone(), transactions_manager_config)
2348 .split_with_handle();
2349
2350 tokio::task::spawn(network);
2351
2352 network_handle.update_sync_state(SyncState::Syncing);
2354 assert!(NetworkInfo::is_syncing(&network_handle));
2355 network_handle.update_sync_state(SyncState::Idle);
2356 assert!(!NetworkInfo::is_syncing(&network_handle));
2357 network_handle.update_sync_state(SyncState::Syncing);
2358 assert!(NetworkInfo::is_syncing(&network_handle));
2359
2360 let mut established = listener0.take(2);
2362 while let Some(ev) = established.next().await {
2363 match ev {
2364 NetworkEvent::ActivePeerSession { .. } |
2365 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2366 transactions.on_network_event(ev);
2368 }
2369 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2370 _ => {
2371 error!("unexpected event {ev:?}")
2372 }
2373 }
2374 }
2375 let input = hex!(
2377 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2378 );
2379 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2380 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2381 peer_id: *handle1.peer_id(),
2382 msg: Transactions(vec![signed_tx.clone()]),
2383 });
2384 poll_fn(|cx| {
2385 let _ = transactions.poll_unpin(cx);
2386 Poll::Ready(())
2387 })
2388 .await;
2389 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2390 assert!(NetworkInfo::is_syncing(&network_handle));
2391 assert!(!pool.is_empty());
2392 handle.terminate().await;
2393 }
2394
2395 #[tokio::test(flavor = "multi_thread")]
2398 async fn test_handle_incoming_transactions_hashes() {
2399 reth_tracing::init_test_tracing();
2400
2401 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2402 let client = NoopProvider::default();
2403
2404 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2405 .listener_port(0)
2407 .disable_discovery()
2408 .build(client);
2409
2410 let pool = testing_pool();
2411
2412 let transactions_manager_config = config.transactions_manager_config.clone();
2413 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2414 .await
2415 .unwrap()
2416 .into_builder()
2417 .transactions(pool.clone(), transactions_manager_config)
2418 .split_with_handle();
2419
2420 let peer_id_1 = PeerId::new([1; 64]);
2421 let eth_version = EthVersion::Eth66;
2422
2423 let txs = vec![TransactionSigned::new_unhashed(
2424 Transaction::Legacy(TxLegacy {
2425 chain_id: Some(4),
2426 nonce: 15u64,
2427 gas_price: 2200000000,
2428 gas_limit: 34811,
2429 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2430 value: U256::from(1234u64),
2431 input: Default::default(),
2432 }),
2433 Signature::new(
2434 U256::from_str(
2435 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2436 )
2437 .unwrap(),
2438 U256::from_str(
2439 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2440 )
2441 .unwrap(),
2442 true,
2443 ),
2444 )];
2445
2446 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2447
2448 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2449 tx_manager.peers.insert(peer_id_1, peer_1);
2450
2451 assert!(pool.is_empty());
2452
2453 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2454 peer_id: peer_id_1,
2455 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2456 txs_hashes.clone(),
2457 )),
2458 });
2459
2460 let req = to_mock_session_rx
2462 .recv()
2463 .await
2464 .expect("peer_1 session should receive request with buffered hashes");
2465 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2466 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2467
2468 let message: Vec<PooledTransactionVariant> = txs
2469 .into_iter()
2470 .map(|tx| {
2471 PooledTransactionVariant::try_from(tx)
2472 .expect("Failed to convert MockTransaction to PooledTransaction")
2473 })
2474 .collect();
2475
2476 response
2478 .send(Ok(PooledTransactions(message)))
2479 .expect("should send peer_1 response to tx manager");
2480
2481 poll_fn(|cx| {
2483 let _ = tx_manager.poll_unpin(cx);
2484 Poll::Ready(())
2485 })
2486 .await;
2487
2488 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2491 }
2492
2493 #[tokio::test(flavor = "multi_thread")]
2494 async fn test_handle_incoming_transactions() {
2495 reth_tracing::init_test_tracing();
2496 let net = Testnet::create(3).await;
2497
2498 let mut handles = net.handles();
2499 let handle0 = handles.next().unwrap();
2500 let handle1 = handles.next().unwrap();
2501
2502 drop(handles);
2503 let handle = net.spawn();
2504
2505 let listener0 = handle0.event_listener();
2506
2507 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2508 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2509
2510 let client = NoopProvider::default();
2511 let pool = testing_pool();
2512 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2513 .disable_discovery()
2514 .listener_port(0)
2515 .build(client);
2516 let transactions_manager_config = config.transactions_manager_config.clone();
2517 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2518 .await
2519 .unwrap()
2520 .into_builder()
2521 .transactions(pool.clone(), transactions_manager_config)
2522 .split_with_handle();
2523 tokio::task::spawn(network);
2524
2525 network_handle.update_sync_state(SyncState::Idle);
2526
2527 assert!(!NetworkInfo::is_syncing(&network_handle));
2528
2529 let mut established = listener0.take(2);
2531 while let Some(ev) = established.next().await {
2532 match ev {
2533 NetworkEvent::ActivePeerSession { .. } |
2534 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2535 transactions.on_network_event(ev);
2537 }
2538 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2539 ev => {
2540 error!("unexpected event {ev:?}")
2541 }
2542 }
2543 }
2544 let input = hex!(
2546 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2547 );
2548 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2549 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2550 peer_id: *handle1.peer_id(),
2551 msg: Transactions(vec![signed_tx.clone()]),
2552 });
2553 assert!(transactions
2554 .transactions_by_peers
2555 .get(signed_tx.tx_hash())
2556 .unwrap()
2557 .contains(handle1.peer_id()));
2558
2559 poll_fn(|cx| {
2561 let _ = transactions.poll_unpin(cx);
2562 Poll::Ready(())
2563 })
2564 .await;
2565
2566 assert!(!pool.is_empty());
2567 assert!(pool.get(signed_tx.tx_hash()).is_some());
2568 handle.terminate().await;
2569 }
2570
2571 #[tokio::test(flavor = "multi_thread")]
2572 async fn test_session_closed_cleans_transaction_peer_state() {
2573 let (mut tx_manager, _network) = new_tx_manager().await;
2574 let peer_id = PeerId::new([1; 64]);
2575 let fallback_peer = PeerId::new([2; 64]);
2576 let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2577 let hash_shared = B256::from_slice(&[1; 32]);
2578
2579 tx_manager.peers.insert(peer_id, peer);
2580 buffer_hash_to_tx_fetcher(
2581 &mut tx_manager.transaction_fetcher,
2582 hash_shared,
2583 peer_id,
2584 0,
2585 None,
2586 );
2587 buffer_hash_to_tx_fetcher(
2588 &mut tx_manager.transaction_fetcher,
2589 hash_shared,
2590 fallback_peer,
2591 0,
2592 None,
2593 );
2594 tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2595
2596 tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2597 peer_id,
2598 reason: None,
2599 }));
2600
2601 assert!(!tx_manager.peers.contains_key(&peer_id));
2603 assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2604 assert_eq!(
2606 tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2607 Some(&fallback_peer)
2608 );
2609 }
2610
2611 #[tokio::test(flavor = "multi_thread")]
2612 async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2613 let (mut tx_manager, _network) = new_tx_manager().await;
2614 let peer_id = PeerId::new([1; 64]);
2615 let hash = B256::from_slice(&[1; 32]);
2616
2617 tx_manager.network.update_sync_state(SyncState::Idle);
2618 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2619
2620 let err = PoolError::new(
2621 hash,
2622 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2623 BlobTransactionValidationError::InvalidProof,
2624 )),
2625 );
2626
2627 tx_manager.on_bad_import(err);
2628
2629 assert!(!tx_manager.bad_imports.contains(&hash));
2630 }
2631
2632 #[tokio::test(flavor = "multi_thread")]
2633 async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2634 let (mut tx_manager, _network) = new_tx_manager().await;
2635 let peer_id = PeerId::new([1; 64]);
2636 let hash = B256::from_slice(&[3; 32]);
2637
2638 tx_manager.network.update_sync_state(SyncState::Idle);
2639 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2640
2641 let err = PoolError::new(
2642 hash,
2643 InvalidPoolTransactionError::Eip4844(
2644 Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2645 ),
2646 );
2647
2648 tx_manager.on_bad_import(err);
2649
2650 assert!(!tx_manager.bad_imports.contains(&hash));
2651 }
2652
2653 #[tokio::test(flavor = "multi_thread")]
2654 async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2655 let (mut tx_manager, _network) = new_tx_manager().await;
2656 let peer_id = PeerId::new([1; 64]);
2657 let hash = B256::from_slice(&[2; 32]);
2658
2659 tx_manager.network.update_sync_state(SyncState::Idle);
2660 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2661
2662 let err = PoolError::new(
2663 hash,
2664 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2665 );
2666
2667 tx_manager.on_bad_import(err);
2668
2669 assert!(tx_manager.bad_imports.contains(&hash));
2670 }
2671
2672 #[tokio::test(flavor = "multi_thread")]
2673 async fn test_on_get_pooled_transactions_network() {
2674 reth_tracing::init_test_tracing();
2675 let net = Testnet::create(2).await;
2676
2677 let mut handles = net.handles();
2678 let handle0 = handles.next().unwrap();
2679 let handle1 = handles.next().unwrap();
2680
2681 drop(handles);
2682 let handle = net.spawn();
2683
2684 let listener0 = handle0.event_listener();
2685
2686 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2687 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2688
2689 let client = NoopProvider::default();
2690 let pool = testing_pool();
2691 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2692 .disable_discovery()
2693 .listener_port(0)
2694 .build(client);
2695 let transactions_manager_config = config.transactions_manager_config.clone();
2696 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2697 .await
2698 .unwrap()
2699 .into_builder()
2700 .transactions(pool.clone(), transactions_manager_config)
2701 .split_with_handle();
2702 tokio::task::spawn(network);
2703
2704 network_handle.update_sync_state(SyncState::Idle);
2705
2706 assert!(!NetworkInfo::is_syncing(&network_handle));
2707
2708 let mut established = listener0.take(2);
2710 while let Some(ev) = established.next().await {
2711 match ev {
2712 NetworkEvent::ActivePeerSession { .. } |
2713 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2714 transactions.on_network_event(ev);
2715 }
2716 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2717 ev => {
2718 error!("unexpected event {ev:?}")
2719 }
2720 }
2721 }
2722 handle.terminate().await;
2723
2724 let tx = MockTransaction::eip1559();
2725 let _ = transactions
2726 .pool
2727 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2728 .await;
2729
2730 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2731
2732 let (send, receive) =
2733 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2734
2735 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2736 peer_id: *handle1.peer_id(),
2737 request,
2738 response: send,
2739 });
2740
2741 match receive.await.unwrap() {
2742 Ok(PooledTransactions(transactions)) => {
2743 assert_eq!(transactions.len(), 1);
2744 }
2745 Err(e) => {
2746 panic!("error: {e:?}");
2747 }
2748 }
2749 }
2750
2751 #[tokio::test]
2755 async fn test_partially_tx_response() {
2756 reth_tracing::init_test_tracing();
2757
2758 let mut tx_manager = new_tx_manager().await.0;
2759 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2760
2761 let peer_id_1 = PeerId::new([1; 64]);
2762 let eth_version = EthVersion::Eth66;
2763
2764 let txs = vec![
2765 TransactionSigned::new_unhashed(
2766 Transaction::Legacy(TxLegacy {
2767 chain_id: Some(4),
2768 nonce: 15u64,
2769 gas_price: 2200000000,
2770 gas_limit: 34811,
2771 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2772 value: U256::from(1234u64),
2773 input: Default::default(),
2774 }),
2775 Signature::new(
2776 U256::from_str(
2777 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2778 )
2779 .unwrap(),
2780 U256::from_str(
2781 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2782 )
2783 .unwrap(),
2784 true,
2785 ),
2786 ),
2787 TransactionSigned::new_unhashed(
2788 Transaction::Eip1559(TxEip1559 {
2789 chain_id: 4,
2790 nonce: 26u64,
2791 max_priority_fee_per_gas: 1500000000,
2792 max_fee_per_gas: 1500000013,
2793 gas_limit: MIN_TRANSACTION_GAS,
2794 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2795 value: U256::from(3000000000000000000u64),
2796 input: Default::default(),
2797 access_list: Default::default(),
2798 }),
2799 Signature::new(
2800 U256::from_str(
2801 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2802 )
2803 .unwrap(),
2804 U256::from_str(
2805 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2806 )
2807 .unwrap(),
2808 true,
2809 ),
2810 ),
2811 ];
2812
2813 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2814
2815 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2816 peer_1.seen_transactions.insert(txs_hashes[0]);
2819 peer_1.seen_transactions.insert(txs_hashes[1]);
2820 tx_manager.peers.insert(peer_id_1, peer_1);
2821
2822 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2823 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2824
2825 assert!(tx_fetcher.is_idle(&peer_id_1));
2827 assert_eq!(tx_fetcher.active_peers.len(), 0);
2828
2829 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2831
2832 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2833 assert!(!tx_fetcher.is_idle(&peer_id_1));
2835 assert_eq!(tx_fetcher.active_peers.len(), 1);
2836
2837 let req = to_mock_session_rx
2839 .recv()
2840 .await
2841 .expect("peer_1 session should receive request with buffered hashes");
2842 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2843
2844 let message: Vec<PooledTransactionVariant> = txs
2845 .into_iter()
2846 .take(1)
2847 .map(|tx| {
2848 PooledTransactionVariant::try_from(tx)
2849 .expect("Failed to convert MockTransaction to PooledTransaction")
2850 })
2851 .collect();
2852 response
2854 .send(Ok(PooledTransactions(message)))
2855 .expect("should send peer_1 response to tx manager");
2856 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2857 unreachable!()
2858 };
2859
2860 assert!(tx_fetcher.is_idle(&peer_id));
2862 assert_eq!(tx_fetcher.active_peers.len(), 0);
2863 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2865 }
2866
2867 #[tokio::test]
2868 async fn test_max_retries_tx_request() {
2869 reth_tracing::init_test_tracing();
2870
2871 let mut tx_manager = new_tx_manager().await.0;
2872 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2873
2874 let peer_id_1 = PeerId::new([1; 64]);
2875 let peer_id_2 = PeerId::new([2; 64]);
2876 let eth_version = EthVersion::Eth66;
2877 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2878
2879 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2880 peer_1.seen_transactions.insert(seen_hashes[0]);
2883 peer_1.seen_transactions.insert(seen_hashes[1]);
2884 tx_manager.peers.insert(peer_id_1, peer_1);
2885
2886 let retries = 1;
2889 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2890 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2891
2892 assert!(tx_fetcher.is_idle(&peer_id_1));
2894 assert_eq!(tx_fetcher.active_peers.len(), 0);
2895
2896 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2898
2899 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2900
2901 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2902 assert!(!tx_fetcher.is_idle(&peer_id_1));
2904 assert_eq!(tx_fetcher.active_peers.len(), 1);
2905
2906 let req = to_mock_session_rx
2908 .recv()
2909 .await
2910 .expect("peer_1 session should receive request with buffered hashes");
2911 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2912 let GetPooledTransactions(hashes) = request;
2913
2914 let hashes = hashes.into_iter().collect::<B256Set>();
2915
2916 assert_eq!(hashes, seen_hashes.into_iter().collect::<B256Set>());
2917
2918 response
2920 .send(Err(RequestError::BadResponse))
2921 .expect("should send peer_1 response to tx manager");
2922 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2923 unreachable!()
2924 };
2925
2926 assert!(tx_fetcher.is_idle(&peer_id));
2928 assert_eq!(tx_fetcher.active_peers.len(), 0);
2929 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2931
2932 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2933 tx_manager.peers.insert(peer_id_2, peer_2);
2934
2935 let msg =
2937 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2938 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2939
2940 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2941
2942 assert_eq!(tx_fetcher.active_peers.len(), 1);
2944
2945 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2947 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2949
2950 let req = to_mock_session_rx
2952 .recv()
2953 .await
2954 .expect("peer_2 session should receive request with buffered hashes");
2955 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2956
2957 response
2959 .send(Err(RequestError::BadResponse))
2960 .expect("should send peer_2 response to tx manager");
2961 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2962
2963 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2966 assert_eq!(tx_fetcher.active_peers.len(), 0);
2967 }
2968
2969 #[test]
2970 fn test_transaction_builder_empty() {
2971 let mut builder =
2972 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2973 assert!(builder.is_empty());
2974
2975 let mut factory = MockTransactionFactory::default();
2976 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2977 builder.push(&tx);
2978 assert!(!builder.is_empty());
2979
2980 let txs = builder.build();
2981 assert!(txs.full.is_none());
2982 let txs = txs.pooled.unwrap();
2983 assert_eq!(txs.len(), 1);
2984 }
2985
2986 #[test]
2987 fn test_transaction_builder_large() {
2988 let mut builder =
2989 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2990 assert!(builder.is_empty());
2991
2992 let mut factory = MockTransactionFactory::default();
2993 let mut tx = factory.create_eip1559();
2994 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2996 let tx = Arc::new(tx);
2997 let tx = PropagateTransaction::pool_tx(tx);
2998 builder.push(&tx);
2999 assert!(!builder.is_empty());
3000
3001 let txs = builder.clone().build();
3002 assert!(txs.pooled.is_none());
3003 let txs = txs.full.unwrap();
3004 assert_eq!(txs.len(), 1);
3005
3006 builder.push(&tx);
3007
3008 let txs = builder.clone().build();
3009 let pooled = txs.pooled.unwrap();
3010 assert_eq!(pooled.len(), 1);
3011 let txs = txs.full.unwrap();
3012 assert_eq!(txs.len(), 1);
3013 }
3014
3015 #[test]
3016 fn test_transaction_builder_eip4844() {
3017 let mut builder =
3018 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
3019 assert!(builder.is_empty());
3020
3021 let mut factory = MockTransactionFactory::default();
3022 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
3023 builder.push(&tx);
3024 assert!(!builder.is_empty());
3025
3026 let txs = builder.clone().build();
3027 assert!(txs.full.is_none());
3028 let txs = txs.pooled.unwrap();
3029 assert_eq!(txs.len(), 1);
3030
3031 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
3032 builder.push(&tx);
3033
3034 let txs = builder.clone().build();
3035 let pooled = txs.pooled.unwrap();
3036 assert_eq!(pooled.len(), 1);
3037 let txs = txs.full.unwrap();
3038 assert_eq!(txs.len(), 1);
3039 }
3040
3041 #[tokio::test]
3042 async fn test_propagate_full() {
3043 reth_tracing::init_test_tracing();
3044
3045 let (mut tx_manager, network) = new_tx_manager().await;
3046 let peer_id = PeerId::random();
3047
3048 network.handle().update_sync_state(SyncState::Idle);
3050
3051 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3053
3054 let session_info = SessionInfo {
3055 peer_id,
3056 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3057 client_version: Arc::from(""),
3058 capabilities: Arc::new(vec![].into()),
3059 status: Arc::new(Default::default()),
3060 version: EthVersion::Eth68,
3061 peer_kind: PeerKind::Basic,
3062 };
3063 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3064 tx_manager
3065 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3066 let mut propagate = vec![];
3067 let mut factory = MockTransactionFactory::default();
3068 let eip1559_tx = Arc::new(factory.create_eip1559());
3069 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3070 let eip4844_tx = Arc::new(factory.create_eip4844());
3071 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3072
3073 let propagated =
3074 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3075 assert_eq!(propagated.len(), 2);
3076 let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3077 assert_eq!(prop_txs.len(), 1);
3078 assert!(prop_txs[0].is_full());
3079
3080 let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3081 assert_eq!(prop_txs.len(), 1);
3082 assert!(prop_txs[0].is_hash());
3083
3084 let peer = tx_manager.peers.get(&peer_id).unwrap();
3085 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3086 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3087 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3088
3089 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3091 assert!(propagated.is_empty());
3092 }
3093
3094 #[tokio::test]
3095 async fn test_propagate_pending_txs_while_initially_syncing() {
3096 reth_tracing::init_test_tracing();
3097
3098 let (mut tx_manager, network) = new_tx_manager().await;
3099 let peer_id = PeerId::random();
3100
3101 network.handle().update_sync_state(SyncState::Syncing);
3103 assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3104
3105 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3107 tx_manager.peers.insert(peer_id, peer);
3108
3109 let tx = MockTransaction::eip1559();
3110 tx_manager
3111 .pool
3112 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3113 .await
3114 .expect("transaction should be accepted into the pool");
3115
3116 tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3117
3118 let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3119 assert!(peer.seen_transactions.contains(tx.get_hash()));
3120 }
3121
3122 #[tokio::test]
3123 async fn test_relaxed_filter_ignores_unknown_tx_types() {
3124 reth_tracing::init_test_tracing();
3125
3126 let transactions_manager_config = TransactionsManagerConfig::default();
3127
3128 let propagation_policy = TransactionPropagationKind::default();
3129 let announcement_policy = RelaxedEthAnnouncementFilter::default();
3130
3131 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3132
3133 let pool = testing_pool();
3134 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3135 let client = NoopProvider::default();
3136
3137 let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3138 .listener_port(0)
3139 .disable_discovery()
3140 .build(client.clone());
3141
3142 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3143 let (to_tx_manager_tx, from_network_rx) =
3144 reth_metrics::common::mpsc::memory_bounded_channel::<
3145 NetworkTransactionEvent<EthNetworkPrimitives>,
3146 >(
3147 crate::transactions::constants::tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
3148 "test_tx_channel",
3149 );
3150 network_manager.set_transactions(to_tx_manager_tx);
3151 let network_handle = network_manager.handle().clone();
3152 let network_service_handle = tokio::spawn(network_manager);
3153
3154 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3155 network_handle.clone(),
3156 pool.clone(),
3157 from_network_rx,
3158 transactions_manager_config,
3159 policy_bundle,
3160 );
3161
3162 let peer_id = PeerId::random();
3163 let eth_version = EthVersion::Eth68;
3164 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3165 tx_manager.peers.insert(peer_id, mock_peer_metadata);
3166
3167 let mut tx_factory = MockTransactionFactory::default();
3168
3169 let valid_known_tx = tx_factory.create_eip1559();
3170 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3171
3172 let known_tx_hash = *known_tx_signed.hash();
3173 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3174 let known_tx_size = known_tx_signed.encoded_length();
3175
3176 let unknown_tx_hash = B256::random();
3177 let unknown_tx_type_byte = 0xff_u8;
3178 let unknown_tx_size = 150;
3179
3180 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3181 types: vec![known_tx_type_byte, unknown_tx_type_byte],
3182 sizes: vec![known_tx_size, unknown_tx_size],
3183 hashes: vec![known_tx_hash, unknown_tx_hash],
3184 });
3185
3186 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3187
3188 poll_fn(|cx| {
3189 let _ = tx_manager.poll_unpin(cx);
3190 Poll::Ready(())
3191 })
3192 .await;
3193
3194 let mut requested_hashes_in_getpooled = B256Set::default();
3195 let mut unexpected_request_received = false;
3196
3197 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3198 .await
3199 {
3200 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3201 let GetPooledTransactions(hashes) = request;
3202 for hash in hashes {
3203 requested_hashes_in_getpooled.insert(hash);
3204 }
3205 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3206 }
3207 Ok(Some(other_request)) => {
3208 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3209 unexpected_request_received = true;
3210 }
3211 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3212 Err(_timeout_err) => {
3213 tracing::info!("Timeout: No GetPooledTransactions request received.")
3214 }
3215 }
3216
3217 assert!(
3218 requested_hashes_in_getpooled.contains(&known_tx_hash),
3219 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3220 );
3221 assert!(
3222 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3223 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3224 );
3225 assert!(
3226 !unexpected_request_received,
3227 "An unexpected P2P request was received by the mock peer."
3228 );
3229
3230 network_service_handle.abort();
3231 }
3232}