1use alloy_consensus::transaction::TxHashRef;
4use itertools::Itertools;
5use rayon::iter::{IntoParallelIterator, ParallelIterator};
6
7pub mod config;
9pub mod constants;
11pub mod fetcher;
13pub mod policy;
15
16pub use self::constants::{
17 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
18 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
19};
20use config::AnnouncementAcceptance;
21pub use config::{
22 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
23 TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
24};
25use policy::NetworkPolicies;
26
27pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
28
29use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
30use crate::{
31 budget::{
32 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
33 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
34 },
35 cache::LruCache,
36 duration_metered_exec, metered_poll_nested_stream_with_budget,
37 metrics::{
38 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
39 },
40 transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
41 NetworkHandle, TxTypesCounter,
42};
43use alloy_primitives::{TxHash, B256};
44use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
45use futures::{stream::FuturesUnordered, Future, StreamExt};
46use reth_eth_wire::{
47 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
48 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
49 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
50 RequestTxHashes, Transactions, ValidAnnouncementData,
51};
52use reth_ethereum_primitives::{TransactionSigned, TxType};
53use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
54use reth_network_api::{
55 events::{PeerEvent, SessionInfo},
56 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
57};
58use reth_network_p2p::{
59 error::{RequestError, RequestResult},
60 sync::SyncStateProvider,
61};
62use reth_network_peers::PeerId;
63use reth_network_types::ReputationChangeKind;
64use reth_primitives_traits::SignedTransaction;
65use reth_tokio_util::EventStream;
66use reth_transaction_pool::{
67 error::{PoolError, PoolResult},
68 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
69 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
70};
71use std::{
72 collections::{hash_map::Entry, HashMap, HashSet},
73 pin::Pin,
74 sync::{
75 atomic::{AtomicUsize, Ordering},
76 Arc,
77 },
78 task::{Context, Poll},
79 time::{Duration, Instant},
80};
81use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
82use tokio_stream::wrappers::UnboundedReceiverStream;
83use tracing::{debug, trace};
84
85pub type PoolImportFuture =
89 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
90
91#[derive(Debug, Clone)]
99pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
100 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
102}
103
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105 fn send(&self, cmd: TransactionsCommand<N>) {
106 let _ = self.manager_tx.send(cmd);
107 }
108
109 async fn peer_handle(
111 &self,
112 peer_id: PeerId,
113 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114 let (tx, rx) = oneshot::channel();
115 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116 rx.await
117 }
118
119 pub fn propagate(&self, hash: TxHash) {
121 self.send(TransactionsCommand::PropagateHash(hash))
122 }
123
124 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128 self.propagate_hashes_to(Some(hash), peer)
129 }
130
131 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135 let hashes = hash.into_iter().collect::<Vec<_>>();
136 if hashes.is_empty() {
137 return
138 }
139 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140 }
141
142 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144 let (tx, rx) = oneshot::channel();
145 self.send(TransactionsCommand::GetActivePeers(tx));
146 rx.await
147 }
148
149 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153 if transactions.is_empty() {
154 return
155 }
156 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157 }
158
159 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164 if transactions.is_empty() {
165 return
166 }
167 self.send(TransactionsCommand::PropagateTransactions(transactions))
168 }
169
170 pub fn broadcast_transactions(
175 &self,
176 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177 ) {
178 let transactions =
179 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180 if transactions.is_empty() {
181 return
182 }
183 self.send(TransactionsCommand::BroadcastTransactions(transactions))
184 }
185
186 pub async fn get_transaction_hashes(
188 &self,
189 peers: Vec<PeerId>,
190 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
191 if peers.is_empty() {
192 return Ok(Default::default())
193 }
194 let (tx, rx) = oneshot::channel();
195 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196 rx.await
197 }
198
199 pub async fn get_peer_transaction_hashes(
201 &self,
202 peer: PeerId,
203 ) -> Result<HashSet<TxHash>, RecvError> {
204 let res = self.get_transaction_hashes(vec![peer]).await?;
205 Ok(res.into_values().next().unwrap_or_default())
206 }
207
208 pub async fn get_pooled_transactions_from(
214 &self,
215 peer_id: PeerId,
216 hashes: Vec<B256>,
217 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
218 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
219
220 let (tx, rx) = oneshot::channel();
221 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
222 peer.try_send(request).ok();
223
224 rx.await?.map(|res| Some(res.0))
225 }
226}
227
228#[derive(Debug)]
283#[must_use = "Manager does nothing unless polled."]
284pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
285 pool: Pool,
287 network: NetworkHandle<N>,
289 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
293 transaction_fetcher: TransactionFetcher<N>,
295 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
300 pool_imports: FuturesUnordered<PoolImportFuture>,
312 pending_pool_imports_info: PendingPoolImportsInfo,
314 bad_imports: LruCache<TxHash>,
316 peers: HashMap<PeerId, PeerMetadata<N>>,
318 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
322 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
327 pending_transactions: mpsc::Receiver<TxHash>,
336 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
338 config: TransactionsManagerConfig,
340 policies: NetworkPolicies<N>,
342 metrics: TransactionsManagerMetrics,
344 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
346}
347
348impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
349 pub fn new(
353 network: NetworkHandle<N>,
354 pool: Pool,
355 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
356 transactions_manager_config: TransactionsManagerConfig,
357 ) -> Self {
358 Self::with_policy(
359 network,
360 pool,
361 from_network,
362 transactions_manager_config,
363 NetworkPolicies::new(
364 TransactionPropagationKind::default(),
365 StrictEthAnnouncementFilter::default(),
366 ),
367 )
368 }
369}
370
371impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
372 pub fn with_policy(
376 network: NetworkHandle<N>,
377 pool: Pool,
378 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
379 transactions_manager_config: TransactionsManagerConfig,
380 policies: NetworkPolicies<N>,
381 ) -> Self {
382 let network_events = network.event_listener();
383
384 let (command_tx, command_rx) = mpsc::unbounded_channel();
385
386 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
387 &transactions_manager_config.transaction_fetcher_config,
388 );
389
390 let pending = pool.pending_transactions_listener();
393 let pending_pool_imports_info =
394 PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
395 let metrics = TransactionsManagerMetrics::default();
396 metrics
397 .capacity_pending_pool_imports
398 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
399
400 Self {
401 pool,
402 network,
403 network_events,
404 transaction_fetcher,
405 transactions_by_peers: Default::default(),
406 pool_imports: Default::default(),
407 pending_pool_imports_info,
408 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409 peers: Default::default(),
410 command_tx,
411 command_rx: UnboundedReceiverStream::new(command_rx),
412 pending_transactions: pending,
413 transaction_events: UnboundedMeteredReceiver::new(
414 from_network,
415 NETWORK_POOL_TRANSACTIONS_SCOPE,
416 ),
417 config: transactions_manager_config,
418 policies,
419 metrics,
420 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421 }
422 }
423
424 pub fn handle(&self) -> TransactionsHandle<N> {
426 TransactionsHandle { manager_tx: self.command_tx.clone() }
427 }
428
429 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432 self.has_capacity_for_pending_pool_imports() &&
433 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434 }
435
436 fn has_capacity_for_pending_pool_imports(&self) -> bool {
438 self.remaining_pool_import_capacity() > 0
439 }
440
441 fn remaining_pool_import_capacity(&self) -> usize {
443 self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444 self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445 )
446 }
447
448 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450 self.metrics.reported_bad_transactions.increment(1);
451 }
452
453 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455 self.network.reputation_change(peer_id, kind);
456 }
457
458 fn report_already_seen(&self, peer_id: PeerId) {
459 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461 }
462
463 fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
465 if let Some(mut peer) = self.peers.remove(peer_id) {
466 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
467 }
468 self.transaction_fetcher.remove_peer(peer_id);
469 }
470
471 fn on_good_import(&mut self, hash: TxHash) {
473 self.transactions_by_peers.remove(&hash);
474 }
475
476 fn on_bad_import(&mut self, err: PoolError) {
506 let peers = self.transactions_by_peers.remove(&err.hash);
507
508 if err.is_bad_blob_sidecar() {
509 if let Some(peers) = peers {
513 for peer_id in peers {
514 self.report_peer_bad_transactions(peer_id);
515 }
516 }
517 return
518 }
519
520 if !err.is_bad_transaction() || self.network.is_syncing() {
522 return
523 }
524 if let Some(peers) = peers {
527 for peer_id in peers {
528 self.report_peer_bad_transactions(peer_id);
529 }
530 }
531 self.metrics.bad_imports.increment(1);
532 self.bad_imports.insert(err.hash);
533 }
534
535 fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
539 let info = &self.pending_pool_imports_info;
541 let max_pending_pool_imports = info.max_pending_pool_imports;
542 let has_capacity_wrt_pending_pool_imports =
543 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
544
545 self.transaction_fetcher
546 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
547 }
548
549 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
550 let kind = match req_err {
551 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
552 RequestError::Timeout => ReputationChangeKind::Timeout,
553 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
554 return
556 }
557 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
558 };
559 self.report_peer(peer_id, kind);
560 }
561
562 #[inline]
563 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
564 let metrics = &self.metrics;
565
566 let TxManagerPollDurations {
567 acc_network_events,
568 acc_pending_imports,
569 acc_tx_events,
570 acc_imported_txns,
571 acc_fetch_events,
572 acc_pending_fetch,
573 acc_cmds,
574 } = poll_durations;
575
576 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
578 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
580 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
581 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
582 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
583 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
584 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
585 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
586 }
587}
588
589impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
590 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
592 for res in batch_results {
593 match res {
594 Ok(AddedTransactionOutcome { hash, .. }) => {
595 self.on_good_import(hash);
596 }
597 Err(err) => {
598 self.on_bad_import(err);
599 }
600 }
601 }
602 }
603
604 fn on_new_pooled_transaction_hashes(
606 &mut self,
607 peer_id: PeerId,
608 msg: NewPooledTransactionHashes,
609 ) {
610 if self.network.is_initially_syncing() {
612 return
613 }
614 if self.network.tx_gossip_disabled() {
615 return
616 }
617
618 let Some(peer) = self.peers.get_mut(&peer_id) else {
620 trace!(
621 peer_id = format!("{peer_id:#}"),
622 ?msg,
623 "discarding announcement from inactive peer"
624 );
625
626 return
627 };
628 let client = peer.client_version.clone();
629
630 let mut count_txns_already_seen_by_peer = 0;
632 for tx in msg.iter_hashes().copied() {
633 if !peer.seen_transactions.insert(tx) {
634 count_txns_already_seen_by_peer += 1;
635 }
636 }
637 if count_txns_already_seen_by_peer > 0 {
638 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
643 self.metrics
644 .occurrences_hash_already_seen_by_peer
645 .increment(count_txns_already_seen_by_peer);
646
647 trace!(target: "net::tx",
648 %count_txns_already_seen_by_peer,
649 peer_id=format!("{peer_id:#}"),
650 ?client,
651 "Peer sent hashes that have already been marked as seen by peer"
652 );
653
654 self.report_already_seen(peer_id);
655 }
656
657 if msg.is_empty() {
659 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
660 return;
661 }
662
663 let original_len = msg.len();
664 let mut partially_valid_msg = msg.dedup();
665
666 if partially_valid_msg.len() != original_len {
667 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
668 }
669
670 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
672
673 let mut should_report_peer = false;
680 let mut tx_types_counter = TxTypesCounter::default();
681
682 let is_eth68_message = partially_valid_msg
683 .msg_version()
684 .expect("partially valid announcement should have a version")
685 .is_eth68();
686
687 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
688 let (ty_byte, size_val) = match *metadata_ref_mut {
689 Some((ty, size)) => {
690 if !is_eth68_message {
691 should_report_peer = true;
692 }
693 (ty, size)
694 }
695 None => {
696 if is_eth68_message {
697 should_report_peer = true;
698 return false;
699 }
700 (0u8, 0)
701 }
702 };
703
704 if is_eth68_message &&
705 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
706 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
707 {
708 tx_types_counter.increase_by_tx_type(parsed_tx_type);
709 }
710
711 let decision = self
712 .policies
713 .announcement_filter()
714 .decide_on_announcement(ty_byte, tx_hash, size_val);
715
716 match decision {
717 AnnouncementAcceptance::Accept => true,
718 AnnouncementAcceptance::Ignore => false,
719 AnnouncementAcceptance::Reject { penalize_peer } => {
720 if penalize_peer {
721 should_report_peer = true;
722 }
723 false
724 }
725 }
726 });
727
728 if is_eth68_message {
729 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
730 }
731
732 if should_report_peer {
733 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
734 }
735
736 let hashes_count_pre_pool_filter = partially_valid_msg.len();
744 self.pool.retain_unknown(&mut partially_valid_msg);
745 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
746 let already_known_hashes_count =
747 hashes_count_pre_pool_filter - partially_valid_msg.len();
748 self.metrics
749 .occurrences_hashes_already_in_pool
750 .increment(already_known_hashes_count as u64);
751 }
752
753 if partially_valid_msg.is_empty() {
754 return
756 }
757
758 let mut valid_announcement_data =
759 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
760
761 if valid_announcement_data.is_empty() {
762 return
764 }
765
766 let bad_imports = &self.bad_imports;
773 self.transaction_fetcher.filter_unseen_and_pending_hashes(
774 &mut valid_announcement_data,
775 |hash| bad_imports.contains(hash),
776 &peer_id,
777 &client,
778 );
779
780 if valid_announcement_data.is_empty() {
781 return
783 }
784
785 trace!(target: "net::tx::propagation",
786 peer_id=format!("{peer_id:#}"),
787 hashes_len=valid_announcement_data.len(),
788 hashes=%valid_announcement_data.keys().format(", "),
789 msg_version=%valid_announcement_data.msg_version(),
790 client_version=%client,
791 "received previously unseen and pending hashes in announcement from peer"
792 );
793
794 if !self.transaction_fetcher.is_idle(&peer_id) {
797 let msg_version = valid_announcement_data.msg_version();
799 let (hashes, _version) = valid_announcement_data.into_request_hashes();
800
801 trace!(target: "net::tx",
802 peer_id=format!("{peer_id:#}"),
803 hashes=?*hashes,
804 %msg_version,
805 %client,
806 "buffering hashes announced by busy peer"
807 );
808
809 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
810
811 return
812 }
813
814 let mut hashes_to_request =
815 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
816 let surplus_hashes =
817 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
818
819 if !surplus_hashes.is_empty() {
820 trace!(target: "net::tx",
821 peer_id=format!("{peer_id:#}"),
822 surplus_hashes=?*surplus_hashes,
823 %client,
824 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
825 );
826
827 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
828 }
829
830 trace!(target: "net::tx",
831 peer_id=format!("{peer_id:#}"),
832 hashes=?*hashes_to_request,
833 %client,
834 "sending hashes in `GetPooledTransactions` request to peer's session"
835 );
836
837 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
841 if let Some(failed_to_request_hashes) =
842 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
843 {
844 let conn_eth_version = peer.version;
845
846 trace!(target: "net::tx",
847 peer_id=format!("{peer_id:#}"),
848 failed_to_request_hashes=?*failed_to_request_hashes,
849 %conn_eth_version,
850 %client,
851 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
852 );
853 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
854 }
855 }
856}
857
858impl<Pool, N> TransactionsManager<Pool, N>
859where
860 Pool: TransactionPool + Unpin + 'static,
861 N: NetworkPrimitives<
862 BroadcastedTransaction: SignedTransaction,
863 PooledTransaction: SignedTransaction,
864 > + Unpin,
865 Pool::Transaction:
866 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
867{
868 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
880 if self.network.tx_gossip_disabled() {
884 return
885 }
886
887 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
888
889 self.propagate_all(hashes);
890 }
891
892 fn propagate_full_transactions_to_peer(
896 &mut self,
897 txs: Vec<TxHash>,
898 peer_id: PeerId,
899 propagation_mode: PropagationMode,
900 ) -> Option<PropagatedTransactions> {
901 let peer = self.peers.get_mut(&peer_id)?;
902 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
903 let mut propagated = PropagatedTransactions::default();
904
905 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
907
908 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
909
910 if propagation_mode.is_forced() {
911 full_transactions.extend(to_propagate);
913 } else {
914 for tx in to_propagate {
917 if !peer.seen_transactions.contains(tx.tx_hash()) {
918 full_transactions.push(&tx);
920 }
921 }
922 }
923
924 if full_transactions.is_empty() {
925 return None
927 }
928
929 let PropagateTransactions { pooled, full } = full_transactions.build();
930
931 if let Some(new_pooled_hashes) = pooled {
933 for hash in new_pooled_hashes.iter_hashes().copied() {
934 propagated.record(hash, PropagateKind::Hash(peer_id));
935 peer.seen_transactions.insert(hash);
937 }
938
939 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
941 }
942
943 if let Some(new_full_transactions) = full {
945 for tx in &new_full_transactions {
946 propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
947 peer.seen_transactions.insert(*tx.tx_hash());
949 }
950
951 self.network.send_transactions(peer_id, new_full_transactions);
953 }
954
955 self.metrics.propagated_transactions.increment(propagated.len() as u64);
957
958 Some(propagated)
959 }
960
961 fn propagate_hashes_to(
965 &mut self,
966 hashes: Vec<TxHash>,
967 peer_id: PeerId,
968 propagation_mode: PropagationMode,
969 ) {
970 trace!(target: "net::tx", "Start propagating transactions as hashes");
971
972 let propagated = {
975 let Some(peer) = self.peers.get_mut(&peer_id) else {
976 return
978 };
979
980 let to_propagate =
981 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
982
983 let mut propagated = PropagatedTransactions::default();
984
985 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
987
988 if propagation_mode.is_forced() {
989 hashes.extend(to_propagate)
990 } else {
991 for tx in to_propagate {
992 if !peer.seen_transactions.contains(tx.tx_hash()) {
993 hashes.push(&tx);
995 }
996 }
997 }
998
999 let new_pooled_hashes = hashes.build();
1000
1001 if new_pooled_hashes.is_empty() {
1002 return
1004 }
1005
1006 if let Some(peer) = self.peers.get_mut(&peer_id) {
1007 for hash in new_pooled_hashes.iter_hashes().copied() {
1008 propagated.record(hash, PropagateKind::Hash(peer_id));
1009 peer.seen_transactions.insert(hash);
1010 }
1011 }
1012
1013 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1014
1015 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1017
1018 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1020
1021 propagated
1022 };
1023
1024 self.pool.on_propagated(propagated);
1026 }
1027
1028 fn propagate_transactions(
1035 &mut self,
1036 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1037 propagation_mode: PropagationMode,
1038 ) -> PropagatedTransactions {
1039 let mut propagated = PropagatedTransactions::default();
1040 if self.network.tx_gossip_disabled() {
1041 return propagated
1042 }
1043
1044 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1046
1047 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1049 if !self.policies.propagation_policy().can_propagate(peer) {
1050 continue
1052 }
1053 let mut builder = if peer_idx > max_num_full {
1055 PropagateTransactionsBuilder::pooled(peer.version)
1056 } else {
1057 PropagateTransactionsBuilder::full(peer.version)
1058 };
1059
1060 if propagation_mode.is_forced() {
1061 builder.extend(to_propagate.iter());
1062 } else {
1063 for tx in &to_propagate {
1067 if !peer.seen_transactions.contains(tx.tx_hash()) {
1070 builder.push(tx);
1071 }
1072 }
1073 }
1074
1075 if builder.is_empty() {
1076 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1077 continue
1078 }
1079
1080 let PropagateTransactions { pooled, full } = builder.build();
1081
1082 if let Some(mut new_pooled_hashes) = pooled {
1084 new_pooled_hashes
1087 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1088
1089 for hash in new_pooled_hashes.iter_hashes().copied() {
1090 propagated.record(hash, PropagateKind::Hash(*peer_id));
1091 peer.seen_transactions.insert(hash);
1093 }
1094
1095 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1096
1097 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1099 }
1100
1101 if let Some(new_full_transactions) = full {
1103 for tx in &new_full_transactions {
1104 propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1105 peer.seen_transactions.insert(*tx.tx_hash());
1107 }
1108
1109 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1110
1111 self.network.send_transactions(*peer_id, new_full_transactions);
1113 }
1114 }
1115
1116 self.metrics.propagated_transactions.increment(propagated.len() as u64);
1118
1119 propagated
1120 }
1121
1122 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1127 if self.peers.is_empty() {
1128 return
1130 }
1131 let propagated = self.propagate_transactions(
1132 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1133 PropagationMode::Basic,
1134 );
1135
1136 self.pool.on_propagated(propagated);
1138 }
1139
1140 fn on_get_pooled_transactions(
1142 &mut self,
1143 peer_id: PeerId,
1144 request: GetPooledTransactions,
1145 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1146 ) {
1147 if self.network.tx_gossip_disabled() {
1149 let _ = response.send(Ok(PooledTransactions::default()));
1150 return
1151 }
1152 if let Some(peer) = self.peers.get_mut(&peer_id) {
1153 let transactions = self.pool.get_pooled_transaction_elements(
1154 request.0,
1155 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1156 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1157 ),
1158 );
1159 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1160
1161 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1164
1165 let resp = PooledTransactions(transactions);
1166 let _ = response.send(Ok(resp));
1167 }
1168 }
1169
1170 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1172 match cmd {
1173 TransactionsCommand::PropagateHash(hash) => {
1174 self.on_new_pending_transactions(vec![hash])
1175 }
1176 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1177 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1178 }
1179 TransactionsCommand::GetActivePeers(tx) => {
1180 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1181 tx.send(peers).ok();
1182 }
1183 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1184 if let Some(propagated) =
1185 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1186 {
1187 self.pool.on_propagated(propagated);
1188 }
1189 }
1190 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1191 TransactionsCommand::BroadcastTransactions(txs) => {
1192 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1193 self.pool.on_propagated(propagated);
1194 }
1195 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1196 let mut res = HashMap::with_capacity(peers.len());
1197 for peer_id in peers {
1198 let hashes = self
1199 .peers
1200 .get(&peer_id)
1201 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1202 .unwrap_or_default();
1203 res.insert(peer_id, hashes);
1204 }
1205 tx.send(res).ok();
1206 }
1207 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1208 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1209 peer_request_sender.send(sender).ok();
1210 }
1211 }
1212 }
1213
1214 fn handle_peer_session(
1218 &mut self,
1219 info: SessionInfo,
1220 messages: PeerRequestSender<PeerRequest<N>>,
1221 ) {
1222 let SessionInfo { peer_id, client_version, version, .. } = info;
1223
1224 let peer = PeerMetadata::<N>::new(
1226 messages,
1227 version,
1228 client_version,
1229 self.config.max_transactions_seen_by_peer_history,
1230 info.peer_kind,
1231 );
1232 let peer = match self.peers.entry(peer_id) {
1233 Entry::Occupied(mut entry) => {
1234 entry.insert(peer);
1235 entry.into_mut()
1236 }
1237 Entry::Vacant(entry) => entry.insert(peer),
1238 };
1239
1240 self.policies.propagation_policy_mut().on_session_established(peer);
1241
1242 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1246 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1247 return
1248 }
1249
1250 let pooled_txs = self.pool.pooled_transactions_max(
1252 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1253 );
1254 if pooled_txs.is_empty() {
1255 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1256 return;
1257 }
1258
1259 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1261 for pooled_tx in pooled_txs {
1262 peer.seen_transactions.insert(*pooled_tx.hash());
1263 msg_builder.push_pooled(pooled_tx);
1264 }
1265
1266 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1267 let msg = msg_builder.build();
1268 self.network.send_transactions_hashes(peer_id, msg);
1269 }
1270
1271 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1273 match event_result {
1274 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1275 self.on_peer_session_closed(&peer_id);
1276 }
1277 NetworkEvent::ActivePeerSession { info, messages } => {
1278 self.handle_peer_session(info, messages);
1280 }
1281 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1282 let peer_id = info.peer_id;
1283 let messages = match self.peers.get(&peer_id) {
1285 Some(p) => p.request_tx.clone(),
1286 None => {
1287 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1288 return;
1289 }
1290 };
1291 self.handle_peer_session(info, messages);
1292 }
1293 _ => {}
1294 }
1295 }
1296
1297 fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1299 if self.config.ingress_policy.allows_all() {
1300 return true;
1301 }
1302 let Some(peer) = self.peers.get(peer_id) else {
1303 return false;
1304 };
1305 self.config.ingress_policy.allows(peer.peer_kind())
1306 }
1307
1308 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1310 match event {
1311 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1312 if !self.accepts_incoming_from(&peer_id) {
1313 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1314 return;
1315 }
1316
1317 let has_blob_txs = msg.has_eip4844();
1321
1322 let non_blob_txs = msg
1323 .into_iter()
1324 .map(N::PooledTransaction::try_from)
1325 .filter_map(Result::ok)
1326 .collect();
1327
1328 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1329
1330 if has_blob_txs {
1331 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1332 self.report_peer_bad_transactions(peer_id);
1333 }
1334 }
1335 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1336 if !self.accepts_incoming_from(&peer_id) {
1337 trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1338 return;
1339 }
1340 self.on_new_pooled_transaction_hashes(peer_id, msg)
1341 }
1342 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1343 self.on_get_pooled_transactions(peer_id, request, response)
1344 }
1345 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1346 let _ = response.send(Some(self.handle()));
1347 }
1348 }
1349 }
1350
1351 fn import_transactions(
1353 &mut self,
1354 peer_id: PeerId,
1355 transactions: PooledTransactions<N::PooledTransaction>,
1356 source: TransactionSource,
1357 ) {
1358 if self.network.is_initially_syncing() {
1360 return
1361 }
1362 if self.network.tx_gossip_disabled() {
1363 return
1364 }
1365
1366 if !self.has_capacity_for_pending_pool_imports() {
1368 return
1369 }
1370
1371 let mut transactions = transactions.0;
1372
1373 let capacity = self.remaining_pool_import_capacity();
1377 if transactions.len() > capacity {
1378 let skipped = transactions.len() - capacity;
1379 transactions.truncate(capacity);
1380 self.metrics
1381 .skipped_transactions_pending_pool_imports_at_capacity
1382 .increment(skipped as u64);
1383 trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1384 }
1385
1386 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1387 let client_version = peer.client_version.clone();
1388
1389 let start = Instant::now();
1390
1391 self.transaction_fetcher
1393 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1394
1395 let mut num_already_seen_by_peer = 0;
1400 for tx in &transactions {
1401 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1402 num_already_seen_by_peer += 1;
1403 }
1404 }
1405
1406 let mut has_bad_transactions = false;
1408
1409 transactions.retain(|tx| {
1412 if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1413 entry.get_mut().insert(peer_id);
1414 return false
1415 }
1416 if self.bad_imports.contains(tx.tx_hash()) {
1417 trace!(target: "net::tx",
1418 peer_id=format!("{peer_id:#}"),
1419 hash=%tx.tx_hash(),
1420 %client_version,
1421 "received a known bad transaction from peer"
1422 );
1423 has_bad_transactions = true;
1424 return false;
1425 }
1426 true
1427 });
1428
1429 let txns_count_pre_pool_filter = transactions.len();
1431 self.pool.retain_unknown(&mut transactions);
1432 if txns_count_pre_pool_filter > transactions.len() {
1433 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1434 self.metrics
1435 .occurrences_transactions_already_in_pool
1436 .increment(already_known_txns_count as u64);
1437 }
1438
1439 let txs_len = transactions.len();
1440
1441 let new_txs = transactions
1442 .into_par_iter()
1443 .filter_map(|tx| match tx.try_into_recovered() {
1444 Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1445 Err(badtx) => {
1446 trace!(target: "net::tx",
1447 peer_id=format!("{peer_id:#}"),
1448 hash=%badtx.tx_hash(),
1449 client_version=%client_version,
1450 "failed ecrecovery for transaction"
1451 );
1452 None
1453 }
1454 })
1455 .collect::<Vec<_>>();
1456
1457 has_bad_transactions |= new_txs.len() != txs_len;
1458
1459 for tx in &new_txs {
1461 self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1462 }
1463
1464 if !new_txs.is_empty() {
1467 let pool = self.pool.clone();
1468 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1470 metric_pending_pool_imports.increment(new_txs.len() as f64);
1471
1472 self.pending_pool_imports_info
1474 .pending_pool_imports
1475 .fetch_add(new_txs.len(), Ordering::Relaxed);
1476 let tx_manager_info_pending_pool_imports =
1477 self.pending_pool_imports_info.pending_pool_imports.clone();
1478
1479 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1480 let import = Box::pin(async move {
1481 let added = new_txs.len();
1482 let res = pool.add_external_transactions(new_txs).await;
1483
1484 metric_pending_pool_imports.decrement(added as f64);
1486 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1488
1489 res
1490 });
1491
1492 self.pool_imports.push(import);
1493 }
1494
1495 if num_already_seen_by_peer > 0 {
1496 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1497 self.metrics
1498 .occurrences_of_transaction_already_seen_by_peer
1499 .increment(num_already_seen_by_peer);
1500 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1501 }
1502
1503 if has_bad_transactions {
1504 self.report_peer_bad_transactions(peer_id)
1506 }
1507
1508 if num_already_seen_by_peer > 0 {
1509 self.report_already_seen(peer_id);
1510 }
1511
1512 self.metrics.pool_import_prepare_duration.record(start.elapsed());
1513 }
1514
1515 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1517 match fetch_event {
1518 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1519 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1520 if report_peer {
1521 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1522 }
1523 }
1524 FetchEvent::FetchError { peer_id, error } => {
1525 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1526 self.on_request_error(peer_id, error);
1527 }
1528 FetchEvent::EmptyResponse { peer_id } => {
1529 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1530 }
1531 }
1532 }
1533}
1534
1535impl<
1543 Pool: TransactionPool + Unpin + 'static,
1544 N: NetworkPrimitives<
1545 BroadcastedTransaction: SignedTransaction,
1546 PooledTransaction: SignedTransaction,
1547 > + Unpin,
1548 > Future for TransactionsManager<Pool, N>
1549where
1550 Pool::Transaction:
1551 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1552{
1553 type Output = ();
1554
1555 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1556 let start = Instant::now();
1557 let mut poll_durations = TxManagerPollDurations::default();
1558
1559 let this = self.get_mut();
1560
1561 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1567 poll_durations.acc_network_events,
1568 "net::tx",
1569 "Network events stream",
1570 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1571 this.network_events.poll_next_unpin(cx),
1572 |event| this.on_network_event(event)
1573 );
1574
1575 let mut new_txs = Vec::new();
1584 let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1585 cx,
1586 &mut new_txs,
1587 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1588 ) {
1589 Poll::Ready(count) => {
1590 if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1591 true
1594 } else {
1595 let limit =
1599 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1600 new_txs.len();
1601 this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1602 }
1603 }
1604 Poll::Pending => false,
1605 };
1606 if !new_txs.is_empty() {
1607 this.on_new_pending_transactions(new_txs);
1608 }
1609
1610 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1625 poll_durations.acc_tx_events,
1626 "net::tx",
1627 "Network transaction events stream",
1628 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1629 this.transaction_events.poll_next_unpin(cx),
1630 |event| this.on_network_tx_event(event),
1631 );
1632
1633 let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1644 poll_durations.acc_fetch_events,
1645 "net::tx",
1646 "Transaction fetch events stream",
1647 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1648 this.transaction_fetcher.poll_next_unpin(cx),
1649 |event| this.on_fetch_event(event),
1650 );
1651
1652 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1667 poll_durations.acc_pending_imports,
1668 "net::tx",
1669 "Batched pool imports stream",
1670 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1671 this.pool_imports.poll_next_unpin(cx),
1672 |batch_results| this.on_batch_import_result(batch_results)
1673 );
1674
1675 duration_metered_exec!(
1680 {
1681 if this.has_capacity_for_fetching_pending_hashes() &&
1682 this.on_fetch_hashes_pending_fetch()
1683 {
1684 maybe_more_tx_fetch_events = true;
1685 }
1686 },
1687 poll_durations.acc_pending_fetch
1688 );
1689
1690 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1692 poll_durations.acc_cmds,
1693 "net::tx",
1694 "Commands channel",
1695 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1696 this.command_rx.poll_next_unpin(cx),
1697 |cmd| this.on_command(cmd)
1698 );
1699
1700 this.transaction_fetcher.update_metrics();
1701
1702 if maybe_more_network_events ||
1704 maybe_more_commands ||
1705 maybe_more_tx_events ||
1706 maybe_more_tx_fetch_events ||
1707 maybe_more_pool_imports ||
1708 maybe_more_pending_txns
1709 {
1710 cx.waker().wake_by_ref();
1712 return Poll::Pending
1713 }
1714
1715 this.update_poll_metrics(start, poll_durations);
1716
1717 Poll::Pending
1718 }
1719}
1720
1721#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1725enum PropagationMode {
1726 Basic,
1730 Forced,
1735}
1736
1737impl PropagationMode {
1738 const fn is_forced(self) -> bool {
1740 matches!(self, Self::Forced)
1741 }
1742}
1743
1744#[derive(Debug, Clone)]
1746struct PropagateTransaction<T = TransactionSigned> {
1747 size: usize,
1748 transaction: Arc<T>,
1749}
1750
1751impl<T: SignedTransaction> PropagateTransaction<T> {
1752 pub fn new(transaction: T) -> Self {
1754 let size = transaction.length();
1755 Self { size, transaction: Arc::new(transaction) }
1756 }
1757
1758 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1760 where
1761 P: PoolTransaction<Consensus = T>,
1762 {
1763 let size = tx.encoded_length();
1764 let transaction = tx.transaction.clone_into_consensus();
1765 let transaction = Arc::new(transaction.into_inner());
1766 Self { size, transaction }
1767 }
1768
1769 fn tx_hash(&self) -> &TxHash {
1770 self.transaction.tx_hash()
1771 }
1772}
1773
1774#[derive(Debug, Clone)]
1777enum PropagateTransactionsBuilder<T> {
1778 Pooled(PooledTransactionsHashesBuilder),
1779 Full(FullTransactionsBuilder<T>),
1780}
1781
1782impl<T> PropagateTransactionsBuilder<T> {
1783 fn pooled(version: EthVersion) -> Self {
1785 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1786 }
1787
1788 fn full(version: EthVersion) -> Self {
1790 Self::Full(FullTransactionsBuilder::new(version))
1791 }
1792
1793 fn is_empty(&self) -> bool {
1795 match self {
1796 Self::Pooled(builder) => builder.is_empty(),
1797 Self::Full(builder) => builder.is_empty(),
1798 }
1799 }
1800
1801 fn build(self) -> PropagateTransactions<T> {
1803 match self {
1804 Self::Pooled(pooled) => {
1805 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1806 }
1807 Self::Full(full) => full.build(),
1808 }
1809 }
1810}
1811
1812impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1813 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1815 for tx in txs {
1816 self.push(tx);
1817 }
1818 }
1819
1820 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1822 match self {
1823 Self::Pooled(builder) => builder.push(transaction),
1824 Self::Full(builder) => builder.push(transaction),
1825 }
1826 }
1827}
1828
1829struct PropagateTransactions<T> {
1831 pooled: Option<NewPooledTransactionHashes>,
1833 full: Option<Vec<Arc<T>>>,
1835}
1836
1837#[derive(Debug, Clone)]
1842struct FullTransactionsBuilder<T> {
1843 total_size: usize,
1845 transactions: Vec<Arc<T>>,
1847 pooled: PooledTransactionsHashesBuilder,
1849}
1850
1851impl<T> FullTransactionsBuilder<T> {
1852 fn new(version: EthVersion) -> Self {
1854 Self {
1855 total_size: 0,
1856 pooled: PooledTransactionsHashesBuilder::new(version),
1857 transactions: vec![],
1858 }
1859 }
1860
1861 fn is_empty(&self) -> bool {
1863 self.transactions.is_empty() && self.pooled.is_empty()
1864 }
1865
1866 fn build(self) -> PropagateTransactions<T> {
1868 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1869 let full = Some(self.transactions).filter(|full| !full.is_empty());
1870 PropagateTransactions { pooled, full }
1871 }
1872}
1873
1874impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1875 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1877 for tx in txs {
1878 self.push(&tx)
1879 }
1880 }
1881
1882 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1892 if !transaction.transaction.is_broadcastable_in_full() {
1901 self.pooled.push(transaction);
1902 return
1903 }
1904
1905 let new_size = self.total_size + transaction.size;
1906 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1907 self.total_size > 0
1908 {
1909 self.pooled.push(transaction);
1911 return
1912 }
1913
1914 self.total_size = new_size;
1915 self.transactions.push(Arc::clone(&transaction.transaction));
1916 }
1917}
1918
1919#[derive(Debug, Clone)]
1922enum PooledTransactionsHashesBuilder {
1923 Eth66(NewPooledTransactionHashes66),
1924 Eth68(NewPooledTransactionHashes68),
1925}
1926
1927impl PooledTransactionsHashesBuilder {
1930 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1932 match self {
1933 Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1934 Self::Eth68(msg) => {
1935 msg.hashes.push(*pooled_tx.hash());
1936 msg.sizes.push(pooled_tx.encoded_length());
1937 msg.types.push(pooled_tx.transaction.ty());
1938 }
1939 }
1940 }
1941
1942 fn is_empty(&self) -> bool {
1944 match self {
1945 Self::Eth66(hashes) => hashes.is_empty(),
1946 Self::Eth68(hashes) => hashes.is_empty(),
1947 }
1948 }
1949
1950 fn len(&self) -> usize {
1952 match self {
1953 Self::Eth66(hashes) => hashes.len(),
1954 Self::Eth68(hashes) => hashes.len(),
1955 }
1956 }
1957
1958 fn extend<T: SignedTransaction>(
1960 &mut self,
1961 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1962 ) {
1963 for tx in txs {
1964 self.push(&tx);
1965 }
1966 }
1967
1968 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1969 match self {
1970 Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1971 Self::Eth68(msg) => {
1972 msg.hashes.push(*tx.tx_hash());
1973 msg.sizes.push(tx.size);
1974 msg.types.push(tx.transaction.ty());
1975 }
1976 }
1977 }
1978
1979 fn new(version: EthVersion) -> Self {
1981 match version {
1982 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1983 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1984 Self::Eth68(Default::default())
1985 }
1986 }
1987 }
1988
1989 fn build(self) -> NewPooledTransactionHashes {
1990 match self {
1991 Self::Eth66(mut msg) => {
1992 msg.shrink_to_fit();
1993 msg.into()
1994 }
1995 Self::Eth68(mut msg) => {
1996 msg.shrink_to_fit();
1997 msg.into()
1998 }
1999 }
2000 }
2001}
2002
2003enum TransactionSource {
2005 Broadcast,
2007 Response,
2009}
2010
2011impl TransactionSource {
2014 const fn is_broadcast(&self) -> bool {
2016 matches!(self, Self::Broadcast)
2017 }
2018}
2019
2020#[derive(Debug)]
2022pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2023 seen_transactions: LruCache<TxHash>,
2027 request_tx: PeerRequestSender<PeerRequest<N>>,
2029 version: EthVersion,
2031 client_version: Arc<str>,
2033 peer_kind: PeerKind,
2035}
2036
2037impl<N: NetworkPrimitives> PeerMetadata<N> {
2038 pub fn new(
2040 request_tx: PeerRequestSender<PeerRequest<N>>,
2041 version: EthVersion,
2042 client_version: Arc<str>,
2043 max_transactions_seen_by_peer: u32,
2044 peer_kind: PeerKind,
2045 ) -> Self {
2046 Self {
2047 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2048 request_tx,
2049 version,
2050 client_version,
2051 peer_kind,
2052 }
2053 }
2054
2055 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2057 &self.request_tx
2058 }
2059
2060 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2062 &mut self.seen_transactions
2063 }
2064
2065 pub const fn version(&self) -> EthVersion {
2067 self.version
2068 }
2069
2070 pub fn client_version(&self) -> &str {
2072 &self.client_version
2073 }
2074
2075 pub const fn peer_kind(&self) -> PeerKind {
2077 self.peer_kind
2078 }
2079}
2080
2081#[derive(Debug)]
2083enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2084 PropagateHash(B256),
2086 PropagateHashesTo(Vec<B256>, PeerId),
2088 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2090 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2092 PropagateTransactions(Vec<TxHash>),
2094 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2096 GetTransactionHashes {
2098 peers: Vec<PeerId>,
2099 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2100 },
2101 GetPeerSender {
2103 peer_id: PeerId,
2104 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2105 },
2106}
2107
2108#[derive(Debug)]
2110pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2111 IncomingTransactions {
2115 peer_id: PeerId,
2117 msg: Transactions<N::BroadcastedTransaction>,
2119 },
2120 IncomingPooledTransactionHashes {
2122 peer_id: PeerId,
2124 msg: NewPooledTransactionHashes,
2126 },
2127 GetPooledTransactions {
2129 peer_id: PeerId,
2131 request: GetPooledTransactions,
2133 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2135 },
2136 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2138}
2139
2140#[derive(Debug)]
2142pub struct PendingPoolImportsInfo {
2143 pending_pool_imports: Arc<AtomicUsize>,
2145 max_pending_pool_imports: usize,
2147}
2148
2149impl PendingPoolImportsInfo {
2150 pub fn new(max_pending_pool_imports: usize) -> Self {
2152 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2153 }
2154
2155 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2157 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2158 }
2159}
2160
2161impl Default for PendingPoolImportsInfo {
2162 fn default() -> Self {
2163 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2164 }
2165}
2166
2167#[derive(Debug, Default)]
2168struct TxManagerPollDurations {
2169 acc_network_events: Duration,
2170 acc_pending_imports: Duration,
2171 acc_tx_events: Duration,
2172 acc_imported_txns: Duration,
2173 acc_fetch_events: Duration,
2174 acc_pending_fetch: Duration,
2175 acc_cmds: Duration,
2176}
2177
2178#[cfg(test)]
2179mod tests {
2180 use super::*;
2181 use crate::{
2182 test_utils::{
2183 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2184 Testnet,
2185 },
2186 transactions::config::RelaxedEthAnnouncementFilter,
2187 NetworkConfigBuilder, NetworkManager,
2188 };
2189 use alloy_consensus::{TxEip1559, TxLegacy};
2190 use alloy_eips::eip4844::BlobTransactionValidationError;
2191 use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2192 use alloy_rlp::Decodable;
2193 use futures::FutureExt;
2194 use reth_chainspec::MIN_TRANSACTION_GAS;
2195 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2196 use reth_network_api::{NetworkInfo, PeerKind};
2197 use reth_network_p2p::{
2198 error::{RequestError, RequestResult},
2199 sync::{NetworkSyncUpdater, SyncState},
2200 };
2201 use reth_storage_api::noop::NoopProvider;
2202 use reth_tasks::Runtime;
2203 use reth_transaction_pool::{
2204 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2205 test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2206 };
2207 use secp256k1::SecretKey;
2208 use std::{
2209 collections::HashSet,
2210 future::poll_fn,
2211 net::{IpAddr, Ipv4Addr, SocketAddr},
2212 str::FromStr,
2213 };
2214 use tracing::error;
2215
2216 #[tokio::test(flavor = "multi_thread")]
2217 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2218 reth_tracing::init_test_tracing();
2219 let net = Testnet::create(3).await;
2220
2221 let mut handles = net.handles();
2222 let handle0 = handles.next().unwrap();
2223 let handle1 = handles.next().unwrap();
2224
2225 drop(handles);
2226 let handle = net.spawn();
2227
2228 let listener0 = handle0.event_listener();
2229 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2230 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2231
2232 let client = NoopProvider::default();
2233 let pool = testing_pool();
2234 let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2235 .disable_discovery()
2236 .listener_port(0)
2237 .build(client);
2238 let transactions_manager_config = config.transactions_manager_config.clone();
2239 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2240 .await
2241 .unwrap()
2242 .into_builder()
2243 .transactions(pool.clone(), transactions_manager_config)
2244 .split_with_handle();
2245
2246 tokio::task::spawn(network);
2247
2248 network_handle.update_sync_state(SyncState::Syncing);
2250 assert!(NetworkInfo::is_syncing(&network_handle));
2251 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2252
2253 let mut established = listener0.take(2);
2255 while let Some(ev) = established.next().await {
2256 match ev {
2257 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2258 transactions
2260 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2261 }
2262 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2263 ev => {
2264 error!("unexpected event {ev:?}")
2265 }
2266 }
2267 }
2268 let input = hex!(
2270 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2271 );
2272 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2273 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2274 peer_id: *handle1.peer_id(),
2275 msg: Transactions(vec![signed_tx.clone()]),
2276 });
2277 poll_fn(|cx| {
2278 let _ = transactions.poll_unpin(cx);
2279 Poll::Ready(())
2280 })
2281 .await;
2282 assert!(pool.is_empty());
2283 handle.terminate().await;
2284 }
2285
2286 #[tokio::test(flavor = "multi_thread")]
2287 async fn test_tx_broadcasts_through_two_syncs() {
2288 reth_tracing::init_test_tracing();
2289 let net = Testnet::create(3).await;
2290
2291 let mut handles = net.handles();
2292 let handle0 = handles.next().unwrap();
2293 let handle1 = handles.next().unwrap();
2294
2295 drop(handles);
2296 let handle = net.spawn();
2297
2298 let listener0 = handle0.event_listener();
2299 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2300 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2301
2302 let client = NoopProvider::default();
2303 let pool = testing_pool();
2304 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2305 .disable_discovery()
2306 .listener_port(0)
2307 .build(client);
2308 let transactions_manager_config = config.transactions_manager_config.clone();
2309 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2310 .await
2311 .unwrap()
2312 .into_builder()
2313 .transactions(pool.clone(), transactions_manager_config)
2314 .split_with_handle();
2315
2316 tokio::task::spawn(network);
2317
2318 network_handle.update_sync_state(SyncState::Syncing);
2320 assert!(NetworkInfo::is_syncing(&network_handle));
2321 network_handle.update_sync_state(SyncState::Idle);
2322 assert!(!NetworkInfo::is_syncing(&network_handle));
2323 network_handle.update_sync_state(SyncState::Syncing);
2324 assert!(NetworkInfo::is_syncing(&network_handle));
2325
2326 let mut established = listener0.take(2);
2328 while let Some(ev) = established.next().await {
2329 match ev {
2330 NetworkEvent::ActivePeerSession { .. } |
2331 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2332 transactions.on_network_event(ev);
2334 }
2335 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2336 _ => {
2337 error!("unexpected event {ev:?}")
2338 }
2339 }
2340 }
2341 let input = hex!(
2343 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2344 );
2345 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2346 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2347 peer_id: *handle1.peer_id(),
2348 msg: Transactions(vec![signed_tx.clone()]),
2349 });
2350 poll_fn(|cx| {
2351 let _ = transactions.poll_unpin(cx);
2352 Poll::Ready(())
2353 })
2354 .await;
2355 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2356 assert!(NetworkInfo::is_syncing(&network_handle));
2357 assert!(!pool.is_empty());
2358 handle.terminate().await;
2359 }
2360
2361 #[tokio::test(flavor = "multi_thread")]
2364 async fn test_handle_incoming_transactions_hashes() {
2365 reth_tracing::init_test_tracing();
2366
2367 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2368 let client = NoopProvider::default();
2369
2370 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2371 .listener_port(0)
2373 .disable_discovery()
2374 .build(client);
2375
2376 let pool = testing_pool();
2377
2378 let transactions_manager_config = config.transactions_manager_config.clone();
2379 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2380 .await
2381 .unwrap()
2382 .into_builder()
2383 .transactions(pool.clone(), transactions_manager_config)
2384 .split_with_handle();
2385
2386 let peer_id_1 = PeerId::new([1; 64]);
2387 let eth_version = EthVersion::Eth66;
2388
2389 let txs = vec![TransactionSigned::new_unhashed(
2390 Transaction::Legacy(TxLegacy {
2391 chain_id: Some(4),
2392 nonce: 15u64,
2393 gas_price: 2200000000,
2394 gas_limit: 34811,
2395 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2396 value: U256::from(1234u64),
2397 input: Default::default(),
2398 }),
2399 Signature::new(
2400 U256::from_str(
2401 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2402 )
2403 .unwrap(),
2404 U256::from_str(
2405 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2406 )
2407 .unwrap(),
2408 true,
2409 ),
2410 )];
2411
2412 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2413
2414 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2415 tx_manager.peers.insert(peer_id_1, peer_1);
2416
2417 assert!(pool.is_empty());
2418
2419 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2420 peer_id: peer_id_1,
2421 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2422 txs_hashes.clone(),
2423 )),
2424 });
2425
2426 let req = to_mock_session_rx
2428 .recv()
2429 .await
2430 .expect("peer_1 session should receive request with buffered hashes");
2431 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2432 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2433
2434 let message: Vec<PooledTransactionVariant> = txs
2435 .into_iter()
2436 .map(|tx| {
2437 PooledTransactionVariant::try_from(tx)
2438 .expect("Failed to convert MockTransaction to PooledTransaction")
2439 })
2440 .collect();
2441
2442 response
2444 .send(Ok(PooledTransactions(message)))
2445 .expect("should send peer_1 response to tx manager");
2446
2447 poll_fn(|cx| {
2449 let _ = tx_manager.poll_unpin(cx);
2450 Poll::Ready(())
2451 })
2452 .await;
2453
2454 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2457 }
2458
2459 #[tokio::test(flavor = "multi_thread")]
2460 async fn test_handle_incoming_transactions() {
2461 reth_tracing::init_test_tracing();
2462 let net = Testnet::create(3).await;
2463
2464 let mut handles = net.handles();
2465 let handle0 = handles.next().unwrap();
2466 let handle1 = handles.next().unwrap();
2467
2468 drop(handles);
2469 let handle = net.spawn();
2470
2471 let listener0 = handle0.event_listener();
2472
2473 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2474 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2475
2476 let client = NoopProvider::default();
2477 let pool = testing_pool();
2478 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2479 .disable_discovery()
2480 .listener_port(0)
2481 .build(client);
2482 let transactions_manager_config = config.transactions_manager_config.clone();
2483 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2484 .await
2485 .unwrap()
2486 .into_builder()
2487 .transactions(pool.clone(), transactions_manager_config)
2488 .split_with_handle();
2489 tokio::task::spawn(network);
2490
2491 network_handle.update_sync_state(SyncState::Idle);
2492
2493 assert!(!NetworkInfo::is_syncing(&network_handle));
2494
2495 let mut established = listener0.take(2);
2497 while let Some(ev) = established.next().await {
2498 match ev {
2499 NetworkEvent::ActivePeerSession { .. } |
2500 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2501 transactions.on_network_event(ev);
2503 }
2504 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2505 ev => {
2506 error!("unexpected event {ev:?}")
2507 }
2508 }
2509 }
2510 let input = hex!(
2512 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2513 );
2514 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2515 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2516 peer_id: *handle1.peer_id(),
2517 msg: Transactions(vec![signed_tx.clone()]),
2518 });
2519 assert!(transactions
2520 .transactions_by_peers
2521 .get(signed_tx.tx_hash())
2522 .unwrap()
2523 .contains(handle1.peer_id()));
2524
2525 poll_fn(|cx| {
2527 let _ = transactions.poll_unpin(cx);
2528 Poll::Ready(())
2529 })
2530 .await;
2531
2532 assert!(!pool.is_empty());
2533 assert!(pool.get(signed_tx.tx_hash()).is_some());
2534 handle.terminate().await;
2535 }
2536
2537 #[tokio::test(flavor = "multi_thread")]
2538 async fn test_session_closed_cleans_transaction_peer_state() {
2539 let (mut tx_manager, _network) = new_tx_manager().await;
2540 let peer_id = PeerId::new([1; 64]);
2541 let fallback_peer = PeerId::new([2; 64]);
2542 let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2543 let hash_shared = B256::from_slice(&[1; 32]);
2544
2545 tx_manager.peers.insert(peer_id, peer);
2546 buffer_hash_to_tx_fetcher(
2547 &mut tx_manager.transaction_fetcher,
2548 hash_shared,
2549 peer_id,
2550 0,
2551 None,
2552 );
2553 buffer_hash_to_tx_fetcher(
2554 &mut tx_manager.transaction_fetcher,
2555 hash_shared,
2556 fallback_peer,
2557 0,
2558 None,
2559 );
2560 tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2561
2562 tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2563 peer_id,
2564 reason: None,
2565 }));
2566
2567 assert!(!tx_manager.peers.contains_key(&peer_id));
2569 assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2570 assert_eq!(
2572 tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2573 Some(&fallback_peer)
2574 );
2575 }
2576
2577 #[tokio::test(flavor = "multi_thread")]
2578 async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2579 let (mut tx_manager, _network) = new_tx_manager().await;
2580 let peer_id = PeerId::new([1; 64]);
2581 let hash = B256::from_slice(&[1; 32]);
2582
2583 tx_manager.network.update_sync_state(SyncState::Idle);
2584 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2585
2586 let err = PoolError::new(
2587 hash,
2588 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2589 BlobTransactionValidationError::InvalidProof,
2590 )),
2591 );
2592
2593 tx_manager.on_bad_import(err);
2594
2595 assert!(!tx_manager.bad_imports.contains(&hash));
2596 }
2597
2598 #[tokio::test(flavor = "multi_thread")]
2599 async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2600 let (mut tx_manager, _network) = new_tx_manager().await;
2601 let peer_id = PeerId::new([1; 64]);
2602 let hash = B256::from_slice(&[3; 32]);
2603
2604 tx_manager.network.update_sync_state(SyncState::Idle);
2605 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2606
2607 let err = PoolError::new(
2608 hash,
2609 InvalidPoolTransactionError::Eip4844(
2610 Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2611 ),
2612 );
2613
2614 tx_manager.on_bad_import(err);
2615
2616 assert!(!tx_manager.bad_imports.contains(&hash));
2617 }
2618
2619 #[tokio::test(flavor = "multi_thread")]
2620 async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2621 let (mut tx_manager, _network) = new_tx_manager().await;
2622 let peer_id = PeerId::new([1; 64]);
2623 let hash = B256::from_slice(&[2; 32]);
2624
2625 tx_manager.network.update_sync_state(SyncState::Idle);
2626 tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2627
2628 let err = PoolError::new(
2629 hash,
2630 InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2631 );
2632
2633 tx_manager.on_bad_import(err);
2634
2635 assert!(tx_manager.bad_imports.contains(&hash));
2636 }
2637
2638 #[tokio::test(flavor = "multi_thread")]
2639 async fn test_on_get_pooled_transactions_network() {
2640 reth_tracing::init_test_tracing();
2641 let net = Testnet::create(2).await;
2642
2643 let mut handles = net.handles();
2644 let handle0 = handles.next().unwrap();
2645 let handle1 = handles.next().unwrap();
2646
2647 drop(handles);
2648 let handle = net.spawn();
2649
2650 let listener0 = handle0.event_listener();
2651
2652 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2653 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2654
2655 let client = NoopProvider::default();
2656 let pool = testing_pool();
2657 let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2658 .disable_discovery()
2659 .listener_port(0)
2660 .build(client);
2661 let transactions_manager_config = config.transactions_manager_config.clone();
2662 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2663 .await
2664 .unwrap()
2665 .into_builder()
2666 .transactions(pool.clone(), transactions_manager_config)
2667 .split_with_handle();
2668 tokio::task::spawn(network);
2669
2670 network_handle.update_sync_state(SyncState::Idle);
2671
2672 assert!(!NetworkInfo::is_syncing(&network_handle));
2673
2674 let mut established = listener0.take(2);
2676 while let Some(ev) = established.next().await {
2677 match ev {
2678 NetworkEvent::ActivePeerSession { .. } |
2679 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2680 transactions.on_network_event(ev);
2681 }
2682 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2683 ev => {
2684 error!("unexpected event {ev:?}")
2685 }
2686 }
2687 }
2688 handle.terminate().await;
2689
2690 let tx = MockTransaction::eip1559();
2691 let _ = transactions
2692 .pool
2693 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2694 .await;
2695
2696 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2697
2698 let (send, receive) =
2699 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2700
2701 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2702 peer_id: *handle1.peer_id(),
2703 request,
2704 response: send,
2705 });
2706
2707 match receive.await.unwrap() {
2708 Ok(PooledTransactions(transactions)) => {
2709 assert_eq!(transactions.len(), 1);
2710 }
2711 Err(e) => {
2712 panic!("error: {e:?}");
2713 }
2714 }
2715 }
2716
2717 #[tokio::test]
2721 async fn test_partially_tx_response() {
2722 reth_tracing::init_test_tracing();
2723
2724 let mut tx_manager = new_tx_manager().await.0;
2725 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2726
2727 let peer_id_1 = PeerId::new([1; 64]);
2728 let eth_version = EthVersion::Eth66;
2729
2730 let txs = vec![
2731 TransactionSigned::new_unhashed(
2732 Transaction::Legacy(TxLegacy {
2733 chain_id: Some(4),
2734 nonce: 15u64,
2735 gas_price: 2200000000,
2736 gas_limit: 34811,
2737 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2738 value: U256::from(1234u64),
2739 input: Default::default(),
2740 }),
2741 Signature::new(
2742 U256::from_str(
2743 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2744 )
2745 .unwrap(),
2746 U256::from_str(
2747 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2748 )
2749 .unwrap(),
2750 true,
2751 ),
2752 ),
2753 TransactionSigned::new_unhashed(
2754 Transaction::Eip1559(TxEip1559 {
2755 chain_id: 4,
2756 nonce: 26u64,
2757 max_priority_fee_per_gas: 1500000000,
2758 max_fee_per_gas: 1500000013,
2759 gas_limit: MIN_TRANSACTION_GAS,
2760 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2761 value: U256::from(3000000000000000000u64),
2762 input: Default::default(),
2763 access_list: Default::default(),
2764 }),
2765 Signature::new(
2766 U256::from_str(
2767 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2768 )
2769 .unwrap(),
2770 U256::from_str(
2771 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2772 )
2773 .unwrap(),
2774 true,
2775 ),
2776 ),
2777 ];
2778
2779 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2780
2781 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2782 peer_1.seen_transactions.insert(txs_hashes[0]);
2785 peer_1.seen_transactions.insert(txs_hashes[1]);
2786 tx_manager.peers.insert(peer_id_1, peer_1);
2787
2788 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2789 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2790
2791 assert!(tx_fetcher.is_idle(&peer_id_1));
2793 assert_eq!(tx_fetcher.active_peers.len(), 0);
2794
2795 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2797
2798 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2799 assert!(!tx_fetcher.is_idle(&peer_id_1));
2801 assert_eq!(tx_fetcher.active_peers.len(), 1);
2802
2803 let req = to_mock_session_rx
2805 .recv()
2806 .await
2807 .expect("peer_1 session should receive request with buffered hashes");
2808 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2809
2810 let message: Vec<PooledTransactionVariant> = txs
2811 .into_iter()
2812 .take(1)
2813 .map(|tx| {
2814 PooledTransactionVariant::try_from(tx)
2815 .expect("Failed to convert MockTransaction to PooledTransaction")
2816 })
2817 .collect();
2818 response
2820 .send(Ok(PooledTransactions(message)))
2821 .expect("should send peer_1 response to tx manager");
2822 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2823 unreachable!()
2824 };
2825
2826 assert!(tx_fetcher.is_idle(&peer_id));
2828 assert_eq!(tx_fetcher.active_peers.len(), 0);
2829 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2831 }
2832
2833 #[tokio::test]
2834 async fn test_max_retries_tx_request() {
2835 reth_tracing::init_test_tracing();
2836
2837 let mut tx_manager = new_tx_manager().await.0;
2838 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2839
2840 let peer_id_1 = PeerId::new([1; 64]);
2841 let peer_id_2 = PeerId::new([2; 64]);
2842 let eth_version = EthVersion::Eth66;
2843 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2844
2845 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2846 peer_1.seen_transactions.insert(seen_hashes[0]);
2849 peer_1.seen_transactions.insert(seen_hashes[1]);
2850 tx_manager.peers.insert(peer_id_1, peer_1);
2851
2852 let retries = 1;
2855 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2856 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2857
2858 assert!(tx_fetcher.is_idle(&peer_id_1));
2860 assert_eq!(tx_fetcher.active_peers.len(), 0);
2861
2862 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2864
2865 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2866
2867 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2868 assert!(!tx_fetcher.is_idle(&peer_id_1));
2870 assert_eq!(tx_fetcher.active_peers.len(), 1);
2871
2872 let req = to_mock_session_rx
2874 .recv()
2875 .await
2876 .expect("peer_1 session should receive request with buffered hashes");
2877 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2878 let GetPooledTransactions(hashes) = request;
2879
2880 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2881
2882 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2883
2884 response
2886 .send(Err(RequestError::BadResponse))
2887 .expect("should send peer_1 response to tx manager");
2888 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2889 unreachable!()
2890 };
2891
2892 assert!(tx_fetcher.is_idle(&peer_id));
2894 assert_eq!(tx_fetcher.active_peers.len(), 0);
2895 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2897
2898 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2899 tx_manager.peers.insert(peer_id_2, peer_2);
2900
2901 let msg =
2903 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2904 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2905
2906 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2907
2908 assert_eq!(tx_fetcher.active_peers.len(), 1);
2910
2911 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2913 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2915
2916 let req = to_mock_session_rx
2918 .recv()
2919 .await
2920 .expect("peer_2 session should receive request with buffered hashes");
2921 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2922
2923 response
2925 .send(Err(RequestError::BadResponse))
2926 .expect("should send peer_2 response to tx manager");
2927 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2928
2929 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2932 assert_eq!(tx_fetcher.active_peers.len(), 0);
2933 }
2934
2935 #[test]
2936 fn test_transaction_builder_empty() {
2937 let mut builder =
2938 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2939 assert!(builder.is_empty());
2940
2941 let mut factory = MockTransactionFactory::default();
2942 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2943 builder.push(&tx);
2944 assert!(!builder.is_empty());
2945
2946 let txs = builder.build();
2947 assert!(txs.full.is_none());
2948 let txs = txs.pooled.unwrap();
2949 assert_eq!(txs.len(), 1);
2950 }
2951
2952 #[test]
2953 fn test_transaction_builder_large() {
2954 let mut builder =
2955 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2956 assert!(builder.is_empty());
2957
2958 let mut factory = MockTransactionFactory::default();
2959 let mut tx = factory.create_eip1559();
2960 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2962 let tx = Arc::new(tx);
2963 let tx = PropagateTransaction::pool_tx(tx);
2964 builder.push(&tx);
2965 assert!(!builder.is_empty());
2966
2967 let txs = builder.clone().build();
2968 assert!(txs.pooled.is_none());
2969 let txs = txs.full.unwrap();
2970 assert_eq!(txs.len(), 1);
2971
2972 builder.push(&tx);
2973
2974 let txs = builder.clone().build();
2975 let pooled = txs.pooled.unwrap();
2976 assert_eq!(pooled.len(), 1);
2977 let txs = txs.full.unwrap();
2978 assert_eq!(txs.len(), 1);
2979 }
2980
2981 #[test]
2982 fn test_transaction_builder_eip4844() {
2983 let mut builder =
2984 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2985 assert!(builder.is_empty());
2986
2987 let mut factory = MockTransactionFactory::default();
2988 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2989 builder.push(&tx);
2990 assert!(!builder.is_empty());
2991
2992 let txs = builder.clone().build();
2993 assert!(txs.full.is_none());
2994 let txs = txs.pooled.unwrap();
2995 assert_eq!(txs.len(), 1);
2996
2997 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2998 builder.push(&tx);
2999
3000 let txs = builder.clone().build();
3001 let pooled = txs.pooled.unwrap();
3002 assert_eq!(pooled.len(), 1);
3003 let txs = txs.full.unwrap();
3004 assert_eq!(txs.len(), 1);
3005 }
3006
3007 #[tokio::test]
3008 async fn test_propagate_full() {
3009 reth_tracing::init_test_tracing();
3010
3011 let (mut tx_manager, network) = new_tx_manager().await;
3012 let peer_id = PeerId::random();
3013
3014 network.handle().update_sync_state(SyncState::Idle);
3016
3017 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3019
3020 let session_info = SessionInfo {
3021 peer_id,
3022 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3023 client_version: Arc::from(""),
3024 capabilities: Arc::new(vec![].into()),
3025 status: Arc::new(Default::default()),
3026 version: EthVersion::Eth68,
3027 peer_kind: PeerKind::Basic,
3028 };
3029 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3030 tx_manager
3031 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3032 let mut propagate = vec![];
3033 let mut factory = MockTransactionFactory::default();
3034 let eip1559_tx = Arc::new(factory.create_eip1559());
3035 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3036 let eip4844_tx = Arc::new(factory.create_eip4844());
3037 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3038
3039 let propagated =
3040 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3041 assert_eq!(propagated.len(), 2);
3042 let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3043 assert_eq!(prop_txs.len(), 1);
3044 assert!(prop_txs[0].is_full());
3045
3046 let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3047 assert_eq!(prop_txs.len(), 1);
3048 assert!(prop_txs[0].is_hash());
3049
3050 let peer = tx_manager.peers.get(&peer_id).unwrap();
3051 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3052 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3053 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3054
3055 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3057 assert!(propagated.is_empty());
3058 }
3059
3060 #[tokio::test]
3061 async fn test_propagate_pending_txs_while_initially_syncing() {
3062 reth_tracing::init_test_tracing();
3063
3064 let (mut tx_manager, network) = new_tx_manager().await;
3065 let peer_id = PeerId::random();
3066
3067 network.handle().update_sync_state(SyncState::Syncing);
3069 assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3070
3071 let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3073 tx_manager.peers.insert(peer_id, peer);
3074
3075 let tx = MockTransaction::eip1559();
3076 tx_manager
3077 .pool
3078 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3079 .await
3080 .expect("transaction should be accepted into the pool");
3081
3082 tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3083
3084 let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3085 assert!(peer.seen_transactions.contains(tx.get_hash()));
3086 }
3087
3088 #[tokio::test]
3089 async fn test_relaxed_filter_ignores_unknown_tx_types() {
3090 reth_tracing::init_test_tracing();
3091
3092 let transactions_manager_config = TransactionsManagerConfig::default();
3093
3094 let propagation_policy = TransactionPropagationKind::default();
3095 let announcement_policy = RelaxedEthAnnouncementFilter::default();
3096
3097 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3098
3099 let pool = testing_pool();
3100 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3101 let client = NoopProvider::default();
3102
3103 let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3104 .listener_port(0)
3105 .disable_discovery()
3106 .build(client.clone());
3107
3108 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3109 let (to_tx_manager_tx, from_network_rx) =
3110 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
3111 network_manager.set_transactions(to_tx_manager_tx);
3112 let network_handle = network_manager.handle().clone();
3113 let network_service_handle = tokio::spawn(network_manager);
3114
3115 let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3116 network_handle.clone(),
3117 pool.clone(),
3118 from_network_rx,
3119 transactions_manager_config,
3120 policy_bundle,
3121 );
3122
3123 let peer_id = PeerId::random();
3124 let eth_version = EthVersion::Eth68;
3125 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3126 tx_manager.peers.insert(peer_id, mock_peer_metadata);
3127
3128 let mut tx_factory = MockTransactionFactory::default();
3129
3130 let valid_known_tx = tx_factory.create_eip1559();
3131 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3132
3133 let known_tx_hash = *known_tx_signed.hash();
3134 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3135 let known_tx_size = known_tx_signed.encoded_length();
3136
3137 let unknown_tx_hash = B256::random();
3138 let unknown_tx_type_byte = 0xff_u8;
3139 let unknown_tx_size = 150;
3140
3141 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3142 types: vec![known_tx_type_byte, unknown_tx_type_byte],
3143 sizes: vec![known_tx_size, unknown_tx_size],
3144 hashes: vec![known_tx_hash, unknown_tx_hash],
3145 });
3146
3147 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3148
3149 poll_fn(|cx| {
3150 let _ = tx_manager.poll_unpin(cx);
3151 Poll::Ready(())
3152 })
3153 .await;
3154
3155 let mut requested_hashes_in_getpooled = HashSet::new();
3156 let mut unexpected_request_received = false;
3157
3158 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3159 .await
3160 {
3161 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3162 let GetPooledTransactions(hashes) = request;
3163 for hash in hashes {
3164 requested_hashes_in_getpooled.insert(hash);
3165 }
3166 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3167 }
3168 Ok(Some(other_request)) => {
3169 tracing::error!(?other_request, "Received unexpected PeerRequest type");
3170 unexpected_request_received = true;
3171 }
3172 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3173 Err(_timeout_err) => {
3174 tracing::info!("Timeout: No GetPooledTransactions request received.")
3175 }
3176 }
3177
3178 assert!(
3179 requested_hashes_in_getpooled.contains(&known_tx_hash),
3180 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3181 );
3182 assert!(
3183 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3184 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3185 );
3186 assert!(
3187 !unexpected_request_received,
3188 "An unexpected P2P request was received by the mock peer."
3189 );
3190
3191 network_service_handle.abort();
3192 }
3193}