reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3/// Aggregation on configurable parameters for [`TransactionsManager`].
4pub mod config;
5/// Default and spec'd bounds.
6pub mod constants;
7/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
8pub mod fetcher;
9pub mod validation;
10
11pub use self::constants::{
12    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
13    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
14};
15pub use config::{TransactionFetcherConfig, TransactionPropagationMode, TransactionsManagerConfig};
16pub use validation::*;
17
18pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
19
20use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
21use crate::{
22    budget::{
23        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
24        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
25        DEFAULT_BUDGET_TRY_DRAIN_STREAM,
26    },
27    cache::LruCache,
28    duration_metered_exec, metered_poll_nested_stream_with_budget,
29    metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
30    NetworkHandle,
31};
32use alloy_primitives::{TxHash, B256};
33use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
34use futures::{stream::FuturesUnordered, Future, StreamExt};
35use reth_eth_wire::{
36    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
37    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
38    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
39    RequestTxHashes, Transactions,
40};
41use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
42use reth_network_api::{
43    events::{PeerEvent, SessionInfo},
44    NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers,
45};
46use reth_network_p2p::{
47    error::{RequestError, RequestResult},
48    sync::SyncStateProvider,
49};
50use reth_network_peers::PeerId;
51use reth_network_types::ReputationChangeKind;
52use reth_primitives::TransactionSigned;
53use reth_primitives_traits::SignedTransaction;
54use reth_tokio_util::EventStream;
55use reth_transaction_pool::{
56    error::{PoolError, PoolResult},
57    GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
58    TransactionPool, ValidPoolTransaction,
59};
60use std::{
61    collections::{hash_map::Entry, HashMap, HashSet},
62    pin::Pin,
63    sync::{
64        atomic::{AtomicUsize, Ordering},
65        Arc,
66    },
67    task::{Context, Poll},
68    time::{Duration, Instant},
69};
70use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
71use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
72use tracing::{debug, trace};
73
74/// The future for importing transactions into the pool.
75///
76/// Resolves with the result of each transaction import.
77pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
78
79/// Api to interact with [`TransactionsManager`] task.
80///
81/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
82/// with the [`TransactionsManager`] task once it is spawned.
83///
84/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
85/// known by a specific peer.
86#[derive(Debug, Clone)]
87pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
88    /// Command channel to the [`TransactionsManager`]
89    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
90}
91
92/// Implementation of the `TransactionsHandle` API for use in testnet via type
93/// [`PeerHandle`](crate::test_utils::PeerHandle).
94impl<N: NetworkPrimitives> TransactionsHandle<N> {
95    fn send(&self, cmd: TransactionsCommand<N>) {
96        let _ = self.manager_tx.send(cmd);
97    }
98
99    /// Fetch the [`PeerRequestSender`] for the given peer.
100    async fn peer_handle(
101        &self,
102        peer_id: PeerId,
103    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
104        let (tx, rx) = oneshot::channel();
105        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
106        rx.await
107    }
108
109    /// Manually propagate the transaction that belongs to the hash.
110    pub fn propagate(&self, hash: TxHash) {
111        self.send(TransactionsCommand::PropagateHash(hash))
112    }
113
114    /// Manually propagate the transaction hash to a specific peer.
115    ///
116    /// Note: this only propagates if the pool contains the transaction.
117    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
118        self.propagate_hashes_to(Some(hash), peer)
119    }
120
121    /// Manually propagate the transaction hashes to a specific peer.
122    ///
123    /// Note: this only propagates the transactions that are known to the pool.
124    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
125        let hashes = hash.into_iter().collect::<Vec<_>>();
126        if hashes.is_empty() {
127            return
128        }
129        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
130    }
131
132    /// Request the active peer IDs from the [`TransactionsManager`].
133    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
134        let (tx, rx) = oneshot::channel();
135        self.send(TransactionsCommand::GetActivePeers(tx));
136        rx.await
137    }
138
139    /// Manually propagate full transaction hashes to a specific peer.
140    ///
141    /// Do nothing if transactions are empty.
142    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
143        if transactions.is_empty() {
144            return
145        }
146        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
147    }
148
149    /// Manually propagate the given transaction hashes to all peers.
150    ///
151    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
152    /// full.
153    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
154        if transactions.is_empty() {
155            return
156        }
157        self.send(TransactionsCommand::PropagateTransactions(transactions))
158    }
159
160    /// Manually propagate the given transactions to all peers.
161    ///
162    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
163    /// full.
164    pub fn broadcast_transactions(
165        &self,
166        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
167    ) {
168        let transactions =
169            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
170        if transactions.is_empty() {
171            return
172        }
173        self.send(TransactionsCommand::BroadcastTransactions(transactions))
174    }
175
176    /// Request the transaction hashes known by specific peers.
177    pub async fn get_transaction_hashes(
178        &self,
179        peers: Vec<PeerId>,
180    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
181        if peers.is_empty() {
182            return Ok(Default::default())
183        }
184        let (tx, rx) = oneshot::channel();
185        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
186        rx.await
187    }
188
189    /// Request the transaction hashes known by a specific peer.
190    pub async fn get_peer_transaction_hashes(
191        &self,
192        peer: PeerId,
193    ) -> Result<HashSet<TxHash>, RecvError> {
194        let res = self.get_transaction_hashes(vec![peer]).await?;
195        Ok(res.into_values().next().unwrap_or_default())
196    }
197
198    /// Requests the transactions directly from the given peer.
199    ///
200    /// Returns `None` if the peer is not connected.
201    ///
202    /// **Note**: this returns the response from the peer as received.
203    pub async fn get_pooled_transactions_from(
204        &self,
205        peer_id: PeerId,
206        hashes: Vec<B256>,
207    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
208        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
209
210        let (tx, rx) = oneshot::channel();
211        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
212        peer.try_send(request).ok();
213
214        rx.await?.map(|res| Some(res.0))
215    }
216}
217
218/// Manages transactions on top of the p2p network.
219///
220/// This can be spawned to another task and is supposed to be run as background service.
221/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
222/// interact with it.
223///
224/// The [`TransactionsManager`] is responsible for:
225///    - handling incoming eth messages for transactions.
226///    - serving transaction requests.
227///    - propagate transactions
228///
229/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
230///   - receives incoming network messages.
231///   - sends messages to dispatch (responses, propagate tx)
232///
233/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
234/// propagate new transactions over the network.
235#[derive(Debug)]
236#[must_use = "Manager does nothing unless polled."]
237pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
238    /// Access to the transaction pool.
239    pool: Pool,
240    /// Network access.
241    network: NetworkHandle<N>,
242    /// Subscriptions to all network related events.
243    ///
244    /// From which we get all new incoming transaction related messages.
245    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
246    /// Transaction fetcher to handle inflight and missing transaction requests.
247    transaction_fetcher: TransactionFetcher<N>,
248    /// All currently pending transactions grouped by peers.
249    ///
250    /// This way we can track incoming transactions and prevent multiple pool imports for the same
251    /// transaction
252    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
253    /// Transactions that are currently imported into the `Pool`.
254    ///
255    /// The import process includes:
256    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
257    ///    valid, or for 4844 transaction the blobs are valid. See also
258    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
259    /// - if the transaction is valid, it is added into the pool.
260    ///
261    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
262    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
263    /// receiver.
264    pool_imports: FuturesUnordered<PoolImportFuture>,
265    /// Stats on pending pool imports that help the node self-monitor.
266    pending_pool_imports_info: PendingPoolImportsInfo,
267    /// Bad imports.
268    bad_imports: LruCache<TxHash>,
269    /// All the connected peers.
270    peers: HashMap<PeerId, PeerMetadata<N>>,
271    /// Send half for the command channel.
272    ///
273    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
274    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
275    /// Incoming commands from [`TransactionsHandle`].
276    ///
277    /// This will only receive commands if a user manually sends a command to the manager through
278    /// the [`TransactionsHandle`] to interact with this type directly.
279    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
280    /// A stream that yields new __pending__ transactions.
281    ///
282    /// A transaction is considered __pending__ if it is executable on the current state of the
283    /// chain. In other words, this only yields transactions that satisfy all consensus
284    /// requirements, these include:
285    ///   - no nonce gaps
286    ///   - all dynamic fee requirements are (currently) met
287    ///   - account has enough balance to cover the transaction's gas
288    pending_transactions: ReceiverStream<TxHash>,
289    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
290    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
291    /// How the `TransactionsManager` is configured.
292    config: TransactionsManagerConfig,
293    /// `TransactionsManager` metrics
294    metrics: TransactionsManagerMetrics,
295}
296
297impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
298    /// Sets up a new instance.
299    ///
300    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
301    pub fn new(
302        network: NetworkHandle<N>,
303        pool: Pool,
304        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
305        transactions_manager_config: TransactionsManagerConfig,
306    ) -> Self {
307        let network_events = network.event_listener();
308
309        let (command_tx, command_rx) = mpsc::unbounded_channel();
310
311        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
312            &transactions_manager_config.transaction_fetcher_config,
313        );
314
315        // install a listener for new __pending__ transactions that are allowed to be propagated
316        // over the network
317        let pending = pool.pending_transactions_listener();
318        let pending_pool_imports_info = PendingPoolImportsInfo::default();
319        let metrics = TransactionsManagerMetrics::default();
320        metrics
321            .capacity_pending_pool_imports
322            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
323
324        Self {
325            pool,
326            network,
327            network_events,
328            transaction_fetcher,
329            transactions_by_peers: Default::default(),
330            pool_imports: Default::default(),
331            pending_pool_imports_info: PendingPoolImportsInfo::new(
332                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
333            ),
334            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
335            peers: Default::default(),
336            command_tx,
337            command_rx: UnboundedReceiverStream::new(command_rx),
338            pending_transactions: ReceiverStream::new(pending),
339            transaction_events: UnboundedMeteredReceiver::new(
340                from_network,
341                NETWORK_POOL_TRANSACTIONS_SCOPE,
342            ),
343            config: transactions_manager_config,
344            metrics,
345        }
346    }
347
348    /// Returns a new handle that can send commands to this type.
349    pub fn handle(&self) -> TransactionsHandle<N> {
350        TransactionsHandle { manager_tx: self.command_tx.clone() }
351    }
352
353    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
354    /// `false` if [`TransactionsManager`] is operating close to full capacity.
355    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
356        self.pending_pool_imports_info
357            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
358            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
359    }
360
361    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
362        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
363        self.metrics.reported_bad_transactions.increment(1);
364    }
365
366    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
367        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
368        self.network.reputation_change(peer_id, kind);
369    }
370
371    fn report_already_seen(&self, peer_id: PeerId) {
372        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
373        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
374    }
375
376    /// Clear the transaction
377    fn on_good_import(&mut self, hash: TxHash) {
378        self.transactions_by_peers.remove(&hash);
379    }
380
381    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
382    /// fetching or importing it again.
383    ///
384    /// Errors that count as bad transactions are:
385    ///
386    /// - intrinsic gas too low
387    /// - exceeds gas limit
388    /// - gas uint overflow
389    /// - exceeds max init code size
390    /// - oversized data
391    /// - signer account has bytecode
392    /// - chain id mismatch
393    /// - old legacy chain id
394    /// - tx type not supported
395    ///
396    /// (and additionally for blobs txns...)
397    ///
398    /// - no blobs
399    /// - too many blobs
400    /// - invalid kzg proof
401    /// - kzg error
402    /// - not blob transaction (tx type mismatch)
403    /// - wrong versioned kzg commitment hash
404    fn on_bad_import(&mut self, err: PoolError) {
405        let peers = self.transactions_by_peers.remove(&err.hash);
406
407        // if we're _currently_ syncing, we ignore a bad transaction
408        if !err.is_bad_transaction() || self.network.is_syncing() {
409            return
410        }
411        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
412        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
413        if let Some(peers) = peers {
414            for peer_id in peers {
415                self.report_peer_bad_transactions(peer_id);
416            }
417        }
418        self.metrics.bad_imports.increment(1);
419        self.bad_imports.insert(err.hash);
420    }
421
422    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
423    fn on_fetch_hashes_pending_fetch(&mut self) {
424        // try drain transaction hashes pending fetch
425        let info = &self.pending_pool_imports_info;
426        let max_pending_pool_imports = info.max_pending_pool_imports;
427        let has_capacity_wrt_pending_pool_imports =
428            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
429
430        self.transaction_fetcher
431            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
432    }
433
434    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
435        let kind = match req_err {
436            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
437            RequestError::Timeout => ReputationChangeKind::Timeout,
438            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
439                // peer is already disconnected
440                return
441            }
442            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
443        };
444        self.report_peer(peer_id, kind);
445    }
446
447    #[inline]
448    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
449        let metrics = &self.metrics;
450
451        let TxManagerPollDurations {
452            acc_network_events,
453            acc_pending_imports,
454            acc_tx_events,
455            acc_imported_txns,
456            acc_fetch_events,
457            acc_pending_fetch,
458            acc_cmds,
459        } = poll_durations;
460
461        // update metrics for whole poll function
462        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
463        // update metrics for nested expressions
464        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
465        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
466        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
467        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
468        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
469        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
470        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
471    }
472}
473
474impl<Pool, N> TransactionsManager<Pool, N>
475where
476    Pool: TransactionPool,
477    N: NetworkPrimitives,
478{
479    /// Processes a batch import results.
480    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
481        for res in batch_results {
482            match res {
483                Ok(hash) => {
484                    self.on_good_import(hash);
485                }
486                Err(err) => {
487                    self.on_bad_import(err);
488                }
489            }
490        }
491    }
492
493    /// Request handler for an incoming `NewPooledTransactionHashes`
494    fn on_new_pooled_transaction_hashes(
495        &mut self,
496        peer_id: PeerId,
497        msg: NewPooledTransactionHashes,
498    ) {
499        // If the node is initially syncing, ignore transactions
500        if self.network.is_initially_syncing() {
501            return
502        }
503        if self.network.tx_gossip_disabled() {
504            return
505        }
506
507        // get handle to peer's session, if the session is still active
508        let Some(peer) = self.peers.get_mut(&peer_id) else {
509            trace!(
510                peer_id = format!("{peer_id:#}"),
511                ?msg,
512                "discarding announcement from inactive peer"
513            );
514
515            return
516        };
517        let client = peer.client_version.clone();
518
519        // keep track of the transactions the peer knows
520        let mut count_txns_already_seen_by_peer = 0;
521        for tx in msg.iter_hashes().copied() {
522            if !peer.seen_transactions.insert(tx) {
523                count_txns_already_seen_by_peer += 1;
524            }
525        }
526        if count_txns_already_seen_by_peer > 0 {
527            // this may occur if transactions are sent or announced to a peer, at the same time as
528            // the peer sends/announces those hashes to us. this is because, marking
529            // txns as seen by a peer is done optimistically upon sending them to the
530            // peer.
531            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
532            self.metrics
533                .occurrences_hash_already_seen_by_peer
534                .increment(count_txns_already_seen_by_peer);
535
536            trace!(target: "net::tx",
537                %count_txns_already_seen_by_peer,
538                peer_id=format!("{peer_id:#}"),
539                ?client,
540                "Peer sent hashes that have already been marked as seen by peer"
541            );
542
543            self.report_already_seen(peer_id);
544        }
545
546        // 1. filter out spam
547        let (validation_outcome, mut partially_valid_msg) =
548            self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
549
550        if validation_outcome == FilterOutcome::ReportPeer {
551            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
552        }
553
554        // 2. filter out transactions pending import to pool
555        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
556
557        // 3. filter out known hashes
558        //
559        // known txns have already been successfully fetched or received over gossip.
560        //
561        // most hashes will be filtered out here since this the mempool protocol is a gossip
562        // protocol, healthy peers will send many of the same hashes.
563        //
564        let hashes_count_pre_pool_filter = partially_valid_msg.len();
565        self.pool.retain_unknown(&mut partially_valid_msg);
566        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
567            let already_known_hashes_count =
568                hashes_count_pre_pool_filter - partially_valid_msg.len();
569            self.metrics
570                .occurrences_hashes_already_in_pool
571                .increment(already_known_hashes_count as u64);
572        }
573
574        if partially_valid_msg.is_empty() {
575            // nothing to request
576            return
577        }
578
579        // 4. filter out invalid entries (spam)
580        //
581        // validates messages with respect to the given network, e.g. allowed tx types
582        //
583        let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
584            .msg_version()
585            .expect("partially valid announcement should have version")
586            .is_eth68()
587        {
588            // validate eth68 announcement data
589            self.transaction_fetcher
590                .filter_valid_message
591                .filter_valid_entries_68(partially_valid_msg)
592        } else {
593            // validate eth66 announcement data
594            self.transaction_fetcher
595                .filter_valid_message
596                .filter_valid_entries_66(partially_valid_msg)
597        };
598
599        if validation_outcome == FilterOutcome::ReportPeer {
600            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
601        }
602
603        if valid_announcement_data.is_empty() {
604            // no valid announcement data
605            return
606        }
607
608        // 5. filter out already seen unknown hashes
609        //
610        // seen hashes are already in the tx fetcher, pending fetch.
611        //
612        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
613        // fetcher, hence they should be valid at this point.
614        let bad_imports = &self.bad_imports;
615        self.transaction_fetcher.filter_unseen_and_pending_hashes(
616            &mut valid_announcement_data,
617            |hash| bad_imports.contains(hash),
618            &peer_id,
619            &client,
620        );
621
622        if valid_announcement_data.is_empty() {
623            // nothing to request
624            return
625        }
626
627        trace!(target: "net::tx::propagation",
628            peer_id=format!("{peer_id:#}"),
629            hashes_len=valid_announcement_data.iter().count(),
630            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
631            msg_version=%valid_announcement_data.msg_version(),
632            client_version=%client,
633            "received previously unseen and pending hashes in announcement from peer"
634        );
635
636        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
637        // fallback
638        if !self.transaction_fetcher.is_idle(&peer_id) {
639            // load message version before announcement data is destructed in packing
640            let msg_version = valid_announcement_data.msg_version();
641            let (hashes, _version) = valid_announcement_data.into_request_hashes();
642
643            trace!(target: "net::tx",
644                peer_id=format!("{peer_id:#}"),
645                hashes=?*hashes,
646                %msg_version,
647                %client,
648                "buffering hashes announced by busy peer"
649            );
650
651            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
652
653            return
654        }
655
656        let mut hashes_to_request =
657            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
658        let surplus_hashes =
659            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
660
661        if !surplus_hashes.is_empty() {
662            trace!(target: "net::tx",
663                peer_id=format!("{peer_id:#}"),
664                surplus_hashes=?*surplus_hashes,
665                %client,
666                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
667            );
668
669            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
670        }
671
672        trace!(target: "net::tx",
673            peer_id=format!("{peer_id:#}"),
674            hashes=?*hashes_to_request,
675            %client,
676            "sending hashes in `GetPooledTransactions` request to peer's session"
677        );
678
679        // request the missing transactions
680        //
681        // get handle to peer's session again, at this point we know it exists
682        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
683        if let Some(failed_to_request_hashes) =
684            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
685        {
686            let conn_eth_version = peer.version;
687
688            trace!(target: "net::tx",
689                peer_id=format!("{peer_id:#}"),
690                failed_to_request_hashes=?*failed_to_request_hashes,
691                %conn_eth_version,
692                %client,
693                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
694            );
695            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
696        }
697    }
698}
699
700impl<Pool, N> TransactionsManager<Pool, N>
701where
702    Pool: TransactionPool + 'static,
703    N: NetworkPrimitives<
704        BroadcastedTransaction: SignedTransaction,
705        PooledTransaction: SignedTransaction,
706    >,
707    Pool::Transaction:
708        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
709{
710    /// Invoked when transactions in the local mempool are considered __pending__.
711    ///
712    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
713    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
714    /// messages. The Transactions message relays complete transaction objects and is typically
715    /// sent to a small, random fraction of connected peers.
716    ///
717    /// All other peers receive a notification of the transaction hash and can request the
718    /// complete transaction object if it is unknown to them. The dissemination of complete
719    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
720    /// and won't need to request it.
721    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
722        // Nothing to propagate while initially syncing
723        if self.network.is_initially_syncing() {
724            return
725        }
726        if self.network.tx_gossip_disabled() {
727            return
728        }
729
730        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
731
732        self.propagate_all(hashes);
733    }
734
735    /// Propagate the full transactions to a specific peer.
736    ///
737    /// Returns the propagated transactions.
738    fn propagate_full_transactions_to_peer(
739        &mut self,
740        txs: Vec<TxHash>,
741        peer_id: PeerId,
742        propagation_mode: PropagationMode,
743    ) -> Option<PropagatedTransactions> {
744        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
745
746        let peer = self.peers.get_mut(&peer_id)?;
747        let mut propagated = PropagatedTransactions::default();
748
749        // filter all transactions unknown to the peer
750        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
751
752        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
753
754        if propagation_mode.is_forced() {
755            // skip cache check if forced
756            full_transactions.extend(to_propagate);
757        } else {
758            // Iterate through the transactions to propagate and fill the hashes and full
759            // transaction
760            for tx in to_propagate {
761                if !peer.seen_transactions.contains(tx.tx_hash()) {
762                    // Only include if the peer hasn't seen the transaction
763                    full_transactions.push(&tx);
764                }
765            }
766        }
767
768        if full_transactions.is_empty() {
769            // nothing to propagate
770            return None
771        }
772
773        let PropagateTransactions { pooled, full } = full_transactions.build();
774
775        // send hashes if any
776        if let Some(new_pooled_hashes) = pooled {
777            for hash in new_pooled_hashes.iter_hashes().copied() {
778                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
779                // mark transaction as seen by peer
780                peer.seen_transactions.insert(hash);
781            }
782
783            // send hashes of transactions
784            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
785        }
786
787        // send full transactions, if any
788        if let Some(new_full_transactions) = full {
789            for tx in &new_full_transactions {
790                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
791                // mark transaction as seen by peer
792                peer.seen_transactions.insert(*tx.tx_hash());
793            }
794
795            // send full transactions
796            self.network.send_transactions(peer_id, new_full_transactions);
797        }
798
799        // Update propagated transactions metrics
800        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
801
802        Some(propagated)
803    }
804
805    /// Propagate the transaction hashes to the given peer
806    ///
807    /// Note: This will only send the hashes for transactions that exist in the pool.
808    fn propagate_hashes_to(
809        &mut self,
810        hashes: Vec<TxHash>,
811        peer_id: PeerId,
812        propagation_mode: PropagationMode,
813    ) {
814        trace!(target: "net::tx", "Start propagating transactions as hashes");
815
816        // This fetches a transactions from the pool, including the blob transactions, which are
817        // only ever sent as hashes.
818        let propagated = {
819            let Some(peer) = self.peers.get_mut(&peer_id) else {
820                // no such peer
821                return
822            };
823
824            let to_propagate = self
825                .pool
826                .get_all(hashes)
827                .into_iter()
828                .map(PropagateTransaction::pool_tx)
829                .collect::<Vec<_>>();
830
831            let mut propagated = PropagatedTransactions::default();
832
833            // check if transaction is known to peer
834            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
835
836            if propagation_mode.is_forced() {
837                hashes.extend(to_propagate)
838            } else {
839                for tx in to_propagate {
840                    if !peer.seen_transactions.contains(tx.tx_hash()) {
841                        // Include if the peer hasn't seen it
842                        hashes.push(&tx);
843                    }
844                }
845            }
846
847            let new_pooled_hashes = hashes.build();
848
849            if new_pooled_hashes.is_empty() {
850                // nothing to propagate
851                return
852            }
853
854            for hash in new_pooled_hashes.iter_hashes().copied() {
855                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
856            }
857
858            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
859
860            // send hashes of transactions
861            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
862
863            // Update propagated transactions metrics
864            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
865
866            propagated
867        };
868
869        // notify pool so events get fired
870        self.pool.on_propagated(propagated);
871    }
872
873    /// Propagate the transactions to all connected peers either as full objects or hashes.
874    ///
875    /// The message for new pooled hashes depends on the negotiated version of the stream.
876    /// See [`NewPooledTransactionHashes`]
877    ///
878    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
879    fn propagate_transactions(
880        &mut self,
881        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
882        propagation_mode: PropagationMode,
883    ) -> PropagatedTransactions {
884        let mut propagated = PropagatedTransactions::default();
885        if self.network.tx_gossip_disabled() {
886            return propagated
887        }
888
889        // send full transactions to a set of the connected peers based on the configured mode
890        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
891
892        // Note: Assuming ~random~ order due to random state of the peers map hasher
893        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
894            // determine whether to send full tx objects or hashes.
895            let mut builder = if peer_idx > max_num_full {
896                PropagateTransactionsBuilder::pooled(peer.version)
897            } else {
898                PropagateTransactionsBuilder::full(peer.version)
899            };
900
901            if propagation_mode.is_forced() {
902                builder.extend(to_propagate.iter());
903            } else {
904                // Iterate through the transactions to propagate and fill the hashes and full
905                // transaction lists, before deciding whether or not to send full transactions to
906                // the peer.
907                for tx in &to_propagate {
908                    // Only proceed if the transaction is not in the peer's list of seen
909                    // transactions
910                    if !peer.seen_transactions.contains(tx.tx_hash()) {
911                        builder.push(tx);
912                    }
913                }
914            }
915
916            if builder.is_empty() {
917                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
918                continue
919            }
920
921            let PropagateTransactions { pooled, full } = builder.build();
922
923            // send hashes if any
924            if let Some(mut new_pooled_hashes) = pooled {
925                // enforce tx soft limit per message for the (unlikely) event the number of
926                // hashes exceeds it
927                new_pooled_hashes
928                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
929
930                for hash in new_pooled_hashes.iter_hashes().copied() {
931                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
932                    // mark transaction as seen by peer
933                    peer.seen_transactions.insert(hash);
934                }
935
936                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
937
938                // send hashes of transactions
939                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
940            }
941
942            // send full transactions, if any
943            if let Some(new_full_transactions) = full {
944                for tx in &new_full_transactions {
945                    propagated
946                        .0
947                        .entry(*tx.tx_hash())
948                        .or_default()
949                        .push(PropagateKind::Full(*peer_id));
950                    // mark transaction as seen by peer
951                    peer.seen_transactions.insert(*tx.tx_hash());
952                }
953
954                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
955
956                // send full transactions
957                self.network.send_transactions(*peer_id, new_full_transactions);
958            }
959        }
960
961        // Update propagated transactions metrics
962        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
963
964        propagated
965    }
966
967    /// Propagates the given transactions to the peers
968    ///
969    /// This fetches all transaction from the pool, including the 4844 blob transactions but
970    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
971    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
972        let propagated = self.propagate_transactions(
973            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
974            PropagationMode::Basic,
975        );
976
977        // notify pool so events get fired
978        self.pool.on_propagated(propagated);
979    }
980
981    /// Request handler for an incoming request for transactions
982    fn on_get_pooled_transactions(
983        &mut self,
984        peer_id: PeerId,
985        request: GetPooledTransactions,
986        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
987    ) {
988        if let Some(peer) = self.peers.get_mut(&peer_id) {
989            if self.network.tx_gossip_disabled() {
990                let _ = response.send(Ok(PooledTransactions::default()));
991                return
992            }
993            let transactions = self.pool.get_pooled_transaction_elements(
994                request.0,
995                GetPooledTransactionLimit::ResponseSizeSoftLimit(
996                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
997                ),
998            );
999            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1000
1001            // we sent a response at which point we assume that the peer is aware of the
1002            // transactions
1003            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1004
1005            let resp = PooledTransactions(transactions);
1006            let _ = response.send(Ok(resp));
1007        }
1008    }
1009
1010    /// Handles a command received from a detached [`TransactionsHandle`]
1011    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1012        match cmd {
1013            TransactionsCommand::PropagateHash(hash) => {
1014                self.on_new_pending_transactions(vec![hash])
1015            }
1016            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1017                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1018            }
1019            TransactionsCommand::GetActivePeers(tx) => {
1020                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1021                tx.send(peers).ok();
1022            }
1023            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1024                if let Some(propagated) =
1025                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1026                {
1027                    self.pool.on_propagated(propagated);
1028                }
1029            }
1030            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1031            TransactionsCommand::BroadcastTransactions(txs) => {
1032                self.propagate_transactions(txs, PropagationMode::Forced);
1033            }
1034            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1035                let mut res = HashMap::with_capacity(peers.len());
1036                for peer_id in peers {
1037                    let hashes = self
1038                        .peers
1039                        .get(&peer_id)
1040                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1041                        .unwrap_or_default();
1042                    res.insert(peer_id, hashes);
1043                }
1044                tx.send(res).ok();
1045            }
1046            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1047                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1048                peer_request_sender.send(sender).ok();
1049            }
1050        }
1051    }
1052
1053    /// Handles session establishment and peer transactions initialization.
1054    fn handle_peer_session(
1055        &mut self,
1056        info: SessionInfo,
1057        messages: PeerRequestSender<PeerRequest<N>>,
1058    ) {
1059        let SessionInfo { peer_id, client_version, version, .. } = info;
1060
1061        // Insert a new peer into the peerset.
1062        let peer = PeerMetadata::<N>::new(
1063            messages,
1064            version,
1065            client_version,
1066            self.config.max_transactions_seen_by_peer_history,
1067        );
1068        let peer = match self.peers.entry(peer_id) {
1069            Entry::Occupied(mut entry) => {
1070                entry.insert(peer);
1071                entry.into_mut()
1072            }
1073            Entry::Vacant(entry) => entry.insert(peer),
1074        };
1075
1076        // Send a `NewPooledTransactionHashes` to the peer with up to
1077        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1078        // transactions in the pool.
1079        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1080            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1081            return
1082        }
1083
1084        // Get transactions to broadcast
1085        let pooled_txs = self.pool.pooled_transactions_max(
1086            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1087        );
1088        if pooled_txs.is_empty() {
1089            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1090            return;
1091        }
1092
1093        // Build and send transaction hashes message
1094        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1095        for pooled_tx in pooled_txs {
1096            peer.seen_transactions.insert(*pooled_tx.hash());
1097            msg_builder.push_pooled(pooled_tx);
1098        }
1099
1100        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1101        let msg = msg_builder.build();
1102        self.network.send_transactions_hashes(peer_id, msg);
1103    }
1104
1105    /// Handles a received event related to common network events.
1106    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1107        match event_result {
1108            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1109                // remove the peer
1110                self.peers.remove(&peer_id);
1111                self.transaction_fetcher.remove_peer(&peer_id);
1112            }
1113            NetworkEvent::ActivePeerSession { info, messages } => {
1114                // process active peer session and broadcast available transaction from the pool
1115                self.handle_peer_session(info, messages);
1116            }
1117            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1118                let peer_id = info.peer_id;
1119                // get messages from existing peer
1120                let messages = match self.peers.get(&peer_id) {
1121                    Some(p) => p.request_tx.clone(),
1122                    None => {
1123                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1124                        return;
1125                    }
1126                };
1127                self.handle_peer_session(info, messages);
1128            }
1129            _ => {}
1130        }
1131    }
1132
1133    /// Handles dedicated transaction events related to the `eth` protocol.
1134    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1135        match event {
1136            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1137                // ensure we didn't receive any blob transactions as these are disallowed to be
1138                // broadcasted in full
1139
1140                let has_blob_txs = msg.has_eip4844();
1141
1142                let non_blob_txs = msg
1143                    .0
1144                    .into_iter()
1145                    .map(N::PooledTransaction::try_from)
1146                    .filter_map(Result::ok)
1147                    .collect();
1148
1149                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1150
1151                if has_blob_txs {
1152                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1153                    self.report_peer_bad_transactions(peer_id);
1154                }
1155            }
1156            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1157                self.on_new_pooled_transaction_hashes(peer_id, msg)
1158            }
1159            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1160                self.on_get_pooled_transactions(peer_id, request, response)
1161            }
1162            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1163                let _ = response.send(Some(self.handle()));
1164            }
1165        }
1166    }
1167
1168    /// Starts the import process for the given transactions.
1169    fn import_transactions(
1170        &mut self,
1171        peer_id: PeerId,
1172        transactions: PooledTransactions<N::PooledTransaction>,
1173        source: TransactionSource,
1174    ) {
1175        // If the node is pipeline syncing, ignore transactions
1176        if self.network.is_initially_syncing() {
1177            return
1178        }
1179        if self.network.tx_gossip_disabled() {
1180            return
1181        }
1182
1183        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1184        let mut transactions = transactions.0;
1185
1186        // mark the transactions as received
1187        self.transaction_fetcher
1188            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1189
1190        // track that the peer knows these transaction, but only if this is a new broadcast.
1191        // If we received the transactions as the response to our `GetPooledTransactions``
1192        // requests (based on received `NewPooledTransactionHashes`) then we already
1193        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1194        let mut num_already_seen_by_peer = 0;
1195        for tx in &transactions {
1196            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1197                num_already_seen_by_peer += 1;
1198            }
1199        }
1200
1201        // 1. filter out txns already inserted into pool
1202        let txns_count_pre_pool_filter = transactions.len();
1203        self.pool.retain_unknown(&mut transactions);
1204        if txns_count_pre_pool_filter > transactions.len() {
1205            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1206            self.metrics
1207                .occurrences_transactions_already_in_pool
1208                .increment(already_known_txns_count as u64);
1209        }
1210
1211        // tracks the quality of the given transactions
1212        let mut has_bad_transactions = false;
1213
1214        // 2. filter out transactions that are invalid or already pending import
1215        if let Some(peer) = self.peers.get_mut(&peer_id) {
1216            // pre-size to avoid reallocations
1217            let mut new_txs = Vec::with_capacity(transactions.len());
1218            for tx in transactions {
1219                // recover transaction
1220                let tx = match tx.try_into_recovered() {
1221                    Ok(tx) => tx,
1222                    Err(badtx) => {
1223                        trace!(target: "net::tx",
1224                            peer_id=format!("{peer_id:#}"),
1225                            hash=%badtx.tx_hash(),
1226                            client_version=%peer.client_version,
1227                            "failed ecrecovery for transaction"
1228                        );
1229                        has_bad_transactions = true;
1230                        continue
1231                    }
1232                };
1233
1234                match self.transactions_by_peers.entry(*tx.tx_hash()) {
1235                    Entry::Occupied(mut entry) => {
1236                        // transaction was already inserted
1237                        entry.get_mut().insert(peer_id);
1238                    }
1239                    Entry::Vacant(entry) => {
1240                        if self.bad_imports.contains(tx.tx_hash()) {
1241                            trace!(target: "net::tx",
1242                                peer_id=format!("{peer_id:#}"),
1243                                hash=%tx.tx_hash(),
1244                                client_version=%peer.client_version,
1245                                "received a known bad transaction from peer"
1246                            );
1247                            has_bad_transactions = true;
1248                        } else {
1249                            // this is a new transaction that should be imported into the pool
1250
1251                            let pool_transaction = Pool::Transaction::from_pooled(tx);
1252                            new_txs.push(pool_transaction);
1253
1254                            entry.insert(HashSet::from([peer_id]));
1255                        }
1256                    }
1257                }
1258            }
1259            new_txs.shrink_to_fit();
1260
1261            // 3. import new transactions as a batch to minimize lock contention on the underlying
1262            // pool
1263            if !new_txs.is_empty() {
1264                let pool = self.pool.clone();
1265                // update metrics
1266                let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1267                metric_pending_pool_imports.increment(new_txs.len() as f64);
1268
1269                // update self-monitoring info
1270                self.pending_pool_imports_info
1271                    .pending_pool_imports
1272                    .fetch_add(new_txs.len(), Ordering::Relaxed);
1273                let tx_manager_info_pending_pool_imports =
1274                    self.pending_pool_imports_info.pending_pool_imports.clone();
1275
1276                trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1277                let import = Box::pin(async move {
1278                    let added = new_txs.len();
1279                    let res = pool.add_external_transactions(new_txs).await;
1280
1281                    // update metrics
1282                    metric_pending_pool_imports.decrement(added as f64);
1283                    // update self-monitoring info
1284                    tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1285
1286                    res
1287                });
1288
1289                self.pool_imports.push(import);
1290            }
1291
1292            if num_already_seen_by_peer > 0 {
1293                self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1294                self.metrics
1295                    .occurrences_of_transaction_already_seen_by_peer
1296                    .increment(num_already_seen_by_peer);
1297                trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1298            }
1299        }
1300
1301        if has_bad_transactions {
1302            // peer sent us invalid transactions
1303            self.report_peer_bad_transactions(peer_id)
1304        }
1305
1306        if num_already_seen_by_peer > 0 {
1307            self.report_already_seen(peer_id);
1308        }
1309    }
1310
1311    /// Processes a [`FetchEvent`].
1312    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1313        match fetch_event {
1314            FetchEvent::TransactionsFetched { peer_id, transactions } => {
1315                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1316            }
1317            FetchEvent::FetchError { peer_id, error } => {
1318                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1319                self.on_request_error(peer_id, error);
1320            }
1321            FetchEvent::EmptyResponse { peer_id } => {
1322                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1323            }
1324        }
1325    }
1326}
1327
1328/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1329/// [`crate::NetworkManager`] for more context on the design pattern.
1330///
1331/// This should be spawned or used as part of `tokio::select!`.
1332//
1333// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1334// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1335impl<Pool, N> Future for TransactionsManager<Pool, N>
1336where
1337    Pool: TransactionPool + Unpin + 'static,
1338    N: NetworkPrimitives<
1339        BroadcastedTransaction: SignedTransaction,
1340        PooledTransaction: SignedTransaction,
1341    >,
1342    Pool::Transaction:
1343        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1344{
1345    type Output = ();
1346
1347    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1348        let start = Instant::now();
1349        let mut poll_durations = TxManagerPollDurations::default();
1350
1351        let this = self.get_mut();
1352
1353        // All streams are polled until their corresponding budget is exhausted, then we manually
1354        // yield back control to tokio. See `NetworkManager` for more context on the design
1355        // pattern.
1356
1357        // Advance network/peer related events (update peers map).
1358        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1359            poll_durations.acc_network_events,
1360            "net::tx",
1361            "Network events stream",
1362            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1363            this.network_events.poll_next_unpin(cx),
1364            |event| this.on_network_event(event)
1365        );
1366
1367        // Advances new __pending__ transactions, transactions that were successfully inserted into
1368        // pending set in pool (are valid), and propagates them (inform peers which
1369        // transactions we have seen).
1370        //
1371        // We try to drain this to batch the transactions in a single message.
1372        //
1373        // We don't expect this buffer to be large, since only pending transactions are
1374        // emitted here.
1375        let mut new_txs = Vec::new();
1376        let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1377            poll_durations.acc_imported_txns,
1378            "net::tx",
1379            "Pending transactions stream",
1380            DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1381            this.pending_transactions.poll_next_unpin(cx),
1382            |hash| new_txs.push(hash)
1383        );
1384        if !new_txs.is_empty() {
1385            this.on_new_pending_transactions(new_txs);
1386        }
1387
1388        // Advance inflight fetch requests (flush transaction fetcher and queue for
1389        // import to pool).
1390        //
1391        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1392        // (2 MiB / 10 bytes > 200k transactions).
1393        //
1394        // Since transactions aren't validated until they are inserted into the pool,
1395        // this can potentially queue >200k transactions for insertion to pool. More
1396        // if the message size is bigger than the soft limit on a `PooledTransactions`
1397        // response which is 2 MiB.
1398        let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1399            poll_durations.acc_fetch_events,
1400            "net::tx",
1401            "Transaction fetch events stream",
1402            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1403            this.transaction_fetcher.poll_next_unpin(cx),
1404            |event| this.on_fetch_event(event),
1405        );
1406
1407        // Advance incoming transaction events (stream new txns/announcements from
1408        // network manager and queue for import to pool/fetch txns).
1409        //
1410        // This will potentially remove hashes from hashes pending fetch, it the event
1411        // is an announcement (if same hashes are announced that didn't fit into a
1412        // previous request).
1413        //
1414        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1415        // (128 KiB / 10 bytes > 13k transactions).
1416        //
1417        // If this is an event with `Transactions` message, since transactions aren't
1418        // validated until they are inserted into the pool, this can potentially queue
1419        // >13k transactions for insertion to pool. More if the message size is bigger
1420        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1421        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1422            poll_durations.acc_tx_events,
1423            "net::tx",
1424            "Network transaction events stream",
1425            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1426            this.transaction_events.poll_next_unpin(cx),
1427            |event| this.on_network_tx_event(event),
1428        );
1429
1430        // Advance pool imports (flush txns to pool).
1431        //
1432        // Note, this is done in batches. A batch is filled from one `Transactions`
1433        // broadcast messages or one `PooledTransactions` response at a time. The
1434        // minimum batch size is 1 transaction (and might often be the case with blob
1435        // transactions).
1436        //
1437        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1438        // (2 MiB / 10 bytes > 200k transactions).
1439        //
1440        // Since transactions aren't validated until they are inserted into the pool,
1441        // this can potentially validate >200k transactions. More if the message size
1442        // is bigger than the soft limit on a `PooledTransactions` response which is
1443        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1444        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1445            poll_durations.acc_pending_imports,
1446            "net::tx",
1447            "Batched pool imports stream",
1448            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1449            this.pool_imports.poll_next_unpin(cx),
1450            |batch_results| this.on_batch_import_result(batch_results)
1451        );
1452
1453        // Tries to drain hashes pending fetch cache if the tx manager currently has
1454        // capacity for this (fetch txns).
1455        //
1456        // Sends at most one request.
1457        duration_metered_exec!(
1458            {
1459                if this.has_capacity_for_fetching_pending_hashes() {
1460                    this.on_fetch_hashes_pending_fetch();
1461                }
1462            },
1463            poll_durations.acc_pending_fetch
1464        );
1465
1466        // Advance commands (propagate/fetch/serve txns).
1467        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1468            poll_durations.acc_cmds,
1469            "net::tx",
1470            "Commands channel",
1471            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1472            this.command_rx.poll_next_unpin(cx),
1473            |cmd| this.on_command(cmd)
1474        );
1475
1476        this.transaction_fetcher.update_metrics();
1477
1478        // all channels are fully drained and import futures pending
1479        if maybe_more_network_events ||
1480            maybe_more_commands ||
1481            maybe_more_tx_events ||
1482            maybe_more_tx_fetch_events ||
1483            maybe_more_pool_imports ||
1484            maybe_more_pending_txns
1485        {
1486            // make sure we're woken up again
1487            cx.waker().wake_by_ref();
1488            return Poll::Pending
1489        }
1490
1491        this.update_poll_metrics(start, poll_durations);
1492
1493        Poll::Pending
1494    }
1495}
1496
1497/// Represents the different modes of transaction propagation.
1498///
1499/// This enum is used to determine how transactions are propagated to peers in the network.
1500#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1501enum PropagationMode {
1502    /// Default propagation mode.
1503    ///
1504    /// Transactions are only sent to peers that haven't seen them yet.
1505    Basic,
1506    /// Forced propagation mode.
1507    ///
1508    /// Transactions are sent to all peers regardless of whether they have been sent or received
1509    /// before.
1510    Forced,
1511}
1512
1513impl PropagationMode {
1514    /// Returns `true` if the propagation kind is `Forced`.
1515    const fn is_forced(self) -> bool {
1516        matches!(self, Self::Forced)
1517    }
1518}
1519
1520/// A transaction that's about to be propagated to multiple peers.
1521#[derive(Debug, Clone)]
1522struct PropagateTransaction<T = TransactionSigned> {
1523    size: usize,
1524    transaction: Arc<T>,
1525}
1526
1527impl<T: SignedTransaction> PropagateTransaction<T> {
1528    /// Create a new instance from a transaction.
1529    pub fn new(transaction: T) -> Self {
1530        let size = transaction.length();
1531        Self { size, transaction: Arc::new(transaction) }
1532    }
1533
1534    /// Create a new instance from a pooled transaction
1535    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1536    where
1537        P: PoolTransaction<Consensus = T>,
1538    {
1539        let size = tx.encoded_length();
1540        let transaction = tx.transaction.clone_into_consensus();
1541        let transaction = Arc::new(transaction.into_inner());
1542        Self { size, transaction }
1543    }
1544
1545    fn tx_hash(&self) -> &TxHash {
1546        self.transaction.tx_hash()
1547    }
1548}
1549
1550/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1551/// should receive them in full or as pooled
1552#[derive(Debug, Clone)]
1553enum PropagateTransactionsBuilder<T> {
1554    Pooled(PooledTransactionsHashesBuilder),
1555    Full(FullTransactionsBuilder<T>),
1556}
1557
1558impl<T> PropagateTransactionsBuilder<T> {
1559    /// Create a builder for pooled transactions
1560    fn pooled(version: EthVersion) -> Self {
1561        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1562    }
1563
1564    /// Create a builder that sends transactions in full and records transactions that don't fit.
1565    fn full(version: EthVersion) -> Self {
1566        Self::Full(FullTransactionsBuilder::new(version))
1567    }
1568
1569    /// Returns true if no transactions are recorded.
1570    fn is_empty(&self) -> bool {
1571        match self {
1572            Self::Pooled(builder) => builder.is_empty(),
1573            Self::Full(builder) => builder.is_empty(),
1574        }
1575    }
1576
1577    /// Consumes the type and returns the built messages that should be sent to the peer.
1578    fn build(self) -> PropagateTransactions<T> {
1579        match self {
1580            Self::Pooled(pooled) => {
1581                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1582            }
1583            Self::Full(full) => full.build(),
1584        }
1585    }
1586}
1587
1588impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1589    /// Appends all transactions
1590    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1591        for tx in txs {
1592            self.push(tx);
1593        }
1594    }
1595
1596    /// Appends a transaction to the list.
1597    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1598        match self {
1599            Self::Pooled(builder) => builder.push(transaction),
1600            Self::Full(builder) => builder.push(transaction),
1601        }
1602    }
1603}
1604
1605/// Represents how the transactions should be sent to a peer if any.
1606struct PropagateTransactions<T> {
1607    /// The pooled transaction hashes to send.
1608    pooled: Option<NewPooledTransactionHashes>,
1609    /// The transactions to send in full.
1610    full: Option<Vec<Arc<T>>>,
1611}
1612
1613/// Helper type for constructing the full transaction message that enforces the
1614/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1615/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1616/// broadcasted in full.
1617#[derive(Debug, Clone)]
1618struct FullTransactionsBuilder<T> {
1619    /// The soft limit to enforce for a single broadcast message of full transactions.
1620    total_size: usize,
1621    /// All transactions to be broadcasted.
1622    transactions: Vec<Arc<T>>,
1623    /// Transactions that didn't fit into the broadcast message
1624    pooled: PooledTransactionsHashesBuilder,
1625}
1626
1627impl<T> FullTransactionsBuilder<T> {
1628    /// Create a builder for the negotiated version of the peer's session
1629    fn new(version: EthVersion) -> Self {
1630        Self {
1631            total_size: 0,
1632            pooled: PooledTransactionsHashesBuilder::new(version),
1633            transactions: vec![],
1634        }
1635    }
1636
1637    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1638    fn is_empty(&self) -> bool {
1639        self.transactions.is_empty() && self.pooled.is_empty()
1640    }
1641
1642    /// Returns the messages that should be propagated to the peer.
1643    fn build(self) -> PropagateTransactions<T> {
1644        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1645        let full = Some(self.transactions).filter(|full| !full.is_empty());
1646        PropagateTransactions { pooled, full }
1647    }
1648}
1649
1650impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1651    /// Appends all transactions.
1652    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1653        for tx in txs {
1654            self.push(&tx)
1655        }
1656    }
1657
1658    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1659    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1660    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1661    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1662    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1663    ///
1664    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1665    /// to list of pooled transactions, (e.g. 4844 transactions).
1666    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1667    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1668        // Do not send full 4844 transaction hashes to peers.
1669        //
1670        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1671        //  Instead, those transactions are only announced using
1672        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1673        //  via `GetPooledTransactions`.
1674        //
1675        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1676        if !transaction.transaction.is_broadcastable_in_full() {
1677            self.pooled.push(transaction);
1678            return
1679        }
1680
1681        let new_size = self.total_size + transaction.size;
1682        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1683            self.total_size > 0
1684        {
1685            // transaction does not fit into the message
1686            self.pooled.push(transaction);
1687            return
1688        }
1689
1690        self.total_size = new_size;
1691        self.transactions.push(Arc::clone(&transaction.transaction));
1692    }
1693}
1694
1695/// A helper type to create the pooled transactions message based on the negotiated version of the
1696/// session with the peer
1697#[derive(Debug, Clone)]
1698enum PooledTransactionsHashesBuilder {
1699    Eth66(NewPooledTransactionHashes66),
1700    Eth68(NewPooledTransactionHashes68),
1701}
1702
1703// === impl PooledTransactionsHashesBuilder ===
1704
1705impl PooledTransactionsHashesBuilder {
1706    /// Push a transaction from the pool to the list.
1707    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1708        match self {
1709            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1710            Self::Eth68(msg) => {
1711                msg.hashes.push(*pooled_tx.hash());
1712                msg.sizes.push(pooled_tx.encoded_length());
1713                msg.types.push(pooled_tx.transaction.ty());
1714            }
1715        }
1716    }
1717
1718    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1719    fn is_empty(&self) -> bool {
1720        match self {
1721            Self::Eth66(hashes) => hashes.is_empty(),
1722            Self::Eth68(hashes) => hashes.is_empty(),
1723        }
1724    }
1725
1726    /// Appends all hashes
1727    fn extend<T: SignedTransaction>(
1728        &mut self,
1729        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1730    ) {
1731        for tx in txs {
1732            self.push(&tx);
1733        }
1734    }
1735
1736    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1737        match self {
1738            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1739            Self::Eth68(msg) => {
1740                msg.hashes.push(*tx.tx_hash());
1741                msg.sizes.push(tx.size);
1742                msg.types.push(tx.transaction.ty());
1743            }
1744        }
1745    }
1746
1747    /// Create a builder for the negotiated version of the peer's session
1748    fn new(version: EthVersion) -> Self {
1749        match version {
1750            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1751            EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1752        }
1753    }
1754
1755    fn build(self) -> NewPooledTransactionHashes {
1756        match self {
1757            Self::Eth66(msg) => msg.into(),
1758            Self::Eth68(msg) => msg.into(),
1759        }
1760    }
1761}
1762
1763/// How we received the transactions.
1764enum TransactionSource {
1765    /// Transactions were broadcast to us via [`Transactions`] message.
1766    Broadcast,
1767    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1768    Response,
1769}
1770
1771// === impl TransactionSource ===
1772
1773impl TransactionSource {
1774    /// Whether the transaction were sent as broadcast.
1775    const fn is_broadcast(&self) -> bool {
1776        matches!(self, Self::Broadcast)
1777    }
1778}
1779
1780/// Tracks a single peer in the context of [`TransactionsManager`].
1781#[derive(Debug)]
1782pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1783    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1784    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1785    /// the peer.
1786    seen_transactions: LruCache<TxHash>,
1787    /// A communication channel directly to the peer's session task.
1788    request_tx: PeerRequestSender<PeerRequest<N>>,
1789    /// negotiated version of the session.
1790    version: EthVersion,
1791    /// The peer's client version.
1792    client_version: Arc<str>,
1793}
1794
1795impl<N: NetworkPrimitives> PeerMetadata<N> {
1796    /// Returns a new instance of [`PeerMetadata`].
1797    fn new(
1798        request_tx: PeerRequestSender<PeerRequest<N>>,
1799        version: EthVersion,
1800        client_version: Arc<str>,
1801        max_transactions_seen_by_peer: u32,
1802    ) -> Self {
1803        Self {
1804            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1805            request_tx,
1806            version,
1807            client_version,
1808        }
1809    }
1810}
1811
1812/// Commands to send to the [`TransactionsManager`]
1813#[derive(Debug)]
1814enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1815    /// Propagate a transaction hash to the network.
1816    PropagateHash(B256),
1817    /// Propagate transaction hashes to a specific peer.
1818    PropagateHashesTo(Vec<B256>, PeerId),
1819    /// Request the list of active peer IDs from the [`TransactionsManager`].
1820    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
1821    /// Propagate a collection of full transactions to a specific peer.
1822    PropagateTransactionsTo(Vec<TxHash>, PeerId),
1823    /// Propagate a collection of hashes to all peers.
1824    PropagateTransactions(Vec<TxHash>),
1825    /// Propagate a collection of broadcastable transactions in full to all peers.
1826    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
1827    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
1828    GetTransactionHashes {
1829        peers: Vec<PeerId>,
1830        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
1831    },
1832    /// Requests a clone of the sender sender channel to the peer.
1833    GetPeerSender {
1834        peer_id: PeerId,
1835        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
1836    },
1837}
1838
1839/// All events related to transactions emitted by the network.
1840#[derive(Debug)]
1841pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
1842    /// Represents the event of receiving a list of transactions from a peer.
1843    ///
1844    /// This indicates transactions that were broadcasted to us from the peer.
1845    IncomingTransactions {
1846        /// The ID of the peer from which the transactions were received.
1847        peer_id: PeerId,
1848        /// The received transactions.
1849        msg: Transactions<N::BroadcastedTransaction>,
1850    },
1851    /// Represents the event of receiving a list of transaction hashes from a peer.
1852    IncomingPooledTransactionHashes {
1853        /// The ID of the peer from which the transaction hashes were received.
1854        peer_id: PeerId,
1855        /// The received new pooled transaction hashes.
1856        msg: NewPooledTransactionHashes,
1857    },
1858    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
1859    GetPooledTransactions {
1860        /// The ID of the peer from which the request was received.
1861        peer_id: PeerId,
1862        /// The received `GetPooledTransactions` request.
1863        request: GetPooledTransactions,
1864        /// The sender for responding to the request with a result of `PooledTransactions`.
1865        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1866    },
1867    /// Represents the event of receiving a `GetTransactionsHandle` request.
1868    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
1869}
1870
1871/// Tracks stats about the [`TransactionsManager`].
1872#[derive(Debug)]
1873pub struct PendingPoolImportsInfo {
1874    /// Number of transactions about to be inserted into the pool.
1875    pending_pool_imports: Arc<AtomicUsize>,
1876    /// Max number of transactions allowed to be imported concurrently.
1877    max_pending_pool_imports: usize,
1878}
1879
1880impl PendingPoolImportsInfo {
1881    /// Returns a new [`PendingPoolImportsInfo`].
1882    pub fn new(max_pending_pool_imports: usize) -> Self {
1883        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
1884    }
1885
1886    /// Returns `true` if the number of pool imports is under a given tolerated max.
1887    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
1888        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
1889    }
1890}
1891
1892impl Default for PendingPoolImportsInfo {
1893    fn default() -> Self {
1894        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
1895    }
1896}
1897
1898#[derive(Debug, Default)]
1899struct TxManagerPollDurations {
1900    acc_network_events: Duration,
1901    acc_pending_imports: Duration,
1902    acc_tx_events: Duration,
1903    acc_imported_txns: Duration,
1904    acc_fetch_events: Duration,
1905    acc_pending_fetch: Duration,
1906    acc_cmds: Duration,
1907}
1908
1909#[cfg(test)]
1910mod tests {
1911    use super::*;
1912    use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
1913    use alloy_consensus::{transaction::PooledTransaction, TxEip1559, TxLegacy};
1914    use alloy_primitives::{hex, PrimitiveSignature as Signature, TxKind, U256};
1915    use alloy_rlp::Decodable;
1916    use constants::tx_fetcher::DEFAULT_MAX_COUNT_FALLBACK_PEERS;
1917    use futures::FutureExt;
1918    use reth_chainspec::MIN_TRANSACTION_GAS;
1919    use reth_ethereum_primitives::{Transaction, TransactionSigned};
1920    use reth_network_api::{NetworkInfo, PeerKind};
1921    use reth_network_p2p::{
1922        error::{RequestError, RequestResult},
1923        sync::{NetworkSyncUpdater, SyncState},
1924    };
1925    use reth_storage_api::noop::NoopProvider;
1926    use reth_transaction_pool::test_utils::{
1927        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
1928    };
1929    use secp256k1::SecretKey;
1930    use std::{
1931        fmt,
1932        future::poll_fn,
1933        hash,
1934        net::{IpAddr, Ipv4Addr, SocketAddr},
1935        str::FromStr,
1936    };
1937    use tests::fetcher::TxFetchMetadata;
1938    use tracing::error;
1939
1940    async fn new_tx_manager(
1941    ) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
1942    {
1943        let secret_key = SecretKey::new(&mut rand::thread_rng());
1944        let client = NoopProvider::default();
1945
1946        let config = NetworkConfigBuilder::new(secret_key)
1947            // let OS choose port
1948            .listener_port(0)
1949            .disable_discovery()
1950            .build(client);
1951
1952        let pool = testing_pool();
1953
1954        let transactions_manager_config = config.transactions_manager_config.clone();
1955        let (_network_handle, network, transactions, _) = NetworkManager::new(config)
1956            .await
1957            .unwrap()
1958            .into_builder()
1959            .transactions(pool.clone(), transactions_manager_config)
1960            .split_with_handle();
1961
1962        (transactions, network)
1963    }
1964
1965    pub(super) fn default_cache<T: hash::Hash + Eq + fmt::Debug>() -> LruCache<T> {
1966        LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32)
1967    }
1968
1969    // Returns (peer, channel-to-send-get-pooled-tx-response-on).
1970    pub(super) fn new_mock_session(
1971        peer_id: PeerId,
1972        version: EthVersion,
1973    ) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
1974        let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
1975
1976        (
1977            PeerMetadata::new(
1978                PeerRequestSender::new(peer_id, to_mock_session_tx),
1979                version,
1980                Arc::from(""),
1981                DEFAULT_MAX_COUNT_TRANSACTIONS_SEEN_BY_PEER,
1982            ),
1983            to_mock_session_rx,
1984        )
1985    }
1986
1987    #[tokio::test(flavor = "multi_thread")]
1988    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
1989        reth_tracing::init_test_tracing();
1990        let net = Testnet::create(3).await;
1991
1992        let mut handles = net.handles();
1993        let handle0 = handles.next().unwrap();
1994        let handle1 = handles.next().unwrap();
1995
1996        drop(handles);
1997        let handle = net.spawn();
1998
1999        let listener0 = handle0.event_listener();
2000        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2001        let secret_key = SecretKey::new(&mut rand::thread_rng());
2002
2003        let client = NoopProvider::default();
2004        let pool = testing_pool();
2005        let config = NetworkConfigBuilder::eth(secret_key)
2006            .disable_discovery()
2007            .listener_port(0)
2008            .build(client);
2009        let transactions_manager_config = config.transactions_manager_config.clone();
2010        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2011            .await
2012            .unwrap()
2013            .into_builder()
2014            .transactions(pool.clone(), transactions_manager_config)
2015            .split_with_handle();
2016
2017        tokio::task::spawn(network);
2018
2019        // go to syncing (pipeline sync)
2020        network_handle.update_sync_state(SyncState::Syncing);
2021        assert!(NetworkInfo::is_syncing(&network_handle));
2022        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2023
2024        // wait for all initiator connections
2025        let mut established = listener0.take(2);
2026        while let Some(ev) = established.next().await {
2027            match ev {
2028                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2029                    // to insert a new peer in transactions peerset
2030                    transactions
2031                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2032                }
2033                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2034                ev => {
2035                    error!("unexpected event {ev:?}")
2036                }
2037            }
2038        }
2039        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2040        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2041        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2042        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2043            peer_id: *handle1.peer_id(),
2044            msg: Transactions(vec![signed_tx.clone()]),
2045        });
2046        poll_fn(|cx| {
2047            let _ = transactions.poll_unpin(cx);
2048            Poll::Ready(())
2049        })
2050        .await;
2051        assert!(pool.is_empty());
2052        handle.terminate().await;
2053    }
2054
2055    #[tokio::test(flavor = "multi_thread")]
2056    async fn test_tx_broadcasts_through_two_syncs() {
2057        reth_tracing::init_test_tracing();
2058        let net = Testnet::create(3).await;
2059
2060        let mut handles = net.handles();
2061        let handle0 = handles.next().unwrap();
2062        let handle1 = handles.next().unwrap();
2063
2064        drop(handles);
2065        let handle = net.spawn();
2066
2067        let listener0 = handle0.event_listener();
2068        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2069        let secret_key = SecretKey::new(&mut rand::thread_rng());
2070
2071        let client = NoopProvider::default();
2072        let pool = testing_pool();
2073        let config = NetworkConfigBuilder::new(secret_key)
2074            .disable_discovery()
2075            .listener_port(0)
2076            .build(client);
2077        let transactions_manager_config = config.transactions_manager_config.clone();
2078        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2079            .await
2080            .unwrap()
2081            .into_builder()
2082            .transactions(pool.clone(), transactions_manager_config)
2083            .split_with_handle();
2084
2085        tokio::task::spawn(network);
2086
2087        // go to syncing (pipeline sync) to idle and then to syncing (live)
2088        network_handle.update_sync_state(SyncState::Syncing);
2089        assert!(NetworkInfo::is_syncing(&network_handle));
2090        network_handle.update_sync_state(SyncState::Idle);
2091        assert!(!NetworkInfo::is_syncing(&network_handle));
2092        network_handle.update_sync_state(SyncState::Syncing);
2093        assert!(NetworkInfo::is_syncing(&network_handle));
2094
2095        // wait for all initiator connections
2096        let mut established = listener0.take(2);
2097        while let Some(ev) = established.next().await {
2098            match ev {
2099                NetworkEvent::ActivePeerSession { .. } |
2100                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2101                    // to insert a new peer in transactions peerset
2102                    transactions.on_network_event(ev);
2103                }
2104                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2105                _ => {
2106                    error!("unexpected event {ev:?}")
2107                }
2108            }
2109        }
2110        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2111        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2112        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2113        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2114            peer_id: *handle1.peer_id(),
2115            msg: Transactions(vec![signed_tx.clone()]),
2116        });
2117        poll_fn(|cx| {
2118            let _ = transactions.poll_unpin(cx);
2119            Poll::Ready(())
2120        })
2121        .await;
2122        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2123        assert!(NetworkInfo::is_syncing(&network_handle));
2124        assert!(!pool.is_empty());
2125        handle.terminate().await;
2126    }
2127
2128    #[tokio::test(flavor = "multi_thread")]
2129    async fn test_handle_incoming_transactions() {
2130        reth_tracing::init_test_tracing();
2131        let net = Testnet::create(3).await;
2132
2133        let mut handles = net.handles();
2134        let handle0 = handles.next().unwrap();
2135        let handle1 = handles.next().unwrap();
2136
2137        drop(handles);
2138        let handle = net.spawn();
2139
2140        let listener0 = handle0.event_listener();
2141
2142        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2143        let secret_key = SecretKey::new(&mut rand::thread_rng());
2144
2145        let client = NoopProvider::default();
2146        let pool = testing_pool();
2147        let config = NetworkConfigBuilder::new(secret_key)
2148            .disable_discovery()
2149            .listener_port(0)
2150            .build(client);
2151        let transactions_manager_config = config.transactions_manager_config.clone();
2152        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2153            .await
2154            .unwrap()
2155            .into_builder()
2156            .transactions(pool.clone(), transactions_manager_config)
2157            .split_with_handle();
2158        tokio::task::spawn(network);
2159
2160        network_handle.update_sync_state(SyncState::Idle);
2161
2162        assert!(!NetworkInfo::is_syncing(&network_handle));
2163
2164        // wait for all initiator connections
2165        let mut established = listener0.take(2);
2166        while let Some(ev) = established.next().await {
2167            match ev {
2168                NetworkEvent::ActivePeerSession { .. } |
2169                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2170                    // to insert a new peer in transactions peerset
2171                    transactions.on_network_event(ev);
2172                }
2173                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2174                ev => {
2175                    error!("unexpected event {ev:?}")
2176                }
2177            }
2178        }
2179        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2180        let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
2181        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2182        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2183            peer_id: *handle1.peer_id(),
2184            msg: Transactions(vec![signed_tx.clone()]),
2185        });
2186        assert!(transactions
2187            .transactions_by_peers
2188            .get(signed_tx.tx_hash())
2189            .unwrap()
2190            .contains(handle1.peer_id()));
2191
2192        // advance the transaction manager future
2193        poll_fn(|cx| {
2194            let _ = transactions.poll_unpin(cx);
2195            Poll::Ready(())
2196        })
2197        .await;
2198
2199        assert!(!pool.is_empty());
2200        assert!(pool.get(signed_tx.tx_hash()).is_some());
2201        handle.terminate().await;
2202    }
2203
2204    #[tokio::test(flavor = "multi_thread")]
2205    async fn test_on_get_pooled_transactions_network() {
2206        reth_tracing::init_test_tracing();
2207        let net = Testnet::create(2).await;
2208
2209        let mut handles = net.handles();
2210        let handle0 = handles.next().unwrap();
2211        let handle1 = handles.next().unwrap();
2212
2213        drop(handles);
2214        let handle = net.spawn();
2215
2216        let listener0 = handle0.event_listener();
2217
2218        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2219        let secret_key = SecretKey::new(&mut rand::thread_rng());
2220
2221        let client = NoopProvider::default();
2222        let pool = testing_pool();
2223        let config = NetworkConfigBuilder::new(secret_key)
2224            .disable_discovery()
2225            .listener_port(0)
2226            .build(client);
2227        let transactions_manager_config = config.transactions_manager_config.clone();
2228        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2229            .await
2230            .unwrap()
2231            .into_builder()
2232            .transactions(pool.clone(), transactions_manager_config)
2233            .split_with_handle();
2234        tokio::task::spawn(network);
2235
2236        network_handle.update_sync_state(SyncState::Idle);
2237
2238        assert!(!NetworkInfo::is_syncing(&network_handle));
2239
2240        // wait for all initiator connections
2241        let mut established = listener0.take(2);
2242        while let Some(ev) = established.next().await {
2243            match ev {
2244                NetworkEvent::ActivePeerSession { .. } |
2245                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2246                    transactions.on_network_event(ev);
2247                }
2248                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2249                ev => {
2250                    error!("unexpected event {ev:?}")
2251                }
2252            }
2253        }
2254        handle.terminate().await;
2255
2256        let tx = MockTransaction::eip1559();
2257        let _ = transactions
2258            .pool
2259            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2260            .await;
2261
2262        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2263
2264        let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
2265
2266        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2267            peer_id: *handle1.peer_id(),
2268            request,
2269            response: send,
2270        });
2271
2272        match receive.await.unwrap() {
2273            Ok(PooledTransactions(transactions)) => {
2274                assert_eq!(transactions.len(), 1);
2275            }
2276            Err(e) => {
2277                panic!("error: {e:?}");
2278            }
2279        }
2280    }
2281
2282    // Ensure that when the remote peer only returns part of the requested transactions, the
2283    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2284    // re-buffered.
2285    #[tokio::test]
2286    async fn test_partially_tx_response() {
2287        reth_tracing::init_test_tracing();
2288
2289        let mut tx_manager = new_tx_manager().await.0;
2290        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2291
2292        let peer_id_1 = PeerId::new([1; 64]);
2293        let eth_version = EthVersion::Eth66;
2294
2295        let txs = vec![
2296            TransactionSigned::new_unhashed(
2297                Transaction::Legacy(TxLegacy {
2298                    chain_id: Some(4),
2299                    nonce: 15u64,
2300                    gas_price: 2200000000,
2301                    gas_limit: 34811,
2302                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2303                    value: U256::from(1234u64),
2304                    input: Default::default(),
2305                }),
2306                Signature::new(
2307                    U256::from_str(
2308                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2309                    )
2310                    .unwrap(),
2311                    U256::from_str(
2312                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2313                    )
2314                    .unwrap(),
2315                    true,
2316                ),
2317            ),
2318            TransactionSigned::new_unhashed(
2319                Transaction::Eip1559(TxEip1559 {
2320                    chain_id: 4,
2321                    nonce: 26u64,
2322                    max_priority_fee_per_gas: 1500000000,
2323                    max_fee_per_gas: 1500000013,
2324                    gas_limit: MIN_TRANSACTION_GAS,
2325                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2326                    value: U256::from(3000000000000000000u64),
2327                    input: Default::default(),
2328                    access_list: Default::default(),
2329                }),
2330                Signature::new(
2331                    U256::from_str(
2332                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2333                    )
2334                    .unwrap(),
2335                    U256::from_str(
2336                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2337                    )
2338                    .unwrap(),
2339                    true,
2340                ),
2341            ),
2342        ];
2343
2344        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2345
2346        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2347        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2348        // fetch
2349        peer_1.seen_transactions.insert(txs_hashes[0]);
2350        peer_1.seen_transactions.insert(txs_hashes[1]);
2351        tx_manager.peers.insert(peer_id_1, peer_1);
2352
2353        let mut backups = default_cache();
2354        backups.insert(peer_id_1);
2355
2356        let mut backups1 = default_cache();
2357        backups1.insert(peer_id_1);
2358
2359        tx_fetcher
2360            .hashes_fetch_inflight_and_pending_fetch
2361            .insert(txs_hashes[0], TxFetchMetadata::new(1, backups, None));
2362        tx_fetcher
2363            .hashes_fetch_inflight_and_pending_fetch
2364            .insert(txs_hashes[1], TxFetchMetadata::new(1, backups1, None));
2365        tx_fetcher.hashes_pending_fetch.insert(txs_hashes[0]);
2366        tx_fetcher.hashes_pending_fetch.insert(txs_hashes[1]);
2367
2368        // peer_1 is idle
2369        assert!(tx_fetcher.is_idle(&peer_id_1));
2370        assert_eq!(tx_fetcher.active_peers.len(), 0);
2371
2372        // sends requests for buffered hashes to peer_1
2373        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2374
2375        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2376        // as long as request is in flight peer_1 is not idle
2377        assert!(!tx_fetcher.is_idle(&peer_id_1));
2378        assert_eq!(tx_fetcher.active_peers.len(), 1);
2379
2380        // mock session of peer_1 receives request
2381        let req = to_mock_session_rx
2382            .recv()
2383            .await
2384            .expect("peer_1 session should receive request with buffered hashes");
2385        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2386
2387        let message: Vec<PooledTransaction> = txs
2388            .into_iter()
2389            .take(1)
2390            .map(|tx| {
2391                PooledTransaction::try_from(tx)
2392                    .expect("Failed to convert MockTransaction to PooledTransaction")
2393            })
2394            .collect();
2395        // response partial request
2396        response
2397            .send(Ok(reth_eth_wire::PooledTransactions(message)))
2398            .expect("should send peer_1 response to tx manager");
2399        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2400            unreachable!()
2401        };
2402
2403        // request has resolved, peer_1 is idle again
2404        assert!(tx_fetcher.is_idle(&peer_id));
2405        assert_eq!(tx_fetcher.active_peers.len(), 0);
2406        // failing peer_1's request buffers requested hashes for retry.
2407        assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 1);
2408    }
2409
2410    #[tokio::test]
2411    async fn test_max_retries_tx_request() {
2412        reth_tracing::init_test_tracing();
2413
2414        let mut tx_manager = new_tx_manager().await.0;
2415        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2416
2417        let peer_id_1 = PeerId::new([1; 64]);
2418        let peer_id_2 = PeerId::new([2; 64]);
2419        let eth_version = EthVersion::Eth66;
2420        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2421
2422        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2423        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2424        // fetch
2425        peer_1.seen_transactions.insert(seen_hashes[0]);
2426        peer_1.seen_transactions.insert(seen_hashes[1]);
2427        tx_manager.peers.insert(peer_id_1, peer_1);
2428
2429        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2430        // for first retry in reverse order to make index 0 lru
2431        let retries = 1;
2432        let mut backups = default_cache();
2433        backups.insert(peer_id_1);
2434
2435        let mut backups1 = default_cache();
2436        backups1.insert(peer_id_1);
2437        tx_fetcher
2438            .hashes_fetch_inflight_and_pending_fetch
2439            .insert(seen_hashes[1], TxFetchMetadata::new(retries, backups, None));
2440        tx_fetcher
2441            .hashes_fetch_inflight_and_pending_fetch
2442            .insert(seen_hashes[0], TxFetchMetadata::new(retries, backups1, None));
2443        tx_fetcher.hashes_pending_fetch.insert(seen_hashes[1]);
2444        tx_fetcher.hashes_pending_fetch.insert(seen_hashes[0]);
2445
2446        // peer_1 is idle
2447        assert!(tx_fetcher.is_idle(&peer_id_1));
2448        assert_eq!(tx_fetcher.active_peers.len(), 0);
2449
2450        // sends request for buffered hashes to peer_1
2451        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2452
2453        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2454
2455        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2456        // as long as request is in inflight peer_1 is not idle
2457        assert!(!tx_fetcher.is_idle(&peer_id_1));
2458        assert_eq!(tx_fetcher.active_peers.len(), 1);
2459
2460        // mock session of peer_1 receives request
2461        let req = to_mock_session_rx
2462            .recv()
2463            .await
2464            .expect("peer_1 session should receive request with buffered hashes");
2465        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2466        let GetPooledTransactions(hashes) = request;
2467
2468        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2469
2470        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2471
2472        // fail request to peer_1
2473        response
2474            .send(Err(RequestError::BadResponse))
2475            .expect("should send peer_1 response to tx manager");
2476        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2477            unreachable!()
2478        };
2479
2480        // request has resolved, peer_1 is idle again
2481        assert!(tx_fetcher.is_idle(&peer_id));
2482        assert_eq!(tx_fetcher.active_peers.len(), 0);
2483        // failing peer_1's request buffers requested hashes for retry
2484        assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 2);
2485
2486        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2487        tx_manager.peers.insert(peer_id_2, peer_2);
2488
2489        // peer_2 announces same hashes as peer_1
2490        let msg =
2491            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2492        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2493
2494        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2495
2496        // peer_2 should be in active_peers.
2497        assert_eq!(tx_fetcher.active_peers.len(), 1);
2498
2499        // since hashes are already seen, no changes to length of unknown hashes
2500        assert_eq!(tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len(), 2);
2501        // but hashes are taken out of buffer and packed into request to peer_2
2502        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2503
2504        // mock session of peer_2 receives request
2505        let req = to_mock_session_rx
2506            .recv()
2507            .await
2508            .expect("peer_2 session should receive request with buffered hashes");
2509        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2510
2511        // report failed request to tx manager
2512        response
2513            .send(Err(RequestError::BadResponse))
2514            .expect("should send peer_2 response to tx manager");
2515        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2516
2517        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2518        // for retry
2519        assert!(tx_fetcher.hashes_pending_fetch.is_empty());
2520        assert_eq!(tx_fetcher.active_peers.len(), 0);
2521    }
2522
2523    #[test]
2524    fn test_transaction_builder_empty() {
2525        let mut builder =
2526            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2527        assert!(builder.is_empty());
2528
2529        let mut factory = MockTransactionFactory::default();
2530        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2531        builder.push(&tx);
2532        assert!(!builder.is_empty());
2533
2534        let txs = builder.build();
2535        assert!(txs.full.is_none());
2536        let txs = txs.pooled.unwrap();
2537        assert_eq!(txs.len(), 1);
2538    }
2539
2540    #[test]
2541    fn test_transaction_builder_large() {
2542        let mut builder =
2543            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2544        assert!(builder.is_empty());
2545
2546        let mut factory = MockTransactionFactory::default();
2547        let mut tx = factory.create_eip1559();
2548        // create a transaction that still fits
2549        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2550        let tx = Arc::new(tx);
2551        let tx = PropagateTransaction::pool_tx(tx);
2552        builder.push(&tx);
2553        assert!(!builder.is_empty());
2554
2555        let txs = builder.clone().build();
2556        assert!(txs.pooled.is_none());
2557        let txs = txs.full.unwrap();
2558        assert_eq!(txs.len(), 1);
2559
2560        builder.push(&tx);
2561
2562        let txs = builder.clone().build();
2563        let pooled = txs.pooled.unwrap();
2564        assert_eq!(pooled.len(), 1);
2565        let txs = txs.full.unwrap();
2566        assert_eq!(txs.len(), 1);
2567    }
2568
2569    #[test]
2570    fn test_transaction_builder_eip4844() {
2571        let mut builder =
2572            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2573        assert!(builder.is_empty());
2574
2575        let mut factory = MockTransactionFactory::default();
2576        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2577        builder.push(&tx);
2578        assert!(!builder.is_empty());
2579
2580        let txs = builder.clone().build();
2581        assert!(txs.full.is_none());
2582        let txs = txs.pooled.unwrap();
2583        assert_eq!(txs.len(), 1);
2584
2585        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2586        builder.push(&tx);
2587
2588        let txs = builder.clone().build();
2589        let pooled = txs.pooled.unwrap();
2590        assert_eq!(pooled.len(), 1);
2591        let txs = txs.full.unwrap();
2592        assert_eq!(txs.len(), 1);
2593    }
2594
2595    #[tokio::test]
2596    async fn test_propagate_full() {
2597        reth_tracing::init_test_tracing();
2598
2599        let (mut tx_manager, network) = new_tx_manager().await;
2600        let peer_id = PeerId::random();
2601
2602        // ensure not syncing
2603        network.handle().update_sync_state(SyncState::Idle);
2604
2605        // mock a peer
2606        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2607
2608        let session_info = SessionInfo {
2609            peer_id,
2610            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2611            client_version: Arc::from(""),
2612            capabilities: Arc::new(vec![].into()),
2613            status: Arc::new(Default::default()),
2614            version: EthVersion::Eth68,
2615            peer_kind: PeerKind::Basic,
2616        };
2617        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2618        tx_manager
2619            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2620        let mut propagate = vec![];
2621        let mut factory = MockTransactionFactory::default();
2622        let eip1559_tx = Arc::new(factory.create_eip1559());
2623        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2624        let eip4844_tx = Arc::new(factory.create_eip4844());
2625        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2626
2627        let propagated =
2628            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2629        assert_eq!(propagated.0.len(), 2);
2630        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2631        assert_eq!(prop_txs.len(), 1);
2632        assert!(prop_txs[0].is_full());
2633
2634        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2635        assert_eq!(prop_txs.len(), 1);
2636        assert!(prop_txs[0].is_hash());
2637
2638        let peer = tx_manager.peers.get(&peer_id).unwrap();
2639        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2640        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2641        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2642
2643        // propagate again
2644        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2645        assert!(propagated.0.is_empty());
2646    }
2647}