1pub mod config;
5pub mod constants;
7pub mod fetcher;
9pub mod validation;
10
11pub use self::constants::{
12 tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
13 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
14};
15use config::TransactionPropagationKind;
16pub use config::{
17 TransactionFetcherConfig, TransactionPropagationMode, TransactionPropagationPolicy,
18 TransactionsManagerConfig,
19};
20pub use validation::*;
21
22pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
23
24use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
25use crate::{
26 budget::{
27 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
28 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
29 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
30 },
31 cache::LruCache,
32 duration_metered_exec, metered_poll_nested_stream_with_budget,
33 metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
34 NetworkHandle,
35};
36use alloy_primitives::{TxHash, B256};
37use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
38use futures::{stream::FuturesUnordered, Future, StreamExt};
39use reth_eth_wire::{
40 DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
41 HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
42 NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
43 RequestTxHashes, Transactions,
44};
45use reth_ethereum_primitives::TransactionSigned;
46use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
47use reth_network_api::{
48 events::{PeerEvent, SessionInfo},
49 NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
50};
51use reth_network_p2p::{
52 error::{RequestError, RequestResult},
53 sync::SyncStateProvider,
54};
55use reth_network_peers::PeerId;
56use reth_network_types::ReputationChangeKind;
57use reth_primitives_traits::SignedTransaction;
58use reth_tokio_util::EventStream;
59use reth_transaction_pool::{
60 error::{PoolError, PoolResult},
61 GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
62 TransactionPool, ValidPoolTransaction,
63};
64use std::{
65 collections::{hash_map::Entry, HashMap, HashSet},
66 pin::Pin,
67 sync::{
68 atomic::{AtomicUsize, Ordering},
69 Arc,
70 },
71 task::{Context, Poll},
72 time::{Duration, Instant},
73};
74use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
75use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
76use tracing::{debug, trace};
77
78pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
82
83#[derive(Debug, Clone)]
91pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
92 manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
94}
95
96impl<N: NetworkPrimitives> TransactionsHandle<N> {
99 fn send(&self, cmd: TransactionsCommand<N>) {
100 let _ = self.manager_tx.send(cmd);
101 }
102
103 async fn peer_handle(
105 &self,
106 peer_id: PeerId,
107 ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
108 let (tx, rx) = oneshot::channel();
109 self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
110 rx.await
111 }
112
113 pub fn propagate(&self, hash: TxHash) {
115 self.send(TransactionsCommand::PropagateHash(hash))
116 }
117
118 pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
122 self.propagate_hashes_to(Some(hash), peer)
123 }
124
125 pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
129 let hashes = hash.into_iter().collect::<Vec<_>>();
130 if hashes.is_empty() {
131 return
132 }
133 self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
134 }
135
136 pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
138 let (tx, rx) = oneshot::channel();
139 self.send(TransactionsCommand::GetActivePeers(tx));
140 rx.await
141 }
142
143 pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
147 if transactions.is_empty() {
148 return
149 }
150 self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
151 }
152
153 pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
158 if transactions.is_empty() {
159 return
160 }
161 self.send(TransactionsCommand::PropagateTransactions(transactions))
162 }
163
164 pub fn broadcast_transactions(
169 &self,
170 transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
171 ) {
172 let transactions =
173 transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
174 if transactions.is_empty() {
175 return
176 }
177 self.send(TransactionsCommand::BroadcastTransactions(transactions))
178 }
179
180 pub async fn get_transaction_hashes(
182 &self,
183 peers: Vec<PeerId>,
184 ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
185 if peers.is_empty() {
186 return Ok(Default::default())
187 }
188 let (tx, rx) = oneshot::channel();
189 self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
190 rx.await
191 }
192
193 pub async fn get_peer_transaction_hashes(
195 &self,
196 peer: PeerId,
197 ) -> Result<HashSet<TxHash>, RecvError> {
198 let res = self.get_transaction_hashes(vec![peer]).await?;
199 Ok(res.into_values().next().unwrap_or_default())
200 }
201
202 pub async fn get_pooled_transactions_from(
208 &self,
209 peer_id: PeerId,
210 hashes: Vec<B256>,
211 ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
212 let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
213
214 let (tx, rx) = oneshot::channel();
215 let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
216 peer.try_send(request).ok();
217
218 rx.await?.map(|res| Some(res.0))
219 }
220}
221
222#[derive(Debug)]
240#[must_use = "Manager does nothing unless polled."]
241pub struct TransactionsManager<
242 Pool,
243 N: NetworkPrimitives = EthNetworkPrimitives,
244 P: TransactionPropagationPolicy = TransactionPropagationKind,
245> {
246 pool: Pool,
248 network: NetworkHandle<N>,
250 network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
254 transaction_fetcher: TransactionFetcher<N>,
256 transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
261 pool_imports: FuturesUnordered<PoolImportFuture>,
273 pending_pool_imports_info: PendingPoolImportsInfo,
275 bad_imports: LruCache<TxHash>,
277 peers: HashMap<PeerId, PeerMetadata<N>>,
279 command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
283 command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
288 pending_transactions: ReceiverStream<TxHash>,
297 transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
299 config: TransactionsManagerConfig,
301 propagation_policy: P,
303 metrics: TransactionsManagerMetrics,
305}
306
307impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
308 pub fn new(
312 network: NetworkHandle<N>,
313 pool: Pool,
314 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
315 transactions_manager_config: TransactionsManagerConfig,
316 ) -> Self {
317 Self::with_policy(
318 network,
319 pool,
320 from_network,
321 transactions_manager_config,
322 TransactionPropagationKind::default(),
323 )
324 }
325}
326
327impl<Pool: TransactionPool, N: NetworkPrimitives, P: TransactionPropagationPolicy>
328 TransactionsManager<Pool, N, P>
329{
330 pub fn with_policy(
334 network: NetworkHandle<N>,
335 pool: Pool,
336 from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
337 transactions_manager_config: TransactionsManagerConfig,
338 propagation_policy: P,
339 ) -> Self {
340 let network_events = network.event_listener();
341
342 let (command_tx, command_rx) = mpsc::unbounded_channel();
343
344 let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
345 &transactions_manager_config.transaction_fetcher_config,
346 );
347
348 let pending = pool.pending_transactions_listener();
351 let pending_pool_imports_info = PendingPoolImportsInfo::default();
352 let metrics = TransactionsManagerMetrics::default();
353 metrics
354 .capacity_pending_pool_imports
355 .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
356
357 Self {
358 pool,
359 network,
360 network_events,
361 transaction_fetcher,
362 transactions_by_peers: Default::default(),
363 pool_imports: Default::default(),
364 pending_pool_imports_info: PendingPoolImportsInfo::new(
365 DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
366 ),
367 bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
368 peers: Default::default(),
369 command_tx,
370 command_rx: UnboundedReceiverStream::new(command_rx),
371 pending_transactions: ReceiverStream::new(pending),
372 transaction_events: UnboundedMeteredReceiver::new(
373 from_network,
374 NETWORK_POOL_TRANSACTIONS_SCOPE,
375 ),
376 config: transactions_manager_config,
377 propagation_policy,
378 metrics,
379 }
380 }
381
382 pub fn handle(&self) -> TransactionsHandle<N> {
384 TransactionsHandle { manager_tx: self.command_tx.clone() }
385 }
386
387 fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
390 self.pending_pool_imports_info
391 .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
392 self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
393 }
394
395 fn report_peer_bad_transactions(&self, peer_id: PeerId) {
396 self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
397 self.metrics.reported_bad_transactions.increment(1);
398 }
399
400 fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
401 trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
402 self.network.reputation_change(peer_id, kind);
403 }
404
405 fn report_already_seen(&self, peer_id: PeerId) {
406 trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
407 self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
408 }
409
410 fn on_good_import(&mut self, hash: TxHash) {
412 self.transactions_by_peers.remove(&hash);
413 }
414
415 fn on_bad_import(&mut self, err: PoolError) {
439 let peers = self.transactions_by_peers.remove(&err.hash);
440
441 if !err.is_bad_transaction() || self.network.is_syncing() {
443 return
444 }
445 if let Some(peers) = peers {
448 for peer_id in peers {
449 self.report_peer_bad_transactions(peer_id);
450 }
451 }
452 self.metrics.bad_imports.increment(1);
453 self.bad_imports.insert(err.hash);
454 }
455
456 fn on_fetch_hashes_pending_fetch(&mut self) {
458 let info = &self.pending_pool_imports_info;
460 let max_pending_pool_imports = info.max_pending_pool_imports;
461 let has_capacity_wrt_pending_pool_imports =
462 |divisor| info.has_capacity(max_pending_pool_imports / divisor);
463
464 self.transaction_fetcher
465 .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
466 }
467
468 fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
469 let kind = match req_err {
470 RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
471 RequestError::Timeout => ReputationChangeKind::Timeout,
472 RequestError::ChannelClosed | RequestError::ConnectionDropped => {
473 return
475 }
476 RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
477 };
478 self.report_peer(peer_id, kind);
479 }
480
481 #[inline]
482 fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
483 let metrics = &self.metrics;
484
485 let TxManagerPollDurations {
486 acc_network_events,
487 acc_pending_imports,
488 acc_tx_events,
489 acc_imported_txns,
490 acc_fetch_events,
491 acc_pending_fetch,
492 acc_cmds,
493 } = poll_durations;
494
495 metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
497 metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
499 metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
500 metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
501 metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
502 metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
503 metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
504 metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
505 }
506}
507
508impl<Pool, N, Policy> TransactionsManager<Pool, N, Policy>
509where
510 Pool: TransactionPool,
511 N: NetworkPrimitives,
512 Policy: TransactionPropagationPolicy,
513{
514 fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
516 for res in batch_results {
517 match res {
518 Ok(hash) => {
519 self.on_good_import(hash);
520 }
521 Err(err) => {
522 self.on_bad_import(err);
523 }
524 }
525 }
526 }
527
528 fn on_new_pooled_transaction_hashes(
530 &mut self,
531 peer_id: PeerId,
532 msg: NewPooledTransactionHashes,
533 ) {
534 if self.network.is_initially_syncing() {
536 return
537 }
538 if self.network.tx_gossip_disabled() {
539 return
540 }
541
542 let Some(peer) = self.peers.get_mut(&peer_id) else {
544 trace!(
545 peer_id = format!("{peer_id:#}"),
546 ?msg,
547 "discarding announcement from inactive peer"
548 );
549
550 return
551 };
552 let client = peer.client_version.clone();
553
554 let mut count_txns_already_seen_by_peer = 0;
556 for tx in msg.iter_hashes().copied() {
557 if !peer.seen_transactions.insert(tx) {
558 count_txns_already_seen_by_peer += 1;
559 }
560 }
561 if count_txns_already_seen_by_peer > 0 {
562 self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
567 self.metrics
568 .occurrences_hash_already_seen_by_peer
569 .increment(count_txns_already_seen_by_peer);
570
571 trace!(target: "net::tx",
572 %count_txns_already_seen_by_peer,
573 peer_id=format!("{peer_id:#}"),
574 ?client,
575 "Peer sent hashes that have already been marked as seen by peer"
576 );
577
578 self.report_already_seen(peer_id);
579 }
580
581 let (validation_outcome, mut partially_valid_msg) =
583 self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
584
585 if validation_outcome == FilterOutcome::ReportPeer {
586 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
587 }
588
589 partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
591
592 let hashes_count_pre_pool_filter = partially_valid_msg.len();
600 self.pool.retain_unknown(&mut partially_valid_msg);
601 if hashes_count_pre_pool_filter > partially_valid_msg.len() {
602 let already_known_hashes_count =
603 hashes_count_pre_pool_filter - partially_valid_msg.len();
604 self.metrics
605 .occurrences_hashes_already_in_pool
606 .increment(already_known_hashes_count as u64);
607 }
608
609 if partially_valid_msg.is_empty() {
610 return
612 }
613
614 let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
619 .msg_version()
620 .expect("partially valid announcement should have version")
621 .is_eth68()
622 {
623 self.transaction_fetcher
625 .filter_valid_message
626 .filter_valid_entries_68(partially_valid_msg)
627 } else {
628 self.transaction_fetcher
630 .filter_valid_message
631 .filter_valid_entries_66(partially_valid_msg)
632 };
633
634 if validation_outcome == FilterOutcome::ReportPeer {
635 self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
636 }
637
638 if valid_announcement_data.is_empty() {
639 return
641 }
642
643 let bad_imports = &self.bad_imports;
650 self.transaction_fetcher.filter_unseen_and_pending_hashes(
651 &mut valid_announcement_data,
652 |hash| bad_imports.contains(hash),
653 &peer_id,
654 &client,
655 );
656
657 if valid_announcement_data.is_empty() {
658 return
660 }
661
662 trace!(target: "net::tx::propagation",
663 peer_id=format!("{peer_id:#}"),
664 hashes_len=valid_announcement_data.iter().count(),
665 hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
666 msg_version=%valid_announcement_data.msg_version(),
667 client_version=%client,
668 "received previously unseen and pending hashes in announcement from peer"
669 );
670
671 if !self.transaction_fetcher.is_idle(&peer_id) {
674 let msg_version = valid_announcement_data.msg_version();
676 let (hashes, _version) = valid_announcement_data.into_request_hashes();
677
678 trace!(target: "net::tx",
679 peer_id=format!("{peer_id:#}"),
680 hashes=?*hashes,
681 %msg_version,
682 %client,
683 "buffering hashes announced by busy peer"
684 );
685
686 self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
687
688 return
689 }
690
691 let mut hashes_to_request =
692 RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
693 let surplus_hashes =
694 self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
695
696 if !surplus_hashes.is_empty() {
697 trace!(target: "net::tx",
698 peer_id=format!("{peer_id:#}"),
699 surplus_hashes=?*surplus_hashes,
700 %client,
701 "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
702 );
703
704 self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
705 }
706
707 trace!(target: "net::tx",
708 peer_id=format!("{peer_id:#}"),
709 hashes=?*hashes_to_request,
710 %client,
711 "sending hashes in `GetPooledTransactions` request to peer's session"
712 );
713
714 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
718 if let Some(failed_to_request_hashes) =
719 self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
720 {
721 let conn_eth_version = peer.version;
722
723 trace!(target: "net::tx",
724 peer_id=format!("{peer_id:#}"),
725 failed_to_request_hashes=?*failed_to_request_hashes,
726 %conn_eth_version,
727 %client,
728 "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
729 );
730 self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
731 }
732 }
733}
734
735impl<Pool, N, Policy> TransactionsManager<Pool, N, Policy>
736where
737 Pool: TransactionPool + 'static,
738 N: NetworkPrimitives<
739 BroadcastedTransaction: SignedTransaction,
740 PooledTransaction: SignedTransaction,
741 >,
742 Pool::Transaction:
743 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
744 Policy: TransactionPropagationPolicy,
745{
746 fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
758 if self.network.is_initially_syncing() {
760 return
761 }
762 if self.network.tx_gossip_disabled() {
763 return
764 }
765
766 trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
767
768 self.propagate_all(hashes);
769 }
770
771 fn propagate_full_transactions_to_peer(
775 &mut self,
776 txs: Vec<TxHash>,
777 peer_id: PeerId,
778 propagation_mode: PropagationMode,
779 ) -> Option<PropagatedTransactions> {
780 trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
781
782 let peer = self.peers.get_mut(&peer_id)?;
783 let mut propagated = PropagatedTransactions::default();
784
785 let mut full_transactions = FullTransactionsBuilder::new(peer.version);
787
788 let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
789
790 if propagation_mode.is_forced() {
791 full_transactions.extend(to_propagate);
793 } else {
794 for tx in to_propagate {
797 if !peer.seen_transactions.contains(tx.tx_hash()) {
798 full_transactions.push(&tx);
800 }
801 }
802 }
803
804 if full_transactions.is_empty() {
805 return None
807 }
808
809 let PropagateTransactions { pooled, full } = full_transactions.build();
810
811 if let Some(new_pooled_hashes) = pooled {
813 for hash in new_pooled_hashes.iter_hashes().copied() {
814 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
815 peer.seen_transactions.insert(hash);
817 }
818
819 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
821 }
822
823 if let Some(new_full_transactions) = full {
825 for tx in &new_full_transactions {
826 propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
827 peer.seen_transactions.insert(*tx.tx_hash());
829 }
830
831 self.network.send_transactions(peer_id, new_full_transactions);
833 }
834
835 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
837
838 Some(propagated)
839 }
840
841 fn propagate_hashes_to(
845 &mut self,
846 hashes: Vec<TxHash>,
847 peer_id: PeerId,
848 propagation_mode: PropagationMode,
849 ) {
850 trace!(target: "net::tx", "Start propagating transactions as hashes");
851
852 let propagated = {
855 let Some(peer) = self.peers.get_mut(&peer_id) else {
856 return
858 };
859
860 let to_propagate = self
861 .pool
862 .get_all(hashes)
863 .into_iter()
864 .map(PropagateTransaction::pool_tx)
865 .collect::<Vec<_>>();
866
867 let mut propagated = PropagatedTransactions::default();
868
869 let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
871
872 if propagation_mode.is_forced() {
873 hashes.extend(to_propagate)
874 } else {
875 for tx in to_propagate {
876 if !peer.seen_transactions.contains(tx.tx_hash()) {
877 hashes.push(&tx);
879 }
880 }
881 }
882
883 let new_pooled_hashes = hashes.build();
884
885 if new_pooled_hashes.is_empty() {
886 return
888 }
889
890 for hash in new_pooled_hashes.iter_hashes().copied() {
891 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
892 }
893
894 trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
895
896 self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
898
899 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
901
902 propagated
903 };
904
905 self.pool.on_propagated(propagated);
907 }
908
909 fn propagate_transactions(
916 &mut self,
917 to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
918 propagation_mode: PropagationMode,
919 ) -> PropagatedTransactions {
920 let mut propagated = PropagatedTransactions::default();
921 if self.network.tx_gossip_disabled() {
922 return propagated
923 }
924
925 let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
927
928 for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
930 if !self.propagation_policy.can_propagate(peer) {
931 continue
933 }
934 let mut builder = if peer_idx > max_num_full {
936 PropagateTransactionsBuilder::pooled(peer.version)
937 } else {
938 PropagateTransactionsBuilder::full(peer.version)
939 };
940
941 if propagation_mode.is_forced() {
942 builder.extend(to_propagate.iter());
943 } else {
944 for tx in &to_propagate {
948 if !peer.seen_transactions.contains(tx.tx_hash()) {
951 builder.push(tx);
952 }
953 }
954 }
955
956 if builder.is_empty() {
957 trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
958 continue
959 }
960
961 let PropagateTransactions { pooled, full } = builder.build();
962
963 if let Some(mut new_pooled_hashes) = pooled {
965 new_pooled_hashes
968 .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
969
970 for hash in new_pooled_hashes.iter_hashes().copied() {
971 propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
972 peer.seen_transactions.insert(hash);
974 }
975
976 trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
977
978 self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
980 }
981
982 if let Some(new_full_transactions) = full {
984 for tx in &new_full_transactions {
985 propagated
986 .0
987 .entry(*tx.tx_hash())
988 .or_default()
989 .push(PropagateKind::Full(*peer_id));
990 peer.seen_transactions.insert(*tx.tx_hash());
992 }
993
994 trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
995
996 self.network.send_transactions(*peer_id, new_full_transactions);
998 }
999 }
1000
1001 self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1003
1004 propagated
1005 }
1006
1007 fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1012 let propagated = self.propagate_transactions(
1013 self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1014 PropagationMode::Basic,
1015 );
1016
1017 self.pool.on_propagated(propagated);
1019 }
1020
1021 fn on_get_pooled_transactions(
1023 &mut self,
1024 peer_id: PeerId,
1025 request: GetPooledTransactions,
1026 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1027 ) {
1028 if let Some(peer) = self.peers.get_mut(&peer_id) {
1029 if self.network.tx_gossip_disabled() {
1030 let _ = response.send(Ok(PooledTransactions::default()));
1031 return
1032 }
1033 let transactions = self.pool.get_pooled_transaction_elements(
1034 request.0,
1035 GetPooledTransactionLimit::ResponseSizeSoftLimit(
1036 self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1037 ),
1038 );
1039 trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1040
1041 peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1044
1045 let resp = PooledTransactions(transactions);
1046 let _ = response.send(Ok(resp));
1047 }
1048 }
1049
1050 fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1052 match cmd {
1053 TransactionsCommand::PropagateHash(hash) => {
1054 self.on_new_pending_transactions(vec![hash])
1055 }
1056 TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1057 self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1058 }
1059 TransactionsCommand::GetActivePeers(tx) => {
1060 let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1061 tx.send(peers).ok();
1062 }
1063 TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1064 if let Some(propagated) =
1065 self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1066 {
1067 self.pool.on_propagated(propagated);
1068 }
1069 }
1070 TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1071 TransactionsCommand::BroadcastTransactions(txs) => {
1072 self.propagate_transactions(txs, PropagationMode::Forced);
1073 }
1074 TransactionsCommand::GetTransactionHashes { peers, tx } => {
1075 let mut res = HashMap::with_capacity(peers.len());
1076 for peer_id in peers {
1077 let hashes = self
1078 .peers
1079 .get(&peer_id)
1080 .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1081 .unwrap_or_default();
1082 res.insert(peer_id, hashes);
1083 }
1084 tx.send(res).ok();
1085 }
1086 TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1087 let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1088 peer_request_sender.send(sender).ok();
1089 }
1090 }
1091 }
1092
1093 fn handle_peer_session(
1097 &mut self,
1098 info: SessionInfo,
1099 messages: PeerRequestSender<PeerRequest<N>>,
1100 ) {
1101 let SessionInfo { peer_id, client_version, version, .. } = info;
1102
1103 let peer = PeerMetadata::<N>::new(
1105 messages,
1106 version,
1107 client_version,
1108 self.config.max_transactions_seen_by_peer_history,
1109 info.peer_kind,
1110 );
1111 let peer = match self.peers.entry(peer_id) {
1112 Entry::Occupied(mut entry) => {
1113 entry.insert(peer);
1114 entry.into_mut()
1115 }
1116 Entry::Vacant(entry) => entry.insert(peer),
1117 };
1118
1119 self.propagation_policy.on_session_established(peer);
1120
1121 if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1125 trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1126 return
1127 }
1128
1129 let pooled_txs = self.pool.pooled_transactions_max(
1131 SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1132 );
1133 if pooled_txs.is_empty() {
1134 trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1135 return;
1136 }
1137
1138 let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1140 for pooled_tx in pooled_txs {
1141 peer.seen_transactions.insert(*pooled_tx.hash());
1142 msg_builder.push_pooled(pooled_tx);
1143 }
1144
1145 debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1146 let msg = msg_builder.build();
1147 self.network.send_transactions_hashes(peer_id, msg);
1148 }
1149
1150 fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1152 match event_result {
1153 NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1154 let peer = self.peers.remove(&peer_id);
1157 if let Some(mut peer) = peer {
1158 self.propagation_policy.on_session_closed(&mut peer);
1159 }
1160 self.transaction_fetcher.remove_peer(&peer_id);
1161 }
1162 NetworkEvent::ActivePeerSession { info, messages } => {
1163 self.handle_peer_session(info, messages);
1165 }
1166 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1167 let peer_id = info.peer_id;
1168 let messages = match self.peers.get(&peer_id) {
1170 Some(p) => p.request_tx.clone(),
1171 None => {
1172 debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1173 return;
1174 }
1175 };
1176 self.handle_peer_session(info, messages);
1177 }
1178 _ => {}
1179 }
1180 }
1181
1182 fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1184 match event {
1185 NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1186 let has_blob_txs = msg.has_eip4844();
1190
1191 let non_blob_txs = msg
1192 .0
1193 .into_iter()
1194 .map(N::PooledTransaction::try_from)
1195 .filter_map(Result::ok)
1196 .collect();
1197
1198 self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1199
1200 if has_blob_txs {
1201 debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1202 self.report_peer_bad_transactions(peer_id);
1203 }
1204 }
1205 NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1206 self.on_new_pooled_transaction_hashes(peer_id, msg)
1207 }
1208 NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1209 self.on_get_pooled_transactions(peer_id, request, response)
1210 }
1211 NetworkTransactionEvent::GetTransactionsHandle(response) => {
1212 let _ = response.send(Some(self.handle()));
1213 }
1214 }
1215 }
1216
1217 fn import_transactions(
1219 &mut self,
1220 peer_id: PeerId,
1221 transactions: PooledTransactions<N::PooledTransaction>,
1222 source: TransactionSource,
1223 ) {
1224 if self.network.is_initially_syncing() {
1226 return
1227 }
1228 if self.network.tx_gossip_disabled() {
1229 return
1230 }
1231
1232 let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1233 let mut transactions = transactions.0;
1234
1235 self.transaction_fetcher
1237 .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1238
1239 let mut num_already_seen_by_peer = 0;
1244 for tx in &transactions {
1245 if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1246 num_already_seen_by_peer += 1;
1247 }
1248 }
1249
1250 let txns_count_pre_pool_filter = transactions.len();
1252 self.pool.retain_unknown(&mut transactions);
1253 if txns_count_pre_pool_filter > transactions.len() {
1254 let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1255 self.metrics
1256 .occurrences_transactions_already_in_pool
1257 .increment(already_known_txns_count as u64);
1258 }
1259
1260 let mut has_bad_transactions = false;
1262
1263 if let Some(peer) = self.peers.get_mut(&peer_id) {
1265 let mut new_txs = Vec::with_capacity(transactions.len());
1267 for tx in transactions {
1268 let tx = match tx.try_into_recovered() {
1270 Ok(tx) => tx,
1271 Err(badtx) => {
1272 trace!(target: "net::tx",
1273 peer_id=format!("{peer_id:#}"),
1274 hash=%badtx.tx_hash(),
1275 client_version=%peer.client_version,
1276 "failed ecrecovery for transaction"
1277 );
1278 has_bad_transactions = true;
1279 continue
1280 }
1281 };
1282
1283 match self.transactions_by_peers.entry(*tx.tx_hash()) {
1284 Entry::Occupied(mut entry) => {
1285 entry.get_mut().insert(peer_id);
1287 }
1288 Entry::Vacant(entry) => {
1289 if self.bad_imports.contains(tx.tx_hash()) {
1290 trace!(target: "net::tx",
1291 peer_id=format!("{peer_id:#}"),
1292 hash=%tx.tx_hash(),
1293 client_version=%peer.client_version,
1294 "received a known bad transaction from peer"
1295 );
1296 has_bad_transactions = true;
1297 } else {
1298 let pool_transaction = Pool::Transaction::from_pooled(tx);
1301 new_txs.push(pool_transaction);
1302
1303 entry.insert(HashSet::from([peer_id]));
1304 }
1305 }
1306 }
1307 }
1308 new_txs.shrink_to_fit();
1309
1310 if !new_txs.is_empty() {
1313 let pool = self.pool.clone();
1314 let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1316 metric_pending_pool_imports.increment(new_txs.len() as f64);
1317
1318 self.pending_pool_imports_info
1320 .pending_pool_imports
1321 .fetch_add(new_txs.len(), Ordering::Relaxed);
1322 let tx_manager_info_pending_pool_imports =
1323 self.pending_pool_imports_info.pending_pool_imports.clone();
1324
1325 trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1326 let import = Box::pin(async move {
1327 let added = new_txs.len();
1328 let res = pool.add_external_transactions(new_txs).await;
1329
1330 metric_pending_pool_imports.decrement(added as f64);
1332 tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1334
1335 res
1336 });
1337
1338 self.pool_imports.push(import);
1339 }
1340
1341 if num_already_seen_by_peer > 0 {
1342 self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1343 self.metrics
1344 .occurrences_of_transaction_already_seen_by_peer
1345 .increment(num_already_seen_by_peer);
1346 trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1347 }
1348 }
1349
1350 if has_bad_transactions {
1351 self.report_peer_bad_transactions(peer_id)
1353 }
1354
1355 if num_already_seen_by_peer > 0 {
1356 self.report_already_seen(peer_id);
1357 }
1358 }
1359
1360 fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1362 match fetch_event {
1363 FetchEvent::TransactionsFetched { peer_id, transactions } => {
1364 self.import_transactions(peer_id, transactions, TransactionSource::Response);
1365 }
1366 FetchEvent::FetchError { peer_id, error } => {
1367 trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1368 self.on_request_error(peer_id, error);
1369 }
1370 FetchEvent::EmptyResponse { peer_id } => {
1371 trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1372 }
1373 }
1374 }
1375}
1376
1377impl<Pool, N, Policy> Future for TransactionsManager<Pool, N, Policy>
1385where
1386 Pool: TransactionPool + Unpin + 'static,
1387 N: NetworkPrimitives<
1388 BroadcastedTransaction: SignedTransaction,
1389 PooledTransaction: SignedTransaction,
1390 >,
1391 Pool::Transaction:
1392 PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1393 Policy: TransactionPropagationPolicy,
1394{
1395 type Output = ();
1396
1397 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1398 let start = Instant::now();
1399 let mut poll_durations = TxManagerPollDurations::default();
1400
1401 let this = self.get_mut();
1402
1403 let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1409 poll_durations.acc_network_events,
1410 "net::tx",
1411 "Network events stream",
1412 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1413 this.network_events.poll_next_unpin(cx),
1414 |event| this.on_network_event(event)
1415 );
1416
1417 let mut new_txs = Vec::new();
1426 let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1427 poll_durations.acc_imported_txns,
1428 "net::tx",
1429 "Pending transactions stream",
1430 DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1431 this.pending_transactions.poll_next_unpin(cx),
1432 |hash| new_txs.push(hash)
1433 );
1434 if !new_txs.is_empty() {
1435 this.on_new_pending_transactions(new_txs);
1436 }
1437
1438 let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1449 poll_durations.acc_fetch_events,
1450 "net::tx",
1451 "Transaction fetch events stream",
1452 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1453 this.transaction_fetcher.poll_next_unpin(cx),
1454 |event| this.on_fetch_event(event),
1455 );
1456
1457 let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1472 poll_durations.acc_tx_events,
1473 "net::tx",
1474 "Network transaction events stream",
1475 DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1476 this.transaction_events.poll_next_unpin(cx),
1477 |event| this.on_network_tx_event(event),
1478 );
1479
1480 let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1495 poll_durations.acc_pending_imports,
1496 "net::tx",
1497 "Batched pool imports stream",
1498 DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1499 this.pool_imports.poll_next_unpin(cx),
1500 |batch_results| this.on_batch_import_result(batch_results)
1501 );
1502
1503 duration_metered_exec!(
1508 {
1509 if this.has_capacity_for_fetching_pending_hashes() {
1510 this.on_fetch_hashes_pending_fetch();
1511 }
1512 },
1513 poll_durations.acc_pending_fetch
1514 );
1515
1516 let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1518 poll_durations.acc_cmds,
1519 "net::tx",
1520 "Commands channel",
1521 DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1522 this.command_rx.poll_next_unpin(cx),
1523 |cmd| this.on_command(cmd)
1524 );
1525
1526 this.transaction_fetcher.update_metrics();
1527
1528 if maybe_more_network_events ||
1530 maybe_more_commands ||
1531 maybe_more_tx_events ||
1532 maybe_more_tx_fetch_events ||
1533 maybe_more_pool_imports ||
1534 maybe_more_pending_txns
1535 {
1536 cx.waker().wake_by_ref();
1538 return Poll::Pending
1539 }
1540
1541 this.update_poll_metrics(start, poll_durations);
1542
1543 Poll::Pending
1544 }
1545}
1546
1547#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1551enum PropagationMode {
1552 Basic,
1556 Forced,
1561}
1562
1563impl PropagationMode {
1564 const fn is_forced(self) -> bool {
1566 matches!(self, Self::Forced)
1567 }
1568}
1569
1570#[derive(Debug, Clone)]
1572struct PropagateTransaction<T = TransactionSigned> {
1573 size: usize,
1574 transaction: Arc<T>,
1575}
1576
1577impl<T: SignedTransaction> PropagateTransaction<T> {
1578 pub fn new(transaction: T) -> Self {
1580 let size = transaction.length();
1581 Self { size, transaction: Arc::new(transaction) }
1582 }
1583
1584 fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1586 where
1587 P: PoolTransaction<Consensus = T>,
1588 {
1589 let size = tx.encoded_length();
1590 let transaction = tx.transaction.clone_into_consensus();
1591 let transaction = Arc::new(transaction.into_inner());
1592 Self { size, transaction }
1593 }
1594
1595 fn tx_hash(&self) -> &TxHash {
1596 self.transaction.tx_hash()
1597 }
1598}
1599
1600#[derive(Debug, Clone)]
1603enum PropagateTransactionsBuilder<T> {
1604 Pooled(PooledTransactionsHashesBuilder),
1605 Full(FullTransactionsBuilder<T>),
1606}
1607
1608impl<T> PropagateTransactionsBuilder<T> {
1609 fn pooled(version: EthVersion) -> Self {
1611 Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1612 }
1613
1614 fn full(version: EthVersion) -> Self {
1616 Self::Full(FullTransactionsBuilder::new(version))
1617 }
1618
1619 fn is_empty(&self) -> bool {
1621 match self {
1622 Self::Pooled(builder) => builder.is_empty(),
1623 Self::Full(builder) => builder.is_empty(),
1624 }
1625 }
1626
1627 fn build(self) -> PropagateTransactions<T> {
1629 match self {
1630 Self::Pooled(pooled) => {
1631 PropagateTransactions { pooled: Some(pooled.build()), full: None }
1632 }
1633 Self::Full(full) => full.build(),
1634 }
1635 }
1636}
1637
1638impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1639 fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1641 for tx in txs {
1642 self.push(tx);
1643 }
1644 }
1645
1646 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1648 match self {
1649 Self::Pooled(builder) => builder.push(transaction),
1650 Self::Full(builder) => builder.push(transaction),
1651 }
1652 }
1653}
1654
1655struct PropagateTransactions<T> {
1657 pooled: Option<NewPooledTransactionHashes>,
1659 full: Option<Vec<Arc<T>>>,
1661}
1662
1663#[derive(Debug, Clone)]
1668struct FullTransactionsBuilder<T> {
1669 total_size: usize,
1671 transactions: Vec<Arc<T>>,
1673 pooled: PooledTransactionsHashesBuilder,
1675}
1676
1677impl<T> FullTransactionsBuilder<T> {
1678 fn new(version: EthVersion) -> Self {
1680 Self {
1681 total_size: 0,
1682 pooled: PooledTransactionsHashesBuilder::new(version),
1683 transactions: vec![],
1684 }
1685 }
1686
1687 fn is_empty(&self) -> bool {
1689 self.transactions.is_empty() && self.pooled.is_empty()
1690 }
1691
1692 fn build(self) -> PropagateTransactions<T> {
1694 let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1695 let full = Some(self.transactions).filter(|full| !full.is_empty());
1696 PropagateTransactions { pooled, full }
1697 }
1698}
1699
1700impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1701 fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1703 for tx in txs {
1704 self.push(&tx)
1705 }
1706 }
1707
1708 fn push(&mut self, transaction: &PropagateTransaction<T>) {
1718 if !transaction.transaction.is_broadcastable_in_full() {
1727 self.pooled.push(transaction);
1728 return
1729 }
1730
1731 let new_size = self.total_size + transaction.size;
1732 if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1733 self.total_size > 0
1734 {
1735 self.pooled.push(transaction);
1737 return
1738 }
1739
1740 self.total_size = new_size;
1741 self.transactions.push(Arc::clone(&transaction.transaction));
1742 }
1743}
1744
1745#[derive(Debug, Clone)]
1748enum PooledTransactionsHashesBuilder {
1749 Eth66(NewPooledTransactionHashes66),
1750 Eth68(NewPooledTransactionHashes68),
1751}
1752
1753impl PooledTransactionsHashesBuilder {
1756 fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1758 match self {
1759 Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1760 Self::Eth68(msg) => {
1761 msg.hashes.push(*pooled_tx.hash());
1762 msg.sizes.push(pooled_tx.encoded_length());
1763 msg.types.push(pooled_tx.transaction.ty());
1764 }
1765 }
1766 }
1767
1768 fn is_empty(&self) -> bool {
1770 match self {
1771 Self::Eth66(hashes) => hashes.is_empty(),
1772 Self::Eth68(hashes) => hashes.is_empty(),
1773 }
1774 }
1775
1776 fn extend<T: SignedTransaction>(
1778 &mut self,
1779 txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1780 ) {
1781 for tx in txs {
1782 self.push(&tx);
1783 }
1784 }
1785
1786 fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1787 match self {
1788 Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1789 Self::Eth68(msg) => {
1790 msg.hashes.push(*tx.tx_hash());
1791 msg.sizes.push(tx.size);
1792 msg.types.push(tx.transaction.ty());
1793 }
1794 }
1795 }
1796
1797 fn new(version: EthVersion) -> Self {
1799 match version {
1800 EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1801 EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1802 }
1803 }
1804
1805 fn build(self) -> NewPooledTransactionHashes {
1806 match self {
1807 Self::Eth66(msg) => msg.into(),
1808 Self::Eth68(msg) => msg.into(),
1809 }
1810 }
1811}
1812
1813enum TransactionSource {
1815 Broadcast,
1817 Response,
1819}
1820
1821impl TransactionSource {
1824 const fn is_broadcast(&self) -> bool {
1826 matches!(self, Self::Broadcast)
1827 }
1828}
1829
1830#[derive(Debug)]
1832pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1833 seen_transactions: LruCache<TxHash>,
1837 request_tx: PeerRequestSender<PeerRequest<N>>,
1839 version: EthVersion,
1841 client_version: Arc<str>,
1843 peer_kind: PeerKind,
1845}
1846
1847impl<N: NetworkPrimitives> PeerMetadata<N> {
1848 pub fn new(
1850 request_tx: PeerRequestSender<PeerRequest<N>>,
1851 version: EthVersion,
1852 client_version: Arc<str>,
1853 max_transactions_seen_by_peer: u32,
1854 peer_kind: PeerKind,
1855 ) -> Self {
1856 Self {
1857 seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1858 request_tx,
1859 version,
1860 client_version,
1861 peer_kind,
1862 }
1863 }
1864
1865 pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1867 &self.request_tx
1868 }
1869
1870 pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1872 &mut self.seen_transactions
1873 }
1874
1875 pub const fn version(&self) -> EthVersion {
1877 self.version
1878 }
1879
1880 pub fn client_version(&self) -> &str {
1882 &self.client_version
1883 }
1884
1885 pub const fn peer_kind(&self) -> PeerKind {
1887 self.peer_kind
1888 }
1889}
1890
1891#[derive(Debug)]
1893enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1894 PropagateHash(B256),
1896 PropagateHashesTo(Vec<B256>, PeerId),
1898 GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
1900 PropagateTransactionsTo(Vec<TxHash>, PeerId),
1902 PropagateTransactions(Vec<TxHash>),
1904 BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
1906 GetTransactionHashes {
1908 peers: Vec<PeerId>,
1909 tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
1910 },
1911 GetPeerSender {
1913 peer_id: PeerId,
1914 peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
1915 },
1916}
1917
1918#[derive(Debug)]
1920pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
1921 IncomingTransactions {
1925 peer_id: PeerId,
1927 msg: Transactions<N::BroadcastedTransaction>,
1929 },
1930 IncomingPooledTransactionHashes {
1932 peer_id: PeerId,
1934 msg: NewPooledTransactionHashes,
1936 },
1937 GetPooledTransactions {
1939 peer_id: PeerId,
1941 request: GetPooledTransactions,
1943 response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1945 },
1946 GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
1948}
1949
1950#[derive(Debug)]
1952pub struct PendingPoolImportsInfo {
1953 pending_pool_imports: Arc<AtomicUsize>,
1955 max_pending_pool_imports: usize,
1957}
1958
1959impl PendingPoolImportsInfo {
1960 pub fn new(max_pending_pool_imports: usize) -> Self {
1962 Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
1963 }
1964
1965 pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
1967 self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
1968 }
1969}
1970
1971impl Default for PendingPoolImportsInfo {
1972 fn default() -> Self {
1973 Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
1974 }
1975}
1976
1977#[derive(Debug, Default)]
1978struct TxManagerPollDurations {
1979 acc_network_events: Duration,
1980 acc_pending_imports: Duration,
1981 acc_tx_events: Duration,
1982 acc_imported_txns: Duration,
1983 acc_fetch_events: Duration,
1984 acc_pending_fetch: Duration,
1985 acc_cmds: Duration,
1986}
1987
1988#[cfg(test)]
1989mod tests {
1990 use super::*;
1991 use crate::{
1992 test_utils::{
1993 transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
1994 Testnet,
1995 },
1996 NetworkConfigBuilder, NetworkManager,
1997 };
1998 use alloy_consensus::{transaction::PooledTransaction, TxEip1559, TxLegacy};
1999 use alloy_primitives::{hex, Signature, TxKind, U256};
2000 use alloy_rlp::Decodable;
2001 use futures::FutureExt;
2002 use reth_chainspec::MIN_TRANSACTION_GAS;
2003 use reth_ethereum_primitives::{Transaction, TransactionSigned};
2004 use reth_network_api::{NetworkInfo, PeerKind};
2005 use reth_network_p2p::{
2006 error::{RequestError, RequestResult},
2007 sync::{NetworkSyncUpdater, SyncState},
2008 };
2009 use reth_storage_api::noop::NoopProvider;
2010 use reth_transaction_pool::test_utils::{
2011 testing_pool, MockTransaction, MockTransactionFactory,
2012 };
2013 use secp256k1::SecretKey;
2014 use std::{
2015 future::poll_fn,
2016 net::{IpAddr, Ipv4Addr, SocketAddr},
2017 str::FromStr,
2018 };
2019 use tracing::error;
2020
2021 #[tokio::test(flavor = "multi_thread")]
2022 async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2023 reth_tracing::init_test_tracing();
2024 let net = Testnet::create(3).await;
2025
2026 let mut handles = net.handles();
2027 let handle0 = handles.next().unwrap();
2028 let handle1 = handles.next().unwrap();
2029
2030 drop(handles);
2031 let handle = net.spawn();
2032
2033 let listener0 = handle0.event_listener();
2034 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2035 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2036
2037 let client = NoopProvider::default();
2038 let pool = testing_pool();
2039 let config = NetworkConfigBuilder::eth(secret_key)
2040 .disable_discovery()
2041 .listener_port(0)
2042 .build(client);
2043 let transactions_manager_config = config.transactions_manager_config.clone();
2044 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2045 .await
2046 .unwrap()
2047 .into_builder()
2048 .transactions(pool.clone(), transactions_manager_config)
2049 .split_with_handle();
2050
2051 tokio::task::spawn(network);
2052
2053 network_handle.update_sync_state(SyncState::Syncing);
2055 assert!(NetworkInfo::is_syncing(&network_handle));
2056 assert!(NetworkInfo::is_initially_syncing(&network_handle));
2057
2058 let mut established = listener0.take(2);
2060 while let Some(ev) = established.next().await {
2061 match ev {
2062 NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2063 transactions
2065 .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2066 }
2067 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2068 ev => {
2069 error!("unexpected event {ev:?}")
2070 }
2071 }
2072 }
2073 let input = hex!(
2075 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2076 );
2077 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2078 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2079 peer_id: *handle1.peer_id(),
2080 msg: Transactions(vec![signed_tx.clone()]),
2081 });
2082 poll_fn(|cx| {
2083 let _ = transactions.poll_unpin(cx);
2084 Poll::Ready(())
2085 })
2086 .await;
2087 assert!(pool.is_empty());
2088 handle.terminate().await;
2089 }
2090
2091 #[tokio::test(flavor = "multi_thread")]
2092 async fn test_tx_broadcasts_through_two_syncs() {
2093 reth_tracing::init_test_tracing();
2094 let net = Testnet::create(3).await;
2095
2096 let mut handles = net.handles();
2097 let handle0 = handles.next().unwrap();
2098 let handle1 = handles.next().unwrap();
2099
2100 drop(handles);
2101 let handle = net.spawn();
2102
2103 let listener0 = handle0.event_listener();
2104 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2105 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2106
2107 let client = NoopProvider::default();
2108 let pool = testing_pool();
2109 let config = NetworkConfigBuilder::new(secret_key)
2110 .disable_discovery()
2111 .listener_port(0)
2112 .build(client);
2113 let transactions_manager_config = config.transactions_manager_config.clone();
2114 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2115 .await
2116 .unwrap()
2117 .into_builder()
2118 .transactions(pool.clone(), transactions_manager_config)
2119 .split_with_handle();
2120
2121 tokio::task::spawn(network);
2122
2123 network_handle.update_sync_state(SyncState::Syncing);
2125 assert!(NetworkInfo::is_syncing(&network_handle));
2126 network_handle.update_sync_state(SyncState::Idle);
2127 assert!(!NetworkInfo::is_syncing(&network_handle));
2128 network_handle.update_sync_state(SyncState::Syncing);
2129 assert!(NetworkInfo::is_syncing(&network_handle));
2130
2131 let mut established = listener0.take(2);
2133 while let Some(ev) = established.next().await {
2134 match ev {
2135 NetworkEvent::ActivePeerSession { .. } |
2136 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2137 transactions.on_network_event(ev);
2139 }
2140 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2141 _ => {
2142 error!("unexpected event {ev:?}")
2143 }
2144 }
2145 }
2146 let input = hex!(
2148 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2149 );
2150 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2151 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2152 peer_id: *handle1.peer_id(),
2153 msg: Transactions(vec![signed_tx.clone()]),
2154 });
2155 poll_fn(|cx| {
2156 let _ = transactions.poll_unpin(cx);
2157 Poll::Ready(())
2158 })
2159 .await;
2160 assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2161 assert!(NetworkInfo::is_syncing(&network_handle));
2162 assert!(!pool.is_empty());
2163 handle.terminate().await;
2164 }
2165
2166 #[tokio::test(flavor = "multi_thread")]
2169 async fn test_handle_incoming_transactions_hashes() {
2170 reth_tracing::init_test_tracing();
2171
2172 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2173 let client = NoopProvider::default();
2174
2175 let config = NetworkConfigBuilder::new(secret_key)
2176 .listener_port(0)
2178 .disable_discovery()
2179 .build(client);
2180
2181 let pool = testing_pool();
2182
2183 let transactions_manager_config = config.transactions_manager_config.clone();
2184 let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2185 .await
2186 .unwrap()
2187 .into_builder()
2188 .transactions(pool.clone(), transactions_manager_config)
2189 .split_with_handle();
2190
2191 let peer_id_1 = PeerId::new([1; 64]);
2192 let eth_version = EthVersion::Eth66;
2193
2194 let txs = vec![TransactionSigned::new_unhashed(
2195 Transaction::Legacy(TxLegacy {
2196 chain_id: Some(4),
2197 nonce: 15u64,
2198 gas_price: 2200000000,
2199 gas_limit: 34811,
2200 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2201 value: U256::from(1234u64),
2202 input: Default::default(),
2203 }),
2204 Signature::new(
2205 U256::from_str(
2206 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2207 )
2208 .unwrap(),
2209 U256::from_str(
2210 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2211 )
2212 .unwrap(),
2213 true,
2214 ),
2215 )];
2216
2217 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2218
2219 let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2220 tx_manager.peers.insert(peer_id_1, peer_1);
2221
2222 assert!(pool.is_empty());
2223
2224 tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2225 peer_id: peer_id_1,
2226 msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2227 txs_hashes.clone(),
2228 )),
2229 });
2230
2231 let req = to_mock_session_rx
2233 .recv()
2234 .await
2235 .expect("peer_1 session should receive request with buffered hashes");
2236 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2237 assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2238
2239 let message: Vec<PooledTransaction> = txs
2240 .into_iter()
2241 .map(|tx| {
2242 PooledTransaction::try_from(tx)
2243 .expect("Failed to convert MockTransaction to PooledTransaction")
2244 })
2245 .collect();
2246
2247 response
2249 .send(Ok(PooledTransactions(message)))
2250 .expect("should send peer_1 response to tx manager");
2251
2252 poll_fn(|cx| {
2254 let _ = tx_manager.poll_unpin(cx);
2255 Poll::Ready(())
2256 })
2257 .await;
2258
2259 assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2262 }
2263
2264 #[tokio::test(flavor = "multi_thread")]
2265 async fn test_handle_incoming_transactions() {
2266 reth_tracing::init_test_tracing();
2267 let net = Testnet::create(3).await;
2268
2269 let mut handles = net.handles();
2270 let handle0 = handles.next().unwrap();
2271 let handle1 = handles.next().unwrap();
2272
2273 drop(handles);
2274 let handle = net.spawn();
2275
2276 let listener0 = handle0.event_listener();
2277
2278 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2279 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2280
2281 let client = NoopProvider::default();
2282 let pool = testing_pool();
2283 let config = NetworkConfigBuilder::new(secret_key)
2284 .disable_discovery()
2285 .listener_port(0)
2286 .build(client);
2287 let transactions_manager_config = config.transactions_manager_config.clone();
2288 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2289 .await
2290 .unwrap()
2291 .into_builder()
2292 .transactions(pool.clone(), transactions_manager_config)
2293 .split_with_handle();
2294 tokio::task::spawn(network);
2295
2296 network_handle.update_sync_state(SyncState::Idle);
2297
2298 assert!(!NetworkInfo::is_syncing(&network_handle));
2299
2300 let mut established = listener0.take(2);
2302 while let Some(ev) = established.next().await {
2303 match ev {
2304 NetworkEvent::ActivePeerSession { .. } |
2305 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2306 transactions.on_network_event(ev);
2308 }
2309 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2310 ev => {
2311 error!("unexpected event {ev:?}")
2312 }
2313 }
2314 }
2315 let input = hex!(
2317 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2318 );
2319 let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2320 transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2321 peer_id: *handle1.peer_id(),
2322 msg: Transactions(vec![signed_tx.clone()]),
2323 });
2324 assert!(transactions
2325 .transactions_by_peers
2326 .get(signed_tx.tx_hash())
2327 .unwrap()
2328 .contains(handle1.peer_id()));
2329
2330 poll_fn(|cx| {
2332 let _ = transactions.poll_unpin(cx);
2333 Poll::Ready(())
2334 })
2335 .await;
2336
2337 assert!(!pool.is_empty());
2338 assert!(pool.get(signed_tx.tx_hash()).is_some());
2339 handle.terminate().await;
2340 }
2341
2342 #[tokio::test(flavor = "multi_thread")]
2343 async fn test_on_get_pooled_transactions_network() {
2344 reth_tracing::init_test_tracing();
2345 let net = Testnet::create(2).await;
2346
2347 let mut handles = net.handles();
2348 let handle0 = handles.next().unwrap();
2349 let handle1 = handles.next().unwrap();
2350
2351 drop(handles);
2352 let handle = net.spawn();
2353
2354 let listener0 = handle0.event_listener();
2355
2356 handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2357 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2358
2359 let client = NoopProvider::default();
2360 let pool = testing_pool();
2361 let config = NetworkConfigBuilder::new(secret_key)
2362 .disable_discovery()
2363 .listener_port(0)
2364 .build(client);
2365 let transactions_manager_config = config.transactions_manager_config.clone();
2366 let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2367 .await
2368 .unwrap()
2369 .into_builder()
2370 .transactions(pool.clone(), transactions_manager_config)
2371 .split_with_handle();
2372 tokio::task::spawn(network);
2373
2374 network_handle.update_sync_state(SyncState::Idle);
2375
2376 assert!(!NetworkInfo::is_syncing(&network_handle));
2377
2378 let mut established = listener0.take(2);
2380 while let Some(ev) = established.next().await {
2381 match ev {
2382 NetworkEvent::ActivePeerSession { .. } |
2383 NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2384 transactions.on_network_event(ev);
2385 }
2386 NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2387 ev => {
2388 error!("unexpected event {ev:?}")
2389 }
2390 }
2391 }
2392 handle.terminate().await;
2393
2394 let tx = MockTransaction::eip1559();
2395 let _ = transactions
2396 .pool
2397 .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2398 .await;
2399
2400 let request = GetPooledTransactions(vec![*tx.get_hash()]);
2401
2402 let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
2403
2404 transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2405 peer_id: *handle1.peer_id(),
2406 request,
2407 response: send,
2408 });
2409
2410 match receive.await.unwrap() {
2411 Ok(PooledTransactions(transactions)) => {
2412 assert_eq!(transactions.len(), 1);
2413 }
2414 Err(e) => {
2415 panic!("error: {e:?}");
2416 }
2417 }
2418 }
2419
2420 #[tokio::test]
2424 async fn test_partially_tx_response() {
2425 reth_tracing::init_test_tracing();
2426
2427 let mut tx_manager = new_tx_manager().await.0;
2428 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2429
2430 let peer_id_1 = PeerId::new([1; 64]);
2431 let eth_version = EthVersion::Eth66;
2432
2433 let txs = vec![
2434 TransactionSigned::new_unhashed(
2435 Transaction::Legacy(TxLegacy {
2436 chain_id: Some(4),
2437 nonce: 15u64,
2438 gas_price: 2200000000,
2439 gas_limit: 34811,
2440 to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2441 value: U256::from(1234u64),
2442 input: Default::default(),
2443 }),
2444 Signature::new(
2445 U256::from_str(
2446 "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2447 )
2448 .unwrap(),
2449 U256::from_str(
2450 "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2451 )
2452 .unwrap(),
2453 true,
2454 ),
2455 ),
2456 TransactionSigned::new_unhashed(
2457 Transaction::Eip1559(TxEip1559 {
2458 chain_id: 4,
2459 nonce: 26u64,
2460 max_priority_fee_per_gas: 1500000000,
2461 max_fee_per_gas: 1500000013,
2462 gas_limit: MIN_TRANSACTION_GAS,
2463 to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2464 value: U256::from(3000000000000000000u64),
2465 input: Default::default(),
2466 access_list: Default::default(),
2467 }),
2468 Signature::new(
2469 U256::from_str(
2470 "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2471 )
2472 .unwrap(),
2473 U256::from_str(
2474 "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2475 )
2476 .unwrap(),
2477 true,
2478 ),
2479 ),
2480 ];
2481
2482 let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2483
2484 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2485 peer_1.seen_transactions.insert(txs_hashes[0]);
2488 peer_1.seen_transactions.insert(txs_hashes[1]);
2489 tx_manager.peers.insert(peer_id_1, peer_1);
2490
2491 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2492 buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2493
2494 assert!(tx_fetcher.is_idle(&peer_id_1));
2496 assert_eq!(tx_fetcher.active_peers.len(), 0);
2497
2498 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2500
2501 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2502 assert!(!tx_fetcher.is_idle(&peer_id_1));
2504 assert_eq!(tx_fetcher.active_peers.len(), 1);
2505
2506 let req = to_mock_session_rx
2508 .recv()
2509 .await
2510 .expect("peer_1 session should receive request with buffered hashes");
2511 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2512
2513 let message: Vec<PooledTransaction> = txs
2514 .into_iter()
2515 .take(1)
2516 .map(|tx| {
2517 PooledTransaction::try_from(tx)
2518 .expect("Failed to convert MockTransaction to PooledTransaction")
2519 })
2520 .collect();
2521 response
2523 .send(Ok(PooledTransactions(message)))
2524 .expect("should send peer_1 response to tx manager");
2525 let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2526 unreachable!()
2527 };
2528
2529 assert!(tx_fetcher.is_idle(&peer_id));
2531 assert_eq!(tx_fetcher.active_peers.len(), 0);
2532 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2534 }
2535
2536 #[tokio::test]
2537 async fn test_max_retries_tx_request() {
2538 reth_tracing::init_test_tracing();
2539
2540 let mut tx_manager = new_tx_manager().await.0;
2541 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2542
2543 let peer_id_1 = PeerId::new([1; 64]);
2544 let peer_id_2 = PeerId::new([2; 64]);
2545 let eth_version = EthVersion::Eth66;
2546 let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2547
2548 let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2549 peer_1.seen_transactions.insert(seen_hashes[0]);
2552 peer_1.seen_transactions.insert(seen_hashes[1]);
2553 tx_manager.peers.insert(peer_id_1, peer_1);
2554
2555 let retries = 1;
2558 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2559 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2560
2561 assert!(tx_fetcher.is_idle(&peer_id_1));
2563 assert_eq!(tx_fetcher.active_peers.len(), 0);
2564
2565 tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2567
2568 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2569
2570 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2571 assert!(!tx_fetcher.is_idle(&peer_id_1));
2573 assert_eq!(tx_fetcher.active_peers.len(), 1);
2574
2575 let req = to_mock_session_rx
2577 .recv()
2578 .await
2579 .expect("peer_1 session should receive request with buffered hashes");
2580 let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2581 let GetPooledTransactions(hashes) = request;
2582
2583 let hashes = hashes.into_iter().collect::<HashSet<_>>();
2584
2585 assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2586
2587 response
2589 .send(Err(RequestError::BadResponse))
2590 .expect("should send peer_1 response to tx manager");
2591 let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2592 unreachable!()
2593 };
2594
2595 assert!(tx_fetcher.is_idle(&peer_id));
2597 assert_eq!(tx_fetcher.active_peers.len(), 0);
2598 assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2600
2601 let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2602 tx_manager.peers.insert(peer_id_2, peer_2);
2603
2604 let msg =
2606 NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2607 tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2608
2609 let tx_fetcher = &mut tx_manager.transaction_fetcher;
2610
2611 assert_eq!(tx_fetcher.active_peers.len(), 1);
2613
2614 assert_eq!(tx_fetcher.num_all_hashes(), 2);
2616 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2618
2619 let req = to_mock_session_rx
2621 .recv()
2622 .await
2623 .expect("peer_2 session should receive request with buffered hashes");
2624 let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2625
2626 response
2628 .send(Err(RequestError::BadResponse))
2629 .expect("should send peer_2 response to tx manager");
2630 let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2631
2632 assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2635 assert_eq!(tx_fetcher.active_peers.len(), 0);
2636 }
2637
2638 #[test]
2639 fn test_transaction_builder_empty() {
2640 let mut builder =
2641 PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2642 assert!(builder.is_empty());
2643
2644 let mut factory = MockTransactionFactory::default();
2645 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2646 builder.push(&tx);
2647 assert!(!builder.is_empty());
2648
2649 let txs = builder.build();
2650 assert!(txs.full.is_none());
2651 let txs = txs.pooled.unwrap();
2652 assert_eq!(txs.len(), 1);
2653 }
2654
2655 #[test]
2656 fn test_transaction_builder_large() {
2657 let mut builder =
2658 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2659 assert!(builder.is_empty());
2660
2661 let mut factory = MockTransactionFactory::default();
2662 let mut tx = factory.create_eip1559();
2663 tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2665 let tx = Arc::new(tx);
2666 let tx = PropagateTransaction::pool_tx(tx);
2667 builder.push(&tx);
2668 assert!(!builder.is_empty());
2669
2670 let txs = builder.clone().build();
2671 assert!(txs.pooled.is_none());
2672 let txs = txs.full.unwrap();
2673 assert_eq!(txs.len(), 1);
2674
2675 builder.push(&tx);
2676
2677 let txs = builder.clone().build();
2678 let pooled = txs.pooled.unwrap();
2679 assert_eq!(pooled.len(), 1);
2680 let txs = txs.full.unwrap();
2681 assert_eq!(txs.len(), 1);
2682 }
2683
2684 #[test]
2685 fn test_transaction_builder_eip4844() {
2686 let mut builder =
2687 PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2688 assert!(builder.is_empty());
2689
2690 let mut factory = MockTransactionFactory::default();
2691 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2692 builder.push(&tx);
2693 assert!(!builder.is_empty());
2694
2695 let txs = builder.clone().build();
2696 assert!(txs.full.is_none());
2697 let txs = txs.pooled.unwrap();
2698 assert_eq!(txs.len(), 1);
2699
2700 let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2701 builder.push(&tx);
2702
2703 let txs = builder.clone().build();
2704 let pooled = txs.pooled.unwrap();
2705 assert_eq!(pooled.len(), 1);
2706 let txs = txs.full.unwrap();
2707 assert_eq!(txs.len(), 1);
2708 }
2709
2710 #[tokio::test]
2711 async fn test_propagate_full() {
2712 reth_tracing::init_test_tracing();
2713
2714 let (mut tx_manager, network) = new_tx_manager().await;
2715 let peer_id = PeerId::random();
2716
2717 network.handle().update_sync_state(SyncState::Idle);
2719
2720 let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2722
2723 let session_info = SessionInfo {
2724 peer_id,
2725 remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2726 client_version: Arc::from(""),
2727 capabilities: Arc::new(vec![].into()),
2728 status: Arc::new(Default::default()),
2729 version: EthVersion::Eth68,
2730 peer_kind: PeerKind::Basic,
2731 };
2732 let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2733 tx_manager
2734 .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2735 let mut propagate = vec![];
2736 let mut factory = MockTransactionFactory::default();
2737 let eip1559_tx = Arc::new(factory.create_eip1559());
2738 propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2739 let eip4844_tx = Arc::new(factory.create_eip4844());
2740 propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2741
2742 let propagated =
2743 tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2744 assert_eq!(propagated.0.len(), 2);
2745 let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2746 assert_eq!(prop_txs.len(), 1);
2747 assert!(prop_txs[0].is_full());
2748
2749 let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2750 assert_eq!(prop_txs.len(), 1);
2751 assert!(prop_txs[0].is_hash());
2752
2753 let peer = tx_manager.peers.get(&peer_id).unwrap();
2754 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2755 assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2756 peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2757
2758 let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2760 assert!(propagated.0.is_empty());
2761 }
2762}