1use alloy_consensus::transaction::TxHashRef;
4
5pub mod config;
7pub mod constants;
9pub mod fetcher;
11pub mod policy;
13
14pub use self::constants::{
15 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
16 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
17};
18use config::{AnnouncementAcceptance, StrictEthAnnouncementFilter, TransactionPropagationKind};
19pub use config::{
20 AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionPropagationMode,
21 TransactionPropagationPolicy, TransactionsManagerConfig,
22};
23use policy::{NetworkPolicies, TransactionPolicies};
24
25pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
26
27use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28use crate::{
29 budget::{
30 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
31 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
32 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33 },
34 cache::LruCache,
35 duration_metered_exec, metered_poll_nested_stream_with_budget,
36 metrics::{
37 AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38 },
39 NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{TxHash, B256};
42use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
43use futures::{stream::FuturesUnordered, Future, StreamExt};
44use reth_eth_wire::{
45 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
46 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
47 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
48 RequestTxHashes, Transactions, ValidAnnouncementData,
49};
50use reth_ethereum_primitives::{TransactionSigned, TxType};
51use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
52use reth_network_api::{
53 events::{PeerEvent, SessionInfo},
54 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
55};
56use reth_network_p2p::{
57 error::{RequestError, RequestResult},
58 sync::SyncStateProvider,
59};
60use reth_network_peers::PeerId;
61use reth_network_types::ReputationChangeKind;
62use reth_primitives_traits::SignedTransaction;
63use reth_tokio_util::EventStream;
64use reth_transaction_pool::{
65 error::{PoolError, PoolResult},
66 AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
67 PropagatedTransactions, TransactionPool, ValidPoolTransaction,
68};
69use std::{
70 collections::{hash_map::Entry, HashMap, HashSet},
71 pin::Pin,
72 sync::{
73 atomic::{AtomicUsize, Ordering},
74 Arc,
75 },
76 task::{Context, Poll},
77 time::{Duration, Instant},
78};
79use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
80use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
81use tracing::{debug, trace};
82
83pub type PoolImportFuture =
87 Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
88
89#[derive(Debug, Clone)]
97pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
98 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
100}
101
102impl<N: NetworkPrimitives> TransactionsHandle<N> {
105 fn send(&self, cmd: TransactionsCommand<N>) {
106 let _ = self.manager_tx.send(cmd);
107 }
108
109 async fn peer_handle(
111 &self,
112 peer_id: PeerId,
113 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114 let (tx, rx) = oneshot::channel();
115 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116 rx.await
117 }
118
119 pub fn propagate(&self, hash: TxHash) {
121 self.send(TransactionsCommand::PropagateHash(hash))
122 }
123
124 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128 self.propagate_hashes_to(Some(hash), peer)
129 }
130
131 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135 let hashes = hash.into_iter().collect::<Vec<_>>();
136 if hashes.is_empty() {
137 return
138 }
139 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140 }
141
142 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144 let (tx, rx) = oneshot::channel();
145 self.send(TransactionsCommand::GetActivePeers(tx));
146 rx.await
147 }
148
149 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153 if transactions.is_empty() {
154 return
155 }
156 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157 }
158
159 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164 if transactions.is_empty() {
165 return
166 }
167 self.send(TransactionsCommand::PropagateTransactions(transactions))
168 }
169
170 pub fn broadcast_transactions(
175 &self,
176 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177 ) {
178 let transactions =
179 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180 if transactions.is_empty() {
181 return
182 }
183 self.send(TransactionsCommand::BroadcastTransactions(transactions))
184 }
185
186 pub async fn get_transaction_hashes(
188 &self,
189 peers: Vec<PeerId>,
190 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
191 if peers.is_empty() {
192 return Ok(Default::default())
193 }
194 let (tx, rx) = oneshot::channel();
195 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196 rx.await
197 }
198
199 pub async fn get_peer_transaction_hashes(
201 &self,
202 peer: PeerId,
203 ) -> Result<HashSet<TxHash>, RecvError> {
204 let res = self.get_transaction_hashes(vec![peer]).await?;
205 Ok(res.into_values().next().unwrap_or_default())
206 }
207
208 pub async fn get_pooled_transactions_from(
214 &self,
215 peer_id: PeerId,
216 hashes: Vec<B256>,
217 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
218 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
219
220 let (tx, rx) = oneshot::channel();
221 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
222 peer.try_send(request).ok();
223
224 rx.await?.map(|res| Some(res.0))
225 }
226}
227
228#[derive(Debug)]
283#[must_use = "Manager does nothing unless polled."]
284pub struct TransactionsManager<
285 Pool,
286 N: NetworkPrimitives = EthNetworkPrimitives,
287 PBundle: TransactionPolicies = NetworkPolicies<
288 TransactionPropagationKind,
289 StrictEthAnnouncementFilter,
290 >,
291> {
292 pool: Pool,
294 network: NetworkHandle<N>,
296 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
300 transaction_fetcher: TransactionFetcher<N>,
302 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
307 pool_imports: FuturesUnordered<PoolImportFuture>,
319 pending_pool_imports_info: PendingPoolImportsInfo,
321 bad_imports: LruCache<TxHash>,
323 peers: HashMap<PeerId, PeerMetadata<N>>,
325 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
329 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
334 pending_transactions: ReceiverStream<TxHash>,
343 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
345 config: TransactionsManagerConfig,
347 policies: PBundle,
349 metrics: TransactionsManagerMetrics,
351 announced_tx_types_metrics: AnnouncedTxTypesMetrics,
353}
354
355impl<Pool: TransactionPool, N: NetworkPrimitives>
356 TransactionsManager<
357 Pool,
358 N,
359 NetworkPolicies<TransactionPropagationKind, StrictEthAnnouncementFilter>,
360 >
361{
362 pub fn new(
366 network: NetworkHandle<N>,
367 pool: Pool,
368 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
369 transactions_manager_config: TransactionsManagerConfig,
370 ) -> Self {
371 Self::with_policy(
372 network,
373 pool,
374 from_network,
375 transactions_manager_config,
376 NetworkPolicies::default(),
377 )
378 }
379}
380
381impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
382 TransactionsManager<Pool, N, PBundle>
383{
384 pub fn with_policy(
388 network: NetworkHandle<N>,
389 pool: Pool,
390 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
391 transactions_manager_config: TransactionsManagerConfig,
392 policies: PBundle,
393 ) -> Self {
394 let network_events = network.event_listener();
395
396 let (command_tx, command_rx) = mpsc::unbounded_channel();
397
398 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
399 &transactions_manager_config.transaction_fetcher_config,
400 );
401
402 let pending = pool.pending_transactions_listener();
405 let pending_pool_imports_info = PendingPoolImportsInfo::default();
406 let metrics = TransactionsManagerMetrics::default();
407 metrics
408 .capacity_pending_pool_imports
409 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
410
411 Self {
412 pool,
413 network,
414 network_events,
415 transaction_fetcher,
416 transactions_by_peers: Default::default(),
417 pool_imports: Default::default(),
418 pending_pool_imports_info: PendingPoolImportsInfo::new(
419 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
420 ),
421 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
422 peers: Default::default(),
423 command_tx,
424 command_rx: UnboundedReceiverStream::new(command_rx),
425 pending_transactions: ReceiverStream::new(pending),
426 transaction_events: UnboundedMeteredReceiver::new(
427 from_network,
428 NETWORK_POOL_TRANSACTIONS_SCOPE,
429 ),
430 config: transactions_manager_config,
431 policies,
432 metrics,
433 announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
434 }
435 }
436
437 pub fn handle(&self) -> TransactionsHandle<N> {
439 TransactionsHandle { manager_tx: self.command_tx.clone() }
440 }
441
442 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
445 self.pending_pool_imports_info
446 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
447 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
448 }
449
450 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
451 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
452 self.metrics.reported_bad_transactions.increment(1);
453 }
454
455 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
456 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
457 self.network.reputation_change(peer_id, kind);
458 }
459
460 fn report_already_seen(&self, peer_id: PeerId) {
461 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
462 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
463 }
464
465 fn on_good_import(&mut self, hash: TxHash) {
467 self.transactions_by_peers.remove(&hash);
468 }
469
470 fn on_bad_import(&mut self, err: PoolError) {
494 let peers = self.transactions_by_peers.remove(&err.hash);
495
496 if !err.is_bad_transaction() || self.network.is_syncing() {
498 return
499 }
500 if let Some(peers) = peers {
503 for peer_id in peers {
504 self.report_peer_bad_transactions(peer_id);
505 }
506 }
507 self.metrics.bad_imports.increment(1);
508 self.bad_imports.insert(err.hash);
509 }
510
511 fn on_fetch_hashes_pending_fetch(&mut self) {
513 let info = &self.pending_pool_imports_info;
515 let max_pending_pool_imports = info.max_pending_pool_imports;
516 let has_capacity_wrt_pending_pool_imports =
517 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
518
519 self.transaction_fetcher
520 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
521 }
522
523 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
524 let kind = match req_err {
525 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
526 RequestError::Timeout => ReputationChangeKind::Timeout,
527 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
528 return
530 }
531 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
532 };
533 self.report_peer(peer_id, kind);
534 }
535
536 #[inline]
537 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
538 let metrics = &self.metrics;
539
540 let TxManagerPollDurations {
541 acc_network_events,
542 acc_pending_imports,
543 acc_tx_events,
544 acc_imported_txns,
545 acc_fetch_events,
546 acc_pending_fetch,
547 acc_cmds,
548 } = poll_durations;
549
550 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
552 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
554 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
555 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
556 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
557 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
558 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
559 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
560 }
561}
562
563impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
564 TransactionsManager<Pool, N, PBundle>
565{
566 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
568 for res in batch_results {
569 match res {
570 Ok(AddedTransactionOutcome { hash, .. }) => {
571 self.on_good_import(hash);
572 }
573 Err(err) => {
574 self.on_bad_import(err);
575 }
576 }
577 }
578 }
579
580 fn on_new_pooled_transaction_hashes(
582 &mut self,
583 peer_id: PeerId,
584 msg: NewPooledTransactionHashes,
585 ) {
586 if self.network.is_initially_syncing() {
588 return
589 }
590 if self.network.tx_gossip_disabled() {
591 return
592 }
593
594 let Some(peer) = self.peers.get_mut(&peer_id) else {
596 trace!(
597 peer_id = format!("{peer_id:#}"),
598 ?msg,
599 "discarding announcement from inactive peer"
600 );
601
602 return
603 };
604 let client = peer.client_version.clone();
605
606 let mut count_txns_already_seen_by_peer = 0;
608 for tx in msg.iter_hashes().copied() {
609 if !peer.seen_transactions.insert(tx) {
610 count_txns_already_seen_by_peer += 1;
611 }
612 }
613 if count_txns_already_seen_by_peer > 0 {
614 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
619 self.metrics
620 .occurrences_hash_already_seen_by_peer
621 .increment(count_txns_already_seen_by_peer);
622
623 trace!(target: "net::tx",
624 %count_txns_already_seen_by_peer,
625 peer_id=format!("{peer_id:#}"),
626 ?client,
627 "Peer sent hashes that have already been marked as seen by peer"
628 );
629
630 self.report_already_seen(peer_id);
631 }
632
633 if msg.is_empty() {
635 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
636 return;
637 }
638
639 let original_len = msg.len();
640 let mut partially_valid_msg = msg.dedup();
641
642 if partially_valid_msg.len() != original_len {
643 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
644 }
645
646 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
648
649 let hashes_count_pre_pool_filter = partially_valid_msg.len();
657 self.pool.retain_unknown(&mut partially_valid_msg);
658 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
659 let already_known_hashes_count =
660 hashes_count_pre_pool_filter - partially_valid_msg.len();
661 self.metrics
662 .occurrences_hashes_already_in_pool
663 .increment(already_known_hashes_count as u64);
664 }
665
666 if partially_valid_msg.is_empty() {
667 return
669 }
670
671 let mut should_report_peer = false;
676 let mut tx_types_counter = TxTypesCounter::default();
677
678 let is_eth68_message = partially_valid_msg
679 .msg_version()
680 .expect("partially valid announcement should have a version")
681 .is_eth68();
682
683 partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
684 let (ty_byte, size_val) = match *metadata_ref_mut {
685 Some((ty, size)) => {
686 if !is_eth68_message {
687 should_report_peer = true;
688 }
689 (ty, size)
690 }
691 None => {
692 if is_eth68_message {
693 should_report_peer = true;
694 return false;
695 }
696 (0u8, 0)
697 }
698 };
699
700 if is_eth68_message &&
701 let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
702 let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
703 {
704 tx_types_counter.increase_by_tx_type(parsed_tx_type);
705 }
706
707 let decision = self
708 .policies
709 .announcement_filter()
710 .decide_on_announcement(ty_byte, tx_hash, size_val);
711
712 match decision {
713 AnnouncementAcceptance::Accept => true,
714 AnnouncementAcceptance::Ignore => false,
715 AnnouncementAcceptance::Reject { penalize_peer } => {
716 if penalize_peer {
717 should_report_peer = true;
718 }
719 false
720 }
721 }
722 });
723
724 if is_eth68_message {
725 self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
726 }
727
728 if should_report_peer {
729 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
730 }
731
732 let mut valid_announcement_data =
733 ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
734
735 if valid_announcement_data.is_empty() {
736 return
738 }
739
740 let bad_imports = &self.bad_imports;
747 self.transaction_fetcher.filter_unseen_and_pending_hashes(
748 &mut valid_announcement_data,
749 |hash| bad_imports.contains(hash),
750 &peer_id,
751 &client,
752 );
753
754 if valid_announcement_data.is_empty() {
755 return
757 }
758
759 trace!(target: "net::tx::propagation",
760 peer_id=format!("{peer_id:#}"),
761 hashes_len=valid_announcement_data.len(),
762 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
763 msg_version=%valid_announcement_data.msg_version(),
764 client_version=%client,
765 "received previously unseen and pending hashes in announcement from peer"
766 );
767
768 if !self.transaction_fetcher.is_idle(&peer_id) {
771 let msg_version = valid_announcement_data.msg_version();
773 let (hashes, _version) = valid_announcement_data.into_request_hashes();
774
775 trace!(target: "net::tx",
776 peer_id=format!("{peer_id:#}"),
777 hashes=?*hashes,
778 %msg_version,
779 %client,
780 "buffering hashes announced by busy peer"
781 );
782
783 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
784
785 return
786 }
787
788 let mut hashes_to_request =
789 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
790 let surplus_hashes =
791 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
792
793 if !surplus_hashes.is_empty() {
794 trace!(target: "net::tx",
795 peer_id=format!("{peer_id:#}"),
796 surplus_hashes=?*surplus_hashes,
797 %client,
798 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
799 );
800
801 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
802 }
803
804 trace!(target: "net::tx",
805 peer_id=format!("{peer_id:#}"),
806 hashes=?*hashes_to_request,
807 %client,
808 "sending hashes in `GetPooledTransactions` request to peer's session"
809 );
810
811 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
815 if let Some(failed_to_request_hashes) =
816 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
817 {
818 let conn_eth_version = peer.version;
819
820 trace!(target: "net::tx",
821 peer_id=format!("{peer_id:#}"),
822 failed_to_request_hashes=?*failed_to_request_hashes,
823 %conn_eth_version,
824 %client,
825 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
826 );
827 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
828 }
829 }
830}
831
832impl<Pool, N, PBundle> TransactionsManager<Pool, N, PBundle>
833where
834 Pool: TransactionPool + Unpin + 'static,
835
836 N: NetworkPrimitives<
837 BroadcastedTransaction: SignedTransaction,
838 PooledTransaction: SignedTransaction,
839 > + Unpin,
840
841 PBundle: TransactionPolicies,
842 Pool::Transaction:
843 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
844{
845 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
857 if self.network.is_initially_syncing() {
859 return
860 }
861 if self.network.tx_gossip_disabled() {
862 return
863 }
864
865 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
866
867 self.propagate_all(hashes);
868 }
869
870 fn propagate_full_transactions_to_peer(
874 &mut self,
875 txs: Vec<TxHash>,
876 peer_id: PeerId,
877 propagation_mode: PropagationMode,
878 ) -> Option<PropagatedTransactions> {
879 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
880
881 let peer = self.peers.get_mut(&peer_id)?;
882 let mut propagated = PropagatedTransactions::default();
883
884 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
886
887 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
888
889 if propagation_mode.is_forced() {
890 full_transactions.extend(to_propagate);
892 } else {
893 for tx in to_propagate {
896 if !peer.seen_transactions.contains(tx.tx_hash()) {
897 full_transactions.push(&tx);
899 }
900 }
901 }
902
903 if full_transactions.is_empty() {
904 return None
906 }
907
908 let PropagateTransactions { pooled, full } = full_transactions.build();
909
910 if let Some(new_pooled_hashes) = pooled {
912 for hash in new_pooled_hashes.iter_hashes().copied() {
913 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
914 peer.seen_transactions.insert(hash);
916 }
917
918 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
920 }
921
922 if let Some(new_full_transactions) = full {
924 for tx in &new_full_transactions {
925 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
926 peer.seen_transactions.insert(*tx.tx_hash());
928 }
929
930 self.network.send_transactions(peer_id, new_full_transactions);
932 }
933
934 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
936
937 Some(propagated)
938 }
939
940 fn propagate_hashes_to(
944 &mut self,
945 hashes: Vec<TxHash>,
946 peer_id: PeerId,
947 propagation_mode: PropagationMode,
948 ) {
949 trace!(target: "net::tx", "Start propagating transactions as hashes");
950
951 let propagated = {
954 let Some(peer) = self.peers.get_mut(&peer_id) else {
955 return
957 };
958
959 let to_propagate = self
960 .pool
961 .get_all(hashes)
962 .into_iter()
963 .map(PropagateTransaction::pool_tx)
964 .collect::<Vec<_>>();
965
966 let mut propagated = PropagatedTransactions::default();
967
968 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
970
971 if propagation_mode.is_forced() {
972 hashes.extend(to_propagate)
973 } else {
974 for tx in to_propagate {
975 if !peer.seen_transactions.contains(tx.tx_hash()) {
976 hashes.push(&tx);
978 }
979 }
980 }
981
982 let new_pooled_hashes = hashes.build();
983
984 if new_pooled_hashes.is_empty() {
985 return
987 }
988
989 for hash in new_pooled_hashes.iter_hashes().copied() {
990 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
991 }
992
993 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
994
995 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
997
998 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1000
1001 propagated
1002 };
1003
1004 self.pool.on_propagated(propagated);
1006 }
1007
1008 fn propagate_transactions(
1015 &mut self,
1016 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1017 propagation_mode: PropagationMode,
1018 ) -> PropagatedTransactions {
1019 let mut propagated = PropagatedTransactions::default();
1020 if self.network.tx_gossip_disabled() {
1021 return propagated
1022 }
1023
1024 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1026
1027 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1029 if !self.policies.propagation_policy().can_propagate(peer) {
1030 continue
1032 }
1033 let mut builder = if peer_idx > max_num_full {
1035 PropagateTransactionsBuilder::pooled(peer.version)
1036 } else {
1037 PropagateTransactionsBuilder::full(peer.version)
1038 };
1039
1040 if propagation_mode.is_forced() {
1041 builder.extend(to_propagate.iter());
1042 } else {
1043 for tx in &to_propagate {
1047 if !peer.seen_transactions.contains(tx.tx_hash()) {
1050 builder.push(tx);
1051 }
1052 }
1053 }
1054
1055 if builder.is_empty() {
1056 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1057 continue
1058 }
1059
1060 let PropagateTransactions { pooled, full } = builder.build();
1061
1062 if let Some(mut new_pooled_hashes) = pooled {
1064 new_pooled_hashes
1067 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1068
1069 for hash in new_pooled_hashes.iter_hashes().copied() {
1070 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1071 peer.seen_transactions.insert(hash);
1073 }
1074
1075 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1076
1077 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1079 }
1080
1081 if let Some(new_full_transactions) = full {
1083 for tx in &new_full_transactions {
1084 propagated
1085 .0
1086 .entry(*tx.tx_hash())
1087 .or_default()
1088 .push(PropagateKind::Full(*peer_id));
1089 peer.seen_transactions.insert(*tx.tx_hash());
1091 }
1092
1093 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1094
1095 self.network.send_transactions(*peer_id, new_full_transactions);
1097 }
1098 }
1099
1100 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1102
1103 propagated
1104 }
1105
1106 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1111 if self.peers.is_empty() {
1112 return
1114 }
1115 let propagated = self.propagate_transactions(
1116 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1117 PropagationMode::Basic,
1118 );
1119
1120 self.pool.on_propagated(propagated);
1122 }
1123
1124 fn on_get_pooled_transactions(
1126 &mut self,
1127 peer_id: PeerId,
1128 request: GetPooledTransactions,
1129 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1130 ) {
1131 if let Some(peer) = self.peers.get_mut(&peer_id) {
1132 if self.network.tx_gossip_disabled() {
1133 let _ = response.send(Ok(PooledTransactions::default()));
1134 return
1135 }
1136 let transactions = self.pool.get_pooled_transaction_elements(
1137 request.0,
1138 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1139 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1140 ),
1141 );
1142 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1143
1144 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1147
1148 let resp = PooledTransactions(transactions);
1149 let _ = response.send(Ok(resp));
1150 }
1151 }
1152
1153 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1155 match cmd {
1156 TransactionsCommand::PropagateHash(hash) => {
1157 self.on_new_pending_transactions(vec![hash])
1158 }
1159 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1160 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1161 }
1162 TransactionsCommand::GetActivePeers(tx) => {
1163 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1164 tx.send(peers).ok();
1165 }
1166 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1167 if let Some(propagated) =
1168 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1169 {
1170 self.pool.on_propagated(propagated);
1171 }
1172 }
1173 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1174 TransactionsCommand::BroadcastTransactions(txs) => {
1175 let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1176 self.pool.on_propagated(propagated);
1177 }
1178 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1179 let mut res = HashMap::with_capacity(peers.len());
1180 for peer_id in peers {
1181 let hashes = self
1182 .peers
1183 .get(&peer_id)
1184 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1185 .unwrap_or_default();
1186 res.insert(peer_id, hashes);
1187 }
1188 tx.send(res).ok();
1189 }
1190 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1191 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1192 peer_request_sender.send(sender).ok();
1193 }
1194 }
1195 }
1196
1197 fn handle_peer_session(
1201 &mut self,
1202 info: SessionInfo,
1203 messages: PeerRequestSender<PeerRequest<N>>,
1204 ) {
1205 let SessionInfo { peer_id, client_version, version, .. } = info;
1206
1207 let peer = PeerMetadata::<N>::new(
1209 messages,
1210 version,
1211 client_version,
1212 self.config.max_transactions_seen_by_peer_history,
1213 info.peer_kind,
1214 );
1215 let peer = match self.peers.entry(peer_id) {
1216 Entry::Occupied(mut entry) => {
1217 entry.insert(peer);
1218 entry.into_mut()
1219 }
1220 Entry::Vacant(entry) => entry.insert(peer),
1221 };
1222
1223 self.policies.propagation_policy_mut().on_session_established(peer);
1224
1225 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1229 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1230 return
1231 }
1232
1233 let pooled_txs = self.pool.pooled_transactions_max(
1235 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1236 );
1237 if pooled_txs.is_empty() {
1238 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1239 return;
1240 }
1241
1242 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1244 for pooled_tx in pooled_txs {
1245 peer.seen_transactions.insert(*pooled_tx.hash());
1246 msg_builder.push_pooled(pooled_tx);
1247 }
1248
1249 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1250 let msg = msg_builder.build();
1251 self.network.send_transactions_hashes(peer_id, msg);
1252 }
1253
1254 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1256 match event_result {
1257 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1258 let peer = self.peers.remove(&peer_id);
1261 if let Some(mut peer) = peer {
1262 self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1263 }
1264 self.transaction_fetcher.remove_peer(&peer_id);
1265 }
1266 NetworkEvent::ActivePeerSession { info, messages } => {
1267 self.handle_peer_session(info, messages);
1269 }
1270 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1271 let peer_id = info.peer_id;
1272 let messages = match self.peers.get(&peer_id) {
1274 Some(p) => p.request_tx.clone(),
1275 None => {
1276 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1277 return;
1278 }
1279 };
1280 self.handle_peer_session(info, messages);
1281 }
1282 _ => {}
1283 }
1284 }
1285
1286 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1288 match event {
1289 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1290 let has_blob_txs = msg.has_eip4844();
1294
1295 let non_blob_txs = msg
1296 .0
1297 .into_iter()
1298 .map(N::PooledTransaction::try_from)
1299 .filter_map(Result::ok)
1300 .collect();
1301
1302 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1303
1304 if has_blob_txs {
1305 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1306 self.report_peer_bad_transactions(peer_id);
1307 }
1308 }
1309 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1310 self.on_new_pooled_transaction_hashes(peer_id, msg)
1311 }
1312 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1313 self.on_get_pooled_transactions(peer_id, request, response)
1314 }
1315 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1316 let _ = response.send(Some(self.handle()));
1317 }
1318 }
1319 }
1320
1321 fn import_transactions(
1323 &mut self,
1324 peer_id: PeerId,
1325 transactions: PooledTransactions<N::PooledTransaction>,
1326 source: TransactionSource,
1327 ) {
1328 if self.network.is_initially_syncing() {
1330 return
1331 }
1332 if self.network.tx_gossip_disabled() {
1333 return
1334 }
1335
1336 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1337 let mut transactions = transactions.0;
1338
1339 self.transaction_fetcher
1341 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1342
1343 let mut num_already_seen_by_peer = 0;
1348 for tx in &transactions {
1349 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1350 num_already_seen_by_peer += 1;
1351 }
1352 }
1353
1354 let txns_count_pre_pool_filter = transactions.len();
1356 self.pool.retain_unknown(&mut transactions);
1357 if txns_count_pre_pool_filter > transactions.len() {
1358 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1359 self.metrics
1360 .occurrences_transactions_already_in_pool
1361 .increment(already_known_txns_count as u64);
1362 }
1363
1364 let mut has_bad_transactions = false;
1366
1367 let mut new_txs = Vec::with_capacity(transactions.len());
1370 for tx in transactions {
1371 let tx = match tx.try_into_recovered() {
1373 Ok(tx) => tx,
1374 Err(badtx) => {
1375 trace!(target: "net::tx",
1376 peer_id=format!("{peer_id:#}"),
1377 hash=%badtx.tx_hash(),
1378 client_version=%peer.client_version,
1379 "failed ecrecovery for transaction"
1380 );
1381 has_bad_transactions = true;
1382 continue
1383 }
1384 };
1385
1386 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1387 Entry::Occupied(mut entry) => {
1388 entry.get_mut().insert(peer_id);
1390 }
1391 Entry::Vacant(entry) => {
1392 if self.bad_imports.contains(tx.tx_hash()) {
1393 trace!(target: "net::tx",
1394 peer_id=format!("{peer_id:#}"),
1395 hash=%tx.tx_hash(),
1396 client_version=%peer.client_version,
1397 "received a known bad transaction from peer"
1398 );
1399 has_bad_transactions = true;
1400 } else {
1401 let pool_transaction = Pool::Transaction::from_pooled(tx);
1404 new_txs.push(pool_transaction);
1405
1406 entry.insert(HashSet::from([peer_id]));
1407 }
1408 }
1409 }
1410 }
1411 new_txs.shrink_to_fit();
1412
1413 if !new_txs.is_empty() {
1416 let pool = self.pool.clone();
1417 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1419 metric_pending_pool_imports.increment(new_txs.len() as f64);
1420
1421 self.pending_pool_imports_info
1423 .pending_pool_imports
1424 .fetch_add(new_txs.len(), Ordering::Relaxed);
1425 let tx_manager_info_pending_pool_imports =
1426 self.pending_pool_imports_info.pending_pool_imports.clone();
1427
1428 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1429 let import = Box::pin(async move {
1430 let added = new_txs.len();
1431 let res = pool.add_external_transactions(new_txs).await;
1432
1433 metric_pending_pool_imports.decrement(added as f64);
1435 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1437
1438 res
1439 });
1440
1441 self.pool_imports.push(import);
1442 }
1443
1444 if num_already_seen_by_peer > 0 {
1445 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1446 self.metrics
1447 .occurrences_of_transaction_already_seen_by_peer
1448 .increment(num_already_seen_by_peer);
1449 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1450 }
1451
1452 if has_bad_transactions {
1453 self.report_peer_bad_transactions(peer_id)
1455 }
1456
1457 if num_already_seen_by_peer > 0 {
1458 self.report_already_seen(peer_id);
1459 }
1460 }
1461
1462 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1464 match fetch_event {
1465 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1466 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1467 if report_peer {
1468 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1469 }
1470 }
1471 FetchEvent::FetchError { peer_id, error } => {
1472 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1473 self.on_request_error(peer_id, error);
1474 }
1475 FetchEvent::EmptyResponse { peer_id } => {
1476 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1477 }
1478 }
1479 }
1480}
1481
1482impl<
1490 Pool: TransactionPool + Unpin + 'static,
1491 N: NetworkPrimitives<
1492 BroadcastedTransaction: SignedTransaction,
1493 PooledTransaction: SignedTransaction,
1494 > + Unpin,
1495 PBundle: TransactionPolicies + Unpin,
1496 > Future for TransactionsManager<Pool, N, PBundle>
1497where
1498 Pool::Transaction:
1499 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1500{
1501 type Output = ();
1502
1503 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1504 let start = Instant::now();
1505 let mut poll_durations = TxManagerPollDurations::default();
1506
1507 let this = self.get_mut();
1508
1509 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1515 poll_durations.acc_network_events,
1516 "net::tx",
1517 "Network events stream",
1518 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1519 this.network_events.poll_next_unpin(cx),
1520 |event| this.on_network_event(event)
1521 );
1522
1523 let mut new_txs = Vec::new();
1532 let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1533 poll_durations.acc_imported_txns,
1534 "net::tx",
1535 "Pending transactions stream",
1536 DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1537 this.pending_transactions.poll_next_unpin(cx),
1538 |hash| new_txs.push(hash)
1539 );
1540 if !new_txs.is_empty() {
1541 this.on_new_pending_transactions(new_txs);
1542 }
1543
1544 let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1555 poll_durations.acc_fetch_events,
1556 "net::tx",
1557 "Transaction fetch events stream",
1558 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1559 this.transaction_fetcher.poll_next_unpin(cx),
1560 |event| this.on_fetch_event(event),
1561 );
1562
1563 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1578 poll_durations.acc_tx_events,
1579 "net::tx",
1580 "Network transaction events stream",
1581 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1582 this.transaction_events.poll_next_unpin(cx),
1583 |event| this.on_network_tx_event(event),
1584 );
1585
1586 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1601 poll_durations.acc_pending_imports,
1602 "net::tx",
1603 "Batched pool imports stream",
1604 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1605 this.pool_imports.poll_next_unpin(cx),
1606 |batch_results| this.on_batch_import_result(batch_results)
1607 );
1608
1609 duration_metered_exec!(
1614 {
1615 if this.has_capacity_for_fetching_pending_hashes() {
1616 this.on_fetch_hashes_pending_fetch();
1617 }
1618 },
1619 poll_durations.acc_pending_fetch
1620 );
1621
1622 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1624 poll_durations.acc_cmds,
1625 "net::tx",
1626 "Commands channel",
1627 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1628 this.command_rx.poll_next_unpin(cx),
1629 |cmd| this.on_command(cmd)
1630 );
1631
1632 this.transaction_fetcher.update_metrics();
1633
1634 if maybe_more_network_events ||
1636 maybe_more_commands ||
1637 maybe_more_tx_events ||
1638 maybe_more_tx_fetch_events ||
1639 maybe_more_pool_imports ||
1640 maybe_more_pending_txns
1641 {
1642 cx.waker().wake_by_ref();
1644 return Poll::Pending
1645 }
1646
1647 this.update_poll_metrics(start, poll_durations);
1648
1649 Poll::Pending
1650 }
1651}
1652
1653#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1657enum PropagationMode {
1658 Basic,
1662 Forced,
1667}
1668
1669impl PropagationMode {
1670 const fn is_forced(self) -> bool {
1672 matches!(self, Self::Forced)
1673 }
1674}
1675
1676#[derive(Debug, Clone)]
1678struct PropagateTransaction<T = TransactionSigned> {
1679 size: usize,
1680 transaction: Arc<T>,
1681}
1682
1683impl<T: SignedTransaction> PropagateTransaction<T> {
1684 pub fn new(transaction: T) -> Self {
1686 let size = transaction.length();
1687 Self { size, transaction: Arc::new(transaction) }
1688 }
1689
1690 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1692 where
1693 P: PoolTransaction<Consensus = T>,
1694 {
1695 let size = tx.encoded_length();
1696 let transaction = tx.transaction.clone_into_consensus();
1697 let transaction = Arc::new(transaction.into_inner());
1698 Self { size, transaction }
1699 }
1700
1701 fn tx_hash(&self) -> &TxHash {
1702 self.transaction.tx_hash()
1703 }
1704}
1705
1706#[derive(Debug, Clone)]
1709enum PropagateTransactionsBuilder<T> {
1710 Pooled(PooledTransactionsHashesBuilder),
1711 Full(FullTransactionsBuilder<T>),
1712}
1713
1714impl<T> PropagateTransactionsBuilder<T> {
1715 fn pooled(version: EthVersion) -> Self {
1717 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1718 }
1719
1720 fn full(version: EthVersion) -> Self {
1722 Self::Full(FullTransactionsBuilder::new(version))
1723 }
1724
1725 fn is_empty(&self) -> bool {
1727 match self {
1728 Self::Pooled(builder) => builder.is_empty(),
1729 Self::Full(builder) => builder.is_empty(),
1730 }
1731 }
1732
1733 fn build(self) -> PropagateTransactions<T> {
1735 match self {
1736 Self::Pooled(pooled) => {
1737 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1738 }
1739 Self::Full(full) => full.build(),
1740 }
1741 }
1742}
1743
1744impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1745 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1747 for tx in txs {
1748 self.push(tx);
1749 }
1750 }
1751
1752 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1754 match self {
1755 Self::Pooled(builder) => builder.push(transaction),
1756 Self::Full(builder) => builder.push(transaction),
1757 }
1758 }
1759}
1760
1761struct PropagateTransactions<T> {
1763 pooled: Option<NewPooledTransactionHashes>,
1765 full: Option<Vec<Arc<T>>>,
1767}
1768
1769#[derive(Debug, Clone)]
1774struct FullTransactionsBuilder<T> {
1775 total_size: usize,
1777 transactions: Vec<Arc<T>>,
1779 pooled: PooledTransactionsHashesBuilder,
1781}
1782
1783impl<T> FullTransactionsBuilder<T> {
1784 fn new(version: EthVersion) -> Self {
1786 Self {
1787 total_size: 0,
1788 pooled: PooledTransactionsHashesBuilder::new(version),
1789 transactions: vec![],
1790 }
1791 }
1792
1793 fn is_empty(&self) -> bool {
1795 self.transactions.is_empty() && self.pooled.is_empty()
1796 }
1797
1798 fn build(self) -> PropagateTransactions<T> {
1800 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1801 let full = Some(self.transactions).filter(|full| !full.is_empty());
1802 PropagateTransactions { pooled, full }
1803 }
1804}
1805
1806impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1807 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1809 for tx in txs {
1810 self.push(&tx)
1811 }
1812 }
1813
1814 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1824 if !transaction.transaction.is_broadcastable_in_full() {
1833 self.pooled.push(transaction);
1834 return
1835 }
1836
1837 let new_size = self.total_size + transaction.size;
1838 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1839 self.total_size > 0
1840 {
1841 self.pooled.push(transaction);
1843 return
1844 }
1845
1846 self.total_size = new_size;
1847 self.transactions.push(Arc::clone(&transaction.transaction));
1848 }
1849}
1850
1851#[derive(Debug, Clone)]
1854enum PooledTransactionsHashesBuilder {
1855 Eth66(NewPooledTransactionHashes66),
1856 Eth68(NewPooledTransactionHashes68),
1857}
1858
1859impl PooledTransactionsHashesBuilder {
1862 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1864 match self {
1865 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1866 Self::Eth68(msg) => {
1867 msg.hashes.push(*pooled_tx.hash());
1868 msg.sizes.push(pooled_tx.encoded_length());
1869 msg.types.push(pooled_tx.transaction.ty());
1870 }
1871 }
1872 }
1873
1874 fn is_empty(&self) -> bool {
1876 match self {
1877 Self::Eth66(hashes) => hashes.is_empty(),
1878 Self::Eth68(hashes) => hashes.is_empty(),
1879 }
1880 }
1881
1882 fn extend<T: SignedTransaction>(
1884 &mut self,
1885 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1886 ) {
1887 for tx in txs {
1888 self.push(&tx);
1889 }
1890 }
1891
1892 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1893 match self {
1894 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1895 Self::Eth68(msg) => {
1896 msg.hashes.push(*tx.tx_hash());
1897 msg.sizes.push(tx.size);
1898 msg.types.push(tx.transaction.ty());
1899 }
1900 }
1901 }
1902
1903 fn new(version: EthVersion) -> Self {
1905 match version {
1906 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1907 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1908 }
1909 }
1910
1911 fn build(self) -> NewPooledTransactionHashes {
1912 match self {
1913 Self::Eth66(msg) => msg.into(),
1914 Self::Eth68(msg) => msg.into(),
1915 }
1916 }
1917}
1918
1919enum TransactionSource {
1921 Broadcast,
1923 Response,
1925}
1926
1927impl TransactionSource {
1930 const fn is_broadcast(&self) -> bool {
1932 matches!(self, Self::Broadcast)
1933 }
1934}
1935
1936#[derive(Debug)]
1938pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1939 seen_transactions: LruCache<TxHash>,
1943 request_tx: PeerRequestSender<PeerRequest<N>>,
1945 version: EthVersion,
1947 client_version: Arc<str>,
1949 peer_kind: PeerKind,
1951}
1952
1953impl<N: NetworkPrimitives> PeerMetadata<N> {
1954 pub fn new(
1956 request_tx: PeerRequestSender<PeerRequest<N>>,
1957 version: EthVersion,
1958 client_version: Arc<str>,
1959 max_transactions_seen_by_peer: u32,
1960 peer_kind: PeerKind,
1961 ) -> Self {
1962 Self {
1963 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1964 request_tx,
1965 version,
1966 client_version,
1967 peer_kind,
1968 }
1969 }
1970
1971 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1973 &self.request_tx
1974 }
1975
1976 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1978 &mut self.seen_transactions
1979 }
1980
1981 pub const fn version(&self) -> EthVersion {
1983 self.version
1984 }
1985
1986 pub fn client_version(&self) -> &str {
1988 &self.client_version
1989 }
1990
1991 pub const fn peer_kind(&self) -> PeerKind {
1993 self.peer_kind
1994 }
1995}
1996
1997#[derive(Debug)]
1999enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2000 PropagateHash(B256),
2002 PropagateHashesTo(Vec<B256>, PeerId),
2004 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2006 PropagateTransactionsTo(Vec<TxHash>, PeerId),
2008 PropagateTransactions(Vec<TxHash>),
2010 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2012 GetTransactionHashes {
2014 peers: Vec<PeerId>,
2015 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2016 },
2017 GetPeerSender {
2019 peer_id: PeerId,
2020 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2021 },
2022}
2023
2024#[derive(Debug)]
2026pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2027 IncomingTransactions {
2031 peer_id: PeerId,
2033 msg: Transactions<N::BroadcastedTransaction>,
2035 },
2036 IncomingPooledTransactionHashes {
2038 peer_id: PeerId,
2040 msg: NewPooledTransactionHashes,
2042 },
2043 GetPooledTransactions {
2045 peer_id: PeerId,
2047 request: GetPooledTransactions,
2049 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2051 },
2052 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2054}
2055
2056#[derive(Debug)]
2058pub struct PendingPoolImportsInfo {
2059 pending_pool_imports: Arc<AtomicUsize>,
2061 max_pending_pool_imports: usize,
2063}
2064
2065impl PendingPoolImportsInfo {
2066 pub fn new(max_pending_pool_imports: usize) -> Self {
2068 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2069 }
2070
2071 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2073 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2074 }
2075}
2076
2077impl Default for PendingPoolImportsInfo {
2078 fn default() -> Self {
2079 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2080 }
2081}
2082
2083#[derive(Debug, Default)]
2084struct TxManagerPollDurations {
2085 acc_network_events: Duration,
2086 acc_pending_imports: Duration,
2087 acc_tx_events: Duration,
2088 acc_imported_txns: Duration,
2089 acc_fetch_events: Duration,
2090 acc_pending_fetch: Duration,
2091 acc_cmds: Duration,
2092}
2093
2094#[cfg(test)]
2095mod tests {
2096 use super::*;
2097 use crate::{
2098 test_utils::{
2099 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2100 Testnet,
2101 },
2102 transactions::config::RelaxedEthAnnouncementFilter,
2103 NetworkConfigBuilder, NetworkManager,
2104 };
2105 use alloy_consensus::{TxEip1559, TxLegacy};
2106 use alloy_primitives::{hex, Signature, TxKind, U256};
2107 use alloy_rlp::Decodable;
2108 use futures::FutureExt;
2109 use reth_chainspec::MIN_TRANSACTION_GAS;
2110 use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2111 use reth_network_api::{NetworkInfo, PeerKind};
2112 use reth_network_p2p::{
2113 error::{RequestError, RequestResult},
2114 sync::{NetworkSyncUpdater, SyncState},
2115 };
2116 use reth_storage_api::noop::NoopProvider;
2117 use reth_transaction_pool::test_utils::{
2118 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2119 };
2120 use secp256k1::SecretKey;
2121 use std::{
2122 future::poll_fn,
2123 net::{IpAddr, Ipv4Addr, SocketAddr},
2124 str::FromStr,
2125 };
2126 use tracing::error;
2127
2128 #[tokio::test(flavor = "multi_thread")]
2129 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2130 reth_tracing::init_test_tracing();
2131 let net = Testnet::create(3).await;
2132
2133 let mut handles = net.handles();
2134 let handle0 = handles.next().unwrap();
2135 let handle1 = handles.next().unwrap();
2136
2137 drop(handles);
2138 let handle = net.spawn();
2139
2140 let listener0 = handle0.event_listener();
2141 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2142 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2143
2144 let client = NoopProvider::default();
2145 let pool = testing_pool();
2146 let config = NetworkConfigBuilder::eth(secret_key)
2147 .disable_discovery()
2148 .listener_port(0)
2149 .build(client);
2150 let transactions_manager_config = config.transactions_manager_config.clone();
2151 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2152 .await
2153 .unwrap()
2154 .into_builder()
2155 .transactions(pool.clone(), transactions_manager_config)
2156 .split_with_handle();
2157
2158 tokio::task::spawn(network);
2159
2160 network_handle.update_sync_state(SyncState::Syncing);
2162 assert!(NetworkInfo::is_syncing(&network_handle));
2163 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2164
2165 let mut established = listener0.take(2);
2167 while let Some(ev) = established.next().await {
2168 match ev {
2169 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2170 transactions
2172 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2173 }
2174 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2175 ev => {
2176 error!("unexpected event {ev:?}")
2177 }
2178 }
2179 }
2180 let input = hex!(
2182 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2183 );
2184 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2185 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2186 peer_id: *handle1.peer_id(),
2187 msg: Transactions(vec![signed_tx.clone()]),
2188 });
2189 poll_fn(|cx| {
2190 let _ = transactions.poll_unpin(cx);
2191 Poll::Ready(())
2192 })
2193 .await;
2194 assert!(pool.is_empty());
2195 handle.terminate().await;
2196 }
2197
2198 #[tokio::test(flavor = "multi_thread")]
2199 async fn test_tx_broadcasts_through_two_syncs() {
2200 reth_tracing::init_test_tracing();
2201 let net = Testnet::create(3).await;
2202
2203 let mut handles = net.handles();
2204 let handle0 = handles.next().unwrap();
2205 let handle1 = handles.next().unwrap();
2206
2207 drop(handles);
2208 let handle = net.spawn();
2209
2210 let listener0 = handle0.event_listener();
2211 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2212 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2213
2214 let client = NoopProvider::default();
2215 let pool = testing_pool();
2216 let config = NetworkConfigBuilder::new(secret_key)
2217 .disable_discovery()
2218 .listener_port(0)
2219 .build(client);
2220 let transactions_manager_config = config.transactions_manager_config.clone();
2221 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2222 .await
2223 .unwrap()
2224 .into_builder()
2225 .transactions(pool.clone(), transactions_manager_config)
2226 .split_with_handle();
2227
2228 tokio::task::spawn(network);
2229
2230 network_handle.update_sync_state(SyncState::Syncing);
2232 assert!(NetworkInfo::is_syncing(&network_handle));
2233 network_handle.update_sync_state(SyncState::Idle);
2234 assert!(!NetworkInfo::is_syncing(&network_handle));
2235 network_handle.update_sync_state(SyncState::Syncing);
2236 assert!(NetworkInfo::is_syncing(&network_handle));
2237
2238 let mut established = listener0.take(2);
2240 while let Some(ev) = established.next().await {
2241 match ev {
2242 NetworkEvent::ActivePeerSession { .. } |
2243 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2244 transactions.on_network_event(ev);
2246 }
2247 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2248 _ => {
2249 error!("unexpected event {ev:?}")
2250 }
2251 }
2252 }
2253 let input = hex!(
2255 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2256 );
2257 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2258 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2259 peer_id: *handle1.peer_id(),
2260 msg: Transactions(vec![signed_tx.clone()]),
2261 });
2262 poll_fn(|cx| {
2263 let _ = transactions.poll_unpin(cx);
2264 Poll::Ready(())
2265 })
2266 .await;
2267 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2268 assert!(NetworkInfo::is_syncing(&network_handle));
2269 assert!(!pool.is_empty());
2270 handle.terminate().await;
2271 }
2272
2273 #[tokio::test(flavor = "multi_thread")]
2276 async fn test_handle_incoming_transactions_hashes() {
2277 reth_tracing::init_test_tracing();
2278
2279 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2280 let client = NoopProvider::default();
2281
2282 let config = NetworkConfigBuilder::new(secret_key)
2283 .listener_port(0)
2285 .disable_discovery()
2286 .build(client);
2287
2288 let pool = testing_pool();
2289
2290 let transactions_manager_config = config.transactions_manager_config.clone();
2291 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2292 .await
2293 .unwrap()
2294 .into_builder()
2295 .transactions(pool.clone(), transactions_manager_config)
2296 .split_with_handle();
2297
2298 let peer_id_1 = PeerId::new([1; 64]);
2299 let eth_version = EthVersion::Eth66;
2300
2301 let txs = vec![TransactionSigned::new_unhashed(
2302 Transaction::Legacy(TxLegacy {
2303 chain_id: Some(4),
2304 nonce: 15u64,
2305 gas_price: 2200000000,
2306 gas_limit: 34811,
2307 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2308 value: U256::from(1234u64),
2309 input: Default::default(),
2310 }),
2311 Signature::new(
2312 U256::from_str(
2313 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2314 )
2315 .unwrap(),
2316 U256::from_str(
2317 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2318 )
2319 .unwrap(),
2320 true,
2321 ),
2322 )];
2323
2324 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2325
2326 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2327 tx_manager.peers.insert(peer_id_1, peer_1);
2328
2329 assert!(pool.is_empty());
2330
2331 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2332 peer_id: peer_id_1,
2333 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2334 txs_hashes.clone(),
2335 )),
2336 });
2337
2338 let req = to_mock_session_rx
2340 .recv()
2341 .await
2342 .expect("peer_1 session should receive request with buffered hashes");
2343 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2344 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2345
2346 let message: Vec<PooledTransactionVariant> = txs
2347 .into_iter()
2348 .map(|tx| {
2349 PooledTransactionVariant::try_from(tx)
2350 .expect("Failed to convert MockTransaction to PooledTransaction")
2351 })
2352 .collect();
2353
2354 response
2356 .send(Ok(PooledTransactions(message)))
2357 .expect("should send peer_1 response to tx manager");
2358
2359 poll_fn(|cx| {
2361 let _ = tx_manager.poll_unpin(cx);
2362 Poll::Ready(())
2363 })
2364 .await;
2365
2366 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2369 }
2370
2371 #[tokio::test(flavor = "multi_thread")]
2372 async fn test_handle_incoming_transactions() {
2373 reth_tracing::init_test_tracing();
2374 let net = Testnet::create(3).await;
2375
2376 let mut handles = net.handles();
2377 let handle0 = handles.next().unwrap();
2378 let handle1 = handles.next().unwrap();
2379
2380 drop(handles);
2381 let handle = net.spawn();
2382
2383 let listener0 = handle0.event_listener();
2384
2385 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2386 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2387
2388 let client = NoopProvider::default();
2389 let pool = testing_pool();
2390 let config = NetworkConfigBuilder::new(secret_key)
2391 .disable_discovery()
2392 .listener_port(0)
2393 .build(client);
2394 let transactions_manager_config = config.transactions_manager_config.clone();
2395 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2396 .await
2397 .unwrap()
2398 .into_builder()
2399 .transactions(pool.clone(), transactions_manager_config)
2400 .split_with_handle();
2401 tokio::task::spawn(network);
2402
2403 network_handle.update_sync_state(SyncState::Idle);
2404
2405 assert!(!NetworkInfo::is_syncing(&network_handle));
2406
2407 let mut established = listener0.take(2);
2409 while let Some(ev) = established.next().await {
2410 match ev {
2411 NetworkEvent::ActivePeerSession { .. } |
2412 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2413 transactions.on_network_event(ev);
2415 }
2416 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2417 ev => {
2418 error!("unexpected event {ev:?}")
2419 }
2420 }
2421 }
2422 let input = hex!(
2424 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2425 );
2426 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2427 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2428 peer_id: *handle1.peer_id(),
2429 msg: Transactions(vec![signed_tx.clone()]),
2430 });
2431 assert!(transactions
2432 .transactions_by_peers
2433 .get(signed_tx.tx_hash())
2434 .unwrap()
2435 .contains(handle1.peer_id()));
2436
2437 poll_fn(|cx| {
2439 let _ = transactions.poll_unpin(cx);
2440 Poll::Ready(())
2441 })
2442 .await;
2443
2444 assert!(!pool.is_empty());
2445 assert!(pool.get(signed_tx.tx_hash()).is_some());
2446 handle.terminate().await;
2447 }
2448
2449 #[tokio::test(flavor = "multi_thread")]
2450 async fn test_on_get_pooled_transactions_network() {
2451 reth_tracing::init_test_tracing();
2452 let net = Testnet::create(2).await;
2453
2454 let mut handles = net.handles();
2455 let handle0 = handles.next().unwrap();
2456 let handle1 = handles.next().unwrap();
2457
2458 drop(handles);
2459 let handle = net.spawn();
2460
2461 let listener0 = handle0.event_listener();
2462
2463 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2464 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2465
2466 let client = NoopProvider::default();
2467 let pool = testing_pool();
2468 let config = NetworkConfigBuilder::new(secret_key)
2469 .disable_discovery()
2470 .listener_port(0)
2471 .build(client);
2472 let transactions_manager_config = config.transactions_manager_config.clone();
2473 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2474 .await
2475 .unwrap()
2476 .into_builder()
2477 .transactions(pool.clone(), transactions_manager_config)
2478 .split_with_handle();
2479 tokio::task::spawn(network);
2480
2481 network_handle.update_sync_state(SyncState::Idle);
2482
2483 assert!(!NetworkInfo::is_syncing(&network_handle));
2484
2485 let mut established = listener0.take(2);
2487 while let Some(ev) = established.next().await {
2488 match ev {
2489 NetworkEvent::ActivePeerSession { .. } |
2490 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2491 transactions.on_network_event(ev);
2492 }
2493 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2494 ev => {
2495 error!("unexpected event {ev:?}")
2496 }
2497 }
2498 }
2499 handle.terminate().await;
2500
2501 let tx = MockTransaction::eip1559();
2502 let _ = transactions
2503 .pool
2504 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2505 .await;
2506
2507 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2508
2509 let (send, receive) =
2510 oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2511
2512 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2513 peer_id: *handle1.peer_id(),
2514 request,
2515 response: send,
2516 });
2517
2518 match receive.await.unwrap() {
2519 Ok(PooledTransactions(transactions)) => {
2520 assert_eq!(transactions.len(), 1);
2521 }
2522 Err(e) => {
2523 panic!("error: {e:?}");
2524 }
2525 }
2526 }
2527
2528 #[tokio::test]
2532 async fn test_partially_tx_response() {
2533 reth_tracing::init_test_tracing();
2534
2535 let mut tx_manager = new_tx_manager().await.0;
2536 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2537
2538 let peer_id_1 = PeerId::new([1; 64]);
2539 let eth_version = EthVersion::Eth66;
2540
2541 let txs = vec![
2542 TransactionSigned::new_unhashed(
2543 Transaction::Legacy(TxLegacy {
2544 chain_id: Some(4),
2545 nonce: 15u64,
2546 gas_price: 2200000000,
2547 gas_limit: 34811,
2548 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2549 value: U256::from(1234u64),
2550 input: Default::default(),
2551 }),
2552 Signature::new(
2553 U256::from_str(
2554 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2555 )
2556 .unwrap(),
2557 U256::from_str(
2558 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2559 )
2560 .unwrap(),
2561 true,
2562 ),
2563 ),
2564 TransactionSigned::new_unhashed(
2565 Transaction::Eip1559(TxEip1559 {
2566 chain_id: 4,
2567 nonce: 26u64,
2568 max_priority_fee_per_gas: 1500000000,
2569 max_fee_per_gas: 1500000013,
2570 gas_limit: MIN_TRANSACTION_GAS,
2571 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2572 value: U256::from(3000000000000000000u64),
2573 input: Default::default(),
2574 access_list: Default::default(),
2575 }),
2576 Signature::new(
2577 U256::from_str(
2578 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2579 )
2580 .unwrap(),
2581 U256::from_str(
2582 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2583 )
2584 .unwrap(),
2585 true,
2586 ),
2587 ),
2588 ];
2589
2590 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2591
2592 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2593 peer_1.seen_transactions.insert(txs_hashes[0]);
2596 peer_1.seen_transactions.insert(txs_hashes[1]);
2597 tx_manager.peers.insert(peer_id_1, peer_1);
2598
2599 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2600 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2601
2602 assert!(tx_fetcher.is_idle(&peer_id_1));
2604 assert_eq!(tx_fetcher.active_peers.len(), 0);
2605
2606 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2608
2609 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2610 assert!(!tx_fetcher.is_idle(&peer_id_1));
2612 assert_eq!(tx_fetcher.active_peers.len(), 1);
2613
2614 let req = to_mock_session_rx
2616 .recv()
2617 .await
2618 .expect("peer_1 session should receive request with buffered hashes");
2619 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2620
2621 let message: Vec<PooledTransactionVariant> = txs
2622 .into_iter()
2623 .take(1)
2624 .map(|tx| {
2625 PooledTransactionVariant::try_from(tx)
2626 .expect("Failed to convert MockTransaction to PooledTransaction")
2627 })
2628 .collect();
2629 response
2631 .send(Ok(PooledTransactions(message)))
2632 .expect("should send peer_1 response to tx manager");
2633 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2634 unreachable!()
2635 };
2636
2637 assert!(tx_fetcher.is_idle(&peer_id));
2639 assert_eq!(tx_fetcher.active_peers.len(), 0);
2640 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2642 }
2643
2644 #[tokio::test]
2645 async fn test_max_retries_tx_request() {
2646 reth_tracing::init_test_tracing();
2647
2648 let mut tx_manager = new_tx_manager().await.0;
2649 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2650
2651 let peer_id_1 = PeerId::new([1; 64]);
2652 let peer_id_2 = PeerId::new([2; 64]);
2653 let eth_version = EthVersion::Eth66;
2654 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2655
2656 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2657 peer_1.seen_transactions.insert(seen_hashes[0]);
2660 peer_1.seen_transactions.insert(seen_hashes[1]);
2661 tx_manager.peers.insert(peer_id_1, peer_1);
2662
2663 let retries = 1;
2666 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2667 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2668
2669 assert!(tx_fetcher.is_idle(&peer_id_1));
2671 assert_eq!(tx_fetcher.active_peers.len(), 0);
2672
2673 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2675
2676 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2677
2678 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2679 assert!(!tx_fetcher.is_idle(&peer_id_1));
2681 assert_eq!(tx_fetcher.active_peers.len(), 1);
2682
2683 let req = to_mock_session_rx
2685 .recv()
2686 .await
2687 .expect("peer_1 session should receive request with buffered hashes");
2688 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2689 let GetPooledTransactions(hashes) = request;
2690
2691 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2692
2693 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2694
2695 response
2697 .send(Err(RequestError::BadResponse))
2698 .expect("should send peer_1 response to tx manager");
2699 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2700 unreachable!()
2701 };
2702
2703 assert!(tx_fetcher.is_idle(&peer_id));
2705 assert_eq!(tx_fetcher.active_peers.len(), 0);
2706 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2708
2709 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2710 tx_manager.peers.insert(peer_id_2, peer_2);
2711
2712 let msg =
2714 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2715 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2716
2717 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2718
2719 assert_eq!(tx_fetcher.active_peers.len(), 1);
2721
2722 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2724 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2726
2727 let req = to_mock_session_rx
2729 .recv()
2730 .await
2731 .expect("peer_2 session should receive request with buffered hashes");
2732 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2733
2734 response
2736 .send(Err(RequestError::BadResponse))
2737 .expect("should send peer_2 response to tx manager");
2738 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2739
2740 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2743 assert_eq!(tx_fetcher.active_peers.len(), 0);
2744 }
2745
2746 #[test]
2747 fn test_transaction_builder_empty() {
2748 let mut builder =
2749 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2750 assert!(builder.is_empty());
2751
2752 let mut factory = MockTransactionFactory::default();
2753 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2754 builder.push(&tx);
2755 assert!(!builder.is_empty());
2756
2757 let txs = builder.build();
2758 assert!(txs.full.is_none());
2759 let txs = txs.pooled.unwrap();
2760 assert_eq!(txs.len(), 1);
2761 }
2762
2763 #[test]
2764 fn test_transaction_builder_large() {
2765 let mut builder =
2766 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2767 assert!(builder.is_empty());
2768
2769 let mut factory = MockTransactionFactory::default();
2770 let mut tx = factory.create_eip1559();
2771 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2773 let tx = Arc::new(tx);
2774 let tx = PropagateTransaction::pool_tx(tx);
2775 builder.push(&tx);
2776 assert!(!builder.is_empty());
2777
2778 let txs = builder.clone().build();
2779 assert!(txs.pooled.is_none());
2780 let txs = txs.full.unwrap();
2781 assert_eq!(txs.len(), 1);
2782
2783 builder.push(&tx);
2784
2785 let txs = builder.clone().build();
2786 let pooled = txs.pooled.unwrap();
2787 assert_eq!(pooled.len(), 1);
2788 let txs = txs.full.unwrap();
2789 assert_eq!(txs.len(), 1);
2790 }
2791
2792 #[test]
2793 fn test_transaction_builder_eip4844() {
2794 let mut builder =
2795 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2796 assert!(builder.is_empty());
2797
2798 let mut factory = MockTransactionFactory::default();
2799 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2800 builder.push(&tx);
2801 assert!(!builder.is_empty());
2802
2803 let txs = builder.clone().build();
2804 assert!(txs.full.is_none());
2805 let txs = txs.pooled.unwrap();
2806 assert_eq!(txs.len(), 1);
2807
2808 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2809 builder.push(&tx);
2810
2811 let txs = builder.clone().build();
2812 let pooled = txs.pooled.unwrap();
2813 assert_eq!(pooled.len(), 1);
2814 let txs = txs.full.unwrap();
2815 assert_eq!(txs.len(), 1);
2816 }
2817
2818 #[tokio::test]
2819 async fn test_propagate_full() {
2820 reth_tracing::init_test_tracing();
2821
2822 let (mut tx_manager, network) = new_tx_manager().await;
2823 let peer_id = PeerId::random();
2824
2825 network.handle().update_sync_state(SyncState::Idle);
2827
2828 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2830
2831 let session_info = SessionInfo {
2832 peer_id,
2833 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2834 client_version: Arc::from(""),
2835 capabilities: Arc::new(vec![].into()),
2836 status: Arc::new(Default::default()),
2837 version: EthVersion::Eth68,
2838 peer_kind: PeerKind::Basic,
2839 };
2840 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2841 tx_manager
2842 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2843 let mut propagate = vec![];
2844 let mut factory = MockTransactionFactory::default();
2845 let eip1559_tx = Arc::new(factory.create_eip1559());
2846 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2847 let eip4844_tx = Arc::new(factory.create_eip4844());
2848 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2849
2850 let propagated =
2851 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2852 assert_eq!(propagated.0.len(), 2);
2853 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2854 assert_eq!(prop_txs.len(), 1);
2855 assert!(prop_txs[0].is_full());
2856
2857 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2858 assert_eq!(prop_txs.len(), 1);
2859 assert!(prop_txs[0].is_hash());
2860
2861 let peer = tx_manager.peers.get(&peer_id).unwrap();
2862 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2863 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2864 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2865
2866 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2868 assert!(propagated.0.is_empty());
2869 }
2870
2871 #[tokio::test]
2872 async fn test_relaxed_filter_ignores_unknown_tx_types() {
2873 reth_tracing::init_test_tracing();
2874
2875 let transactions_manager_config = TransactionsManagerConfig::default();
2876
2877 let propagation_policy = TransactionPropagationKind::default();
2878 let announcement_policy = RelaxedEthAnnouncementFilter::default();
2879
2880 let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2881
2882 let pool = testing_pool();
2883 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2884 let client = NoopProvider::default();
2885
2886 let network_config = NetworkConfigBuilder::new(secret_key)
2887 .listener_port(0)
2888 .disable_discovery()
2889 .build(client.clone());
2890
2891 let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2892 let (to_tx_manager_tx, from_network_rx) =
2893 mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2894 network_manager.set_transactions(to_tx_manager_tx);
2895 let network_handle = network_manager.handle().clone();
2896 let network_service_handle = tokio::spawn(network_manager);
2897
2898 let mut tx_manager = TransactionsManager::<
2899 TestPool,
2900 EthNetworkPrimitives,
2901 NetworkPolicies<TransactionPropagationKind, RelaxedEthAnnouncementFilter>,
2902 >::with_policy(
2903 network_handle.clone(),
2904 pool.clone(),
2905 from_network_rx,
2906 transactions_manager_config,
2907 policy_bundle,
2908 );
2909
2910 let peer_id = PeerId::random();
2911 let eth_version = EthVersion::Eth68;
2912 let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2913 tx_manager.peers.insert(peer_id, mock_peer_metadata);
2914
2915 let mut tx_factory = MockTransactionFactory::default();
2916
2917 let valid_known_tx = tx_factory.create_eip1559();
2918 let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2919
2920 let known_tx_hash = *known_tx_signed.hash();
2921 let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2922 let known_tx_size = known_tx_signed.encoded_length();
2923
2924 let unknown_tx_hash = B256::random();
2925 let unknown_tx_type_byte = 0xff_u8;
2926 let unknown_tx_size = 150;
2927
2928 let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2929 types: vec![known_tx_type_byte, unknown_tx_type_byte],
2930 sizes: vec![known_tx_size, unknown_tx_size],
2931 hashes: vec![known_tx_hash, unknown_tx_hash],
2932 });
2933
2934 tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2935
2936 poll_fn(|cx| {
2937 let _ = tx_manager.poll_unpin(cx);
2938 Poll::Ready(())
2939 })
2940 .await;
2941
2942 let mut requested_hashes_in_getpooled = HashSet::new();
2943 let mut unexpected_request_received = false;
2944
2945 match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2946 .await
2947 {
2948 Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2949 let GetPooledTransactions(hashes) = request;
2950 for hash in hashes {
2951 requested_hashes_in_getpooled.insert(hash);
2952 }
2953 let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2954 }
2955 Ok(Some(other_request)) => {
2956 tracing::error!(?other_request, "Received unexpected PeerRequest type");
2957 unexpected_request_received = true;
2958 }
2959 Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2960 Err(_timeout_err) => {
2961 tracing::info!("Timeout: No GetPooledTransactions request received.")
2962 }
2963 }
2964
2965 assert!(
2966 requested_hashes_in_getpooled.contains(&known_tx_hash),
2967 "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2968 );
2969 assert!(
2970 !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2971 "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2972 );
2973 assert!(
2974 !unexpected_request_received,
2975 "An unexpected P2P request was received by the mock peer."
2976 );
2977
2978 network_service_handle.abort();
2979 }
2980}