reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3/// Aggregation on configurable parameters for [`TransactionsManager`].
4pub mod config;
5/// Default and spec'd bounds.
6pub mod constants;
7/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
8pub mod fetcher;
9pub mod validation;
10
11pub use self::constants::{
12    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
13    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
14};
15use config::TransactionPropagationKind;
16pub use config::{
17    TransactionFetcherConfig, TransactionPropagationMode, TransactionPropagationPolicy,
18    TransactionsManagerConfig,
19};
20pub use validation::*;
21
22pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
23
24use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
25use crate::{
26    budget::{
27        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
28        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
29        DEFAULT_BUDGET_TRY_DRAIN_STREAM,
30    },
31    cache::LruCache,
32    duration_metered_exec, metered_poll_nested_stream_with_budget,
33    metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
34    NetworkHandle,
35};
36use alloy_primitives::{TxHash, B256};
37use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
38use futures::{stream::FuturesUnordered, Future, StreamExt};
39use reth_eth_wire::{
40    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
41    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
42    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
43    RequestTxHashes, Transactions,
44};
45use reth_ethereum_primitives::TransactionSigned;
46use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
47use reth_network_api::{
48    events::{PeerEvent, SessionInfo},
49    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
50};
51use reth_network_p2p::{
52    error::{RequestError, RequestResult},
53    sync::SyncStateProvider,
54};
55use reth_network_peers::PeerId;
56use reth_network_types::ReputationChangeKind;
57use reth_primitives_traits::SignedTransaction;
58use reth_tokio_util::EventStream;
59use reth_transaction_pool::{
60    error::{PoolError, PoolResult},
61    GetPooledTransactionLimit, PoolTransaction, PropagateKind, PropagatedTransactions,
62    TransactionPool, ValidPoolTransaction,
63};
64use std::{
65    collections::{hash_map::Entry, HashMap, HashSet},
66    pin::Pin,
67    sync::{
68        atomic::{AtomicUsize, Ordering},
69        Arc,
70    },
71    task::{Context, Poll},
72    time::{Duration, Instant},
73};
74use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
75use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
76use tracing::{debug, trace};
77
78/// The future for importing transactions into the pool.
79///
80/// Resolves with the result of each transaction import.
81pub type PoolImportFuture = Pin<Box<dyn Future<Output = Vec<PoolResult<TxHash>>> + Send + 'static>>;
82
83/// Api to interact with [`TransactionsManager`] task.
84///
85/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
86/// with the [`TransactionsManager`] task once it is spawned.
87///
88/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
89/// known by a specific peer.
90#[derive(Debug, Clone)]
91pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
92    /// Command channel to the [`TransactionsManager`]
93    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
94}
95
96/// Implementation of the `TransactionsHandle` API for use in testnet via type
97/// [`PeerHandle`](crate::test_utils::PeerHandle).
98impl<N: NetworkPrimitives> TransactionsHandle<N> {
99    fn send(&self, cmd: TransactionsCommand<N>) {
100        let _ = self.manager_tx.send(cmd);
101    }
102
103    /// Fetch the [`PeerRequestSender`] for the given peer.
104    async fn peer_handle(
105        &self,
106        peer_id: PeerId,
107    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
108        let (tx, rx) = oneshot::channel();
109        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
110        rx.await
111    }
112
113    /// Manually propagate the transaction that belongs to the hash.
114    pub fn propagate(&self, hash: TxHash) {
115        self.send(TransactionsCommand::PropagateHash(hash))
116    }
117
118    /// Manually propagate the transaction hash to a specific peer.
119    ///
120    /// Note: this only propagates if the pool contains the transaction.
121    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
122        self.propagate_hashes_to(Some(hash), peer)
123    }
124
125    /// Manually propagate the transaction hashes to a specific peer.
126    ///
127    /// Note: this only propagates the transactions that are known to the pool.
128    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
129        let hashes = hash.into_iter().collect::<Vec<_>>();
130        if hashes.is_empty() {
131            return
132        }
133        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
134    }
135
136    /// Request the active peer IDs from the [`TransactionsManager`].
137    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
138        let (tx, rx) = oneshot::channel();
139        self.send(TransactionsCommand::GetActivePeers(tx));
140        rx.await
141    }
142
143    /// Manually propagate full transaction hashes to a specific peer.
144    ///
145    /// Do nothing if transactions are empty.
146    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
147        if transactions.is_empty() {
148            return
149        }
150        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
151    }
152
153    /// Manually propagate the given transaction hashes to all peers.
154    ///
155    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
156    /// full.
157    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
158        if transactions.is_empty() {
159            return
160        }
161        self.send(TransactionsCommand::PropagateTransactions(transactions))
162    }
163
164    /// Manually propagate the given transactions to all peers.
165    ///
166    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
167    /// full.
168    pub fn broadcast_transactions(
169        &self,
170        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
171    ) {
172        let transactions =
173            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
174        if transactions.is_empty() {
175            return
176        }
177        self.send(TransactionsCommand::BroadcastTransactions(transactions))
178    }
179
180    /// Request the transaction hashes known by specific peers.
181    pub async fn get_transaction_hashes(
182        &self,
183        peers: Vec<PeerId>,
184    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
185        if peers.is_empty() {
186            return Ok(Default::default())
187        }
188        let (tx, rx) = oneshot::channel();
189        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
190        rx.await
191    }
192
193    /// Request the transaction hashes known by a specific peer.
194    pub async fn get_peer_transaction_hashes(
195        &self,
196        peer: PeerId,
197    ) -> Result<HashSet<TxHash>, RecvError> {
198        let res = self.get_transaction_hashes(vec![peer]).await?;
199        Ok(res.into_values().next().unwrap_or_default())
200    }
201
202    /// Requests the transactions directly from the given peer.
203    ///
204    /// Returns `None` if the peer is not connected.
205    ///
206    /// **Note**: this returns the response from the peer as received.
207    pub async fn get_pooled_transactions_from(
208        &self,
209        peer_id: PeerId,
210        hashes: Vec<B256>,
211    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
212        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
213
214        let (tx, rx) = oneshot::channel();
215        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
216        peer.try_send(request).ok();
217
218        rx.await?.map(|res| Some(res.0))
219    }
220}
221
222/// Manages transactions on top of the p2p network.
223///
224/// This can be spawned to another task and is supposed to be run as background service.
225/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
226/// interact with it.
227///
228/// The [`TransactionsManager`] is responsible for:
229///    - handling incoming eth messages for transactions.
230///    - serving transaction requests.
231///    - propagate transactions
232///
233/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
234///   - receives incoming network messages.
235///   - sends messages to dispatch (responses, propagate tx)
236///
237/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
238/// propagate new transactions over the network.
239#[derive(Debug)]
240#[must_use = "Manager does nothing unless polled."]
241pub struct TransactionsManager<
242    Pool,
243    N: NetworkPrimitives = EthNetworkPrimitives,
244    P: TransactionPropagationPolicy = TransactionPropagationKind,
245> {
246    /// Access to the transaction pool.
247    pool: Pool,
248    /// Network access.
249    network: NetworkHandle<N>,
250    /// Subscriptions to all network related events.
251    ///
252    /// From which we get all new incoming transaction related messages.
253    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
254    /// Transaction fetcher to handle inflight and missing transaction requests.
255    transaction_fetcher: TransactionFetcher<N>,
256    /// All currently pending transactions grouped by peers.
257    ///
258    /// This way we can track incoming transactions and prevent multiple pool imports for the same
259    /// transaction
260    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
261    /// Transactions that are currently imported into the `Pool`.
262    ///
263    /// The import process includes:
264    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
265    ///    valid, or for 4844 transaction the blobs are valid. See also
266    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
267    /// - if the transaction is valid, it is added into the pool.
268    ///
269    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
270    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
271    /// receiver.
272    pool_imports: FuturesUnordered<PoolImportFuture>,
273    /// Stats on pending pool imports that help the node self-monitor.
274    pending_pool_imports_info: PendingPoolImportsInfo,
275    /// Bad imports.
276    bad_imports: LruCache<TxHash>,
277    /// All the connected peers.
278    peers: HashMap<PeerId, PeerMetadata<N>>,
279    /// Send half for the command channel.
280    ///
281    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
282    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
283    /// Incoming commands from [`TransactionsHandle`].
284    ///
285    /// This will only receive commands if a user manually sends a command to the manager through
286    /// the [`TransactionsHandle`] to interact with this type directly.
287    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
288    /// A stream that yields new __pending__ transactions.
289    ///
290    /// A transaction is considered __pending__ if it is executable on the current state of the
291    /// chain. In other words, this only yields transactions that satisfy all consensus
292    /// requirements, these include:
293    ///   - no nonce gaps
294    ///   - all dynamic fee requirements are (currently) met
295    ///   - account has enough balance to cover the transaction's gas
296    pending_transactions: ReceiverStream<TxHash>,
297    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
298    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
299    /// How the `TransactionsManager` is configured.
300    config: TransactionsManagerConfig,
301    /// The policy to use when propagating transactions.
302    propagation_policy: P,
303    /// `TransactionsManager` metrics
304    metrics: TransactionsManagerMetrics,
305}
306
307impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
308    /// Sets up a new instance.
309    ///
310    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
311    pub fn new(
312        network: NetworkHandle<N>,
313        pool: Pool,
314        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
315        transactions_manager_config: TransactionsManagerConfig,
316    ) -> Self {
317        Self::with_policy(
318            network,
319            pool,
320            from_network,
321            transactions_manager_config,
322            TransactionPropagationKind::default(),
323        )
324    }
325}
326
327impl<Pool: TransactionPool, N: NetworkPrimitives, P: TransactionPropagationPolicy>
328    TransactionsManager<Pool, N, P>
329{
330    /// Sets up a new instance with given the settings.
331    ///
332    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
333    pub fn with_policy(
334        network: NetworkHandle<N>,
335        pool: Pool,
336        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
337        transactions_manager_config: TransactionsManagerConfig,
338        propagation_policy: P,
339    ) -> Self {
340        let network_events = network.event_listener();
341
342        let (command_tx, command_rx) = mpsc::unbounded_channel();
343
344        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
345            &transactions_manager_config.transaction_fetcher_config,
346        );
347
348        // install a listener for new __pending__ transactions that are allowed to be propagated
349        // over the network
350        let pending = pool.pending_transactions_listener();
351        let pending_pool_imports_info = PendingPoolImportsInfo::default();
352        let metrics = TransactionsManagerMetrics::default();
353        metrics
354            .capacity_pending_pool_imports
355            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
356
357        Self {
358            pool,
359            network,
360            network_events,
361            transaction_fetcher,
362            transactions_by_peers: Default::default(),
363            pool_imports: Default::default(),
364            pending_pool_imports_info: PendingPoolImportsInfo::new(
365                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
366            ),
367            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
368            peers: Default::default(),
369            command_tx,
370            command_rx: UnboundedReceiverStream::new(command_rx),
371            pending_transactions: ReceiverStream::new(pending),
372            transaction_events: UnboundedMeteredReceiver::new(
373                from_network,
374                NETWORK_POOL_TRANSACTIONS_SCOPE,
375            ),
376            config: transactions_manager_config,
377            propagation_policy,
378            metrics,
379        }
380    }
381
382    /// Returns a new handle that can send commands to this type.
383    pub fn handle(&self) -> TransactionsHandle<N> {
384        TransactionsHandle { manager_tx: self.command_tx.clone() }
385    }
386
387    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
388    /// `false` if [`TransactionsManager`] is operating close to full capacity.
389    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
390        self.pending_pool_imports_info
391            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
392            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
393    }
394
395    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
396        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
397        self.metrics.reported_bad_transactions.increment(1);
398    }
399
400    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
401        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
402        self.network.reputation_change(peer_id, kind);
403    }
404
405    fn report_already_seen(&self, peer_id: PeerId) {
406        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
407        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
408    }
409
410    /// Clear the transaction
411    fn on_good_import(&mut self, hash: TxHash) {
412        self.transactions_by_peers.remove(&hash);
413    }
414
415    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
416    /// fetching or importing it again.
417    ///
418    /// Errors that count as bad transactions are:
419    ///
420    /// - intrinsic gas too low
421    /// - exceeds gas limit
422    /// - gas uint overflow
423    /// - exceeds max init code size
424    /// - oversized data
425    /// - signer account has bytecode
426    /// - chain id mismatch
427    /// - old legacy chain id
428    /// - tx type not supported
429    ///
430    /// (and additionally for blobs txns...)
431    ///
432    /// - no blobs
433    /// - too many blobs
434    /// - invalid kzg proof
435    /// - kzg error
436    /// - not blob transaction (tx type mismatch)
437    /// - wrong versioned kzg commitment hash
438    fn on_bad_import(&mut self, err: PoolError) {
439        let peers = self.transactions_by_peers.remove(&err.hash);
440
441        // if we're _currently_ syncing, we ignore a bad transaction
442        if !err.is_bad_transaction() || self.network.is_syncing() {
443            return
444        }
445        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
446        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
447        if let Some(peers) = peers {
448            for peer_id in peers {
449                self.report_peer_bad_transactions(peer_id);
450            }
451        }
452        self.metrics.bad_imports.increment(1);
453        self.bad_imports.insert(err.hash);
454    }
455
456    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
457    fn on_fetch_hashes_pending_fetch(&mut self) {
458        // try drain transaction hashes pending fetch
459        let info = &self.pending_pool_imports_info;
460        let max_pending_pool_imports = info.max_pending_pool_imports;
461        let has_capacity_wrt_pending_pool_imports =
462            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
463
464        self.transaction_fetcher
465            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
466    }
467
468    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
469        let kind = match req_err {
470            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
471            RequestError::Timeout => ReputationChangeKind::Timeout,
472            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
473                // peer is already disconnected
474                return
475            }
476            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
477        };
478        self.report_peer(peer_id, kind);
479    }
480
481    #[inline]
482    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
483        let metrics = &self.metrics;
484
485        let TxManagerPollDurations {
486            acc_network_events,
487            acc_pending_imports,
488            acc_tx_events,
489            acc_imported_txns,
490            acc_fetch_events,
491            acc_pending_fetch,
492            acc_cmds,
493        } = poll_durations;
494
495        // update metrics for whole poll function
496        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
497        // update metrics for nested expressions
498        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
499        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
500        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
501        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
502        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
503        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
504        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
505    }
506}
507
508impl<Pool, N, Policy> TransactionsManager<Pool, N, Policy>
509where
510    Pool: TransactionPool,
511    N: NetworkPrimitives,
512    Policy: TransactionPropagationPolicy,
513{
514    /// Processes a batch import results.
515    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
516        for res in batch_results {
517            match res {
518                Ok(hash) => {
519                    self.on_good_import(hash);
520                }
521                Err(err) => {
522                    self.on_bad_import(err);
523                }
524            }
525        }
526    }
527
528    /// Request handler for an incoming `NewPooledTransactionHashes`
529    fn on_new_pooled_transaction_hashes(
530        &mut self,
531        peer_id: PeerId,
532        msg: NewPooledTransactionHashes,
533    ) {
534        // If the node is initially syncing, ignore transactions
535        if self.network.is_initially_syncing() {
536            return
537        }
538        if self.network.tx_gossip_disabled() {
539            return
540        }
541
542        // get handle to peer's session, if the session is still active
543        let Some(peer) = self.peers.get_mut(&peer_id) else {
544            trace!(
545                peer_id = format!("{peer_id:#}"),
546                ?msg,
547                "discarding announcement from inactive peer"
548            );
549
550            return
551        };
552        let client = peer.client_version.clone();
553
554        // keep track of the transactions the peer knows
555        let mut count_txns_already_seen_by_peer = 0;
556        for tx in msg.iter_hashes().copied() {
557            if !peer.seen_transactions.insert(tx) {
558                count_txns_already_seen_by_peer += 1;
559            }
560        }
561        if count_txns_already_seen_by_peer > 0 {
562            // this may occur if transactions are sent or announced to a peer, at the same time as
563            // the peer sends/announces those hashes to us. this is because, marking
564            // txns as seen by a peer is done optimistically upon sending them to the
565            // peer.
566            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
567            self.metrics
568                .occurrences_hash_already_seen_by_peer
569                .increment(count_txns_already_seen_by_peer);
570
571            trace!(target: "net::tx",
572                %count_txns_already_seen_by_peer,
573                peer_id=format!("{peer_id:#}"),
574                ?client,
575                "Peer sent hashes that have already been marked as seen by peer"
576            );
577
578            self.report_already_seen(peer_id);
579        }
580
581        // 1. filter out spam
582        let (validation_outcome, mut partially_valid_msg) =
583            self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
584
585        if validation_outcome == FilterOutcome::ReportPeer {
586            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
587        }
588
589        // 2. filter out transactions pending import to pool
590        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
591
592        // 3. filter out known hashes
593        //
594        // known txns have already been successfully fetched or received over gossip.
595        //
596        // most hashes will be filtered out here since this the mempool protocol is a gossip
597        // protocol, healthy peers will send many of the same hashes.
598        //
599        let hashes_count_pre_pool_filter = partially_valid_msg.len();
600        self.pool.retain_unknown(&mut partially_valid_msg);
601        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
602            let already_known_hashes_count =
603                hashes_count_pre_pool_filter - partially_valid_msg.len();
604            self.metrics
605                .occurrences_hashes_already_in_pool
606                .increment(already_known_hashes_count as u64);
607        }
608
609        if partially_valid_msg.is_empty() {
610            // nothing to request
611            return
612        }
613
614        // 4. filter out invalid entries (spam)
615        //
616        // validates messages with respect to the given network, e.g. allowed tx types
617        //
618        let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
619            .msg_version()
620            .expect("partially valid announcement should have version")
621            .is_eth68()
622        {
623            // validate eth68 announcement data
624            self.transaction_fetcher
625                .filter_valid_message
626                .filter_valid_entries_68(partially_valid_msg)
627        } else {
628            // validate eth66 announcement data
629            self.transaction_fetcher
630                .filter_valid_message
631                .filter_valid_entries_66(partially_valid_msg)
632        };
633
634        if validation_outcome == FilterOutcome::ReportPeer {
635            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
636        }
637
638        if valid_announcement_data.is_empty() {
639            // no valid announcement data
640            return
641        }
642
643        // 5. filter out already seen unknown hashes
644        //
645        // seen hashes are already in the tx fetcher, pending fetch.
646        //
647        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
648        // fetcher, hence they should be valid at this point.
649        let bad_imports = &self.bad_imports;
650        self.transaction_fetcher.filter_unseen_and_pending_hashes(
651            &mut valid_announcement_data,
652            |hash| bad_imports.contains(hash),
653            &peer_id,
654            &client,
655        );
656
657        if valid_announcement_data.is_empty() {
658            // nothing to request
659            return
660        }
661
662        trace!(target: "net::tx::propagation",
663            peer_id=format!("{peer_id:#}"),
664            hashes_len=valid_announcement_data.iter().count(),
665            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
666            msg_version=%valid_announcement_data.msg_version(),
667            client_version=%client,
668            "received previously unseen and pending hashes in announcement from peer"
669        );
670
671        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
672        // fallback
673        if !self.transaction_fetcher.is_idle(&peer_id) {
674            // load message version before announcement data is destructed in packing
675            let msg_version = valid_announcement_data.msg_version();
676            let (hashes, _version) = valid_announcement_data.into_request_hashes();
677
678            trace!(target: "net::tx",
679                peer_id=format!("{peer_id:#}"),
680                hashes=?*hashes,
681                %msg_version,
682                %client,
683                "buffering hashes announced by busy peer"
684            );
685
686            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
687
688            return
689        }
690
691        let mut hashes_to_request =
692            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
693        let surplus_hashes =
694            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
695
696        if !surplus_hashes.is_empty() {
697            trace!(target: "net::tx",
698                peer_id=format!("{peer_id:#}"),
699                surplus_hashes=?*surplus_hashes,
700                %client,
701                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
702            );
703
704            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
705        }
706
707        trace!(target: "net::tx",
708            peer_id=format!("{peer_id:#}"),
709            hashes=?*hashes_to_request,
710            %client,
711            "sending hashes in `GetPooledTransactions` request to peer's session"
712        );
713
714        // request the missing transactions
715        //
716        // get handle to peer's session again, at this point we know it exists
717        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
718        if let Some(failed_to_request_hashes) =
719            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
720        {
721            let conn_eth_version = peer.version;
722
723            trace!(target: "net::tx",
724                peer_id=format!("{peer_id:#}"),
725                failed_to_request_hashes=?*failed_to_request_hashes,
726                %conn_eth_version,
727                %client,
728                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
729            );
730            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
731        }
732    }
733}
734
735impl<Pool, N, Policy> TransactionsManager<Pool, N, Policy>
736where
737    Pool: TransactionPool + 'static,
738    N: NetworkPrimitives<
739        BroadcastedTransaction: SignedTransaction,
740        PooledTransaction: SignedTransaction,
741    >,
742    Pool::Transaction:
743        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
744    Policy: TransactionPropagationPolicy,
745{
746    /// Invoked when transactions in the local mempool are considered __pending__.
747    ///
748    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
749    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
750    /// messages. The Transactions message relays complete transaction objects and is typically
751    /// sent to a small, random fraction of connected peers.
752    ///
753    /// All other peers receive a notification of the transaction hash and can request the
754    /// complete transaction object if it is unknown to them. The dissemination of complete
755    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
756    /// and won't need to request it.
757    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
758        // Nothing to propagate while initially syncing
759        if self.network.is_initially_syncing() {
760            return
761        }
762        if self.network.tx_gossip_disabled() {
763            return
764        }
765
766        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
767
768        self.propagate_all(hashes);
769    }
770
771    /// Propagate the full transactions to a specific peer.
772    ///
773    /// Returns the propagated transactions.
774    fn propagate_full_transactions_to_peer(
775        &mut self,
776        txs: Vec<TxHash>,
777        peer_id: PeerId,
778        propagation_mode: PropagationMode,
779    ) -> Option<PropagatedTransactions> {
780        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
781
782        let peer = self.peers.get_mut(&peer_id)?;
783        let mut propagated = PropagatedTransactions::default();
784
785        // filter all transactions unknown to the peer
786        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
787
788        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
789
790        if propagation_mode.is_forced() {
791            // skip cache check if forced
792            full_transactions.extend(to_propagate);
793        } else {
794            // Iterate through the transactions to propagate and fill the hashes and full
795            // transaction
796            for tx in to_propagate {
797                if !peer.seen_transactions.contains(tx.tx_hash()) {
798                    // Only include if the peer hasn't seen the transaction
799                    full_transactions.push(&tx);
800                }
801            }
802        }
803
804        if full_transactions.is_empty() {
805            // nothing to propagate
806            return None
807        }
808
809        let PropagateTransactions { pooled, full } = full_transactions.build();
810
811        // send hashes if any
812        if let Some(new_pooled_hashes) = pooled {
813            for hash in new_pooled_hashes.iter_hashes().copied() {
814                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
815                // mark transaction as seen by peer
816                peer.seen_transactions.insert(hash);
817            }
818
819            // send hashes of transactions
820            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
821        }
822
823        // send full transactions, if any
824        if let Some(new_full_transactions) = full {
825            for tx in &new_full_transactions {
826                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
827                // mark transaction as seen by peer
828                peer.seen_transactions.insert(*tx.tx_hash());
829            }
830
831            // send full transactions
832            self.network.send_transactions(peer_id, new_full_transactions);
833        }
834
835        // Update propagated transactions metrics
836        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
837
838        Some(propagated)
839    }
840
841    /// Propagate the transaction hashes to the given peer
842    ///
843    /// Note: This will only send the hashes for transactions that exist in the pool.
844    fn propagate_hashes_to(
845        &mut self,
846        hashes: Vec<TxHash>,
847        peer_id: PeerId,
848        propagation_mode: PropagationMode,
849    ) {
850        trace!(target: "net::tx", "Start propagating transactions as hashes");
851
852        // This fetches a transactions from the pool, including the blob transactions, which are
853        // only ever sent as hashes.
854        let propagated = {
855            let Some(peer) = self.peers.get_mut(&peer_id) else {
856                // no such peer
857                return
858            };
859
860            let to_propagate = self
861                .pool
862                .get_all(hashes)
863                .into_iter()
864                .map(PropagateTransaction::pool_tx)
865                .collect::<Vec<_>>();
866
867            let mut propagated = PropagatedTransactions::default();
868
869            // check if transaction is known to peer
870            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
871
872            if propagation_mode.is_forced() {
873                hashes.extend(to_propagate)
874            } else {
875                for tx in to_propagate {
876                    if !peer.seen_transactions.contains(tx.tx_hash()) {
877                        // Include if the peer hasn't seen it
878                        hashes.push(&tx);
879                    }
880                }
881            }
882
883            let new_pooled_hashes = hashes.build();
884
885            if new_pooled_hashes.is_empty() {
886                // nothing to propagate
887                return
888            }
889
890            for hash in new_pooled_hashes.iter_hashes().copied() {
891                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
892            }
893
894            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
895
896            // send hashes of transactions
897            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
898
899            // Update propagated transactions metrics
900            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
901
902            propagated
903        };
904
905        // notify pool so events get fired
906        self.pool.on_propagated(propagated);
907    }
908
909    /// Propagate the transactions to all connected peers either as full objects or hashes.
910    ///
911    /// The message for new pooled hashes depends on the negotiated version of the stream.
912    /// See [`NewPooledTransactionHashes`]
913    ///
914    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
915    fn propagate_transactions(
916        &mut self,
917        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
918        propagation_mode: PropagationMode,
919    ) -> PropagatedTransactions {
920        let mut propagated = PropagatedTransactions::default();
921        if self.network.tx_gossip_disabled() {
922            return propagated
923        }
924
925        // send full transactions to a set of the connected peers based on the configured mode
926        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
927
928        // Note: Assuming ~random~ order due to random state of the peers map hasher
929        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
930            if !self.propagation_policy.can_propagate(peer) {
931                // skip peers we should not propagate to
932                continue
933            }
934            // determine whether to send full tx objects or hashes.
935            let mut builder = if peer_idx > max_num_full {
936                PropagateTransactionsBuilder::pooled(peer.version)
937            } else {
938                PropagateTransactionsBuilder::full(peer.version)
939            };
940
941            if propagation_mode.is_forced() {
942                builder.extend(to_propagate.iter());
943            } else {
944                // Iterate through the transactions to propagate and fill the hashes and full
945                // transaction lists, before deciding whether or not to send full transactions to
946                // the peer.
947                for tx in &to_propagate {
948                    // Only proceed if the transaction is not in the peer's list of seen
949                    // transactions
950                    if !peer.seen_transactions.contains(tx.tx_hash()) {
951                        builder.push(tx);
952                    }
953                }
954            }
955
956            if builder.is_empty() {
957                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
958                continue
959            }
960
961            let PropagateTransactions { pooled, full } = builder.build();
962
963            // send hashes if any
964            if let Some(mut new_pooled_hashes) = pooled {
965                // enforce tx soft limit per message for the (unlikely) event the number of
966                // hashes exceeds it
967                new_pooled_hashes
968                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
969
970                for hash in new_pooled_hashes.iter_hashes().copied() {
971                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
972                    // mark transaction as seen by peer
973                    peer.seen_transactions.insert(hash);
974                }
975
976                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
977
978                // send hashes of transactions
979                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
980            }
981
982            // send full transactions, if any
983            if let Some(new_full_transactions) = full {
984                for tx in &new_full_transactions {
985                    propagated
986                        .0
987                        .entry(*tx.tx_hash())
988                        .or_default()
989                        .push(PropagateKind::Full(*peer_id));
990                    // mark transaction as seen by peer
991                    peer.seen_transactions.insert(*tx.tx_hash());
992                }
993
994                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
995
996                // send full transactions
997                self.network.send_transactions(*peer_id, new_full_transactions);
998            }
999        }
1000
1001        // Update propagated transactions metrics
1002        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1003
1004        propagated
1005    }
1006
1007    /// Propagates the given transactions to the peers
1008    ///
1009    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1010    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1011    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1012        let propagated = self.propagate_transactions(
1013            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1014            PropagationMode::Basic,
1015        );
1016
1017        // notify pool so events get fired
1018        self.pool.on_propagated(propagated);
1019    }
1020
1021    /// Request handler for an incoming request for transactions
1022    fn on_get_pooled_transactions(
1023        &mut self,
1024        peer_id: PeerId,
1025        request: GetPooledTransactions,
1026        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1027    ) {
1028        if let Some(peer) = self.peers.get_mut(&peer_id) {
1029            if self.network.tx_gossip_disabled() {
1030                let _ = response.send(Ok(PooledTransactions::default()));
1031                return
1032            }
1033            let transactions = self.pool.get_pooled_transaction_elements(
1034                request.0,
1035                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1036                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1037                ),
1038            );
1039            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1040
1041            // we sent a response at which point we assume that the peer is aware of the
1042            // transactions
1043            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1044
1045            let resp = PooledTransactions(transactions);
1046            let _ = response.send(Ok(resp));
1047        }
1048    }
1049
1050    /// Handles a command received from a detached [`TransactionsHandle`]
1051    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1052        match cmd {
1053            TransactionsCommand::PropagateHash(hash) => {
1054                self.on_new_pending_transactions(vec![hash])
1055            }
1056            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1057                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1058            }
1059            TransactionsCommand::GetActivePeers(tx) => {
1060                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1061                tx.send(peers).ok();
1062            }
1063            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1064                if let Some(propagated) =
1065                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1066                {
1067                    self.pool.on_propagated(propagated);
1068                }
1069            }
1070            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1071            TransactionsCommand::BroadcastTransactions(txs) => {
1072                self.propagate_transactions(txs, PropagationMode::Forced);
1073            }
1074            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1075                let mut res = HashMap::with_capacity(peers.len());
1076                for peer_id in peers {
1077                    let hashes = self
1078                        .peers
1079                        .get(&peer_id)
1080                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1081                        .unwrap_or_default();
1082                    res.insert(peer_id, hashes);
1083                }
1084                tx.send(res).ok();
1085            }
1086            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1087                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1088                peer_request_sender.send(sender).ok();
1089            }
1090        }
1091    }
1092
1093    /// Handles session establishment and peer transactions initialization.
1094    ///
1095    /// This is invoked when a new session is established.
1096    fn handle_peer_session(
1097        &mut self,
1098        info: SessionInfo,
1099        messages: PeerRequestSender<PeerRequest<N>>,
1100    ) {
1101        let SessionInfo { peer_id, client_version, version, .. } = info;
1102
1103        // Insert a new peer into the peerset.
1104        let peer = PeerMetadata::<N>::new(
1105            messages,
1106            version,
1107            client_version,
1108            self.config.max_transactions_seen_by_peer_history,
1109            info.peer_kind,
1110        );
1111        let peer = match self.peers.entry(peer_id) {
1112            Entry::Occupied(mut entry) => {
1113                entry.insert(peer);
1114                entry.into_mut()
1115            }
1116            Entry::Vacant(entry) => entry.insert(peer),
1117        };
1118
1119        self.propagation_policy.on_session_established(peer);
1120
1121        // Send a `NewPooledTransactionHashes` to the peer with up to
1122        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1123        // transactions in the pool.
1124        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1125            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1126            return
1127        }
1128
1129        // Get transactions to broadcast
1130        let pooled_txs = self.pool.pooled_transactions_max(
1131            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1132        );
1133        if pooled_txs.is_empty() {
1134            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1135            return;
1136        }
1137
1138        // Build and send transaction hashes message
1139        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1140        for pooled_tx in pooled_txs {
1141            peer.seen_transactions.insert(*pooled_tx.hash());
1142            msg_builder.push_pooled(pooled_tx);
1143        }
1144
1145        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1146        let msg = msg_builder.build();
1147        self.network.send_transactions_hashes(peer_id, msg);
1148    }
1149
1150    /// Handles a received event related to common network events.
1151    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1152        match event_result {
1153            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1154                // remove the peer
1155
1156                let peer = self.peers.remove(&peer_id);
1157                if let Some(mut peer) = peer {
1158                    self.propagation_policy.on_session_closed(&mut peer);
1159                }
1160                self.transaction_fetcher.remove_peer(&peer_id);
1161            }
1162            NetworkEvent::ActivePeerSession { info, messages } => {
1163                // process active peer session and broadcast available transaction from the pool
1164                self.handle_peer_session(info, messages);
1165            }
1166            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1167                let peer_id = info.peer_id;
1168                // get messages from existing peer
1169                let messages = match self.peers.get(&peer_id) {
1170                    Some(p) => p.request_tx.clone(),
1171                    None => {
1172                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1173                        return;
1174                    }
1175                };
1176                self.handle_peer_session(info, messages);
1177            }
1178            _ => {}
1179        }
1180    }
1181
1182    /// Handles dedicated transaction events related to the `eth` protocol.
1183    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1184        match event {
1185            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1186                // ensure we didn't receive any blob transactions as these are disallowed to be
1187                // broadcasted in full
1188
1189                let has_blob_txs = msg.has_eip4844();
1190
1191                let non_blob_txs = msg
1192                    .0
1193                    .into_iter()
1194                    .map(N::PooledTransaction::try_from)
1195                    .filter_map(Result::ok)
1196                    .collect();
1197
1198                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1199
1200                if has_blob_txs {
1201                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1202                    self.report_peer_bad_transactions(peer_id);
1203                }
1204            }
1205            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1206                self.on_new_pooled_transaction_hashes(peer_id, msg)
1207            }
1208            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1209                self.on_get_pooled_transactions(peer_id, request, response)
1210            }
1211            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1212                let _ = response.send(Some(self.handle()));
1213            }
1214        }
1215    }
1216
1217    /// Starts the import process for the given transactions.
1218    fn import_transactions(
1219        &mut self,
1220        peer_id: PeerId,
1221        transactions: PooledTransactions<N::PooledTransaction>,
1222        source: TransactionSource,
1223    ) {
1224        // If the node is pipeline syncing, ignore transactions
1225        if self.network.is_initially_syncing() {
1226            return
1227        }
1228        if self.network.tx_gossip_disabled() {
1229            return
1230        }
1231
1232        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1233        let mut transactions = transactions.0;
1234
1235        // mark the transactions as received
1236        self.transaction_fetcher
1237            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1238
1239        // track that the peer knows these transaction, but only if this is a new broadcast.
1240        // If we received the transactions as the response to our `GetPooledTransactions``
1241        // requests (based on received `NewPooledTransactionHashes`) then we already
1242        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1243        let mut num_already_seen_by_peer = 0;
1244        for tx in &transactions {
1245            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1246                num_already_seen_by_peer += 1;
1247            }
1248        }
1249
1250        // 1. filter out txns already inserted into pool
1251        let txns_count_pre_pool_filter = transactions.len();
1252        self.pool.retain_unknown(&mut transactions);
1253        if txns_count_pre_pool_filter > transactions.len() {
1254            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1255            self.metrics
1256                .occurrences_transactions_already_in_pool
1257                .increment(already_known_txns_count as u64);
1258        }
1259
1260        // tracks the quality of the given transactions
1261        let mut has_bad_transactions = false;
1262
1263        // 2. filter out transactions that are invalid or already pending import
1264        if let Some(peer) = self.peers.get_mut(&peer_id) {
1265            // pre-size to avoid reallocations
1266            let mut new_txs = Vec::with_capacity(transactions.len());
1267            for tx in transactions {
1268                // recover transaction
1269                let tx = match tx.try_into_recovered() {
1270                    Ok(tx) => tx,
1271                    Err(badtx) => {
1272                        trace!(target: "net::tx",
1273                            peer_id=format!("{peer_id:#}"),
1274                            hash=%badtx.tx_hash(),
1275                            client_version=%peer.client_version,
1276                            "failed ecrecovery for transaction"
1277                        );
1278                        has_bad_transactions = true;
1279                        continue
1280                    }
1281                };
1282
1283                match self.transactions_by_peers.entry(*tx.tx_hash()) {
1284                    Entry::Occupied(mut entry) => {
1285                        // transaction was already inserted
1286                        entry.get_mut().insert(peer_id);
1287                    }
1288                    Entry::Vacant(entry) => {
1289                        if self.bad_imports.contains(tx.tx_hash()) {
1290                            trace!(target: "net::tx",
1291                                peer_id=format!("{peer_id:#}"),
1292                                hash=%tx.tx_hash(),
1293                                client_version=%peer.client_version,
1294                                "received a known bad transaction from peer"
1295                            );
1296                            has_bad_transactions = true;
1297                        } else {
1298                            // this is a new transaction that should be imported into the pool
1299
1300                            let pool_transaction = Pool::Transaction::from_pooled(tx);
1301                            new_txs.push(pool_transaction);
1302
1303                            entry.insert(HashSet::from([peer_id]));
1304                        }
1305                    }
1306                }
1307            }
1308            new_txs.shrink_to_fit();
1309
1310            // 3. import new transactions as a batch to minimize lock contention on the underlying
1311            // pool
1312            if !new_txs.is_empty() {
1313                let pool = self.pool.clone();
1314                // update metrics
1315                let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1316                metric_pending_pool_imports.increment(new_txs.len() as f64);
1317
1318                // update self-monitoring info
1319                self.pending_pool_imports_info
1320                    .pending_pool_imports
1321                    .fetch_add(new_txs.len(), Ordering::Relaxed);
1322                let tx_manager_info_pending_pool_imports =
1323                    self.pending_pool_imports_info.pending_pool_imports.clone();
1324
1325                trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1326                let import = Box::pin(async move {
1327                    let added = new_txs.len();
1328                    let res = pool.add_external_transactions(new_txs).await;
1329
1330                    // update metrics
1331                    metric_pending_pool_imports.decrement(added as f64);
1332                    // update self-monitoring info
1333                    tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1334
1335                    res
1336                });
1337
1338                self.pool_imports.push(import);
1339            }
1340
1341            if num_already_seen_by_peer > 0 {
1342                self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1343                self.metrics
1344                    .occurrences_of_transaction_already_seen_by_peer
1345                    .increment(num_already_seen_by_peer);
1346                trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1347            }
1348        }
1349
1350        if has_bad_transactions {
1351            // peer sent us invalid transactions
1352            self.report_peer_bad_transactions(peer_id)
1353        }
1354
1355        if num_already_seen_by_peer > 0 {
1356            self.report_already_seen(peer_id);
1357        }
1358    }
1359
1360    /// Processes a [`FetchEvent`].
1361    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1362        match fetch_event {
1363            FetchEvent::TransactionsFetched { peer_id, transactions } => {
1364                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1365            }
1366            FetchEvent::FetchError { peer_id, error } => {
1367                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1368                self.on_request_error(peer_id, error);
1369            }
1370            FetchEvent::EmptyResponse { peer_id } => {
1371                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1372            }
1373        }
1374    }
1375}
1376
1377/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1378/// [`crate::NetworkManager`] for more context on the design pattern.
1379///
1380/// This should be spawned or used as part of `tokio::select!`.
1381//
1382// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1383// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1384impl<Pool, N, Policy> Future for TransactionsManager<Pool, N, Policy>
1385where
1386    Pool: TransactionPool + Unpin + 'static,
1387    N: NetworkPrimitives<
1388        BroadcastedTransaction: SignedTransaction,
1389        PooledTransaction: SignedTransaction,
1390    >,
1391    Pool::Transaction:
1392        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1393    Policy: TransactionPropagationPolicy,
1394{
1395    type Output = ();
1396
1397    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1398        let start = Instant::now();
1399        let mut poll_durations = TxManagerPollDurations::default();
1400
1401        let this = self.get_mut();
1402
1403        // All streams are polled until their corresponding budget is exhausted, then we manually
1404        // yield back control to tokio. See `NetworkManager` for more context on the design
1405        // pattern.
1406
1407        // Advance network/peer related events (update peers map).
1408        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1409            poll_durations.acc_network_events,
1410            "net::tx",
1411            "Network events stream",
1412            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1413            this.network_events.poll_next_unpin(cx),
1414            |event| this.on_network_event(event)
1415        );
1416
1417        // Advances new __pending__ transactions, transactions that were successfully inserted into
1418        // pending set in pool (are valid), and propagates them (inform peers which
1419        // transactions we have seen).
1420        //
1421        // We try to drain this to batch the transactions in a single message.
1422        //
1423        // We don't expect this buffer to be large, since only pending transactions are
1424        // emitted here.
1425        let mut new_txs = Vec::new();
1426        let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1427            poll_durations.acc_imported_txns,
1428            "net::tx",
1429            "Pending transactions stream",
1430            DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1431            this.pending_transactions.poll_next_unpin(cx),
1432            |hash| new_txs.push(hash)
1433        );
1434        if !new_txs.is_empty() {
1435            this.on_new_pending_transactions(new_txs);
1436        }
1437
1438        // Advance inflight fetch requests (flush transaction fetcher and queue for
1439        // import to pool).
1440        //
1441        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1442        // (2 MiB / 10 bytes > 200k transactions).
1443        //
1444        // Since transactions aren't validated until they are inserted into the pool,
1445        // this can potentially queue >200k transactions for insertion to pool. More
1446        // if the message size is bigger than the soft limit on a `PooledTransactions`
1447        // response which is 2 MiB.
1448        let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1449            poll_durations.acc_fetch_events,
1450            "net::tx",
1451            "Transaction fetch events stream",
1452            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1453            this.transaction_fetcher.poll_next_unpin(cx),
1454            |event| this.on_fetch_event(event),
1455        );
1456
1457        // Advance incoming transaction events (stream new txns/announcements from
1458        // network manager and queue for import to pool/fetch txns).
1459        //
1460        // This will potentially remove hashes from hashes pending fetch, it the event
1461        // is an announcement (if same hashes are announced that didn't fit into a
1462        // previous request).
1463        //
1464        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1465        // (128 KiB / 10 bytes > 13k transactions).
1466        //
1467        // If this is an event with `Transactions` message, since transactions aren't
1468        // validated until they are inserted into the pool, this can potentially queue
1469        // >13k transactions for insertion to pool. More if the message size is bigger
1470        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1471        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1472            poll_durations.acc_tx_events,
1473            "net::tx",
1474            "Network transaction events stream",
1475            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1476            this.transaction_events.poll_next_unpin(cx),
1477            |event| this.on_network_tx_event(event),
1478        );
1479
1480        // Advance pool imports (flush txns to pool).
1481        //
1482        // Note, this is done in batches. A batch is filled from one `Transactions`
1483        // broadcast messages or one `PooledTransactions` response at a time. The
1484        // minimum batch size is 1 transaction (and might often be the case with blob
1485        // transactions).
1486        //
1487        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1488        // (2 MiB / 10 bytes > 200k transactions).
1489        //
1490        // Since transactions aren't validated until they are inserted into the pool,
1491        // this can potentially validate >200k transactions. More if the message size
1492        // is bigger than the soft limit on a `PooledTransactions` response which is
1493        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1494        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1495            poll_durations.acc_pending_imports,
1496            "net::tx",
1497            "Batched pool imports stream",
1498            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1499            this.pool_imports.poll_next_unpin(cx),
1500            |batch_results| this.on_batch_import_result(batch_results)
1501        );
1502
1503        // Tries to drain hashes pending fetch cache if the tx manager currently has
1504        // capacity for this (fetch txns).
1505        //
1506        // Sends at most one request.
1507        duration_metered_exec!(
1508            {
1509                if this.has_capacity_for_fetching_pending_hashes() {
1510                    this.on_fetch_hashes_pending_fetch();
1511                }
1512            },
1513            poll_durations.acc_pending_fetch
1514        );
1515
1516        // Advance commands (propagate/fetch/serve txns).
1517        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1518            poll_durations.acc_cmds,
1519            "net::tx",
1520            "Commands channel",
1521            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1522            this.command_rx.poll_next_unpin(cx),
1523            |cmd| this.on_command(cmd)
1524        );
1525
1526        this.transaction_fetcher.update_metrics();
1527
1528        // all channels are fully drained and import futures pending
1529        if maybe_more_network_events ||
1530            maybe_more_commands ||
1531            maybe_more_tx_events ||
1532            maybe_more_tx_fetch_events ||
1533            maybe_more_pool_imports ||
1534            maybe_more_pending_txns
1535        {
1536            // make sure we're woken up again
1537            cx.waker().wake_by_ref();
1538            return Poll::Pending
1539        }
1540
1541        this.update_poll_metrics(start, poll_durations);
1542
1543        Poll::Pending
1544    }
1545}
1546
1547/// Represents the different modes of transaction propagation.
1548///
1549/// This enum is used to determine how transactions are propagated to peers in the network.
1550#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1551enum PropagationMode {
1552    /// Default propagation mode.
1553    ///
1554    /// Transactions are only sent to peers that haven't seen them yet.
1555    Basic,
1556    /// Forced propagation mode.
1557    ///
1558    /// Transactions are sent to all peers regardless of whether they have been sent or received
1559    /// before.
1560    Forced,
1561}
1562
1563impl PropagationMode {
1564    /// Returns `true` if the propagation kind is `Forced`.
1565    const fn is_forced(self) -> bool {
1566        matches!(self, Self::Forced)
1567    }
1568}
1569
1570/// A transaction that's about to be propagated to multiple peers.
1571#[derive(Debug, Clone)]
1572struct PropagateTransaction<T = TransactionSigned> {
1573    size: usize,
1574    transaction: Arc<T>,
1575}
1576
1577impl<T: SignedTransaction> PropagateTransaction<T> {
1578    /// Create a new instance from a transaction.
1579    pub fn new(transaction: T) -> Self {
1580        let size = transaction.length();
1581        Self { size, transaction: Arc::new(transaction) }
1582    }
1583
1584    /// Create a new instance from a pooled transaction
1585    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1586    where
1587        P: PoolTransaction<Consensus = T>,
1588    {
1589        let size = tx.encoded_length();
1590        let transaction = tx.transaction.clone_into_consensus();
1591        let transaction = Arc::new(transaction.into_inner());
1592        Self { size, transaction }
1593    }
1594
1595    fn tx_hash(&self) -> &TxHash {
1596        self.transaction.tx_hash()
1597    }
1598}
1599
1600/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1601/// should receive them in full or as pooled
1602#[derive(Debug, Clone)]
1603enum PropagateTransactionsBuilder<T> {
1604    Pooled(PooledTransactionsHashesBuilder),
1605    Full(FullTransactionsBuilder<T>),
1606}
1607
1608impl<T> PropagateTransactionsBuilder<T> {
1609    /// Create a builder for pooled transactions
1610    fn pooled(version: EthVersion) -> Self {
1611        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1612    }
1613
1614    /// Create a builder that sends transactions in full and records transactions that don't fit.
1615    fn full(version: EthVersion) -> Self {
1616        Self::Full(FullTransactionsBuilder::new(version))
1617    }
1618
1619    /// Returns true if no transactions are recorded.
1620    fn is_empty(&self) -> bool {
1621        match self {
1622            Self::Pooled(builder) => builder.is_empty(),
1623            Self::Full(builder) => builder.is_empty(),
1624        }
1625    }
1626
1627    /// Consumes the type and returns the built messages that should be sent to the peer.
1628    fn build(self) -> PropagateTransactions<T> {
1629        match self {
1630            Self::Pooled(pooled) => {
1631                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1632            }
1633            Self::Full(full) => full.build(),
1634        }
1635    }
1636}
1637
1638impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1639    /// Appends all transactions
1640    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1641        for tx in txs {
1642            self.push(tx);
1643        }
1644    }
1645
1646    /// Appends a transaction to the list.
1647    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1648        match self {
1649            Self::Pooled(builder) => builder.push(transaction),
1650            Self::Full(builder) => builder.push(transaction),
1651        }
1652    }
1653}
1654
1655/// Represents how the transactions should be sent to a peer if any.
1656struct PropagateTransactions<T> {
1657    /// The pooled transaction hashes to send.
1658    pooled: Option<NewPooledTransactionHashes>,
1659    /// The transactions to send in full.
1660    full: Option<Vec<Arc<T>>>,
1661}
1662
1663/// Helper type for constructing the full transaction message that enforces the
1664/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1665/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1666/// broadcasted in full.
1667#[derive(Debug, Clone)]
1668struct FullTransactionsBuilder<T> {
1669    /// The soft limit to enforce for a single broadcast message of full transactions.
1670    total_size: usize,
1671    /// All transactions to be broadcasted.
1672    transactions: Vec<Arc<T>>,
1673    /// Transactions that didn't fit into the broadcast message
1674    pooled: PooledTransactionsHashesBuilder,
1675}
1676
1677impl<T> FullTransactionsBuilder<T> {
1678    /// Create a builder for the negotiated version of the peer's session
1679    fn new(version: EthVersion) -> Self {
1680        Self {
1681            total_size: 0,
1682            pooled: PooledTransactionsHashesBuilder::new(version),
1683            transactions: vec![],
1684        }
1685    }
1686
1687    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1688    fn is_empty(&self) -> bool {
1689        self.transactions.is_empty() && self.pooled.is_empty()
1690    }
1691
1692    /// Returns the messages that should be propagated to the peer.
1693    fn build(self) -> PropagateTransactions<T> {
1694        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1695        let full = Some(self.transactions).filter(|full| !full.is_empty());
1696        PropagateTransactions { pooled, full }
1697    }
1698}
1699
1700impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1701    /// Appends all transactions.
1702    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1703        for tx in txs {
1704            self.push(&tx)
1705        }
1706    }
1707
1708    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1709    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1710    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1711    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1712    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1713    ///
1714    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1715    /// to list of pooled transactions, (e.g. 4844 transactions).
1716    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1717    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1718        // Do not send full 4844 transaction hashes to peers.
1719        //
1720        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1721        //  Instead, those transactions are only announced using
1722        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1723        //  via `GetPooledTransactions`.
1724        //
1725        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1726        if !transaction.transaction.is_broadcastable_in_full() {
1727            self.pooled.push(transaction);
1728            return
1729        }
1730
1731        let new_size = self.total_size + transaction.size;
1732        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1733            self.total_size > 0
1734        {
1735            // transaction does not fit into the message
1736            self.pooled.push(transaction);
1737            return
1738        }
1739
1740        self.total_size = new_size;
1741        self.transactions.push(Arc::clone(&transaction.transaction));
1742    }
1743}
1744
1745/// A helper type to create the pooled transactions message based on the negotiated version of the
1746/// session with the peer
1747#[derive(Debug, Clone)]
1748enum PooledTransactionsHashesBuilder {
1749    Eth66(NewPooledTransactionHashes66),
1750    Eth68(NewPooledTransactionHashes68),
1751}
1752
1753// === impl PooledTransactionsHashesBuilder ===
1754
1755impl PooledTransactionsHashesBuilder {
1756    /// Push a transaction from the pool to the list.
1757    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1758        match self {
1759            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1760            Self::Eth68(msg) => {
1761                msg.hashes.push(*pooled_tx.hash());
1762                msg.sizes.push(pooled_tx.encoded_length());
1763                msg.types.push(pooled_tx.transaction.ty());
1764            }
1765        }
1766    }
1767
1768    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1769    fn is_empty(&self) -> bool {
1770        match self {
1771            Self::Eth66(hashes) => hashes.is_empty(),
1772            Self::Eth68(hashes) => hashes.is_empty(),
1773        }
1774    }
1775
1776    /// Appends all hashes
1777    fn extend<T: SignedTransaction>(
1778        &mut self,
1779        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1780    ) {
1781        for tx in txs {
1782            self.push(&tx);
1783        }
1784    }
1785
1786    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1787        match self {
1788            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1789            Self::Eth68(msg) => {
1790                msg.hashes.push(*tx.tx_hash());
1791                msg.sizes.push(tx.size);
1792                msg.types.push(tx.transaction.ty());
1793            }
1794        }
1795    }
1796
1797    /// Create a builder for the negotiated version of the peer's session
1798    fn new(version: EthVersion) -> Self {
1799        match version {
1800            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1801            EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1802        }
1803    }
1804
1805    fn build(self) -> NewPooledTransactionHashes {
1806        match self {
1807            Self::Eth66(msg) => msg.into(),
1808            Self::Eth68(msg) => msg.into(),
1809        }
1810    }
1811}
1812
1813/// How we received the transactions.
1814enum TransactionSource {
1815    /// Transactions were broadcast to us via [`Transactions`] message.
1816    Broadcast,
1817    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1818    Response,
1819}
1820
1821// === impl TransactionSource ===
1822
1823impl TransactionSource {
1824    /// Whether the transaction were sent as broadcast.
1825    const fn is_broadcast(&self) -> bool {
1826        matches!(self, Self::Broadcast)
1827    }
1828}
1829
1830/// Tracks a single peer in the context of [`TransactionsManager`].
1831#[derive(Debug)]
1832pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1833    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1834    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1835    /// the peer.
1836    seen_transactions: LruCache<TxHash>,
1837    /// A communication channel directly to the peer's session task.
1838    request_tx: PeerRequestSender<PeerRequest<N>>,
1839    /// negotiated version of the session.
1840    version: EthVersion,
1841    /// The peer's client version.
1842    client_version: Arc<str>,
1843    /// The kind of peer.
1844    peer_kind: PeerKind,
1845}
1846
1847impl<N: NetworkPrimitives> PeerMetadata<N> {
1848    /// Returns a new instance of [`PeerMetadata`].
1849    pub fn new(
1850        request_tx: PeerRequestSender<PeerRequest<N>>,
1851        version: EthVersion,
1852        client_version: Arc<str>,
1853        max_transactions_seen_by_peer: u32,
1854        peer_kind: PeerKind,
1855    ) -> Self {
1856        Self {
1857            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1858            request_tx,
1859            version,
1860            client_version,
1861            peer_kind,
1862        }
1863    }
1864
1865    /// Returns a reference to the peer's request sender channel.
1866    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1867        &self.request_tx
1868    }
1869
1870    /// Return a
1871    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1872        &mut self.seen_transactions
1873    }
1874
1875    /// Returns the negotiated `EthVersion` of the session.
1876    pub const fn version(&self) -> EthVersion {
1877        self.version
1878    }
1879
1880    /// Returns a reference to the peer's client version string.
1881    pub fn client_version(&self) -> &str {
1882        &self.client_version
1883    }
1884
1885    /// Returns the peer's kind.
1886    pub const fn peer_kind(&self) -> PeerKind {
1887        self.peer_kind
1888    }
1889}
1890
1891/// Commands to send to the [`TransactionsManager`]
1892#[derive(Debug)]
1893enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1894    /// Propagate a transaction hash to the network.
1895    PropagateHash(B256),
1896    /// Propagate transaction hashes to a specific peer.
1897    PropagateHashesTo(Vec<B256>, PeerId),
1898    /// Request the list of active peer IDs from the [`TransactionsManager`].
1899    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
1900    /// Propagate a collection of full transactions to a specific peer.
1901    PropagateTransactionsTo(Vec<TxHash>, PeerId),
1902    /// Propagate a collection of hashes to all peers.
1903    PropagateTransactions(Vec<TxHash>),
1904    /// Propagate a collection of broadcastable transactions in full to all peers.
1905    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
1906    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
1907    GetTransactionHashes {
1908        peers: Vec<PeerId>,
1909        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
1910    },
1911    /// Requests a clone of the sender channel to the peer.
1912    GetPeerSender {
1913        peer_id: PeerId,
1914        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
1915    },
1916}
1917
1918/// All events related to transactions emitted by the network.
1919#[derive(Debug)]
1920pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
1921    /// Represents the event of receiving a list of transactions from a peer.
1922    ///
1923    /// This indicates transactions that were broadcasted to us from the peer.
1924    IncomingTransactions {
1925        /// The ID of the peer from which the transactions were received.
1926        peer_id: PeerId,
1927        /// The received transactions.
1928        msg: Transactions<N::BroadcastedTransaction>,
1929    },
1930    /// Represents the event of receiving a list of transaction hashes from a peer.
1931    IncomingPooledTransactionHashes {
1932        /// The ID of the peer from which the transaction hashes were received.
1933        peer_id: PeerId,
1934        /// The received new pooled transaction hashes.
1935        msg: NewPooledTransactionHashes,
1936    },
1937    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
1938    GetPooledTransactions {
1939        /// The ID of the peer from which the request was received.
1940        peer_id: PeerId,
1941        /// The received `GetPooledTransactions` request.
1942        request: GetPooledTransactions,
1943        /// The sender for responding to the request with a result of `PooledTransactions`.
1944        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1945    },
1946    /// Represents the event of receiving a `GetTransactionsHandle` request.
1947    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
1948}
1949
1950/// Tracks stats about the [`TransactionsManager`].
1951#[derive(Debug)]
1952pub struct PendingPoolImportsInfo {
1953    /// Number of transactions about to be inserted into the pool.
1954    pending_pool_imports: Arc<AtomicUsize>,
1955    /// Max number of transactions allowed to be imported concurrently.
1956    max_pending_pool_imports: usize,
1957}
1958
1959impl PendingPoolImportsInfo {
1960    /// Returns a new [`PendingPoolImportsInfo`].
1961    pub fn new(max_pending_pool_imports: usize) -> Self {
1962        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
1963    }
1964
1965    /// Returns `true` if the number of pool imports is under a given tolerated max.
1966    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
1967        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
1968    }
1969}
1970
1971impl Default for PendingPoolImportsInfo {
1972    fn default() -> Self {
1973        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
1974    }
1975}
1976
1977#[derive(Debug, Default)]
1978struct TxManagerPollDurations {
1979    acc_network_events: Duration,
1980    acc_pending_imports: Duration,
1981    acc_tx_events: Duration,
1982    acc_imported_txns: Duration,
1983    acc_fetch_events: Duration,
1984    acc_pending_fetch: Duration,
1985    acc_cmds: Duration,
1986}
1987
1988#[cfg(test)]
1989mod tests {
1990    use super::*;
1991    use crate::{
1992        test_utils::{
1993            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
1994            Testnet,
1995        },
1996        NetworkConfigBuilder, NetworkManager,
1997    };
1998    use alloy_consensus::{transaction::PooledTransaction, TxEip1559, TxLegacy};
1999    use alloy_primitives::{hex, Signature, TxKind, U256};
2000    use alloy_rlp::Decodable;
2001    use futures::FutureExt;
2002    use reth_chainspec::MIN_TRANSACTION_GAS;
2003    use reth_ethereum_primitives::{Transaction, TransactionSigned};
2004    use reth_network_api::{NetworkInfo, PeerKind};
2005    use reth_network_p2p::{
2006        error::{RequestError, RequestResult},
2007        sync::{NetworkSyncUpdater, SyncState},
2008    };
2009    use reth_storage_api::noop::NoopProvider;
2010    use reth_transaction_pool::test_utils::{
2011        testing_pool, MockTransaction, MockTransactionFactory,
2012    };
2013    use secp256k1::SecretKey;
2014    use std::{
2015        future::poll_fn,
2016        net::{IpAddr, Ipv4Addr, SocketAddr},
2017        str::FromStr,
2018    };
2019    use tracing::error;
2020
2021    #[tokio::test(flavor = "multi_thread")]
2022    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2023        reth_tracing::init_test_tracing();
2024        let net = Testnet::create(3).await;
2025
2026        let mut handles = net.handles();
2027        let handle0 = handles.next().unwrap();
2028        let handle1 = handles.next().unwrap();
2029
2030        drop(handles);
2031        let handle = net.spawn();
2032
2033        let listener0 = handle0.event_listener();
2034        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2035        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2036
2037        let client = NoopProvider::default();
2038        let pool = testing_pool();
2039        let config = NetworkConfigBuilder::eth(secret_key)
2040            .disable_discovery()
2041            .listener_port(0)
2042            .build(client);
2043        let transactions_manager_config = config.transactions_manager_config.clone();
2044        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2045            .await
2046            .unwrap()
2047            .into_builder()
2048            .transactions(pool.clone(), transactions_manager_config)
2049            .split_with_handle();
2050
2051        tokio::task::spawn(network);
2052
2053        // go to syncing (pipeline sync)
2054        network_handle.update_sync_state(SyncState::Syncing);
2055        assert!(NetworkInfo::is_syncing(&network_handle));
2056        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2057
2058        // wait for all initiator connections
2059        let mut established = listener0.take(2);
2060        while let Some(ev) = established.next().await {
2061            match ev {
2062                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2063                    // to insert a new peer in transactions peerset
2064                    transactions
2065                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2066                }
2067                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2068                ev => {
2069                    error!("unexpected event {ev:?}")
2070                }
2071            }
2072        }
2073        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2074        let input = hex!(
2075            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2076        );
2077        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2078        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2079            peer_id: *handle1.peer_id(),
2080            msg: Transactions(vec![signed_tx.clone()]),
2081        });
2082        poll_fn(|cx| {
2083            let _ = transactions.poll_unpin(cx);
2084            Poll::Ready(())
2085        })
2086        .await;
2087        assert!(pool.is_empty());
2088        handle.terminate().await;
2089    }
2090
2091    #[tokio::test(flavor = "multi_thread")]
2092    async fn test_tx_broadcasts_through_two_syncs() {
2093        reth_tracing::init_test_tracing();
2094        let net = Testnet::create(3).await;
2095
2096        let mut handles = net.handles();
2097        let handle0 = handles.next().unwrap();
2098        let handle1 = handles.next().unwrap();
2099
2100        drop(handles);
2101        let handle = net.spawn();
2102
2103        let listener0 = handle0.event_listener();
2104        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2105        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2106
2107        let client = NoopProvider::default();
2108        let pool = testing_pool();
2109        let config = NetworkConfigBuilder::new(secret_key)
2110            .disable_discovery()
2111            .listener_port(0)
2112            .build(client);
2113        let transactions_manager_config = config.transactions_manager_config.clone();
2114        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2115            .await
2116            .unwrap()
2117            .into_builder()
2118            .transactions(pool.clone(), transactions_manager_config)
2119            .split_with_handle();
2120
2121        tokio::task::spawn(network);
2122
2123        // go to syncing (pipeline sync) to idle and then to syncing (live)
2124        network_handle.update_sync_state(SyncState::Syncing);
2125        assert!(NetworkInfo::is_syncing(&network_handle));
2126        network_handle.update_sync_state(SyncState::Idle);
2127        assert!(!NetworkInfo::is_syncing(&network_handle));
2128        network_handle.update_sync_state(SyncState::Syncing);
2129        assert!(NetworkInfo::is_syncing(&network_handle));
2130
2131        // wait for all initiator connections
2132        let mut established = listener0.take(2);
2133        while let Some(ev) = established.next().await {
2134            match ev {
2135                NetworkEvent::ActivePeerSession { .. } |
2136                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2137                    // to insert a new peer in transactions peerset
2138                    transactions.on_network_event(ev);
2139                }
2140                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2141                _ => {
2142                    error!("unexpected event {ev:?}")
2143                }
2144            }
2145        }
2146        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2147        let input = hex!(
2148            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2149        );
2150        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2151        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2152            peer_id: *handle1.peer_id(),
2153            msg: Transactions(vec![signed_tx.clone()]),
2154        });
2155        poll_fn(|cx| {
2156            let _ = transactions.poll_unpin(cx);
2157            Poll::Ready(())
2158        })
2159        .await;
2160        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2161        assert!(NetworkInfo::is_syncing(&network_handle));
2162        assert!(!pool.is_empty());
2163        handle.terminate().await;
2164    }
2165
2166    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2167    // event and is able to retrieve the corresponding transactions.
2168    #[tokio::test(flavor = "multi_thread")]
2169    async fn test_handle_incoming_transactions_hashes() {
2170        reth_tracing::init_test_tracing();
2171
2172        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2173        let client = NoopProvider::default();
2174
2175        let config = NetworkConfigBuilder::new(secret_key)
2176            // let OS choose port
2177            .listener_port(0)
2178            .disable_discovery()
2179            .build(client);
2180
2181        let pool = testing_pool();
2182
2183        let transactions_manager_config = config.transactions_manager_config.clone();
2184        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2185            .await
2186            .unwrap()
2187            .into_builder()
2188            .transactions(pool.clone(), transactions_manager_config)
2189            .split_with_handle();
2190
2191        let peer_id_1 = PeerId::new([1; 64]);
2192        let eth_version = EthVersion::Eth66;
2193
2194        let txs = vec![TransactionSigned::new_unhashed(
2195            Transaction::Legacy(TxLegacy {
2196                chain_id: Some(4),
2197                nonce: 15u64,
2198                gas_price: 2200000000,
2199                gas_limit: 34811,
2200                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2201                value: U256::from(1234u64),
2202                input: Default::default(),
2203            }),
2204            Signature::new(
2205                U256::from_str(
2206                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2207                )
2208                .unwrap(),
2209                U256::from_str(
2210                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2211                )
2212                .unwrap(),
2213                true,
2214            ),
2215        )];
2216
2217        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2218
2219        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2220        tx_manager.peers.insert(peer_id_1, peer_1);
2221
2222        assert!(pool.is_empty());
2223
2224        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2225            peer_id: peer_id_1,
2226            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2227                txs_hashes.clone(),
2228            )),
2229        });
2230
2231        // mock session of peer_1 receives request
2232        let req = to_mock_session_rx
2233            .recv()
2234            .await
2235            .expect("peer_1 session should receive request with buffered hashes");
2236        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2237        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2238
2239        let message: Vec<PooledTransaction> = txs
2240            .into_iter()
2241            .map(|tx| {
2242                PooledTransaction::try_from(tx)
2243                    .expect("Failed to convert MockTransaction to PooledTransaction")
2244            })
2245            .collect();
2246
2247        // return the transactions corresponding to the transaction hashes.
2248        response
2249            .send(Ok(PooledTransactions(message)))
2250            .expect("should send peer_1 response to tx manager");
2251
2252        // adance the transaction manager future
2253        poll_fn(|cx| {
2254            let _ = tx_manager.poll_unpin(cx);
2255            Poll::Ready(())
2256        })
2257        .await;
2258
2259        // ensure that the transactions corresponding to the transaction hashes have been
2260        // successfully retrieved and stored in the Pool.
2261        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2262    }
2263
2264    #[tokio::test(flavor = "multi_thread")]
2265    async fn test_handle_incoming_transactions() {
2266        reth_tracing::init_test_tracing();
2267        let net = Testnet::create(3).await;
2268
2269        let mut handles = net.handles();
2270        let handle0 = handles.next().unwrap();
2271        let handle1 = handles.next().unwrap();
2272
2273        drop(handles);
2274        let handle = net.spawn();
2275
2276        let listener0 = handle0.event_listener();
2277
2278        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2279        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2280
2281        let client = NoopProvider::default();
2282        let pool = testing_pool();
2283        let config = NetworkConfigBuilder::new(secret_key)
2284            .disable_discovery()
2285            .listener_port(0)
2286            .build(client);
2287        let transactions_manager_config = config.transactions_manager_config.clone();
2288        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2289            .await
2290            .unwrap()
2291            .into_builder()
2292            .transactions(pool.clone(), transactions_manager_config)
2293            .split_with_handle();
2294        tokio::task::spawn(network);
2295
2296        network_handle.update_sync_state(SyncState::Idle);
2297
2298        assert!(!NetworkInfo::is_syncing(&network_handle));
2299
2300        // wait for all initiator connections
2301        let mut established = listener0.take(2);
2302        while let Some(ev) = established.next().await {
2303            match ev {
2304                NetworkEvent::ActivePeerSession { .. } |
2305                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2306                    // to insert a new peer in transactions peerset
2307                    transactions.on_network_event(ev);
2308                }
2309                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2310                ev => {
2311                    error!("unexpected event {ev:?}")
2312                }
2313            }
2314        }
2315        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2316        let input = hex!(
2317            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2318        );
2319        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2320        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2321            peer_id: *handle1.peer_id(),
2322            msg: Transactions(vec![signed_tx.clone()]),
2323        });
2324        assert!(transactions
2325            .transactions_by_peers
2326            .get(signed_tx.tx_hash())
2327            .unwrap()
2328            .contains(handle1.peer_id()));
2329
2330        // advance the transaction manager future
2331        poll_fn(|cx| {
2332            let _ = transactions.poll_unpin(cx);
2333            Poll::Ready(())
2334        })
2335        .await;
2336
2337        assert!(!pool.is_empty());
2338        assert!(pool.get(signed_tx.tx_hash()).is_some());
2339        handle.terminate().await;
2340    }
2341
2342    #[tokio::test(flavor = "multi_thread")]
2343    async fn test_on_get_pooled_transactions_network() {
2344        reth_tracing::init_test_tracing();
2345        let net = Testnet::create(2).await;
2346
2347        let mut handles = net.handles();
2348        let handle0 = handles.next().unwrap();
2349        let handle1 = handles.next().unwrap();
2350
2351        drop(handles);
2352        let handle = net.spawn();
2353
2354        let listener0 = handle0.event_listener();
2355
2356        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2357        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2358
2359        let client = NoopProvider::default();
2360        let pool = testing_pool();
2361        let config = NetworkConfigBuilder::new(secret_key)
2362            .disable_discovery()
2363            .listener_port(0)
2364            .build(client);
2365        let transactions_manager_config = config.transactions_manager_config.clone();
2366        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2367            .await
2368            .unwrap()
2369            .into_builder()
2370            .transactions(pool.clone(), transactions_manager_config)
2371            .split_with_handle();
2372        tokio::task::spawn(network);
2373
2374        network_handle.update_sync_state(SyncState::Idle);
2375
2376        assert!(!NetworkInfo::is_syncing(&network_handle));
2377
2378        // wait for all initiator connections
2379        let mut established = listener0.take(2);
2380        while let Some(ev) = established.next().await {
2381            match ev {
2382                NetworkEvent::ActivePeerSession { .. } |
2383                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2384                    transactions.on_network_event(ev);
2385                }
2386                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2387                ev => {
2388                    error!("unexpected event {ev:?}")
2389                }
2390            }
2391        }
2392        handle.terminate().await;
2393
2394        let tx = MockTransaction::eip1559();
2395        let _ = transactions
2396            .pool
2397            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2398            .await;
2399
2400        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2401
2402        let (send, receive) = oneshot::channel::<RequestResult<PooledTransactions>>();
2403
2404        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2405            peer_id: *handle1.peer_id(),
2406            request,
2407            response: send,
2408        });
2409
2410        match receive.await.unwrap() {
2411            Ok(PooledTransactions(transactions)) => {
2412                assert_eq!(transactions.len(), 1);
2413            }
2414            Err(e) => {
2415                panic!("error: {e:?}");
2416            }
2417        }
2418    }
2419
2420    // Ensure that when the remote peer only returns part of the requested transactions, the
2421    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2422    // re-buffered.
2423    #[tokio::test]
2424    async fn test_partially_tx_response() {
2425        reth_tracing::init_test_tracing();
2426
2427        let mut tx_manager = new_tx_manager().await.0;
2428        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2429
2430        let peer_id_1 = PeerId::new([1; 64]);
2431        let eth_version = EthVersion::Eth66;
2432
2433        let txs = vec![
2434            TransactionSigned::new_unhashed(
2435                Transaction::Legacy(TxLegacy {
2436                    chain_id: Some(4),
2437                    nonce: 15u64,
2438                    gas_price: 2200000000,
2439                    gas_limit: 34811,
2440                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2441                    value: U256::from(1234u64),
2442                    input: Default::default(),
2443                }),
2444                Signature::new(
2445                    U256::from_str(
2446                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2447                    )
2448                    .unwrap(),
2449                    U256::from_str(
2450                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2451                    )
2452                    .unwrap(),
2453                    true,
2454                ),
2455            ),
2456            TransactionSigned::new_unhashed(
2457                Transaction::Eip1559(TxEip1559 {
2458                    chain_id: 4,
2459                    nonce: 26u64,
2460                    max_priority_fee_per_gas: 1500000000,
2461                    max_fee_per_gas: 1500000013,
2462                    gas_limit: MIN_TRANSACTION_GAS,
2463                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2464                    value: U256::from(3000000000000000000u64),
2465                    input: Default::default(),
2466                    access_list: Default::default(),
2467                }),
2468                Signature::new(
2469                    U256::from_str(
2470                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2471                    )
2472                    .unwrap(),
2473                    U256::from_str(
2474                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2475                    )
2476                    .unwrap(),
2477                    true,
2478                ),
2479            ),
2480        ];
2481
2482        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2483
2484        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2485        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2486        // fetch
2487        peer_1.seen_transactions.insert(txs_hashes[0]);
2488        peer_1.seen_transactions.insert(txs_hashes[1]);
2489        tx_manager.peers.insert(peer_id_1, peer_1);
2490
2491        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2492        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2493
2494        // peer_1 is idle
2495        assert!(tx_fetcher.is_idle(&peer_id_1));
2496        assert_eq!(tx_fetcher.active_peers.len(), 0);
2497
2498        // sends requests for buffered hashes to peer_1
2499        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2500
2501        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2502        // as long as request is in flight peer_1 is not idle
2503        assert!(!tx_fetcher.is_idle(&peer_id_1));
2504        assert_eq!(tx_fetcher.active_peers.len(), 1);
2505
2506        // mock session of peer_1 receives request
2507        let req = to_mock_session_rx
2508            .recv()
2509            .await
2510            .expect("peer_1 session should receive request with buffered hashes");
2511        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2512
2513        let message: Vec<PooledTransaction> = txs
2514            .into_iter()
2515            .take(1)
2516            .map(|tx| {
2517                PooledTransaction::try_from(tx)
2518                    .expect("Failed to convert MockTransaction to PooledTransaction")
2519            })
2520            .collect();
2521        // response partial request
2522        response
2523            .send(Ok(PooledTransactions(message)))
2524            .expect("should send peer_1 response to tx manager");
2525        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2526            unreachable!()
2527        };
2528
2529        // request has resolved, peer_1 is idle again
2530        assert!(tx_fetcher.is_idle(&peer_id));
2531        assert_eq!(tx_fetcher.active_peers.len(), 0);
2532        // failing peer_1's request buffers requested hashes for retry.
2533        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2534    }
2535
2536    #[tokio::test]
2537    async fn test_max_retries_tx_request() {
2538        reth_tracing::init_test_tracing();
2539
2540        let mut tx_manager = new_tx_manager().await.0;
2541        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2542
2543        let peer_id_1 = PeerId::new([1; 64]);
2544        let peer_id_2 = PeerId::new([2; 64]);
2545        let eth_version = EthVersion::Eth66;
2546        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2547
2548        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2549        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2550        // fetch
2551        peer_1.seen_transactions.insert(seen_hashes[0]);
2552        peer_1.seen_transactions.insert(seen_hashes[1]);
2553        tx_manager.peers.insert(peer_id_1, peer_1);
2554
2555        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2556        // for first retry in reverse order to make index 0 lru
2557        let retries = 1;
2558        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2559        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2560
2561        // peer_1 is idle
2562        assert!(tx_fetcher.is_idle(&peer_id_1));
2563        assert_eq!(tx_fetcher.active_peers.len(), 0);
2564
2565        // sends request for buffered hashes to peer_1
2566        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2567
2568        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2569
2570        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2571        // as long as request is in inflight peer_1 is not idle
2572        assert!(!tx_fetcher.is_idle(&peer_id_1));
2573        assert_eq!(tx_fetcher.active_peers.len(), 1);
2574
2575        // mock session of peer_1 receives request
2576        let req = to_mock_session_rx
2577            .recv()
2578            .await
2579            .expect("peer_1 session should receive request with buffered hashes");
2580        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2581        let GetPooledTransactions(hashes) = request;
2582
2583        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2584
2585        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2586
2587        // fail request to peer_1
2588        response
2589            .send(Err(RequestError::BadResponse))
2590            .expect("should send peer_1 response to tx manager");
2591        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2592            unreachable!()
2593        };
2594
2595        // request has resolved, peer_1 is idle again
2596        assert!(tx_fetcher.is_idle(&peer_id));
2597        assert_eq!(tx_fetcher.active_peers.len(), 0);
2598        // failing peer_1's request buffers requested hashes for retry
2599        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2600
2601        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2602        tx_manager.peers.insert(peer_id_2, peer_2);
2603
2604        // peer_2 announces same hashes as peer_1
2605        let msg =
2606            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2607        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2608
2609        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2610
2611        // peer_2 should be in active_peers.
2612        assert_eq!(tx_fetcher.active_peers.len(), 1);
2613
2614        // since hashes are already seen, no changes to length of unknown hashes
2615        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2616        // but hashes are taken out of buffer and packed into request to peer_2
2617        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2618
2619        // mock session of peer_2 receives request
2620        let req = to_mock_session_rx
2621            .recv()
2622            .await
2623            .expect("peer_2 session should receive request with buffered hashes");
2624        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2625
2626        // report failed request to tx manager
2627        response
2628            .send(Err(RequestError::BadResponse))
2629            .expect("should send peer_2 response to tx manager");
2630        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2631
2632        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2633        // for retry
2634        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2635        assert_eq!(tx_fetcher.active_peers.len(), 0);
2636    }
2637
2638    #[test]
2639    fn test_transaction_builder_empty() {
2640        let mut builder =
2641            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2642        assert!(builder.is_empty());
2643
2644        let mut factory = MockTransactionFactory::default();
2645        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2646        builder.push(&tx);
2647        assert!(!builder.is_empty());
2648
2649        let txs = builder.build();
2650        assert!(txs.full.is_none());
2651        let txs = txs.pooled.unwrap();
2652        assert_eq!(txs.len(), 1);
2653    }
2654
2655    #[test]
2656    fn test_transaction_builder_large() {
2657        let mut builder =
2658            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2659        assert!(builder.is_empty());
2660
2661        let mut factory = MockTransactionFactory::default();
2662        let mut tx = factory.create_eip1559();
2663        // create a transaction that still fits
2664        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2665        let tx = Arc::new(tx);
2666        let tx = PropagateTransaction::pool_tx(tx);
2667        builder.push(&tx);
2668        assert!(!builder.is_empty());
2669
2670        let txs = builder.clone().build();
2671        assert!(txs.pooled.is_none());
2672        let txs = txs.full.unwrap();
2673        assert_eq!(txs.len(), 1);
2674
2675        builder.push(&tx);
2676
2677        let txs = builder.clone().build();
2678        let pooled = txs.pooled.unwrap();
2679        assert_eq!(pooled.len(), 1);
2680        let txs = txs.full.unwrap();
2681        assert_eq!(txs.len(), 1);
2682    }
2683
2684    #[test]
2685    fn test_transaction_builder_eip4844() {
2686        let mut builder =
2687            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2688        assert!(builder.is_empty());
2689
2690        let mut factory = MockTransactionFactory::default();
2691        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2692        builder.push(&tx);
2693        assert!(!builder.is_empty());
2694
2695        let txs = builder.clone().build();
2696        assert!(txs.full.is_none());
2697        let txs = txs.pooled.unwrap();
2698        assert_eq!(txs.len(), 1);
2699
2700        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2701        builder.push(&tx);
2702
2703        let txs = builder.clone().build();
2704        let pooled = txs.pooled.unwrap();
2705        assert_eq!(pooled.len(), 1);
2706        let txs = txs.full.unwrap();
2707        assert_eq!(txs.len(), 1);
2708    }
2709
2710    #[tokio::test]
2711    async fn test_propagate_full() {
2712        reth_tracing::init_test_tracing();
2713
2714        let (mut tx_manager, network) = new_tx_manager().await;
2715        let peer_id = PeerId::random();
2716
2717        // ensure not syncing
2718        network.handle().update_sync_state(SyncState::Idle);
2719
2720        // mock a peer
2721        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2722
2723        let session_info = SessionInfo {
2724            peer_id,
2725            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2726            client_version: Arc::from(""),
2727            capabilities: Arc::new(vec![].into()),
2728            status: Arc::new(Default::default()),
2729            version: EthVersion::Eth68,
2730            peer_kind: PeerKind::Basic,
2731        };
2732        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2733        tx_manager
2734            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2735        let mut propagate = vec![];
2736        let mut factory = MockTransactionFactory::default();
2737        let eip1559_tx = Arc::new(factory.create_eip1559());
2738        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2739        let eip4844_tx = Arc::new(factory.create_eip4844());
2740        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2741
2742        let propagated =
2743            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2744        assert_eq!(propagated.0.len(), 2);
2745        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2746        assert_eq!(prop_txs.len(), 1);
2747        assert!(prop_txs[0].is_full());
2748
2749        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2750        assert_eq!(prop_txs.len(), 1);
2751        assert!(prop_txs[0].is_hash());
2752
2753        let peer = tx_manager.peers.get(&peer_id).unwrap();
2754        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2755        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2756        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2757
2758        // propagate again
2759        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2760        assert!(propagated.0.is_empty());
2761    }
2762}