1pub mod config;
5pub mod constants;
7pub mod fetcher;
9pub mod validation;
10
11pub use self::constants::{
12 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
13 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
14};
15pub use config::{TransactionFetcherConfig, TransactionPropagationMode, TransactionsManagerConfig};
16pub use validation::*;
17
18pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
19
20use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
21use crate::{
22 budget::{
23 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
24 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
25 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
26 },
27 cache::LruCache,
28 duration_metered_exec, metered_poll_nested_stream_with_budget,
29 metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
30 NetworkHandle,
31};
32use alloy_primitives::{TxHash, B256};
33use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
34use futures::{stream::FuturesUnordered, Future, StreamExt};
35use reth_eth_wire::{
36 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
37 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
38 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
39 RequestTxHashes, Transactions,
40};
41use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
42use reth_network_api::{
43 events::{PeerEvent, SessionInfo},
44 NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
45};
46use reth_network_p2p::{
47 error::{RequestError, RequestResult},
48 sync::SyncStateProvider,
49};
50use reth_network_peers::PeerId;
51use reth_network_types::ReputationChangeKind;
52use reth_primitives::TransactionSigned;
53use reth_primitives_traits::SignedTransaction;
54use reth_tokio_util::EventStream;
55use reth_transaction_pool::{
56 error::{PoolError, PoolResult},
57 GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
58 TransactionPool, ValidPoolTransaction,
59};
60use std::{
61 collections::{hash_map::Entry, HashMap, HashSet},
62 pin::Pin,
63 sync::{
64 atomic::{AtomicUsize, Ordering},
65 Arc,
66 },
67 task::{Context, Poll},
68 time::{Duration, Instant},
69};
70use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
71use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
72use tracing::{debug, trace};
73
74pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
78
79#[derive(Debug, Clone)]
87pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
88 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
90}
91
92impl<N: NetworkPrimitives> TransactionsHandle<N> {
95 fn send(&self, cmd: TransactionsCommand<N>) {
96 let _ = self.manager_tx.send(cmd);
97 }
98
99 async fn peer_handle(
101 &self,
102 peer_id: PeerId,
103 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
104 let (tx, rx) = oneshot::channel();
105 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
106 rx.await
107 }
108
109 pub fn propagate(&self, hash: TxHash) {
111 self.send(TransactionsCommand::PropagateHash(hash))
112 }
113
114 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
118 self.propagate_hashes_to(Some(hash), peer)
119 }
120
121 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
125 let hashes = hash.into_iter().collect::<Vec<_>>();
126 if hashes.is_empty() {
127 return
128 }
129 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
130 }
131
132 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
134 let (tx, rx) = oneshot::channel();
135 self.send(TransactionsCommand::GetActivePeers(tx));
136 rx.await
137 }
138
139 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
143 if transactions.is_empty() {
144 return
145 }
146 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
147 }
148
149 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
154 if transactions.is_empty() {
155 return
156 }
157 self.send(TransactionsCommand::PropagateTransactions(transactions))
158 }
159
160 pub fn broadcast_transactions(
165 &self,
166 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
167 ) {
168 let transactions =
169 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
170 if transactions.is_empty() {
171 return
172 }
173 self.send(TransactionsCommand::BroadcastTransactions(transactions))
174 }
175
176 pub async fn get_transaction_hashes(
178 &self,
179 peers: Vec<PeerId>,
180 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
181 if peers.is_empty() {
182 return Ok(Default::default())
183 }
184 let (tx, rx) = oneshot::channel();
185 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
186 rx.await
187 }
188
189 pub async fn get_peer_transaction_hashes(
191 &self,
192 peer: PeerId,
193 ) -> Result<HashSet<TxHash>, RecvError> {
194 let res = self.get_transaction_hashes(vec![peer]).await?;
195 Ok(res.into_values().next().unwrap_or_default())
196 }
197
198 pub async fn get_pooled_transactions_from(
204 &self,
205 peer_id: PeerId,
206 hashes: Vec<B256>,
207 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
208 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
209
210 let (tx, rx) = oneshot::channel();
211 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
212 peer.try_send(request).ok();
213
214 rx.await?.map(|res| Some(res.0))
215 }
216}
217
218#[derive(Debug)]
236#[must_use = "Manager does nothing unless polled."]
237pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
238 pool: Pool,
240 network: NetworkHandle<N>,
242 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
246 transaction_fetcher: TransactionFetcher<N>,
248 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
253 pool_imports: FuturesUnordered<PoolImportFuture>,
265 pending_pool_imports_info: PendingPoolImportsInfo,
267 bad_imports: LruCache<TxHash>,
269 peers: HashMap<PeerId, PeerMetadata<N>>,
271 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
275 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
280 pending_transactions: ReceiverStream<TxHash>,
289 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
291 config: TransactionsManagerConfig,
293 metrics: TransactionsManagerMetrics,
295}
296
297impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
298 pub fn new(
302 network: NetworkHandle<N>,
303 pool: Pool,
304 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
305 transactions_manager_config: TransactionsManagerConfig,
306 ) -> Self {
307 let network_events = network.event_listener();
308
309 let (command_tx, command_rx) = mpsc::unbounded_channel();
310
311 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
312 &transactions_manager_config.transaction_fetcher_config,
313 );
314
315 let pending = pool.pending_transactions_listener();
318 let pending_pool_imports_info = PendingPoolImportsInfo::default();
319 let metrics = TransactionsManagerMetrics::default();
320 metrics
321 .capacity_pending_pool_imports
322 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
323
324 Self {
325 pool,
326 network,
327 network_events,
328 transaction_fetcher,
329 transactions_by_peers: Default::default(),
330 pool_imports: Default::default(),
331 pending_pool_imports_info: PendingPoolImportsInfo::new(
332 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
333 ),
334 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
335 peers: Default::default(),
336 command_tx,
337 command_rx: UnboundedReceiverStream::new(command_rx),
338 pending_transactions: ReceiverStream::new(pending),
339 transaction_events: UnboundedMeteredReceiver::new(
340 from_network,
341 NETWORK_POOL_TRANSACTIONS_SCOPE,
342 ),
343 config: transactions_manager_config,
344 metrics,
345 }
346 }
347
348 pub fn handle(&self) -> TransactionsHandle<N> {
350 TransactionsHandle { manager_tx: self.command_tx.clone() }
351 }
352
353 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
356 self.pending_pool_imports_info
357 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
358 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
359 }
360
361 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
362 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
363 self.metrics.reported_bad_transactions.increment(1);
364 }
365
366 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
367 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
368 self.network.reputation_change(peer_id, kind);
369 }
370
371 fn report_already_seen(&self, peer_id: PeerId) {
372 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
373 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
374 }
375
376 fn on_good_import(&mut self, hash: TxHash) {
378 self.transactions_by_peers.remove(&hash);
379 }
380
381 fn on_bad_import(&mut self, err: PoolError) {
405 let peers = self.transactions_by_peers.remove(&err.hash);
406
407 if !err.is_bad_transaction() || self.network.is_syncing() {
409 return
410 }
411 if let Some(peers) = peers {
414 for peer_id in peers {
415 self.report_peer_bad_transactions(peer_id);
416 }
417 }
418 self.metrics.bad_imports.increment(1);
419 self.bad_imports.insert(err.hash);
420 }
421
422 fn on_fetch_hashes_pending_fetch(&mut self) {
424 let info = &self.pending_pool_imports_info;
426 let max_pending_pool_imports = info.max_pending_pool_imports;
427 let has_capacity_wrt_pending_pool_imports =
428 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
429
430 self.transaction_fetcher
431 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
432 }
433
434 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
435 let kind = match req_err {
436 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
437 RequestError::Timeout => ReputationChangeKind::Timeout,
438 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
439 return
441 }
442 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
443 };
444 self.report_peer(peer_id, kind);
445 }
446
447 #[inline]
448 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
449 let metrics = &self.metrics;
450
451 let TxManagerPollDurations {
452 acc_network_events,
453 acc_pending_imports,
454 acc_tx_events,
455 acc_imported_txns,
456 acc_fetch_events,
457 acc_pending_fetch,
458 acc_cmds,
459 } = poll_durations;
460
461 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
463 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
465 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
466 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
467 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
468 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
469 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
470 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
471 }
472}
473
474impl<Pool, N> TransactionsManager<Pool, N>
475where
476 Pool: TransactionPool,
477 N: NetworkPrimitives,
478{
479 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
481 for res in batch_results {
482 match res {
483 Ok(hash) => {
484 self.on_good_import(hash);
485 }
486 Err(err) => {
487 self.on_bad_import(err);
488 }
489 }
490 }
491 }
492
493 fn on_new_pooled_transaction_hashes(
495 &mut self,
496 peer_id: PeerId,
497 msg: NewPooledTransactionHashes,
498 ) {
499 if self.network.is_initially_syncing() {
501 return
502 }
503 if self.network.tx_gossip_disabled() {
504 return
505 }
506
507 let Some(peer) = self.peers.get_mut(&peer_id) else {
509 trace!(
510 peer_id = format!("{peer_id:#}"),
511 ?msg,
512 "discarding announcement from inactive peer"
513 );
514
515 return
516 };
517 let client = peer.client_version.clone();
518
519 let mut count_txns_already_seen_by_peer = 0;
521 for tx in msg.iter_hashes().copied() {
522 if !peer.seen_transactions.insert(tx) {
523 count_txns_already_seen_by_peer += 1;
524 }
525 }
526 if count_txns_already_seen_by_peer > 0 {
527 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
532 self.metrics
533 .occurrences_hash_already_seen_by_peer
534 .increment(count_txns_already_seen_by_peer);
535
536 trace!(target: "net::tx",
537 %count_txns_already_seen_by_peer,
538 peer_id=format!("{peer_id:#}"),
539 ?client,
540 "Peer sent hashes that have already been marked as seen by peer"
541 );
542
543 self.report_already_seen(peer_id);
544 }
545
546 let (validation_outcome, mut partially_valid_msg) =
548 self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
549
550 if validation_outcome == FilterOutcome::ReportPeer {
551 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
552 }
553
554 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
556
557 let hashes_count_pre_pool_filter = partially_valid_msg.len();
565 self.pool.retain_unknown(&mut partially_valid_msg);
566 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
567 let already_known_hashes_count =
568 hashes_count_pre_pool_filter - partially_valid_msg.len();
569 self.metrics
570 .occurrences_hashes_already_in_pool
571 .increment(already_known_hashes_count as u64);
572 }
573
574 if partially_valid_msg.is_empty() {
575 return
577 }
578
579 let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
584 .msg_version()
585 .expect("partially valid announcement should have version")
586 .is_eth68()
587 {
588 self.transaction_fetcher
590 .filter_valid_message
591 .filter_valid_entries_68(partially_valid_msg)
592 } else {
593 self.transaction_fetcher
595 .filter_valid_message
596 .filter_valid_entries_66(partially_valid_msg)
597 };
598
599 if validation_outcome == FilterOutcome::ReportPeer {
600 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
601 }
602
603 if valid_announcement_data.is_empty() {
604 return
606 }
607
608 let bad_imports = &self.bad_imports;
615 self.transaction_fetcher.filter_unseen_and_pending_hashes(
616 &mut valid_announcement_data,
617 |hash| bad_imports.contains(hash),
618 &peer_id,
619 &client,
620 );
621
622 if valid_announcement_data.is_empty() {
623 return
625 }
626
627 trace!(target: "net::tx::propagation",
628 peer_id=format!("{peer_id:#}"),
629 hashes_len=valid_announcement_data.iter().count(),
630 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
631 msg_version=%valid_announcement_data.msg_version(),
632 client_version=%client,
633 "received previously unseen and pending hashes in announcement from peer"
634 );
635
636 if !self.transaction_fetcher.is_idle(&peer_id) {
639 let msg_version = valid_announcement_data.msg_version();
641 let (hashes, _version) = valid_announcement_data.into_request_hashes();
642
643 trace!(target: "net::tx",
644 peer_id=format!("{peer_id:#}"),
645 hashes=?*hashes,
646 %msg_version,
647 %client,
648 "buffering hashes announced by busy peer"
649 );
650
651 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
652
653 return
654 }
655
656 let mut hashes_to_request =
657 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
658 let surplus_hashes =
659 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
660
661 if !surplus_hashes.is_empty() {
662 trace!(target: "net::tx",
663 peer_id=format!("{peer_id:#}"),
664 surplus_hashes=?*surplus_hashes,
665 %client,
666 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
667 );
668
669 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
670 }
671
672 trace!(target: "net::tx",
673 peer_id=format!("{peer_id:#}"),
674 hashes=?*hashes_to_request,
675 %client,
676 "sending hashes in `GetPooledTransactions` request to peer's session"
677 );
678
679 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
683 if let Some(failed_to_request_hashes) =
684 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
685 {
686 let conn_eth_version = peer.version;
687
688 trace!(target: "net::tx",
689 peer_id=format!("{peer_id:#}"),
690 failed_to_request_hashes=?*failed_to_request_hashes,
691 %conn_eth_version,
692 %client,
693 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
694 );
695 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
696 }
697 }
698}
699
700impl<Pool, N> TransactionsManager<Pool, N>
701where
702 Pool: TransactionPool + 'static,
703 N: NetworkPrimitives<
704 BroadcastedTransaction: SignedTransaction,
705 PooledTransaction: SignedTransaction,
706 >,
707 Pool::Transaction:
708 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
709{
710 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
722 if self.network.is_initially_syncing() {
724 return
725 }
726 if self.network.tx_gossip_disabled() {
727 return
728 }
729
730 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
731
732 self.propagate_all(hashes);
733 }
734
735 fn propagate_full_transactions_to_peer(
739 &mut self,
740 txs: Vec<TxHash>,
741 peer_id: PeerId,
742 propagation_mode: PropagationMode,
743 ) -> Option<PropagatedTransactions> {
744 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
745
746 let peer = self.peers.get_mut(&peer_id)?;
747 let mut propagated = PropagatedTransactions::default();
748
749 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
751
752 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
753
754 if propagation_mode.is_forced() {
755 full_transactions.extend(to_propagate);
757 } else {
758 for tx in to_propagate {
761 if !peer.seen_transactions.contains(tx.tx_hash()) {
762 full_transactions.push(&tx);
764 }
765 }
766 }
767
768 if full_transactions.is_empty() {
769 return None
771 }
772
773 let PropagateTransactions { pooled, full } = full_transactions.build();
774
775 if let Some(new_pooled_hashes) = pooled {
777 for hash in new_pooled_hashes.iter_hashes().copied() {
778 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
779 peer.seen_transactions.insert(hash);
781 }
782
783 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
785 }
786
787 if let Some(new_full_transactions) = full {
789 for tx in &new_full_transactions {
790 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
791 peer.seen_transactions.insert(*tx.tx_hash());
793 }
794
795 self.network.send_transactions(peer_id, new_full_transactions);
797 }
798
799 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
801
802 Some(propagated)
803 }
804
805 fn propagate_hashes_to(
809 &mut self,
810 hashes: Vec<TxHash>,
811 peer_id: PeerId,
812 propagation_mode: PropagationMode,
813 ) {
814 trace!(target: "net::tx", "Start propagating transactions as hashes");
815
816 let propagated = {
819 let Some(peer) = self.peers.get_mut(&peer_id) else {
820 return
822 };
823
824 let to_propagate = self
825 .pool
826 .get_all(hashes)
827 .into_iter()
828 .map(PropagateTransaction::pool_tx)
829 .collect::<Vec<_>>();
830
831 let mut propagated = PropagatedTransactions::default();
832
833 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
835
836 if propagation_mode.is_forced() {
837 hashes.extend(to_propagate)
838 } else {
839 for tx in to_propagate {
840 if !peer.seen_transactions.contains(tx.tx_hash()) {
841 hashes.push(&tx);
843 }
844 }
845 }
846
847 let new_pooled_hashes = hashes.build();
848
849 if new_pooled_hashes.is_empty() {
850 return
852 }
853
854 for hash in new_pooled_hashes.iter_hashes().copied() {
855 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
856 }
857
858 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
859
860 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
862
863 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
865
866 propagated
867 };
868
869 self.pool.on_propagated(propagated);
871 }
872
873 fn propagate_transactions(
880 &mut self,
881 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
882 propagation_mode: PropagationMode,
883 ) -> PropagatedTransactions {
884 let mut propagated = PropagatedTransactions::default();
885 if self.network.tx_gossip_disabled() {
886 return propagated
887 }
888
889 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
891
892 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
894 let mut builder = if peer_idx > max_num_full {
896 PropagateTransactionsBuilder::pooled(peer.version)
897 } else {
898 PropagateTransactionsBuilder::full(peer.version)
899 };
900
901 if propagation_mode.is_forced() {
902 builder.extend(to_propagate.iter());
903 } else {
904 for tx in &to_propagate {
908 if !peer.seen_transactions.contains(tx.tx_hash()) {
911 builder.push(tx);
912 }
913 }
914 }
915
916 if builder.is_empty() {
917 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
918 continue
919 }
920
921 let PropagateTransactions { pooled, full } = builder.build();
922
923 if let Some(mut new_pooled_hashes) = pooled {
925 new_pooled_hashes
928 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
929
930 for hash in new_pooled_hashes.iter_hashes().copied() {
931 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
932 peer.seen_transactions.insert(hash);
934 }
935
936 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
937
938 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
940 }
941
942 if let Some(new_full_transactions) = full {
944 for tx in &new_full_transactions {
945 propagated
946 .0
947 .entry(*tx.tx_hash())
948 .or_default()
949 .push(PropagateKind::Full(*peer_id));
950 peer.seen_transactions.insert(*tx.tx_hash());
952 }
953
954 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
955
956 self.network.send_transactions(*peer_id, new_full_transactions);
958 }
959 }
960
961 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
963
964 propagated
965 }
966
967 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
972 let propagated = self.propagate_transactions(
973 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
974 PropagationMode::Basic,
975 );
976
977 self.pool.on_propagated(propagated);
979 }
980
981 fn on_get_pooled_transactions(
983 &mut self,
984 peer_id: PeerId,
985 request: GetPooledTransactions,
986 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
987 ) {
988 if let Some(peer) = self.peers.get_mut(&peer_id) {
989 if self.network.tx_gossip_disabled() {
990 let _ = response.send(Ok(PooledTransactions::default()));
991 return
992 }
993 let transactions = self.pool.get_pooled_transaction_elements(
994 request.0,
995 GetPooledTransactionLimit::ResponseSizeSoftLimit(
996 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
997 ),
998 );
999 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1000
1001 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1004
1005 let resp = PooledTransactions(transactions);
1006 let _ = response.send(Ok(resp));
1007 }
1008 }
1009
1010 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1012 match cmd {
1013 TransactionsCommand::PropagateHash(hash) => {
1014 self.on_new_pending_transactions(vec![hash])
1015 }
1016 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1017 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1018 }
1019 TransactionsCommand::GetActivePeers(tx) => {
1020 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1021 tx.send(peers).ok();
1022 }
1023 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1024 if let Some(propagated) =
1025 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1026 {
1027 self.pool.on_propagated(propagated);
1028 }
1029 }
1030 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1031 TransactionsCommand::BroadcastTransactions(txs) => {
1032 self.propagate_transactions(txs, PropagationMode::Forced);
1033 }
1034 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1035 let mut res = HashMap::with_capacity(peers.len());
1036 for peer_id in peers {
1037 let hashes = self
1038 .peers
1039 .get(&peer_id)
1040 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1041 .unwrap_or_default();
1042 res.insert(peer_id, hashes);
1043 }
1044 tx.send(res).ok();
1045 }
1046 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1047 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1048 peer_request_sender.send(sender).ok();
1049 }
1050 }
1051 }
1052
1053 fn handle_peer_session(
1055 &mut self,
1056 info: SessionInfo,
1057 messages: PeerRequestSender<PeerRequest<N>>,
1058 ) {
1059 let SessionInfo { peer_id, client_version, version, .. } = info;
1060
1061 let peer = PeerMetadata::<N>::new(
1063 messages,
1064 version,
1065 client_version,
1066 self.config.max_transactions_seen_by_peer_history,
1067 );
1068 let peer = match self.peers.entry(peer_id) {
1069 Entry::Occupied(mut entry) => {
1070 entry.insert(peer);
1071 entry.into_mut()
1072 }
1073 Entry::Vacant(entry) => entry.insert(peer),
1074 };
1075
1076 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1080 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1081 return
1082 }
1083
1084 let pooled_txs = self.pool.pooled_transactions_max(
1086 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1087 );
1088 if pooled_txs.is_empty() {
1089 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1090 return;
1091 }
1092
1093 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1095 for pooled_tx in pooled_txs {
1096 peer.seen_transactions.insert(*pooled_tx.hash());
1097 msg_builder.push_pooled(pooled_tx);
1098 }
1099
1100 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1101 let msg = msg_builder.build();
1102 self.network.send_transactions_hashes(peer_id, msg);
1103 }
1104
1105 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1107 match event_result {
1108 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1109 self.peers.remove(&peer_id);
1111 self.transaction_fetcher.remove_peer(&peer_id);
1112 }
1113 NetworkEvent::ActivePeerSession { info, messages } => {
1114 self.handle_peer_session(info, messages);
1116 }
1117 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1118 let peer_id = info.peer_id;
1119 let messages = match self.peers.get(&peer_id) {
1121 Some(p) => p.request_tx.clone(),
1122 None => {
1123 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1124 return;
1125 }
1126 };
1127 self.handle_peer_session(info, messages);
1128 }
1129 _ => {}
1130 }
1131 }
1132
1133 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1135 match event {
1136 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1137 let has_blob_txs = msg.has_eip4844();
1141
1142 let non_blob_txs = msg
1143 .0
1144 .into_iter()
1145 .map(N::PooledTransaction::try_from)
1146 .filter_map(Result::ok)
1147 .collect();
1148
1149 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1150
1151 if has_blob_txs {
1152 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1153 self.report_peer_bad_transactions(peer_id);
1154 }
1155 }
1156 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1157 self.on_new_pooled_transaction_hashes(peer_id, msg)
1158 }
1159 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1160 self.on_get_pooled_transactions(peer_id, request, response)
1161 }
1162 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1163 let _ = response.send(Some(self.handle()));
1164 }
1165 }
1166 }
1167
1168 fn import_transactions(
1170 &mut self,
1171 peer_id: PeerId,
1172 transactions: PooledTransactions<N::PooledTransaction>,
1173 source: TransactionSource,
1174 ) {
1175 if self.network.is_initially_syncing() {
1177 return
1178 }
1179 if self.network.tx_gossip_disabled() {
1180 return
1181 }
1182
1183 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1184 let mut transactions = transactions.0;
1185
1186 self.transaction_fetcher
1188 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1189
1190 let mut num_already_seen_by_peer = 0;
1195 for tx in &transactions {
1196 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1197 num_already_seen_by_peer += 1;
1198 }
1199 }
1200
1201 let txns_count_pre_pool_filter = transactions.len();
1203 self.pool.retain_unknown(&mut transactions);
1204 if txns_count_pre_pool_filter > transactions.len() {
1205 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1206 self.metrics
1207 .occurrences_transactions_already_in_pool
1208 .increment(already_known_txns_count as u64);
1209 }
1210
1211 let mut has_bad_transactions = false;
1213
1214 if let Some(peer) = self.peers.get_mut(&peer_id) {
1216 let mut new_txs = Vec::with_capacity(transactions.len());
1218 for tx in transactions {
1219 let tx = match tx.try_into_recovered() {
1221 Ok(tx) => tx,
1222 Err(badtx) => {
1223 trace!(target: "net::tx",
1224 peer_id=format!("{peer_id:#}"),
1225 hash=%badtx.tx_hash(),
1226 client_version=%peer.client_version,
1227 "failed ecrecovery for transaction"
1228 );
1229 has_bad_transactions = true;
1230 continue
1231 }
1232 };
1233
1234 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1235 Entry::Occupied(mut entry) => {
1236 entry.get_mut().insert(peer_id);
1238 }
1239 Entry::Vacant(entry) => {
1240 if self.bad_imports.contains(tx.tx_hash()) {
1241 trace!(target: "net::tx",
1242 peer_id=format!("{peer_id:#}"),
1243 hash=%tx.tx_hash(),
1244 client_version=%peer.client_version,
1245 "received a known bad transaction from peer"
1246 );
1247 has_bad_transactions = true;
1248 } else {
1249 let pool_transaction = Pool::Transaction::from_pooled(tx);
1252 new_txs.push(pool_transaction);
1253
1254 entry.insert(HashSet::from([peer_id]));
1255 }
1256 }
1257 }
1258 }
1259 new_txs.shrink_to_fit();
1260
1261 if !new_txs.is_empty() {
1264 let pool = self.pool.clone();
1265 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1267 metric_pending_pool_imports.increment(new_txs.len() as f64);
1268
1269 self.pending_pool_imports_info
1271 .pending_pool_imports
1272 .fetch_add(new_txs.len(), Ordering::Relaxed);
1273 let tx_manager_info_pending_pool_imports =
1274 self.pending_pool_imports_info.pending_pool_imports.clone();
1275
1276 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1277 let import = Box::pin(async move {
1278 let added = new_txs.len();
1279 let res = pool.add_external_transactions(new_txs).await;
1280
1281 metric_pending_pool_imports.decrement(added as f64);
1283 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1285
1286 res
1287 });
1288
1289 self.pool_imports.push(import);
1290 }
1291
1292 if num_already_seen_by_peer > 0 {
1293 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1294 self.metrics
1295 .occurrences_of_transaction_already_seen_by_peer
1296 .increment(num_already_seen_by_peer);
1297 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1298 }
1299 }
1300
1301 if has_bad_transactions {
1302 self.report_peer_bad_transactions(peer_id)
1304 }
1305
1306 if num_already_seen_by_peer > 0 {
1307 self.report_already_seen(peer_id);
1308 }
1309 }
1310
1311 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1313 match fetch_event {
1314 FetchEvent::TransactionsFetched { peer_id, transactions } => {
1315 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1316 }
1317 FetchEvent::FetchError { peer_id, error } => {
1318 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1319 self.on_request_error(peer_id, error);
1320 }
1321 FetchEvent::EmptyResponse { peer_id } => {
1322 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1323 }
1324 }
1325 }
1326}
1327
1328impl<Pool, N> Future for TransactionsManager<Pool, N>
1336where
1337 Pool: TransactionPool + Unpin + 'static,
1338 N: NetworkPrimitives<
1339 BroadcastedTransaction: SignedTransaction,
1340 PooledTransaction: SignedTransaction,
1341 >,
1342 Pool::Transaction:
1343 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1344{
1345 type Output = ();
1346
1347 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1348 let start = Instant::now();
1349 let mut poll_durations = TxManagerPollDurations::default();
1350
1351 let this = self.get_mut();
1352
1353 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1359 poll_durations.acc_network_events,
1360 "net::tx",
1361 "Network events stream",
1362 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1363 this.network_events.poll_next_unpin(cx),
1364 |event| this.on_network_event(event)
1365 );
1366
1367 let mut new_txs = Vec::new();
1376 let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1377 poll_durations.acc_imported_txns,
1378 "net::tx",
1379 "Pending transactions stream",
1380 DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1381 this.pending_transactions.poll_next_unpin(cx),
1382 |hash| new_txs.push(hash)
1383 );
1384 if !new_txs.is_empty() {
1385 this.on_new_pending_transactions(new_txs);
1386 }
1387
1388 let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1399 poll_durations.acc_fetch_events,
1400 "net::tx",
1401 "Transaction fetch events stream",
1402 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1403 this.transaction_fetcher.poll_next_unpin(cx),
1404 |event| this.on_fetch_event(event),
1405 );
1406
1407 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1422 poll_durations.acc_tx_events,
1423 "net::tx",
1424 "Network transaction events stream",
1425 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1426 this.transaction_events.poll_next_unpin(cx),
1427 |event| this.on_network_tx_event(event),
1428 );
1429
1430 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1445 poll_durations.acc_pending_imports,
1446 "net::tx",
1447 "Batched pool imports stream",
1448 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1449 this.pool_imports.poll_next_unpin(cx),
1450 |batch_results| this.on_batch_import_result(batch_results)
1451 );
1452
1453 duration_metered_exec!(
1458 {
1459 if this.has_capacity_for_fetching_pending_hashes() {
1460 this.on_fetch_hashes_pending_fetch();
1461 }
1462 },
1463 poll_durations.acc_pending_fetch
1464 );
1465
1466 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1468 poll_durations.acc_cmds,
1469 "net::tx",
1470 "Commands channel",
1471 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1472 this.command_rx.poll_next_unpin(cx),
1473 |cmd| this.on_command(cmd)
1474 );
1475
1476 this.transaction_fetcher.update_metrics();
1477
1478 if maybe_more_network_events ||
1480 maybe_more_commands ||
1481 maybe_more_tx_events ||
1482 maybe_more_tx_fetch_events ||
1483 maybe_more_pool_imports ||
1484 maybe_more_pending_txns
1485 {
1486 cx.waker().wake_by_ref();
1488 return Poll::Pending
1489 }
1490
1491 this.update_poll_metrics(start, poll_durations);
1492
1493 Poll::Pending
1494 }
1495}
1496
1497#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1501enum PropagationMode {
1502 Basic,
1506 Forced,
1511}
1512
1513impl PropagationMode {
1514 const fn is_forced(self) -> bool {
1516 matches!(self, Self::Forced)
1517 }
1518}
1519
1520#[derive(Debug, Clone)]
1522struct PropagateTransaction<T = TransactionSigned> {
1523 size: usize,
1524 transaction: Arc<T>,
1525}
1526
1527impl<T: SignedTransaction> PropagateTransaction<T> {
1528 pub fn new(transaction: T) -> Self {
1530 let size = transaction.length();
1531 Self { size, transaction: Arc::new(transaction) }
1532 }
1533
1534 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1536 where
1537 P: PoolTransaction<Consensus = T>,
1538 {
1539 let size = tx.encoded_length();
1540 let transaction = tx.transaction.clone_into_consensus();
1541 let transaction = Arc::new(transaction.into_inner());
1542 Self { size, transaction }
1543 }
1544
1545 fn tx_hash(&self) -> &TxHash {
1546 self.transaction.tx_hash()
1547 }
1548}
1549
1550#[derive(Debug, Clone)]
1553enum PropagateTransactionsBuilder<T> {
1554 Pooled(PooledTransactionsHashesBuilder),
1555 Full(FullTransactionsBuilder<T>),
1556}
1557
1558impl<T> PropagateTransactionsBuilder<T> {
1559 fn pooled(version: EthVersion) -> Self {
1561 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1562 }
1563
1564 fn full(version: EthVersion) -> Self {
1566 Self::Full(FullTransactionsBuilder::new(version))
1567 }
1568
1569 fn is_empty(&self) -> bool {
1571 match self {
1572 Self::Pooled(builder) => builder.is_empty(),
1573 Self::Full(builder) => builder.is_empty(),
1574 }
1575 }
1576
1577 fn build(self) -> PropagateTransactions<T> {
1579 match self {
1580 Self::Pooled(pooled) => {
1581 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1582 }
1583 Self::Full(full) => full.build(),
1584 }
1585 }
1586}
1587
1588impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1589 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1591 for tx in txs {
1592 self.push(tx);
1593 }
1594 }
1595
1596 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1598 match self {
1599 Self::Pooled(builder) => builder.push(transaction),
1600 Self::Full(builder) => builder.push(transaction),
1601 }
1602 }
1603}
1604
1605struct PropagateTransactions<T> {
1607 pooled: Option<NewPooledTransactionHashes>,
1609 full: Option<Vec<Arc<T>>>,
1611}
1612
1613#[derive(Debug, Clone)]
1618struct FullTransactionsBuilder<T> {
1619 total_size: usize,
1621 transactions: Vec<Arc<T>>,
1623 pooled: PooledTransactionsHashesBuilder,
1625}
1626
1627impl<T> FullTransactionsBuilder<T> {
1628 fn new(version: EthVersion) -> Self {
1630 Self {
1631 total_size: 0,
1632 pooled: PooledTransactionsHashesBuilder::new(version),
1633 transactions: vec![],
1634 }
1635 }
1636
1637 fn is_empty(&self) -> bool {
1639 self.transactions.is_empty() && self.pooled.is_empty()
1640 }
1641
1642 fn build(self) -> PropagateTransactions<T> {
1644 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1645 let full = Some(self.transactions).filter(|full| !full.is_empty());
1646 PropagateTransactions { pooled, full }
1647 }
1648}
1649
1650impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1651 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1653 for tx in txs {
1654 self.push(&tx)
1655 }
1656 }
1657
1658 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1668 if !transaction.transaction.is_broadcastable_in_full() {
1677 self.pooled.push(transaction);
1678 return
1679 }
1680
1681 let new_size = self.total_size + transaction.size;
1682 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1683 self.total_size > 0
1684 {
1685 self.pooled.push(transaction);
1687 return
1688 }
1689
1690 self.total_size = new_size;
1691 self.transactions.push(Arc::clone(&transaction.transaction));
1692 }
1693}
1694
1695#[derive(Debug, Clone)]
1698enum PooledTransactionsHashesBuilder {
1699 Eth66(NewPooledTransactionHashes66),
1700 Eth68(NewPooledTransactionHashes68),
1701}
1702
1703impl PooledTransactionsHashesBuilder {
1706 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1708 match self {
1709 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1710 Self::Eth68(msg) => {
1711 msg.hashes.push(*pooled_tx.hash());
1712 msg.sizes.push(pooled_tx.encoded_length());
1713 msg.types.push(pooled_tx.transaction.ty());
1714 }
1715 }
1716 }
1717
1718 fn is_empty(&self) -> bool {
1720 match self {
1721 Self::Eth66(hashes) => hashes.is_empty(),
1722 Self::Eth68(hashes) => hashes.is_empty(),
1723 }
1724 }
1725
1726 fn extend<T: SignedTransaction>(
1728 &mut self,
1729 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1730 ) {
1731 for tx in txs {
1732 self.push(&tx);
1733 }
1734 }
1735
1736 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1737 match self {
1738 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1739 Self::Eth68(msg) => {
1740 msg.hashes.push(*tx.tx_hash());
1741 msg.sizes.push(tx.size);
1742 msg.types.push(tx.transaction.ty());
1743 }
1744 }
1745 }
1746
1747 fn new(version: EthVersion) -> Self {
1749 match version {
1750 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1751 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1752 }
1753 }
1754
1755 fn build(self) -> NewPooledTransactionHashes {
1756 match self {
1757 Self::Eth66(msg) => msg.into(),
1758 Self::Eth68(msg) => msg.into(),
1759 }
1760 }
1761}
1762
1763enum TransactionSource {
1765 Broadcast,
1767 Response,
1769}
1770
1771impl TransactionSource {
1774 const fn is_broadcast(&self) -> bool {
1776 matches!(self, Self::Broadcast)
1777 }
1778}
1779
1780#[derive(Debug)]
1782pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1783 seen_transactions: LruCache<TxHash>,
1787 request_tx: PeerRequestSender<PeerRequest<N>>,
1789 version: EthVersion,
1791 client_version: Arc<str>,
1793}
1794
1795impl<N: NetworkPrimitives> PeerMetadata<N> {
1796 fn new(
1798 request_tx: PeerRequestSender<PeerRequest<N>>,
1799 version: EthVersion,
1800 client_version: Arc<str>,
1801 max_transactions_seen_by_peer: u32,
1802 ) -> Self {
1803 Self {
1804 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1805 request_tx,
1806 version,
1807 client_version,
1808 }
1809 }
1810}
1811
1812#[derive(Debug)]
1814enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1815 PropagateHash(B256),
1817 PropagateHashesTo(Vec<B256>, PeerId),
1819 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
1821 PropagateTransactionsTo(Vec<TxHash>, PeerId),
1823 PropagateTransactions(Vec<TxHash>),
1825 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
1827 GetTransactionHashes {
1829 peers: Vec<PeerId>,
1830 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
1831 },
1832 GetPeerSender {
1834 peer_id: PeerId,
1835 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
1836 },
1837}
1838
1839#[derive(Debug)]
1841pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
1842 IncomingTransactions {
1846 peer_id: PeerId,
1848 msg: Transactions<N::BroadcastedTransaction>,
1850 },
1851 IncomingPooledTransactionHashes {
1853 peer_id: PeerId,
1855 msg: NewPooledTransactionHashes,
1857 },
1858 GetPooledTransactions {
1860 peer_id: PeerId,
1862 request: GetPooledTransactions,
1864 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1866 },
1867 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
1869}
1870
1871#[derive(Debug)]
1873pub struct PendingPoolImportsInfo {
1874 pending_pool_imports: Arc<AtomicUsize>,
1876 max_pending_pool_imports: usize,
1878}
1879
1880impl PendingPoolImportsInfo {
1881 pub fn new(max_pending_pool_imports: usize) -> Self {
1883 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
1884 }
1885
1886 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
1888 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
1889 }
1890}
1891
1892impl Default for PendingPoolImportsInfo {
1893 fn default() -> Self {
1894 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
1895 }
1896}
1897
1898#[derive(Debug, Default)]
1899struct TxManagerPollDurations {
1900 acc_network_events: Duration,
1901 acc_pending_imports: Duration,
1902 acc_tx_events: Duration,
1903 acc_imported_txns: Duration,
1904 acc_fetch_events: Duration,
1905 acc_pending_fetch: Duration,
1906 acc_cmds: Duration,
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911 use super::*;
1912 use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
1913 use alloy_consensus::{transaction::PooledTransaction, TxEip1559, TxLegacy};
1914 use alloy_primitives::{hex, PrimitiveSignature as Signature, TxKind, U256};
1915 use alloy_rlp::Decodable;
1916 use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS;
1917 use futures::FutureExt;
1918 use reth_chainspec::MIN_TRANSACTION_GAS;
1919 use reth_ethereum_primitives::{Transaction, TransactionSigned};
1920 use reth_network_api::{NetworkInfo, PeerKind};
1921 use reth_network_p2p::{
1922 error::{RequestError, RequestResult},
1923 sync::{NetworkSyncUpdater, SyncState},
1924 };
1925 use reth_storage_api::noop::NoopProvider;
1926 use reth_transaction_pool::test_utils::{
1927 testing_pool, MockTransaction, MockTransactionFactory, TestPool,
1928 };
1929 use secp256k1::SecretKey;
1930 use std::{
1931 fmt,
1932 future::poll_fn,
1933 hash,
1934 net::{IpAddr, Ipv4Addr, SocketAddr},
1935 str::FromStr,
1936 };
1937 use tests::fetcher::TxFetchMetadata;
1938 use tracing::error;
1939
1940 async fn new_tx_manager(
1941 ) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
1942 {
1943 let secret_key = SecretKey::new(&mut rand::thread_rng());
1944 let client = NoopProvider::default();
1945
1946 let config = NetworkConfigBuilder::new(secret_key)
1947 .listener_port(0)
1949 .disable_discovery()
1950 .build(client);
1951
1952 let pool = testing_pool();
1953
1954 let transactions_manager_config = config.transactions_manager_config.clone();
1955 let (_network_handle, network, transactions, _) = NetworkManager::new(config)
1956 .await
1957 .unwrap()
1958 .into_builder()
1959 .transactions(pool.clone(), transactions_manager_config)
1960 .split_with_handle();
1961
1962 (transactions, network)
1963 }
1964
1965 pub(super) fn default_cache<T: hash::Hash + Eq + fmt::Debug>() -> LruCache<T> {
1966 LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32)
1967 }
1968
1969 pub(super) fn new_mock_session(
1971 peer_id: PeerId,
1972 version: EthVersion,
1973 ) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
1974 let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
1975
1976 (
1977 PeerMetadata::new(
1978 PeerRequestSender::new(peer_id, to_mock_session_tx),
1979 version,
1980 Arc::from(""),
1981 DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
1982 ),
1983 to_mock_session_rx,
1984 )
1985 }
1986
1987 #[tokio::test(flavor = "multi_thread")]
1988 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
1989 reth_tracing::init_test_tracing();
1990 let net = Testnet::create(3).await;
1991
1992 let mut handles = net.handles();
1993 let handle0 = handles.next().unwrap();
1994 let handle1 = handles.next().unwrap();
1995
1996 drop(handles);
1997 let handle = net.spawn();
1998
1999 let listener0 = handle0.event_listener();
2000 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2001 let secret_key = SecretKey::new(&mut rand::thread_rng());
2002
2003 let client = NoopProvider::default();
2004 let pool = testing_pool();
2005 let config = NetworkConfigBuilder::eth(secret_key)
2006 .disable_discovery()
2007 .listener_port(0)
2008 .build(client);
2009 let transactions_manager_config = config.transactions_manager_config.clone();
2010 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2011 .await
2012 .unwrap()
2013 .into_builder()
2014 .transactions(pool.clone(), transactions_manager_config)
2015 .split_with_handle();
2016
2017 tokio::task::spawn(network);
2018
2019 network_handle.update_sync_state(SyncState::Syncing);
2021 assert!(NetworkInfo::is_syncing(&network_handle));
2022 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2023
2024 let mut established = listener0.take(2);
2026 while let Some(ev) = established.next().await {
2027 match ev {
2028 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2029 transactions
2031 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2032 }
2033 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2034 ev => {
2035 error!("unexpected event {ev:?}")
2036 }
2037 }
2038 }
2039 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2041 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2042 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2043 peer_id: *handle1.peer_id(),
2044 msg: Transactions(vec![signed_tx.clone()]),
2045 });
2046 poll_fn(|cx| {
2047 let _ = transactions.poll_unpin(cx);
2048 Poll::Ready(())
2049 })
2050 .await;
2051 assert!(pool.is_empty());
2052 handle.terminate().await;
2053 }
2054
2055 #[tokio::test(flavor = "multi_thread")]
2056 async fn test_tx_broadcasts_through_two_syncs() {
2057 reth_tracing::init_test_tracing();
2058 let net = Testnet::create(3).await;
2059
2060 let mut handles = net.handles();
2061 let handle0 = handles.next().unwrap();
2062 let handle1 = handles.next().unwrap();
2063
2064 drop(handles);
2065 let handle = net.spawn();
2066
2067 let listener0 = handle0.event_listener();
2068 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2069 let secret_key = SecretKey::new(&mut rand::thread_rng());
2070
2071 let client = NoopProvider::default();
2072 let pool = testing_pool();
2073 let config = NetworkConfigBuilder::new(secret_key)
2074 .disable_discovery()
2075 .listener_port(0)
2076 .build(client);
2077 let transactions_manager_config = config.transactions_manager_config.clone();
2078 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2079 .await
2080 .unwrap()
2081 .into_builder()
2082 .transactions(pool.clone(), transactions_manager_config)
2083 .split_with_handle();
2084
2085 tokio::task::spawn(network);
2086
2087 network_handle.update_sync_state(SyncState::Syncing);
2089 assert!(NetworkInfo::is_syncing(&network_handle));
2090 network_handle.update_sync_state(SyncState::Idle);
2091 assert!(!NetworkInfo::is_syncing(&network_handle));
2092 network_handle.update_sync_state(SyncState::Syncing);
2093 assert!(NetworkInfo::is_syncing(&network_handle));
2094
2095 let mut established = listener0.take(2);
2097 while let Some(ev) = established.next().await {
2098 match ev {
2099 NetworkEvent::ActivePeerSession { .. } |
2100 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2101 transactions.on_network_event(ev);
2103 }
2104 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2105 _ => {
2106 error!("unexpected event {ev:?}")
2107 }
2108 }
2109 }
2110 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2112 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2113 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2114 peer_id: *handle1.peer_id(),
2115 msg: Transactions(vec![signed_tx.clone()]),
2116 });
2117 poll_fn(|cx| {
2118 let _ = transactions.poll_unpin(cx);
2119 Poll::Ready(())
2120 })
2121 .await;
2122 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2123 assert!(NetworkInfo::is_syncing(&network_handle));
2124 assert!(!pool.is_empty());
2125 handle.terminate().await;
2126 }
2127
2128 #[tokio::test(flavor = "multi_thread")]
2129 async fn test_handle_incoming_transactions() {
2130 reth_tracing::init_test_tracing();
2131 let net = Testnet::create(3).await;
2132
2133 let mut handles = net.handles();
2134 let handle0 = handles.next().unwrap();
2135 let handle1 = handles.next().unwrap();
2136
2137 drop(handles);
2138 let handle = net.spawn();
2139
2140 let listener0 = handle0.event_listener();
2141
2142 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2143 let secret_key = SecretKey::new(&mut rand::thread_rng());
2144
2145 let client = NoopProvider::default();
2146 let pool = testing_pool();
2147 let config = NetworkConfigBuilder::new(secret_key)
2148 .disable_discovery()
2149 .listener_port(0)
2150 .build(client);
2151 let transactions_manager_config = config.transactions_manager_config.clone();
2152 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2153 .await
2154 .unwrap()
2155 .into_builder()
2156 .transactions(pool.clone(), transactions_manager_config)
2157 .split_with_handle();
2158 tokio::task::spawn(network);
2159
2160 network_handle.update_sync_state(SyncState::Idle);
2161
2162 assert!(!NetworkInfo::is_syncing(&network_handle));
2163
2164 let mut established = listener0.take(2);
2166 while let Some(ev) = established.next().await {
2167 match ev {
2168 NetworkEvent::ActivePeerSession { .. } |
2169 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2170 transactions.on_network_event(ev);
2172 }
2173 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2174 ev => {
2175 error!("unexpected event {ev:?}")
2176 }
2177 }
2178 }
2179 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2181 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2182 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2183 peer_id: *handle1.peer_id(),
2184 msg: Transactions(vec![signed_tx.clone()]),
2185 });
2186 assert!(transactions
2187 .transactions_by_peers
2188 .get(signed_tx.tx_hash())
2189 .unwrap()
2190 .contains(handle1.peer_id()));
2191
2192 poll_fn(|cx| {
2194 let _ = transactions.poll_unpin(cx);
2195 Poll::Ready(())
2196 })
2197 .await;
2198
2199 assert!(!pool.is_empty());
2200 assert!(pool.get(signed_tx.tx_hash()).is_some());
2201 handle.terminate().await;
2202 }
2203
2204 #[tokio::test(flavor = "multi_thread")]
2205 async fn test_on_get_pooled_transactions_network() {
2206 reth_tracing::init_test_tracing();
2207 let net = Testnet::create(2).await;
2208
2209 let mut handles = net.handles();
2210 let handle0 = handles.next().unwrap();
2211 let handle1 = handles.next().unwrap();
2212
2213 drop(handles);
2214 let handle = net.spawn();
2215
2216 let listener0 = handle0.event_listener();
2217
2218 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2219 let secret_key = SecretKey::new(&mut rand::thread_rng());
2220
2221 let client = NoopProvider::default();
2222 let pool = testing_pool();
2223 let config = NetworkConfigBuilder::new(secret_key)
2224 .disable_discovery()
2225 .listener_port(0)
2226 .build(client);
2227 let transactions_manager_config = config.transactions_manager_config.clone();
2228 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2229 .await
2230 .unwrap()
2231 .into_builder()
2232 .transactions(pool.clone(), transactions_manager_config)
2233 .split_with_handle();
2234 tokio::task::spawn(network);
2235
2236 network_handle.update_sync_state(SyncState::Idle);
2237
2238 assert!(!NetworkInfo::is_syncing(&network_handle));
2239
2240 let mut established = listener0.take(2);
2242 while let Some(ev) = established.next().await {
2243 match ev {
2244 NetworkEvent::ActivePeerSession { .. } |
2245 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2246 transactions.on_network_event(ev);
2247 }
2248 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2249 ev => {
2250 error!("unexpected event {ev:?}")
2251 }
2252 }
2253 }
2254 handle.terminate().await;
2255
2256 let tx = MockTransaction::eip1559();
2257 let _ = transactions
2258 .pool
2259 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2260 .await;
2261
2262 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2263
2264 let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
2265
2266 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2267 peer_id: *handle1.peer_id(),
2268 request,
2269 response: send,
2270 });
2271
2272 match receive.await.unwrap() {
2273 Ok(PooledTransactions(transactions)) => {
2274 assert_eq!(transactions.len(), 1);
2275 }
2276 Err(e) => {
2277 panic!("error: {e:?}");
2278 }
2279 }
2280 }
2281
2282 #[tokio::test]
2286 async fn test_partially_tx_response() {
2287 reth_tracing::init_test_tracing();
2288
2289 let mut tx_manager = new_tx_manager().await.0;
2290 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2291
2292 let peer_id_1 = PeerId::new([1; 64]);
2293 let eth_version = EthVersion::Eth66;
2294
2295 let txs = vec![
2296 TransactionSigned::new_unhashed(
2297 Transaction::Legacy(TxLegacy {
2298 chain_id: Some(4),
2299 nonce: 15u64,
2300 gas_price: 2200000000,
2301 gas_limit: 34811,
2302 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2303 value: U256::from(1234u64),
2304 input: Default::default(),
2305 }),
2306 Signature::new(
2307 U256::from_str(
2308 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2309 )
2310 .unwrap(),
2311 U256::from_str(
2312 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2313 )
2314 .unwrap(),
2315 true,
2316 ),
2317 ),
2318 TransactionSigned::new_unhashed(
2319 Transaction::Eip1559(TxEip1559 {
2320 chain_id: 4,
2321 nonce: 26u64,
2322 max_priority_fee_per_gas: 1500000000,
2323 max_fee_per_gas: 1500000013,
2324 gas_limit: MIN_TRANSACTION_GAS,
2325 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2326 value: U256::from(3000000000000000000u64),
2327 input: Default::default(),
2328 access_list: Default::default(),
2329 }),
2330 Signature::new(
2331 U256::from_str(
2332 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2333 )
2334 .unwrap(),
2335 U256::from_str(
2336 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2337 )
2338 .unwrap(),
2339 true,
2340 ),
2341 ),
2342 ];
2343
2344 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2345
2346 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2347 peer_1.seen_transactions.insert(txs_hashes[0]);
2350 peer_1.seen_transactions.insert(txs_hashes[1]);
2351 tx_manager.peers.insert(peer_id_1, peer_1);
2352
2353 let mut backups = default_cache();
2354 backups.insert(peer_id_1);
2355
2356 let mut backups1 = default_cache();
2357 backups1.insert(peer_id_1);
2358
2359 tx_fetcher
2360 .hashes_fetch_inflight_and_pending_fetch
2361 .insert(txs_hashes[0], TxFetchMetadata::new(1, backups, None));
2362 tx_fetcher
2363 .hashes_fetch_inflight_and_pending_fetch
2364 .insert(txs_hashes[1], TxFetchMetadata::new(1, backups1, None));
2365 tx_fetcher.hashes_pending_fetch.insert(txs_hashes[0]);
2366 tx_fetcher.hashes_pending_fetch.insert(txs_hashes[1]);
2367
2368 assert!(tx_fetcher.is_idle(&peer_id_1));
2370 assert_eq!(tx_fetcher.active_peers.len(), 0);
2371
2372 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2374
2375 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2376 assert!(!tx_fetcher.is_idle(&peer_id_1));
2378 assert_eq!(tx_fetcher.active_peers.len(), 1);
2379
2380 let req = to_mock_session_rx
2382 .recv()
2383 .await
2384 .expect("peer_1 session should receive request with buffered hashes");
2385 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2386
2387 let message: Vec<PooledTransaction> = txs
2388 .into_iter()
2389 .take(1)
2390 .map(|tx| {
2391 PooledTransaction::try_from(tx)
2392 .expect("Failed to convert MockTransaction to PooledTransaction")
2393 })
2394 .collect();
2395 response
2397 .send(Ok(reth_eth_wire::PooledTransactions(message)))
2398 .expect("should send peer_1 response to tx manager");
2399 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2400 unreachable!()
2401 };
2402
2403 assert!(tx_fetcher.is_idle(&peer_id));
2405 assert_eq!(tx_fetcher.active_peers.len(), 0);
2406 assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 1);
2408 }
2409
2410 #[tokio::test]
2411 async fn test_max_retries_tx_request() {
2412 reth_tracing::init_test_tracing();
2413
2414 let mut tx_manager = new_tx_manager().await.0;
2415 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2416
2417 let peer_id_1 = PeerId::new([1; 64]);
2418 let peer_id_2 = PeerId::new([2; 64]);
2419 let eth_version = EthVersion::Eth66;
2420 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2421
2422 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2423 peer_1.seen_transactions.insert(seen_hashes[0]);
2426 peer_1.seen_transactions.insert(seen_hashes[1]);
2427 tx_manager.peers.insert(peer_id_1, peer_1);
2428
2429 let retries = 1;
2432 let mut backups = default_cache();
2433 backups.insert(peer_id_1);
2434
2435 let mut backups1 = default_cache();
2436 backups1.insert(peer_id_1);
2437 tx_fetcher
2438 .hashes_fetch_inflight_and_pending_fetch
2439 .insert(seen_hashes[1], TxFetchMetadata::new(retries, backups, None));
2440 tx_fetcher
2441 .hashes_fetch_inflight_and_pending_fetch
2442 .insert(seen_hashes[0], TxFetchMetadata::new(retries, backups1, None));
2443 tx_fetcher.hashes_pending_fetch.insert(seen_hashes[1]);
2444 tx_fetcher.hashes_pending_fetch.insert(seen_hashes[0]);
2445
2446 assert!(tx_fetcher.is_idle(&peer_id_1));
2448 assert_eq!(tx_fetcher.active_peers.len(), 0);
2449
2450 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2452
2453 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2454
2455 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2456 assert!(!tx_fetcher.is_idle(&peer_id_1));
2458 assert_eq!(tx_fetcher.active_peers.len(), 1);
2459
2460 let req = to_mock_session_rx
2462 .recv()
2463 .await
2464 .expect("peer_1 session should receive request with buffered hashes");
2465 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2466 let GetPooledTransactions(hashes) = request;
2467
2468 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2469
2470 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2471
2472 response
2474 .send(Err(RequestError::BadResponse))
2475 .expect("should send peer_1 response to tx manager");
2476 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2477 unreachable!()
2478 };
2479
2480 assert!(tx_fetcher.is_idle(&peer_id));
2482 assert_eq!(tx_fetcher.active_peers.len(), 0);
2483 assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 2);
2485
2486 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2487 tx_manager.peers.insert(peer_id_2, peer_2);
2488
2489 let msg =
2491 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2492 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2493
2494 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2495
2496 assert_eq!(tx_fetcher.active_peers.len(), 1);
2498
2499 assert_eq!(tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len(), 2);
2501 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2503
2504 let req = to_mock_session_rx
2506 .recv()
2507 .await
2508 .expect("peer_2 session should receive request with buffered hashes");
2509 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2510
2511 response
2513 .send(Err(RequestError::BadResponse))
2514 .expect("should send peer_2 response to tx manager");
2515 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2516
2517 assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2520 assert_eq!(tx_fetcher.active_peers.len(), 0);
2521 }
2522
2523 #[test]
2524 fn test_transaction_builder_empty() {
2525 let mut builder =
2526 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2527 assert!(builder.is_empty());
2528
2529 let mut factory = MockTransactionFactory::default();
2530 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2531 builder.push(&tx);
2532 assert!(!builder.is_empty());
2533
2534 let txs = builder.build();
2535 assert!(txs.full.is_none());
2536 let txs = txs.pooled.unwrap();
2537 assert_eq!(txs.len(), 1);
2538 }
2539
2540 #[test]
2541 fn test_transaction_builder_large() {
2542 let mut builder =
2543 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2544 assert!(builder.is_empty());
2545
2546 let mut factory = MockTransactionFactory::default();
2547 let mut tx = factory.create_eip1559();
2548 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2550 let tx = Arc::new(tx);
2551 let tx = PropagateTransaction::pool_tx(tx);
2552 builder.push(&tx);
2553 assert!(!builder.is_empty());
2554
2555 let txs = builder.clone().build();
2556 assert!(txs.pooled.is_none());
2557 let txs = txs.full.unwrap();
2558 assert_eq!(txs.len(), 1);
2559
2560 builder.push(&tx);
2561
2562 let txs = builder.clone().build();
2563 let pooled = txs.pooled.unwrap();
2564 assert_eq!(pooled.len(), 1);
2565 let txs = txs.full.unwrap();
2566 assert_eq!(txs.len(), 1);
2567 }
2568
2569 #[test]
2570 fn test_transaction_builder_eip4844() {
2571 let mut builder =
2572 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2573 assert!(builder.is_empty());
2574
2575 let mut factory = MockTransactionFactory::default();
2576 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2577 builder.push(&tx);
2578 assert!(!builder.is_empty());
2579
2580 let txs = builder.clone().build();
2581 assert!(txs.full.is_none());
2582 let txs = txs.pooled.unwrap();
2583 assert_eq!(txs.len(), 1);
2584
2585 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2586 builder.push(&tx);
2587
2588 let txs = builder.clone().build();
2589 let pooled = txs.pooled.unwrap();
2590 assert_eq!(pooled.len(), 1);
2591 let txs = txs.full.unwrap();
2592 assert_eq!(txs.len(), 1);
2593 }
2594
2595 #[tokio::test]
2596 async fn test_propagate_full() {
2597 reth_tracing::init_test_tracing();
2598
2599 let (mut tx_manager, network) = new_tx_manager().await;
2600 let peer_id = PeerId::random();
2601
2602 network.handle().update_sync_state(SyncState::Idle);
2604
2605 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2607
2608 let session_info = SessionInfo {
2609 peer_id,
2610 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2611 client_version: Arc::from(""),
2612 capabilities: Arc::new(vec![].into()),
2613 status: Arc::new(Default::default()),
2614 version: EthVersion::Eth68,
2615 peer_kind: PeerKind::Basic,
2616 };
2617 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2618 tx_manager
2619 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2620 let mut propagate = vec![];
2621 let mut factory = MockTransactionFactory::default();
2622 let eip1559_tx = Arc::new(factory.create_eip1559());
2623 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2624 let eip4844_tx = Arc::new(factory.create_eip4844());
2625 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2626
2627 let propagated =
2628 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2629 assert_eq!(propagated.0.len(), 2);
2630 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2631 assert_eq!(prop_txs.len(), 1);
2632 assert!(prop_txs[0].is_full());
2633
2634 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2635 assert_eq!(prop_txs.len(), 1);
2636 assert!(prop_txs[0].is_hash());
2637
2638 let peer = tx_manager.peers.get(&peer_id).unwrap();
2639 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2640 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2641 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2642
2643 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2645 assert!(propagated.0.is_empty());
2646 }
2647}