reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6/// Aggregation on configurable parameters for [`TransactionsManager`].
7pub mod config;
8/// Default and spec'd bounds.
9pub mod constants;
10/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
11pub mod fetcher;
12/// Defines the traits for transaction-related policies.
13pub mod policy;
14
15pub use self::constants::{
16    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30    budget::{
31        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33    },
34    cache::LruCache,
35    duration_metered_exec, metered_poll_nested_stream_with_budget,
36    metrics::{
37        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38    },
39    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40    NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49    RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54    events::{PeerEvent, SessionInfo},
55    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58    error::{RequestError, RequestResult},
59    sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66    error::{PoolError, PoolResult},
67    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71    collections::{hash_map::Entry, HashMap, HashSet},
72    pin::Pin,
73    sync::{
74        atomic::{AtomicUsize, Ordering},
75        Arc,
76    },
77    task::{Context, Poll},
78    time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84/// The future for importing transactions into the pool.
85///
86/// Resolves with the result of each transaction import.
87pub type PoolImportFuture =
88    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90/// Api to interact with [`TransactionsManager`] task.
91///
92/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
93/// with the [`TransactionsManager`] task once it is spawned.
94///
95/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
96/// known by a specific peer.
97#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99    /// Command channel to the [`TransactionsManager`]
100    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104    fn send(&self, cmd: TransactionsCommand<N>) {
105        let _ = self.manager_tx.send(cmd);
106    }
107
108    /// Fetch the [`PeerRequestSender`] for the given peer.
109    async fn peer_handle(
110        &self,
111        peer_id: PeerId,
112    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113        let (tx, rx) = oneshot::channel();
114        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115        rx.await
116    }
117
118    /// Manually propagate the transaction that belongs to the hash.
119    pub fn propagate(&self, hash: TxHash) {
120        self.send(TransactionsCommand::PropagateHash(hash))
121    }
122
123    /// Manually propagate the transaction hash to a specific peer.
124    ///
125    /// Note: this only propagates if the pool contains the transaction.
126    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127        self.propagate_hashes_to(Some(hash), peer)
128    }
129
130    /// Manually propagate the transaction hashes to a specific peer.
131    ///
132    /// Note: this only propagates the transactions that are known to the pool.
133    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134        let hashes = hash.into_iter().collect::<Vec<_>>();
135        if hashes.is_empty() {
136            return
137        }
138        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139    }
140
141    /// Request the active peer IDs from the [`TransactionsManager`].
142    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143        let (tx, rx) = oneshot::channel();
144        self.send(TransactionsCommand::GetActivePeers(tx));
145        rx.await
146    }
147
148    /// Manually propagate full transaction hashes to a specific peer.
149    ///
150    /// Do nothing if transactions are empty.
151    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152        if transactions.is_empty() {
153            return
154        }
155        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156    }
157
158    /// Manually propagate the given transaction hashes to all peers.
159    ///
160    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
161    /// full.
162    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163        if transactions.is_empty() {
164            return
165        }
166        self.send(TransactionsCommand::PropagateTransactions(transactions))
167    }
168
169    /// Manually propagate the given transactions to all peers.
170    ///
171    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
172    /// full.
173    pub fn broadcast_transactions(
174        &self,
175        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176    ) {
177        let transactions =
178            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179        if transactions.is_empty() {
180            return
181        }
182        self.send(TransactionsCommand::BroadcastTransactions(transactions))
183    }
184
185    /// Request the transaction hashes known by specific peers.
186    pub async fn get_transaction_hashes(
187        &self,
188        peers: Vec<PeerId>,
189    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190        if peers.is_empty() {
191            return Ok(Default::default())
192        }
193        let (tx, rx) = oneshot::channel();
194        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195        rx.await
196    }
197
198    /// Request the transaction hashes known by a specific peer.
199    pub async fn get_peer_transaction_hashes(
200        &self,
201        peer: PeerId,
202    ) -> Result<HashSet<TxHash>, RecvError> {
203        let res = self.get_transaction_hashes(vec![peer]).await?;
204        Ok(res.into_values().next().unwrap_or_default())
205    }
206
207    /// Requests the transactions directly from the given peer.
208    ///
209    /// Returns `None` if the peer is not connected.
210    ///
211    /// **Note**: this returns the response from the peer as received.
212    pub async fn get_pooled_transactions_from(
213        &self,
214        peer_id: PeerId,
215        hashes: Vec<B256>,
216    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219        let (tx, rx) = oneshot::channel();
220        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221        peer.try_send(request).ok();
222
223        rx.await?.map(|res| Some(res.0))
224    }
225}
226
227/// Manages transactions on top of the p2p network.
228///
229/// This can be spawned to another task and is supposed to be run as background service.
230/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
231/// interact with it.
232///
233/// The [`TransactionsManager`] is responsible for:
234///    - handling incoming eth messages for transactions.
235///    - serving transaction requests.
236///    - propagate transactions
237///
238/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
239///   - receives incoming network messages.
240///   - sends messages to dispatch (responses, propagate tx)
241///
242/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
243/// propagate new transactions over the network.
244///
245/// It can be configured with different policies for transaction propagation and announcement
246/// filtering. See [`NetworkPolicies`] for more details.
247///
248/// ## Network Transaction Processing
249///
250/// ### Message Types
251///
252/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
253/// - **`NewPooledTransactionHashes`**: Hash announcements
254///
255/// ### Peer Tracking
256///
257/// - Maintains per-peer transaction cache (default: 10,240 entries)
258/// - Prevents duplicate imports and enables efficient propagation
259///
260/// ### Bad Transaction Handling
261///
262/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
263/// Penalizes peers sending invalid transactions.
264///
265/// ### Import Management
266///
267/// Limits concurrent pool imports and backs off when approaching capacity.
268///
269/// ### Transaction Fetching
270///
271/// For announced transactions: filters known → queues unknown → fetches → imports
272///
273/// ### Propagation Rules
274///
275/// Based on: origin (Local/External/Private), peer capabilities, and network state.
276/// Disabled during initial sync.
277///
278/// ### Security
279///
280/// Rate limiting via reputation, bad transaction isolation, peer scoring.
281#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284    /// Access to the transaction pool.
285    pool: Pool,
286    /// Network access.
287    network: NetworkHandle<N>,
288    /// Subscriptions to all network related events.
289    ///
290    /// From which we get all new incoming transaction related messages.
291    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292    /// Transaction fetcher to handle inflight and missing transaction requests.
293    transaction_fetcher: TransactionFetcher<N>,
294    /// All currently pending transactions grouped by peers.
295    ///
296    /// This way we can track incoming transactions and prevent multiple pool imports for the same
297    /// transaction
298    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299    /// Transactions that are currently imported into the `Pool`.
300    ///
301    /// The import process includes:
302    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
303    ///    valid, or for 4844 transaction the blobs are valid. See also
304    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
305    /// - if the transaction is valid, it is added into the pool.
306    ///
307    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
308    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
309    /// receiver.
310    pool_imports: FuturesUnordered<PoolImportFuture>,
311    /// Stats on pending pool imports that help the node self-monitor.
312    pending_pool_imports_info: PendingPoolImportsInfo,
313    /// Bad imports.
314    bad_imports: LruCache<TxHash>,
315    /// All the connected peers.
316    peers: HashMap<PeerId, PeerMetadata<N>>,
317    /// Send half for the command channel.
318    ///
319    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
320    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321    /// Incoming commands from [`TransactionsHandle`].
322    ///
323    /// This will only receive commands if a user manually sends a command to the manager through
324    /// the [`TransactionsHandle`] to interact with this type directly.
325    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326    /// A stream that yields new __pending__ transactions.
327    ///
328    /// A transaction is considered __pending__ if it is executable on the current state of the
329    /// chain. In other words, this only yields transactions that satisfy all consensus
330    /// requirements, these include:
331    ///   - no nonce gaps
332    ///   - all dynamic fee requirements are (currently) met
333    ///   - account has enough balance to cover the transaction's gas
334    pending_transactions: mpsc::Receiver<TxHash>,
335    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
336    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337    /// How the `TransactionsManager` is configured.
338    config: TransactionsManagerConfig,
339    /// Network Policies
340    policies: NetworkPolicies<N>,
341    /// `TransactionsManager` metrics
342    metrics: TransactionsManagerMetrics,
343    /// `AnnouncedTxTypes` metrics
344    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348    /// Sets up a new instance.
349    ///
350    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
351    pub fn new(
352        network: NetworkHandle<N>,
353        pool: Pool,
354        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355        transactions_manager_config: TransactionsManagerConfig,
356    ) -> Self {
357        Self::with_policy(
358            network,
359            pool,
360            from_network,
361            transactions_manager_config,
362            NetworkPolicies::new(
363                TransactionPropagationKind::default(),
364                StrictEthAnnouncementFilter::default(),
365            ),
366        )
367    }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371    /// Sets up a new instance with given the settings.
372    ///
373    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
374    pub fn with_policy(
375        network: NetworkHandle<N>,
376        pool: Pool,
377        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378        transactions_manager_config: TransactionsManagerConfig,
379        policies: NetworkPolicies<N>,
380    ) -> Self {
381        let network_events = network.event_listener();
382
383        let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386            &transactions_manager_config.transaction_fetcher_config,
387        );
388
389        // install a listener for new __pending__ transactions that are allowed to be propagated
390        // over the network
391        let pending = pool.pending_transactions_listener();
392        let pending_pool_imports_info = PendingPoolImportsInfo::default();
393        let metrics = TransactionsManagerMetrics::default();
394        metrics
395            .capacity_pending_pool_imports
396            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398        Self {
399            pool,
400            network,
401            network_events,
402            transaction_fetcher,
403            transactions_by_peers: Default::default(),
404            pool_imports: Default::default(),
405            pending_pool_imports_info: PendingPoolImportsInfo::new(
406                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
407            ),
408            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409            peers: Default::default(),
410            command_tx,
411            command_rx: UnboundedReceiverStream::new(command_rx),
412            pending_transactions: pending,
413            transaction_events: UnboundedMeteredReceiver::new(
414                from_network,
415                NETWORK_POOL_TRANSACTIONS_SCOPE,
416            ),
417            config: transactions_manager_config,
418            policies,
419            metrics,
420            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421        }
422    }
423
424    /// Returns a new handle that can send commands to this type.
425    pub fn handle(&self) -> TransactionsHandle<N> {
426        TransactionsHandle { manager_tx: self.command_tx.clone() }
427    }
428
429    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
430    /// `false` if [`TransactionsManager`] is operating close to full capacity.
431    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432        self.pending_pool_imports_info
433            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
434            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
435    }
436
437    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
438        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
439        self.metrics.reported_bad_transactions.increment(1);
440    }
441
442    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
443        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
444        self.network.reputation_change(peer_id, kind);
445    }
446
447    fn report_already_seen(&self, peer_id: PeerId) {
448        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
449        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
450    }
451
452    /// Clear the transaction
453    fn on_good_import(&mut self, hash: TxHash) {
454        self.transactions_by_peers.remove(&hash);
455    }
456
457    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
458    /// fetching or importing it again.
459    ///
460    /// Errors that count as bad transactions are:
461    ///
462    /// - intrinsic gas too low
463    /// - exceeds gas limit
464    /// - gas uint overflow
465    /// - exceeds max init code size
466    /// - oversized data
467    /// - signer account has bytecode
468    /// - chain id mismatch
469    /// - old legacy chain id
470    /// - tx type not supported
471    ///
472    /// (and additionally for blobs txns...)
473    ///
474    /// - no blobs
475    /// - too many blobs
476    /// - invalid kzg proof
477    /// - kzg error
478    /// - not blob transaction (tx type mismatch)
479    /// - wrong versioned kzg commitment hash
480    fn on_bad_import(&mut self, err: PoolError) {
481        let peers = self.transactions_by_peers.remove(&err.hash);
482
483        // if we're _currently_ syncing, we ignore a bad transaction
484        if !err.is_bad_transaction() || self.network.is_syncing() {
485            return
486        }
487        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
488        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
489        if let Some(peers) = peers {
490            for peer_id in peers {
491                self.report_peer_bad_transactions(peer_id);
492            }
493        }
494        self.metrics.bad_imports.increment(1);
495        self.bad_imports.insert(err.hash);
496    }
497
498    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
499    ///
500    /// Returns `true` if a request was sent.
501    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
502        // try drain transaction hashes pending fetch
503        let info = &self.pending_pool_imports_info;
504        let max_pending_pool_imports = info.max_pending_pool_imports;
505        let has_capacity_wrt_pending_pool_imports =
506            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
507
508        self.transaction_fetcher
509            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
510    }
511
512    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
513        let kind = match req_err {
514            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
515            RequestError::Timeout => ReputationChangeKind::Timeout,
516            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
517                // peer is already disconnected
518                return
519            }
520            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
521        };
522        self.report_peer(peer_id, kind);
523    }
524
525    #[inline]
526    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
527        let metrics = &self.metrics;
528
529        let TxManagerPollDurations {
530            acc_network_events,
531            acc_pending_imports,
532            acc_tx_events,
533            acc_imported_txns,
534            acc_fetch_events,
535            acc_pending_fetch,
536            acc_cmds,
537        } = poll_durations;
538
539        // update metrics for whole poll function
540        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
541        // update metrics for nested expressions
542        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
543        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
544        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
545        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
546        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
547        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
548        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
549    }
550}
551
552impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
553    /// Processes a batch import results.
554    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
555        for res in batch_results {
556            match res {
557                Ok(AddedTransactionOutcome { hash, .. }) => {
558                    self.on_good_import(hash);
559                }
560                Err(err) => {
561                    self.on_bad_import(err);
562                }
563            }
564        }
565    }
566
567    /// Request handler for an incoming `NewPooledTransactionHashes`
568    fn on_new_pooled_transaction_hashes(
569        &mut self,
570        peer_id: PeerId,
571        msg: NewPooledTransactionHashes,
572    ) {
573        // If the node is initially syncing, ignore transactions
574        if self.network.is_initially_syncing() {
575            return
576        }
577        if self.network.tx_gossip_disabled() {
578            return
579        }
580
581        // get handle to peer's session, if the session is still active
582        let Some(peer) = self.peers.get_mut(&peer_id) else {
583            trace!(
584                peer_id = format!("{peer_id:#}"),
585                ?msg,
586                "discarding announcement from inactive peer"
587            );
588
589            return
590        };
591        let client = peer.client_version.clone();
592
593        // keep track of the transactions the peer knows
594        let mut count_txns_already_seen_by_peer = 0;
595        for tx in msg.iter_hashes().copied() {
596            if !peer.seen_transactions.insert(tx) {
597                count_txns_already_seen_by_peer += 1;
598            }
599        }
600        if count_txns_already_seen_by_peer > 0 {
601            // this may occur if transactions are sent or announced to a peer, at the same time as
602            // the peer sends/announces those hashes to us. this is because, marking
603            // txns as seen by a peer is done optimistically upon sending them to the
604            // peer.
605            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
606            self.metrics
607                .occurrences_hash_already_seen_by_peer
608                .increment(count_txns_already_seen_by_peer);
609
610            trace!(target: "net::tx",
611                %count_txns_already_seen_by_peer,
612                peer_id=format!("{peer_id:#}"),
613                ?client,
614                "Peer sent hashes that have already been marked as seen by peer"
615            );
616
617            self.report_already_seen(peer_id);
618        }
619
620        // 1. filter out spam
621        if msg.is_empty() {
622            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
623            return;
624        }
625
626        let original_len = msg.len();
627        let mut partially_valid_msg = msg.dedup();
628
629        if partially_valid_msg.len() != original_len {
630            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
631        }
632
633        // 2. filter out transactions pending import to pool
634        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
635
636        // 3. filter out known hashes
637        //
638        // known txns have already been successfully fetched or received over gossip.
639        //
640        // most hashes will be filtered out here since this the mempool protocol is a gossip
641        // protocol, healthy peers will send many of the same hashes.
642        //
643        let hashes_count_pre_pool_filter = partially_valid_msg.len();
644        self.pool.retain_unknown(&mut partially_valid_msg);
645        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
646            let already_known_hashes_count =
647                hashes_count_pre_pool_filter - partially_valid_msg.len();
648            self.metrics
649                .occurrences_hashes_already_in_pool
650                .increment(already_known_hashes_count as u64);
651        }
652
653        if partially_valid_msg.is_empty() {
654            // nothing to request
655            return
656        }
657
658        // 4. filter out invalid entries (spam)
659        //
660        // validates messages with respect to the given network, e.g. allowed tx types
661        //
662        let mut should_report_peer = false;
663        let mut tx_types_counter = TxTypesCounter::default();
664
665        let is_eth68_message = partially_valid_msg
666            .msg_version()
667            .expect("partially valid announcement should have a version")
668            .is_eth68();
669
670        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
671            let (ty_byte, size_val) = match *metadata_ref_mut {
672                Some((ty, size)) => {
673                    if !is_eth68_message {
674                        should_report_peer = true;
675                    }
676                    (ty, size)
677                }
678                None => {
679                    if is_eth68_message {
680                        should_report_peer = true;
681                        return false;
682                    }
683                    (0u8, 0)
684                }
685            };
686
687            if is_eth68_message &&
688                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
689                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
690            {
691                tx_types_counter.increase_by_tx_type(parsed_tx_type);
692            }
693
694            let decision = self
695                .policies
696                .announcement_filter()
697                .decide_on_announcement(ty_byte, tx_hash, size_val);
698
699            match decision {
700                AnnouncementAcceptance::Accept => true,
701                AnnouncementAcceptance::Ignore => false,
702                AnnouncementAcceptance::Reject { penalize_peer } => {
703                    if penalize_peer {
704                        should_report_peer = true;
705                    }
706                    false
707                }
708            }
709        });
710
711        if is_eth68_message {
712            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
713        }
714
715        if should_report_peer {
716            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
717        }
718
719        let mut valid_announcement_data =
720            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
721
722        if valid_announcement_data.is_empty() {
723            // no valid announcement data
724            return
725        }
726
727        // 5. filter out already seen unknown hashes
728        //
729        // seen hashes are already in the tx fetcher, pending fetch.
730        //
731        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
732        // fetcher, hence they should be valid at this point.
733        let bad_imports = &self.bad_imports;
734        self.transaction_fetcher.filter_unseen_and_pending_hashes(
735            &mut valid_announcement_data,
736            |hash| bad_imports.contains(hash),
737            &peer_id,
738            &client,
739        );
740
741        if valid_announcement_data.is_empty() {
742            // nothing to request
743            return
744        }
745
746        trace!(target: "net::tx::propagation",
747            peer_id=format!("{peer_id:#}"),
748            hashes_len=valid_announcement_data.len(),
749            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
750            msg_version=%valid_announcement_data.msg_version(),
751            client_version=%client,
752            "received previously unseen and pending hashes in announcement from peer"
753        );
754
755        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
756        // fallback
757        if !self.transaction_fetcher.is_idle(&peer_id) {
758            // load message version before announcement data is destructed in packing
759            let msg_version = valid_announcement_data.msg_version();
760            let (hashes, _version) = valid_announcement_data.into_request_hashes();
761
762            trace!(target: "net::tx",
763                peer_id=format!("{peer_id:#}"),
764                hashes=?*hashes,
765                %msg_version,
766                %client,
767                "buffering hashes announced by busy peer"
768            );
769
770            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
771
772            return
773        }
774
775        let mut hashes_to_request =
776            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
777        let surplus_hashes =
778            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
779
780        if !surplus_hashes.is_empty() {
781            trace!(target: "net::tx",
782                peer_id=format!("{peer_id:#}"),
783                surplus_hashes=?*surplus_hashes,
784                %client,
785                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
786            );
787
788            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
789        }
790
791        trace!(target: "net::tx",
792            peer_id=format!("{peer_id:#}"),
793            hashes=?*hashes_to_request,
794            %client,
795            "sending hashes in `GetPooledTransactions` request to peer's session"
796        );
797
798        // request the missing transactions
799        //
800        // get handle to peer's session again, at this point we know it exists
801        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
802        if let Some(failed_to_request_hashes) =
803            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
804        {
805            let conn_eth_version = peer.version;
806
807            trace!(target: "net::tx",
808                peer_id=format!("{peer_id:#}"),
809                failed_to_request_hashes=?*failed_to_request_hashes,
810                %conn_eth_version,
811                %client,
812                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
813            );
814            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
815        }
816    }
817}
818
819impl<Pool, N> TransactionsManager<Pool, N>
820where
821    Pool: TransactionPool + Unpin + 'static,
822    N: NetworkPrimitives<
823            BroadcastedTransaction: SignedTransaction,
824            PooledTransaction: SignedTransaction,
825        > + Unpin,
826    Pool::Transaction:
827        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
828{
829    /// Invoked when transactions in the local mempool are considered __pending__.
830    ///
831    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
832    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
833    /// messages. The Transactions message relays complete transaction objects and is typically
834    /// sent to a small, random fraction of connected peers.
835    ///
836    /// All other peers receive a notification of the transaction hash and can request the
837    /// complete transaction object if it is unknown to them. The dissemination of complete
838    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
839    /// and won't need to request it.
840    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
841        // Nothing to propagate while initially syncing
842        if self.network.is_initially_syncing() {
843            return
844        }
845        if self.network.tx_gossip_disabled() {
846            return
847        }
848
849        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
850
851        self.propagate_all(hashes);
852    }
853
854    /// Propagate the full transactions to a specific peer.
855    ///
856    /// Returns the propagated transactions.
857    fn propagate_full_transactions_to_peer(
858        &mut self,
859        txs: Vec<TxHash>,
860        peer_id: PeerId,
861        propagation_mode: PropagationMode,
862    ) -> Option<PropagatedTransactions> {
863        let peer = self.peers.get_mut(&peer_id)?;
864        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
865        let mut propagated = PropagatedTransactions::default();
866
867        // filter all transactions unknown to the peer
868        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
869
870        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
871
872        if propagation_mode.is_forced() {
873            // skip cache check if forced
874            full_transactions.extend(to_propagate);
875        } else {
876            // Iterate through the transactions to propagate and fill the hashes and full
877            // transaction
878            for tx in to_propagate {
879                if !peer.seen_transactions.contains(tx.tx_hash()) {
880                    // Only include if the peer hasn't seen the transaction
881                    full_transactions.push(&tx);
882                }
883            }
884        }
885
886        if full_transactions.is_empty() {
887            // nothing to propagate
888            return None
889        }
890
891        let PropagateTransactions { pooled, full } = full_transactions.build();
892
893        // send hashes if any
894        if let Some(new_pooled_hashes) = pooled {
895            for hash in new_pooled_hashes.iter_hashes().copied() {
896                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
897                // mark transaction as seen by peer
898                peer.seen_transactions.insert(hash);
899            }
900
901            // send hashes of transactions
902            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
903        }
904
905        // send full transactions, if any
906        if let Some(new_full_transactions) = full {
907            for tx in &new_full_transactions {
908                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
909                // mark transaction as seen by peer
910                peer.seen_transactions.insert(*tx.tx_hash());
911            }
912
913            // send full transactions
914            self.network.send_transactions(peer_id, new_full_transactions);
915        }
916
917        // Update propagated transactions metrics
918        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
919
920        Some(propagated)
921    }
922
923    /// Propagate the transaction hashes to the given peer
924    ///
925    /// Note: This will only send the hashes for transactions that exist in the pool.
926    fn propagate_hashes_to(
927        &mut self,
928        hashes: Vec<TxHash>,
929        peer_id: PeerId,
930        propagation_mode: PropagationMode,
931    ) {
932        trace!(target: "net::tx", "Start propagating transactions as hashes");
933
934        // This fetches a transactions from the pool, including the blob transactions, which are
935        // only ever sent as hashes.
936        let propagated = {
937            let Some(peer) = self.peers.get_mut(&peer_id) else {
938                // no such peer
939                return
940            };
941
942            let to_propagate = self
943                .pool
944                .get_all(hashes)
945                .into_iter()
946                .map(PropagateTransaction::pool_tx)
947                .collect::<Vec<_>>();
948
949            let mut propagated = PropagatedTransactions::default();
950
951            // check if transaction is known to peer
952            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
953
954            if propagation_mode.is_forced() {
955                hashes.extend(to_propagate)
956            } else {
957                for tx in to_propagate {
958                    if !peer.seen_transactions.contains(tx.tx_hash()) {
959                        // Include if the peer hasn't seen it
960                        hashes.push(&tx);
961                    }
962                }
963            }
964
965            let new_pooled_hashes = hashes.build();
966
967            if new_pooled_hashes.is_empty() {
968                // nothing to propagate
969                return
970            }
971
972            for hash in new_pooled_hashes.iter_hashes().copied() {
973                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
974            }
975
976            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
977
978            // send hashes of transactions
979            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
980
981            // Update propagated transactions metrics
982            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
983
984            propagated
985        };
986
987        // notify pool so events get fired
988        self.pool.on_propagated(propagated);
989    }
990
991    /// Propagate the transactions to all connected peers either as full objects or hashes.
992    ///
993    /// The message for new pooled hashes depends on the negotiated version of the stream.
994    /// See [`NewPooledTransactionHashes`]
995    ///
996    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
997    fn propagate_transactions(
998        &mut self,
999        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1000        propagation_mode: PropagationMode,
1001    ) -> PropagatedTransactions {
1002        let mut propagated = PropagatedTransactions::default();
1003        if self.network.tx_gossip_disabled() {
1004            return propagated
1005        }
1006
1007        // send full transactions to a set of the connected peers based on the configured mode
1008        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1009
1010        // Note: Assuming ~random~ order due to random state of the peers map hasher
1011        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1012            if !self.policies.propagation_policy().can_propagate(peer) {
1013                // skip peers we should not propagate to
1014                continue
1015            }
1016            // determine whether to send full tx objects or hashes.
1017            let mut builder = if peer_idx > max_num_full {
1018                PropagateTransactionsBuilder::pooled(peer.version)
1019            } else {
1020                PropagateTransactionsBuilder::full(peer.version)
1021            };
1022
1023            if propagation_mode.is_forced() {
1024                builder.extend(to_propagate.iter());
1025            } else {
1026                // Iterate through the transactions to propagate and fill the hashes and full
1027                // transaction lists, before deciding whether or not to send full transactions to
1028                // the peer.
1029                for tx in &to_propagate {
1030                    // Only proceed if the transaction is not in the peer's list of seen
1031                    // transactions
1032                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1033                        builder.push(tx);
1034                    }
1035                }
1036            }
1037
1038            if builder.is_empty() {
1039                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1040                continue
1041            }
1042
1043            let PropagateTransactions { pooled, full } = builder.build();
1044
1045            // send hashes if any
1046            if let Some(mut new_pooled_hashes) = pooled {
1047                // enforce tx soft limit per message for the (unlikely) event the number of
1048                // hashes exceeds it
1049                new_pooled_hashes
1050                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1051
1052                for hash in new_pooled_hashes.iter_hashes().copied() {
1053                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1054                    // mark transaction as seen by peer
1055                    peer.seen_transactions.insert(hash);
1056                }
1057
1058                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1059
1060                // send hashes of transactions
1061                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1062            }
1063
1064            // send full transactions, if any
1065            if let Some(new_full_transactions) = full {
1066                for tx in &new_full_transactions {
1067                    propagated
1068                        .0
1069                        .entry(*tx.tx_hash())
1070                        .or_default()
1071                        .push(PropagateKind::Full(*peer_id));
1072                    // mark transaction as seen by peer
1073                    peer.seen_transactions.insert(*tx.tx_hash());
1074                }
1075
1076                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1077
1078                // send full transactions
1079                self.network.send_transactions(*peer_id, new_full_transactions);
1080            }
1081        }
1082
1083        // Update propagated transactions metrics
1084        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1085
1086        propagated
1087    }
1088
1089    /// Propagates the given transactions to the peers
1090    ///
1091    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1092    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1093    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1094        if self.peers.is_empty() {
1095            // nothing to propagate
1096            return
1097        }
1098        let propagated = self.propagate_transactions(
1099            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1100            PropagationMode::Basic,
1101        );
1102
1103        // notify pool so events get fired
1104        self.pool.on_propagated(propagated);
1105    }
1106
1107    /// Request handler for an incoming request for transactions
1108    fn on_get_pooled_transactions(
1109        &mut self,
1110        peer_id: PeerId,
1111        request: GetPooledTransactions,
1112        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1113    ) {
1114        if let Some(peer) = self.peers.get_mut(&peer_id) {
1115            if self.network.tx_gossip_disabled() {
1116                let _ = response.send(Ok(PooledTransactions::default()));
1117                return
1118            }
1119            let transactions = self.pool.get_pooled_transaction_elements(
1120                request.0,
1121                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1122                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1123                ),
1124            );
1125            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1126
1127            // we sent a response at which point we assume that the peer is aware of the
1128            // transactions
1129            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1130
1131            let resp = PooledTransactions(transactions);
1132            let _ = response.send(Ok(resp));
1133        }
1134    }
1135
1136    /// Handles a command received from a detached [`TransactionsHandle`]
1137    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1138        match cmd {
1139            TransactionsCommand::PropagateHash(hash) => {
1140                self.on_new_pending_transactions(vec![hash])
1141            }
1142            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1143                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1144            }
1145            TransactionsCommand::GetActivePeers(tx) => {
1146                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1147                tx.send(peers).ok();
1148            }
1149            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1150                if let Some(propagated) =
1151                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1152                {
1153                    self.pool.on_propagated(propagated);
1154                }
1155            }
1156            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1157            TransactionsCommand::BroadcastTransactions(txs) => {
1158                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1159                self.pool.on_propagated(propagated);
1160            }
1161            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1162                let mut res = HashMap::with_capacity(peers.len());
1163                for peer_id in peers {
1164                    let hashes = self
1165                        .peers
1166                        .get(&peer_id)
1167                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1168                        .unwrap_or_default();
1169                    res.insert(peer_id, hashes);
1170                }
1171                tx.send(res).ok();
1172            }
1173            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1174                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1175                peer_request_sender.send(sender).ok();
1176            }
1177        }
1178    }
1179
1180    /// Handles session establishment and peer transactions initialization.
1181    ///
1182    /// This is invoked when a new session is established.
1183    fn handle_peer_session(
1184        &mut self,
1185        info: SessionInfo,
1186        messages: PeerRequestSender<PeerRequest<N>>,
1187    ) {
1188        let SessionInfo { peer_id, client_version, version, .. } = info;
1189
1190        // Insert a new peer into the peerset.
1191        let peer = PeerMetadata::<N>::new(
1192            messages,
1193            version,
1194            client_version,
1195            self.config.max_transactions_seen_by_peer_history,
1196            info.peer_kind,
1197        );
1198        let peer = match self.peers.entry(peer_id) {
1199            Entry::Occupied(mut entry) => {
1200                entry.insert(peer);
1201                entry.into_mut()
1202            }
1203            Entry::Vacant(entry) => entry.insert(peer),
1204        };
1205
1206        self.policies.propagation_policy_mut().on_session_established(peer);
1207
1208        // Send a `NewPooledTransactionHashes` to the peer with up to
1209        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1210        // transactions in the pool.
1211        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1212            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1213            return
1214        }
1215
1216        // Get transactions to broadcast
1217        let pooled_txs = self.pool.pooled_transactions_max(
1218            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1219        );
1220        if pooled_txs.is_empty() {
1221            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1222            return;
1223        }
1224
1225        // Build and send transaction hashes message
1226        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1227        for pooled_tx in pooled_txs {
1228            peer.seen_transactions.insert(*pooled_tx.hash());
1229            msg_builder.push_pooled(pooled_tx);
1230        }
1231
1232        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1233        let msg = msg_builder.build();
1234        self.network.send_transactions_hashes(peer_id, msg);
1235    }
1236
1237    /// Handles a received event related to common network events.
1238    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1239        match event_result {
1240            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1241                // remove the peer
1242
1243                let peer = self.peers.remove(&peer_id);
1244                if let Some(mut peer) = peer {
1245                    self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1246                }
1247                self.transaction_fetcher.remove_peer(&peer_id);
1248            }
1249            NetworkEvent::ActivePeerSession { info, messages } => {
1250                // process active peer session and broadcast available transaction from the pool
1251                self.handle_peer_session(info, messages);
1252            }
1253            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1254                let peer_id = info.peer_id;
1255                // get messages from existing peer
1256                let messages = match self.peers.get(&peer_id) {
1257                    Some(p) => p.request_tx.clone(),
1258                    None => {
1259                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1260                        return;
1261                    }
1262                };
1263                self.handle_peer_session(info, messages);
1264            }
1265            _ => {}
1266        }
1267    }
1268
1269    /// Returns true if the ingress policy allows processing messages from the given peer.
1270    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1271        if self.config.ingress_policy.allows_all() {
1272            return true;
1273        }
1274        let Some(peer) = self.peers.get(peer_id) else {
1275            return false;
1276        };
1277        self.config.ingress_policy.allows(peer.peer_kind())
1278    }
1279
1280    /// Handles dedicated transaction events related to the `eth` protocol.
1281    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1282        match event {
1283            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1284                if !self.accepts_incoming_from(&peer_id) {
1285                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1286                    return;
1287                }
1288                // ensure we didn't receive any blob transactions as these are disallowed to be
1289                // broadcasted in full
1290
1291                let has_blob_txs = msg.has_eip4844();
1292
1293                let non_blob_txs = msg
1294                    .0
1295                    .into_iter()
1296                    .map(N::PooledTransaction::try_from)
1297                    .filter_map(Result::ok)
1298                    .collect();
1299
1300                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1301
1302                if has_blob_txs {
1303                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1304                    self.report_peer_bad_transactions(peer_id);
1305                }
1306            }
1307            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1308                if !self.accepts_incoming_from(&peer_id) {
1309                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1310                    return;
1311                }
1312                self.on_new_pooled_transaction_hashes(peer_id, msg)
1313            }
1314            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1315                self.on_get_pooled_transactions(peer_id, request, response)
1316            }
1317            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1318                let _ = response.send(Some(self.handle()));
1319            }
1320        }
1321    }
1322
1323    /// Starts the import process for the given transactions.
1324    fn import_transactions(
1325        &mut self,
1326        peer_id: PeerId,
1327        transactions: PooledTransactions<N::PooledTransaction>,
1328        source: TransactionSource,
1329    ) {
1330        // If the node is pipeline syncing, ignore transactions
1331        if self.network.is_initially_syncing() {
1332            return
1333        }
1334        if self.network.tx_gossip_disabled() {
1335            return
1336        }
1337
1338        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1339        let mut transactions = transactions.0;
1340
1341        let start = Instant::now();
1342
1343        // mark the transactions as received
1344        self.transaction_fetcher
1345            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1346
1347        // track that the peer knows these transaction, but only if this is a new broadcast.
1348        // If we received the transactions as the response to our `GetPooledTransactions``
1349        // requests (based on received `NewPooledTransactionHashes`) then we already
1350        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1351        let mut num_already_seen_by_peer = 0;
1352        for tx in &transactions {
1353            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1354                num_already_seen_by_peer += 1;
1355            }
1356        }
1357
1358        // 1. filter out txns already inserted into pool
1359        let txns_count_pre_pool_filter = transactions.len();
1360        self.pool.retain_unknown(&mut transactions);
1361        if txns_count_pre_pool_filter > transactions.len() {
1362            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1363            self.metrics
1364                .occurrences_transactions_already_in_pool
1365                .increment(already_known_txns_count as u64);
1366        }
1367
1368        // tracks the quality of the given transactions
1369        let mut has_bad_transactions = false;
1370
1371        // Remove known and invalid transactions
1372        transactions.retain(|tx| {
1373            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1374                entry.get_mut().insert(peer_id);
1375                return false
1376            }
1377            if self.bad_imports.contains(tx.tx_hash()) {
1378                trace!(target: "net::tx",
1379                    peer_id=format!("{peer_id:#}"),
1380                    hash=%tx.tx_hash(),
1381                    client_version=%peer.client_version,
1382                    "received a known bad transaction from peer"
1383                );
1384                has_bad_transactions = true;
1385                return false;
1386            }
1387            true
1388        });
1389
1390        let txs_len = transactions.len();
1391
1392        let new_txs = transactions
1393            .into_par_iter()
1394            .filter_map(|tx| match tx.try_into_recovered() {
1395                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1396                Err(badtx) => {
1397                    trace!(target: "net::tx",
1398                        peer_id=format!("{peer_id:#}"),
1399                        hash=%badtx.tx_hash(),
1400                        client_version=%peer.client_version,
1401                        "failed ecrecovery for transaction"
1402                    );
1403                    None
1404                }
1405            })
1406            .collect::<Vec<_>>();
1407
1408        has_bad_transactions |= new_txs.len() != txs_len;
1409
1410        // Record the transactions as seen by the peer
1411        for tx in &new_txs {
1412            self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1413        }
1414
1415        // 3. import new transactions as a batch to minimize lock contention on the underlying
1416        // pool
1417        if !new_txs.is_empty() {
1418            let pool = self.pool.clone();
1419            // update metrics
1420            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1421            metric_pending_pool_imports.increment(new_txs.len() as f64);
1422
1423            // update self-monitoring info
1424            self.pending_pool_imports_info
1425                .pending_pool_imports
1426                .fetch_add(new_txs.len(), Ordering::Relaxed);
1427            let tx_manager_info_pending_pool_imports =
1428                self.pending_pool_imports_info.pending_pool_imports.clone();
1429
1430            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1431            let import = Box::pin(async move {
1432                let added = new_txs.len();
1433                let res = pool.add_external_transactions(new_txs).await;
1434
1435                // update metrics
1436                metric_pending_pool_imports.decrement(added as f64);
1437                // update self-monitoring info
1438                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1439
1440                res
1441            });
1442
1443            self.pool_imports.push(import);
1444        }
1445
1446        if num_already_seen_by_peer > 0 {
1447            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1448            self.metrics
1449                .occurrences_of_transaction_already_seen_by_peer
1450                .increment(num_already_seen_by_peer);
1451            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1452        }
1453
1454        if has_bad_transactions {
1455            // peer sent us invalid transactions
1456            self.report_peer_bad_transactions(peer_id)
1457        }
1458
1459        if num_already_seen_by_peer > 0 {
1460            self.report_already_seen(peer_id);
1461        }
1462
1463        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1464    }
1465
1466    /// Processes a [`FetchEvent`].
1467    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1468        match fetch_event {
1469            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1470                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1471                if report_peer {
1472                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1473                }
1474            }
1475            FetchEvent::FetchError { peer_id, error } => {
1476                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1477                self.on_request_error(peer_id, error);
1478            }
1479            FetchEvent::EmptyResponse { peer_id } => {
1480                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1481            }
1482        }
1483    }
1484}
1485
1486/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1487/// [`crate::NetworkManager`] for more context on the design pattern.
1488///
1489/// This should be spawned or used as part of `tokio::select!`.
1490//
1491// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1492// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1493impl<
1494        Pool: TransactionPool + Unpin + 'static,
1495        N: NetworkPrimitives<
1496                BroadcastedTransaction: SignedTransaction,
1497                PooledTransaction: SignedTransaction,
1498            > + Unpin,
1499    > Future for TransactionsManager<Pool, N>
1500where
1501    Pool::Transaction:
1502        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1503{
1504    type Output = ();
1505
1506    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1507        let start = Instant::now();
1508        let mut poll_durations = TxManagerPollDurations::default();
1509
1510        let this = self.get_mut();
1511
1512        // All streams are polled until their corresponding budget is exhausted, then we manually
1513        // yield back control to tokio. See `NetworkManager` for more context on the design
1514        // pattern.
1515
1516        // Advance network/peer related events (update peers map).
1517        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1518            poll_durations.acc_network_events,
1519            "net::tx",
1520            "Network events stream",
1521            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1522            this.network_events.poll_next_unpin(cx),
1523            |event| this.on_network_event(event)
1524        );
1525
1526        // Advances new __pending__ transactions, transactions that were successfully inserted into
1527        // pending set in pool (are valid), and propagates them (inform peers which
1528        // transactions we have seen).
1529        //
1530        // We try to drain this to batch the transactions in a single message.
1531        //
1532        // We don't expect this buffer to be large, since only pending transactions are
1533        // emitted here.
1534        let mut new_txs = Vec::new();
1535        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1536            cx,
1537            &mut new_txs,
1538            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1539        ) {
1540            Poll::Ready(count) => {
1541                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1542                    // we filled the entire buffer capacity and need to try again on the next poll
1543                    // immediately
1544                    true
1545                } else {
1546                    // try once more, because mostlikely the channel is now empty and the waker is
1547                    // registered if this is pending, if we filled additional hashes, we poll again
1548                    // on the next iteration
1549                    let limit =
1550                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1551                            new_txs.len();
1552                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1553                }
1554            }
1555            Poll::Pending => false,
1556        };
1557        if !new_txs.is_empty() {
1558            this.on_new_pending_transactions(new_txs);
1559        }
1560
1561        // Advance incoming transaction events (stream new txns/announcements from
1562        // network manager and queue for import to pool/fetch txns).
1563        //
1564        // This will potentially remove hashes from hashes pending fetch, it the event
1565        // is an announcement (if same hashes are announced that didn't fit into a
1566        // previous request).
1567        //
1568        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1569        // (128 KiB / 10 bytes > 13k transactions).
1570        //
1571        // If this is an event with `Transactions` message, since transactions aren't
1572        // validated until they are inserted into the pool, this can potentially queue
1573        // >13k transactions for insertion to pool. More if the message size is bigger
1574        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1575        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1576            poll_durations.acc_tx_events,
1577            "net::tx",
1578            "Network transaction events stream",
1579            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1580            this.transaction_events.poll_next_unpin(cx),
1581            |event| this.on_network_tx_event(event),
1582        );
1583
1584        // Advance inflight fetch requests (flush transaction fetcher and queue for
1585        // import to pool).
1586        //
1587        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1588        // (2 MiB / 10 bytes > 200k transactions).
1589        //
1590        // Since transactions aren't validated until they are inserted into the pool,
1591        // this can potentially queue >200k transactions for insertion to pool. More
1592        // if the message size is bigger than the soft limit on a `PooledTransactions`
1593        // response which is 2 MiB.
1594        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1595            poll_durations.acc_fetch_events,
1596            "net::tx",
1597            "Transaction fetch events stream",
1598            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1599            this.transaction_fetcher.poll_next_unpin(cx),
1600            |event| this.on_fetch_event(event),
1601        );
1602
1603        // Advance pool imports (flush txns to pool).
1604        //
1605        // Note, this is done in batches. A batch is filled from one `Transactions`
1606        // broadcast messages or one `PooledTransactions` response at a time. The
1607        // minimum batch size is 1 transaction (and might often be the case with blob
1608        // transactions).
1609        //
1610        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1611        // (2 MiB / 10 bytes > 200k transactions).
1612        //
1613        // Since transactions aren't validated until they are inserted into the pool,
1614        // this can potentially validate >200k transactions. More if the message size
1615        // is bigger than the soft limit on a `PooledTransactions` response which is
1616        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1617        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1618            poll_durations.acc_pending_imports,
1619            "net::tx",
1620            "Batched pool imports stream",
1621            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1622            this.pool_imports.poll_next_unpin(cx),
1623            |batch_results| this.on_batch_import_result(batch_results)
1624        );
1625
1626        // Tries to drain hashes pending fetch cache if the tx manager currently has
1627        // capacity for this (fetch txns).
1628        //
1629        // Sends at most one request.
1630        duration_metered_exec!(
1631            {
1632                if this.has_capacity_for_fetching_pending_hashes() &&
1633                    this.on_fetch_hashes_pending_fetch()
1634                {
1635                    maybe_more_tx_fetch_events = true;
1636                }
1637            },
1638            poll_durations.acc_pending_fetch
1639        );
1640
1641        // Advance commands (propagate/fetch/serve txns).
1642        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1643            poll_durations.acc_cmds,
1644            "net::tx",
1645            "Commands channel",
1646            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1647            this.command_rx.poll_next_unpin(cx),
1648            |cmd| this.on_command(cmd)
1649        );
1650
1651        this.transaction_fetcher.update_metrics();
1652
1653        // all channels are fully drained and import futures pending
1654        if maybe_more_network_events ||
1655            maybe_more_commands ||
1656            maybe_more_tx_events ||
1657            maybe_more_tx_fetch_events ||
1658            maybe_more_pool_imports ||
1659            maybe_more_pending_txns
1660        {
1661            // make sure we're woken up again
1662            cx.waker().wake_by_ref();
1663            return Poll::Pending
1664        }
1665
1666        this.update_poll_metrics(start, poll_durations);
1667
1668        Poll::Pending
1669    }
1670}
1671
1672/// Represents the different modes of transaction propagation.
1673///
1674/// This enum is used to determine how transactions are propagated to peers in the network.
1675#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1676enum PropagationMode {
1677    /// Default propagation mode.
1678    ///
1679    /// Transactions are only sent to peers that haven't seen them yet.
1680    Basic,
1681    /// Forced propagation mode.
1682    ///
1683    /// Transactions are sent to all peers regardless of whether they have been sent or received
1684    /// before.
1685    Forced,
1686}
1687
1688impl PropagationMode {
1689    /// Returns `true` if the propagation kind is `Forced`.
1690    const fn is_forced(self) -> bool {
1691        matches!(self, Self::Forced)
1692    }
1693}
1694
1695/// A transaction that's about to be propagated to multiple peers.
1696#[derive(Debug, Clone)]
1697struct PropagateTransaction<T = TransactionSigned> {
1698    size: usize,
1699    transaction: Arc<T>,
1700}
1701
1702impl<T: SignedTransaction> PropagateTransaction<T> {
1703    /// Create a new instance from a transaction.
1704    pub fn new(transaction: T) -> Self {
1705        let size = transaction.length();
1706        Self { size, transaction: Arc::new(transaction) }
1707    }
1708
1709    /// Create a new instance from a pooled transaction
1710    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1711    where
1712        P: PoolTransaction<Consensus = T>,
1713    {
1714        let size = tx.encoded_length();
1715        let transaction = tx.transaction.clone_into_consensus();
1716        let transaction = Arc::new(transaction.into_inner());
1717        Self { size, transaction }
1718    }
1719
1720    fn tx_hash(&self) -> &TxHash {
1721        self.transaction.tx_hash()
1722    }
1723}
1724
1725/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1726/// should receive them in full or as pooled
1727#[derive(Debug, Clone)]
1728enum PropagateTransactionsBuilder<T> {
1729    Pooled(PooledTransactionsHashesBuilder),
1730    Full(FullTransactionsBuilder<T>),
1731}
1732
1733impl<T> PropagateTransactionsBuilder<T> {
1734    /// Create a builder for pooled transactions
1735    fn pooled(version: EthVersion) -> Self {
1736        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1737    }
1738
1739    /// Create a builder that sends transactions in full and records transactions that don't fit.
1740    fn full(version: EthVersion) -> Self {
1741        Self::Full(FullTransactionsBuilder::new(version))
1742    }
1743
1744    /// Returns true if no transactions are recorded.
1745    fn is_empty(&self) -> bool {
1746        match self {
1747            Self::Pooled(builder) => builder.is_empty(),
1748            Self::Full(builder) => builder.is_empty(),
1749        }
1750    }
1751
1752    /// Consumes the type and returns the built messages that should be sent to the peer.
1753    fn build(self) -> PropagateTransactions<T> {
1754        match self {
1755            Self::Pooled(pooled) => {
1756                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1757            }
1758            Self::Full(full) => full.build(),
1759        }
1760    }
1761}
1762
1763impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1764    /// Appends all transactions
1765    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1766        for tx in txs {
1767            self.push(tx);
1768        }
1769    }
1770
1771    /// Appends a transaction to the list.
1772    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1773        match self {
1774            Self::Pooled(builder) => builder.push(transaction),
1775            Self::Full(builder) => builder.push(transaction),
1776        }
1777    }
1778}
1779
1780/// Represents how the transactions should be sent to a peer if any.
1781struct PropagateTransactions<T> {
1782    /// The pooled transaction hashes to send.
1783    pooled: Option<NewPooledTransactionHashes>,
1784    /// The transactions to send in full.
1785    full: Option<Vec<Arc<T>>>,
1786}
1787
1788/// Helper type for constructing the full transaction message that enforces the
1789/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1790/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1791/// broadcasted in full.
1792#[derive(Debug, Clone)]
1793struct FullTransactionsBuilder<T> {
1794    /// The soft limit to enforce for a single broadcast message of full transactions.
1795    total_size: usize,
1796    /// All transactions to be broadcasted.
1797    transactions: Vec<Arc<T>>,
1798    /// Transactions that didn't fit into the broadcast message
1799    pooled: PooledTransactionsHashesBuilder,
1800}
1801
1802impl<T> FullTransactionsBuilder<T> {
1803    /// Create a builder for the negotiated version of the peer's session
1804    fn new(version: EthVersion) -> Self {
1805        Self {
1806            total_size: 0,
1807            pooled: PooledTransactionsHashesBuilder::new(version),
1808            transactions: vec![],
1809        }
1810    }
1811
1812    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1813    fn is_empty(&self) -> bool {
1814        self.transactions.is_empty() && self.pooled.is_empty()
1815    }
1816
1817    /// Returns the messages that should be propagated to the peer.
1818    fn build(self) -> PropagateTransactions<T> {
1819        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1820        let full = Some(self.transactions).filter(|full| !full.is_empty());
1821        PropagateTransactions { pooled, full }
1822    }
1823}
1824
1825impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1826    /// Appends all transactions.
1827    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1828        for tx in txs {
1829            self.push(&tx)
1830        }
1831    }
1832
1833    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1834    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1835    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1836    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1837    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1838    ///
1839    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1840    /// to list of pooled transactions, (e.g. 4844 transactions).
1841    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1842    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1843        // Do not send full 4844 transaction hashes to peers.
1844        //
1845        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1846        //  Instead, those transactions are only announced using
1847        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1848        //  via `GetPooledTransactions`.
1849        //
1850        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1851        if !transaction.transaction.is_broadcastable_in_full() {
1852            self.pooled.push(transaction);
1853            return
1854        }
1855
1856        let new_size = self.total_size + transaction.size;
1857        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1858            self.total_size > 0
1859        {
1860            // transaction does not fit into the message
1861            self.pooled.push(transaction);
1862            return
1863        }
1864
1865        self.total_size = new_size;
1866        self.transactions.push(Arc::clone(&transaction.transaction));
1867    }
1868}
1869
1870/// A helper type to create the pooled transactions message based on the negotiated version of the
1871/// session with the peer
1872#[derive(Debug, Clone)]
1873enum PooledTransactionsHashesBuilder {
1874    Eth66(NewPooledTransactionHashes66),
1875    Eth68(NewPooledTransactionHashes68),
1876}
1877
1878// === impl PooledTransactionsHashesBuilder ===
1879
1880impl PooledTransactionsHashesBuilder {
1881    /// Push a transaction from the pool to the list.
1882    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1883        match self {
1884            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1885            Self::Eth68(msg) => {
1886                msg.hashes.push(*pooled_tx.hash());
1887                msg.sizes.push(pooled_tx.encoded_length());
1888                msg.types.push(pooled_tx.transaction.ty());
1889            }
1890        }
1891    }
1892
1893    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1894    fn is_empty(&self) -> bool {
1895        match self {
1896            Self::Eth66(hashes) => hashes.is_empty(),
1897            Self::Eth68(hashes) => hashes.is_empty(),
1898        }
1899    }
1900
1901    /// Appends all hashes
1902    fn extend<T: SignedTransaction>(
1903        &mut self,
1904        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1905    ) {
1906        for tx in txs {
1907            self.push(&tx);
1908        }
1909    }
1910
1911    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1912        match self {
1913            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1914            Self::Eth68(msg) => {
1915                msg.hashes.push(*tx.tx_hash());
1916                msg.sizes.push(tx.size);
1917                msg.types.push(tx.transaction.ty());
1918            }
1919        }
1920    }
1921
1922    /// Create a builder for the negotiated version of the peer's session
1923    fn new(version: EthVersion) -> Self {
1924        match version {
1925            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1926            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 => {
1927                Self::Eth68(Default::default())
1928            }
1929        }
1930    }
1931
1932    fn build(self) -> NewPooledTransactionHashes {
1933        match self {
1934            Self::Eth66(mut msg) => {
1935                msg.0.shrink_to_fit();
1936                msg.into()
1937            }
1938            Self::Eth68(mut msg) => {
1939                msg.shrink_to_fit();
1940                msg.into()
1941            }
1942        }
1943    }
1944}
1945
1946/// How we received the transactions.
1947enum TransactionSource {
1948    /// Transactions were broadcast to us via [`Transactions`] message.
1949    Broadcast,
1950    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1951    Response,
1952}
1953
1954// === impl TransactionSource ===
1955
1956impl TransactionSource {
1957    /// Whether the transaction were sent as broadcast.
1958    const fn is_broadcast(&self) -> bool {
1959        matches!(self, Self::Broadcast)
1960    }
1961}
1962
1963/// Tracks a single peer in the context of [`TransactionsManager`].
1964#[derive(Debug)]
1965pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1966    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1967    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1968    /// the peer.
1969    seen_transactions: LruCache<TxHash>,
1970    /// A communication channel directly to the peer's session task.
1971    request_tx: PeerRequestSender<PeerRequest<N>>,
1972    /// negotiated version of the session.
1973    version: EthVersion,
1974    /// The peer's client version.
1975    client_version: Arc<str>,
1976    /// The kind of peer.
1977    peer_kind: PeerKind,
1978}
1979
1980impl<N: NetworkPrimitives> PeerMetadata<N> {
1981    /// Returns a new instance of [`PeerMetadata`].
1982    pub fn new(
1983        request_tx: PeerRequestSender<PeerRequest<N>>,
1984        version: EthVersion,
1985        client_version: Arc<str>,
1986        max_transactions_seen_by_peer: u32,
1987        peer_kind: PeerKind,
1988    ) -> Self {
1989        Self {
1990            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1991            request_tx,
1992            version,
1993            client_version,
1994            peer_kind,
1995        }
1996    }
1997
1998    /// Returns a reference to the peer's request sender channel.
1999    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2000        &self.request_tx
2001    }
2002
2003    /// Return a
2004    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2005        &mut self.seen_transactions
2006    }
2007
2008    /// Returns the negotiated `EthVersion` of the session.
2009    pub const fn version(&self) -> EthVersion {
2010        self.version
2011    }
2012
2013    /// Returns a reference to the peer's client version string.
2014    pub fn client_version(&self) -> &str {
2015        &self.client_version
2016    }
2017
2018    /// Returns the peer's kind.
2019    pub const fn peer_kind(&self) -> PeerKind {
2020        self.peer_kind
2021    }
2022}
2023
2024/// Commands to send to the [`TransactionsManager`]
2025#[derive(Debug)]
2026enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2027    /// Propagate a transaction hash to the network.
2028    PropagateHash(B256),
2029    /// Propagate transaction hashes to a specific peer.
2030    PropagateHashesTo(Vec<B256>, PeerId),
2031    /// Request the list of active peer IDs from the [`TransactionsManager`].
2032    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2033    /// Propagate a collection of full transactions to a specific peer.
2034    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2035    /// Propagate a collection of hashes to all peers.
2036    PropagateTransactions(Vec<TxHash>),
2037    /// Propagate a collection of broadcastable transactions in full to all peers.
2038    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2039    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2040    GetTransactionHashes {
2041        peers: Vec<PeerId>,
2042        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2043    },
2044    /// Requests a clone of the sender channel to the peer.
2045    GetPeerSender {
2046        peer_id: PeerId,
2047        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2048    },
2049}
2050
2051/// All events related to transactions emitted by the network.
2052#[derive(Debug)]
2053pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2054    /// Represents the event of receiving a list of transactions from a peer.
2055    ///
2056    /// This indicates transactions that were broadcasted to us from the peer.
2057    IncomingTransactions {
2058        /// The ID of the peer from which the transactions were received.
2059        peer_id: PeerId,
2060        /// The received transactions.
2061        msg: Transactions<N::BroadcastedTransaction>,
2062    },
2063    /// Represents the event of receiving a list of transaction hashes from a peer.
2064    IncomingPooledTransactionHashes {
2065        /// The ID of the peer from which the transaction hashes were received.
2066        peer_id: PeerId,
2067        /// The received new pooled transaction hashes.
2068        msg: NewPooledTransactionHashes,
2069    },
2070    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2071    GetPooledTransactions {
2072        /// The ID of the peer from which the request was received.
2073        peer_id: PeerId,
2074        /// The received `GetPooledTransactions` request.
2075        request: GetPooledTransactions,
2076        /// The sender for responding to the request with a result of `PooledTransactions`.
2077        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2078    },
2079    /// Represents the event of receiving a `GetTransactionsHandle` request.
2080    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2081}
2082
2083/// Tracks stats about the [`TransactionsManager`].
2084#[derive(Debug)]
2085pub struct PendingPoolImportsInfo {
2086    /// Number of transactions about to be inserted into the pool.
2087    pending_pool_imports: Arc<AtomicUsize>,
2088    /// Max number of transactions allowed to be imported concurrently.
2089    max_pending_pool_imports: usize,
2090}
2091
2092impl PendingPoolImportsInfo {
2093    /// Returns a new [`PendingPoolImportsInfo`].
2094    pub fn new(max_pending_pool_imports: usize) -> Self {
2095        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2096    }
2097
2098    /// Returns `true` if the number of pool imports is under a given tolerated max.
2099    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2100        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2101    }
2102}
2103
2104impl Default for PendingPoolImportsInfo {
2105    fn default() -> Self {
2106        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2107    }
2108}
2109
2110#[derive(Debug, Default)]
2111struct TxManagerPollDurations {
2112    acc_network_events: Duration,
2113    acc_pending_imports: Duration,
2114    acc_tx_events: Duration,
2115    acc_imported_txns: Duration,
2116    acc_fetch_events: Duration,
2117    acc_pending_fetch: Duration,
2118    acc_cmds: Duration,
2119}
2120
2121#[cfg(test)]
2122mod tests {
2123    use super::*;
2124    use crate::{
2125        test_utils::{
2126            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2127            Testnet,
2128        },
2129        transactions::config::RelaxedEthAnnouncementFilter,
2130        NetworkConfigBuilder, NetworkManager,
2131    };
2132    use alloy_consensus::{TxEip1559, TxLegacy};
2133    use alloy_primitives::{hex, Signature, TxKind, U256};
2134    use alloy_rlp::Decodable;
2135    use futures::FutureExt;
2136    use reth_chainspec::MIN_TRANSACTION_GAS;
2137    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2138    use reth_network_api::{NetworkInfo, PeerKind};
2139    use reth_network_p2p::{
2140        error::{RequestError, RequestResult},
2141        sync::{NetworkSyncUpdater, SyncState},
2142    };
2143    use reth_storage_api::noop::NoopProvider;
2144    use reth_transaction_pool::test_utils::{
2145        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2146    };
2147    use secp256k1::SecretKey;
2148    use std::{
2149        future::poll_fn,
2150        net::{IpAddr, Ipv4Addr, SocketAddr},
2151        str::FromStr,
2152    };
2153    use tracing::error;
2154
2155    #[tokio::test(flavor = "multi_thread")]
2156    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2157        reth_tracing::init_test_tracing();
2158        let net = Testnet::create(3).await;
2159
2160        let mut handles = net.handles();
2161        let handle0 = handles.next().unwrap();
2162        let handle1 = handles.next().unwrap();
2163
2164        drop(handles);
2165        let handle = net.spawn();
2166
2167        let listener0 = handle0.event_listener();
2168        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2169        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2170
2171        let client = NoopProvider::default();
2172        let pool = testing_pool();
2173        let config = NetworkConfigBuilder::eth(secret_key)
2174            .disable_discovery()
2175            .listener_port(0)
2176            .build(client);
2177        let transactions_manager_config = config.transactions_manager_config.clone();
2178        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2179            .await
2180            .unwrap()
2181            .into_builder()
2182            .transactions(pool.clone(), transactions_manager_config)
2183            .split_with_handle();
2184
2185        tokio::task::spawn(network);
2186
2187        // go to syncing (pipeline sync)
2188        network_handle.update_sync_state(SyncState::Syncing);
2189        assert!(NetworkInfo::is_syncing(&network_handle));
2190        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2191
2192        // wait for all initiator connections
2193        let mut established = listener0.take(2);
2194        while let Some(ev) = established.next().await {
2195            match ev {
2196                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2197                    // to insert a new peer in transactions peerset
2198                    transactions
2199                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2200                }
2201                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2202                ev => {
2203                    error!("unexpected event {ev:?}")
2204                }
2205            }
2206        }
2207        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2208        let input = hex!(
2209            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2210        );
2211        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2212        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2213            peer_id: *handle1.peer_id(),
2214            msg: Transactions(vec![signed_tx.clone()]),
2215        });
2216        poll_fn(|cx| {
2217            let _ = transactions.poll_unpin(cx);
2218            Poll::Ready(())
2219        })
2220        .await;
2221        assert!(pool.is_empty());
2222        handle.terminate().await;
2223    }
2224
2225    #[tokio::test(flavor = "multi_thread")]
2226    async fn test_tx_broadcasts_through_two_syncs() {
2227        reth_tracing::init_test_tracing();
2228        let net = Testnet::create(3).await;
2229
2230        let mut handles = net.handles();
2231        let handle0 = handles.next().unwrap();
2232        let handle1 = handles.next().unwrap();
2233
2234        drop(handles);
2235        let handle = net.spawn();
2236
2237        let listener0 = handle0.event_listener();
2238        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2239        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2240
2241        let client = NoopProvider::default();
2242        let pool = testing_pool();
2243        let config = NetworkConfigBuilder::new(secret_key)
2244            .disable_discovery()
2245            .listener_port(0)
2246            .build(client);
2247        let transactions_manager_config = config.transactions_manager_config.clone();
2248        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2249            .await
2250            .unwrap()
2251            .into_builder()
2252            .transactions(pool.clone(), transactions_manager_config)
2253            .split_with_handle();
2254
2255        tokio::task::spawn(network);
2256
2257        // go to syncing (pipeline sync) to idle and then to syncing (live)
2258        network_handle.update_sync_state(SyncState::Syncing);
2259        assert!(NetworkInfo::is_syncing(&network_handle));
2260        network_handle.update_sync_state(SyncState::Idle);
2261        assert!(!NetworkInfo::is_syncing(&network_handle));
2262        network_handle.update_sync_state(SyncState::Syncing);
2263        assert!(NetworkInfo::is_syncing(&network_handle));
2264
2265        // wait for all initiator connections
2266        let mut established = listener0.take(2);
2267        while let Some(ev) = established.next().await {
2268            match ev {
2269                NetworkEvent::ActivePeerSession { .. } |
2270                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2271                    // to insert a new peer in transactions peerset
2272                    transactions.on_network_event(ev);
2273                }
2274                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2275                _ => {
2276                    error!("unexpected event {ev:?}")
2277                }
2278            }
2279        }
2280        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2281        let input = hex!(
2282            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2283        );
2284        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2285        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2286            peer_id: *handle1.peer_id(),
2287            msg: Transactions(vec![signed_tx.clone()]),
2288        });
2289        poll_fn(|cx| {
2290            let _ = transactions.poll_unpin(cx);
2291            Poll::Ready(())
2292        })
2293        .await;
2294        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2295        assert!(NetworkInfo::is_syncing(&network_handle));
2296        assert!(!pool.is_empty());
2297        handle.terminate().await;
2298    }
2299
2300    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2301    // event and is able to retrieve the corresponding transactions.
2302    #[tokio::test(flavor = "multi_thread")]
2303    async fn test_handle_incoming_transactions_hashes() {
2304        reth_tracing::init_test_tracing();
2305
2306        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2307        let client = NoopProvider::default();
2308
2309        let config = NetworkConfigBuilder::new(secret_key)
2310            // let OS choose port
2311            .listener_port(0)
2312            .disable_discovery()
2313            .build(client);
2314
2315        let pool = testing_pool();
2316
2317        let transactions_manager_config = config.transactions_manager_config.clone();
2318        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2319            .await
2320            .unwrap()
2321            .into_builder()
2322            .transactions(pool.clone(), transactions_manager_config)
2323            .split_with_handle();
2324
2325        let peer_id_1 = PeerId::new([1; 64]);
2326        let eth_version = EthVersion::Eth66;
2327
2328        let txs = vec![TransactionSigned::new_unhashed(
2329            Transaction::Legacy(TxLegacy {
2330                chain_id: Some(4),
2331                nonce: 15u64,
2332                gas_price: 2200000000,
2333                gas_limit: 34811,
2334                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2335                value: U256::from(1234u64),
2336                input: Default::default(),
2337            }),
2338            Signature::new(
2339                U256::from_str(
2340                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2341                )
2342                .unwrap(),
2343                U256::from_str(
2344                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2345                )
2346                .unwrap(),
2347                true,
2348            ),
2349        )];
2350
2351        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2352
2353        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2354        tx_manager.peers.insert(peer_id_1, peer_1);
2355
2356        assert!(pool.is_empty());
2357
2358        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2359            peer_id: peer_id_1,
2360            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2361                txs_hashes.clone(),
2362            )),
2363        });
2364
2365        // mock session of peer_1 receives request
2366        let req = to_mock_session_rx
2367            .recv()
2368            .await
2369            .expect("peer_1 session should receive request with buffered hashes");
2370        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2371        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2372
2373        let message: Vec<PooledTransactionVariant> = txs
2374            .into_iter()
2375            .map(|tx| {
2376                PooledTransactionVariant::try_from(tx)
2377                    .expect("Failed to convert MockTransaction to PooledTransaction")
2378            })
2379            .collect();
2380
2381        // return the transactions corresponding to the transaction hashes.
2382        response
2383            .send(Ok(PooledTransactions(message)))
2384            .expect("should send peer_1 response to tx manager");
2385
2386        // adance the transaction manager future
2387        poll_fn(|cx| {
2388            let _ = tx_manager.poll_unpin(cx);
2389            Poll::Ready(())
2390        })
2391        .await;
2392
2393        // ensure that the transactions corresponding to the transaction hashes have been
2394        // successfully retrieved and stored in the Pool.
2395        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2396    }
2397
2398    #[tokio::test(flavor = "multi_thread")]
2399    async fn test_handle_incoming_transactions() {
2400        reth_tracing::init_test_tracing();
2401        let net = Testnet::create(3).await;
2402
2403        let mut handles = net.handles();
2404        let handle0 = handles.next().unwrap();
2405        let handle1 = handles.next().unwrap();
2406
2407        drop(handles);
2408        let handle = net.spawn();
2409
2410        let listener0 = handle0.event_listener();
2411
2412        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2413        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2414
2415        let client = NoopProvider::default();
2416        let pool = testing_pool();
2417        let config = NetworkConfigBuilder::new(secret_key)
2418            .disable_discovery()
2419            .listener_port(0)
2420            .build(client);
2421        let transactions_manager_config = config.transactions_manager_config.clone();
2422        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2423            .await
2424            .unwrap()
2425            .into_builder()
2426            .transactions(pool.clone(), transactions_manager_config)
2427            .split_with_handle();
2428        tokio::task::spawn(network);
2429
2430        network_handle.update_sync_state(SyncState::Idle);
2431
2432        assert!(!NetworkInfo::is_syncing(&network_handle));
2433
2434        // wait for all initiator connections
2435        let mut established = listener0.take(2);
2436        while let Some(ev) = established.next().await {
2437            match ev {
2438                NetworkEvent::ActivePeerSession { .. } |
2439                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2440                    // to insert a new peer in transactions peerset
2441                    transactions.on_network_event(ev);
2442                }
2443                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2444                ev => {
2445                    error!("unexpected event {ev:?}")
2446                }
2447            }
2448        }
2449        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2450        let input = hex!(
2451            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2452        );
2453        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2454        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2455            peer_id: *handle1.peer_id(),
2456            msg: Transactions(vec![signed_tx.clone()]),
2457        });
2458        assert!(transactions
2459            .transactions_by_peers
2460            .get(signed_tx.tx_hash())
2461            .unwrap()
2462            .contains(handle1.peer_id()));
2463
2464        // advance the transaction manager future
2465        poll_fn(|cx| {
2466            let _ = transactions.poll_unpin(cx);
2467            Poll::Ready(())
2468        })
2469        .await;
2470
2471        assert!(!pool.is_empty());
2472        assert!(pool.get(signed_tx.tx_hash()).is_some());
2473        handle.terminate().await;
2474    }
2475
2476    #[tokio::test(flavor = "multi_thread")]
2477    async fn test_on_get_pooled_transactions_network() {
2478        reth_tracing::init_test_tracing();
2479        let net = Testnet::create(2).await;
2480
2481        let mut handles = net.handles();
2482        let handle0 = handles.next().unwrap();
2483        let handle1 = handles.next().unwrap();
2484
2485        drop(handles);
2486        let handle = net.spawn();
2487
2488        let listener0 = handle0.event_listener();
2489
2490        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2491        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2492
2493        let client = NoopProvider::default();
2494        let pool = testing_pool();
2495        let config = NetworkConfigBuilder::new(secret_key)
2496            .disable_discovery()
2497            .listener_port(0)
2498            .build(client);
2499        let transactions_manager_config = config.transactions_manager_config.clone();
2500        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2501            .await
2502            .unwrap()
2503            .into_builder()
2504            .transactions(pool.clone(), transactions_manager_config)
2505            .split_with_handle();
2506        tokio::task::spawn(network);
2507
2508        network_handle.update_sync_state(SyncState::Idle);
2509
2510        assert!(!NetworkInfo::is_syncing(&network_handle));
2511
2512        // wait for all initiator connections
2513        let mut established = listener0.take(2);
2514        while let Some(ev) = established.next().await {
2515            match ev {
2516                NetworkEvent::ActivePeerSession { .. } |
2517                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2518                    transactions.on_network_event(ev);
2519                }
2520                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2521                ev => {
2522                    error!("unexpected event {ev:?}")
2523                }
2524            }
2525        }
2526        handle.terminate().await;
2527
2528        let tx = MockTransaction::eip1559();
2529        let _ = transactions
2530            .pool
2531            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2532            .await;
2533
2534        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2535
2536        let (send, receive) =
2537            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2538
2539        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2540            peer_id: *handle1.peer_id(),
2541            request,
2542            response: send,
2543        });
2544
2545        match receive.await.unwrap() {
2546            Ok(PooledTransactions(transactions)) => {
2547                assert_eq!(transactions.len(), 1);
2548            }
2549            Err(e) => {
2550                panic!("error: {e:?}");
2551            }
2552        }
2553    }
2554
2555    // Ensure that when the remote peer only returns part of the requested transactions, the
2556    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2557    // re-buffered.
2558    #[tokio::test]
2559    async fn test_partially_tx_response() {
2560        reth_tracing::init_test_tracing();
2561
2562        let mut tx_manager = new_tx_manager().await.0;
2563        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2564
2565        let peer_id_1 = PeerId::new([1; 64]);
2566        let eth_version = EthVersion::Eth66;
2567
2568        let txs = vec![
2569            TransactionSigned::new_unhashed(
2570                Transaction::Legacy(TxLegacy {
2571                    chain_id: Some(4),
2572                    nonce: 15u64,
2573                    gas_price: 2200000000,
2574                    gas_limit: 34811,
2575                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2576                    value: U256::from(1234u64),
2577                    input: Default::default(),
2578                }),
2579                Signature::new(
2580                    U256::from_str(
2581                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2582                    )
2583                    .unwrap(),
2584                    U256::from_str(
2585                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2586                    )
2587                    .unwrap(),
2588                    true,
2589                ),
2590            ),
2591            TransactionSigned::new_unhashed(
2592                Transaction::Eip1559(TxEip1559 {
2593                    chain_id: 4,
2594                    nonce: 26u64,
2595                    max_priority_fee_per_gas: 1500000000,
2596                    max_fee_per_gas: 1500000013,
2597                    gas_limit: MIN_TRANSACTION_GAS,
2598                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2599                    value: U256::from(3000000000000000000u64),
2600                    input: Default::default(),
2601                    access_list: Default::default(),
2602                }),
2603                Signature::new(
2604                    U256::from_str(
2605                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2606                    )
2607                    .unwrap(),
2608                    U256::from_str(
2609                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2610                    )
2611                    .unwrap(),
2612                    true,
2613                ),
2614            ),
2615        ];
2616
2617        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2618
2619        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2620        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2621        // fetch
2622        peer_1.seen_transactions.insert(txs_hashes[0]);
2623        peer_1.seen_transactions.insert(txs_hashes[1]);
2624        tx_manager.peers.insert(peer_id_1, peer_1);
2625
2626        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2627        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2628
2629        // peer_1 is idle
2630        assert!(tx_fetcher.is_idle(&peer_id_1));
2631        assert_eq!(tx_fetcher.active_peers.len(), 0);
2632
2633        // sends requests for buffered hashes to peer_1
2634        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2635
2636        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2637        // as long as request is in flight peer_1 is not idle
2638        assert!(!tx_fetcher.is_idle(&peer_id_1));
2639        assert_eq!(tx_fetcher.active_peers.len(), 1);
2640
2641        // mock session of peer_1 receives request
2642        let req = to_mock_session_rx
2643            .recv()
2644            .await
2645            .expect("peer_1 session should receive request with buffered hashes");
2646        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2647
2648        let message: Vec<PooledTransactionVariant> = txs
2649            .into_iter()
2650            .take(1)
2651            .map(|tx| {
2652                PooledTransactionVariant::try_from(tx)
2653                    .expect("Failed to convert MockTransaction to PooledTransaction")
2654            })
2655            .collect();
2656        // response partial request
2657        response
2658            .send(Ok(PooledTransactions(message)))
2659            .expect("should send peer_1 response to tx manager");
2660        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2661            unreachable!()
2662        };
2663
2664        // request has resolved, peer_1 is idle again
2665        assert!(tx_fetcher.is_idle(&peer_id));
2666        assert_eq!(tx_fetcher.active_peers.len(), 0);
2667        // failing peer_1's request buffers requested hashes for retry.
2668        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2669    }
2670
2671    #[tokio::test]
2672    async fn test_max_retries_tx_request() {
2673        reth_tracing::init_test_tracing();
2674
2675        let mut tx_manager = new_tx_manager().await.0;
2676        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2677
2678        let peer_id_1 = PeerId::new([1; 64]);
2679        let peer_id_2 = PeerId::new([2; 64]);
2680        let eth_version = EthVersion::Eth66;
2681        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2682
2683        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2684        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2685        // fetch
2686        peer_1.seen_transactions.insert(seen_hashes[0]);
2687        peer_1.seen_transactions.insert(seen_hashes[1]);
2688        tx_manager.peers.insert(peer_id_1, peer_1);
2689
2690        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2691        // for first retry in reverse order to make index 0 lru
2692        let retries = 1;
2693        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2694        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2695
2696        // peer_1 is idle
2697        assert!(tx_fetcher.is_idle(&peer_id_1));
2698        assert_eq!(tx_fetcher.active_peers.len(), 0);
2699
2700        // sends request for buffered hashes to peer_1
2701        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2702
2703        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2704
2705        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2706        // as long as request is in inflight peer_1 is not idle
2707        assert!(!tx_fetcher.is_idle(&peer_id_1));
2708        assert_eq!(tx_fetcher.active_peers.len(), 1);
2709
2710        // mock session of peer_1 receives request
2711        let req = to_mock_session_rx
2712            .recv()
2713            .await
2714            .expect("peer_1 session should receive request with buffered hashes");
2715        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2716        let GetPooledTransactions(hashes) = request;
2717
2718        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2719
2720        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2721
2722        // fail request to peer_1
2723        response
2724            .send(Err(RequestError::BadResponse))
2725            .expect("should send peer_1 response to tx manager");
2726        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2727            unreachable!()
2728        };
2729
2730        // request has resolved, peer_1 is idle again
2731        assert!(tx_fetcher.is_idle(&peer_id));
2732        assert_eq!(tx_fetcher.active_peers.len(), 0);
2733        // failing peer_1's request buffers requested hashes for retry
2734        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2735
2736        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2737        tx_manager.peers.insert(peer_id_2, peer_2);
2738
2739        // peer_2 announces same hashes as peer_1
2740        let msg =
2741            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2742        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2743
2744        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2745
2746        // peer_2 should be in active_peers.
2747        assert_eq!(tx_fetcher.active_peers.len(), 1);
2748
2749        // since hashes are already seen, no changes to length of unknown hashes
2750        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2751        // but hashes are taken out of buffer and packed into request to peer_2
2752        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2753
2754        // mock session of peer_2 receives request
2755        let req = to_mock_session_rx
2756            .recv()
2757            .await
2758            .expect("peer_2 session should receive request with buffered hashes");
2759        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2760
2761        // report failed request to tx manager
2762        response
2763            .send(Err(RequestError::BadResponse))
2764            .expect("should send peer_2 response to tx manager");
2765        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2766
2767        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2768        // for retry
2769        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2770        assert_eq!(tx_fetcher.active_peers.len(), 0);
2771    }
2772
2773    #[test]
2774    fn test_transaction_builder_empty() {
2775        let mut builder =
2776            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2777        assert!(builder.is_empty());
2778
2779        let mut factory = MockTransactionFactory::default();
2780        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2781        builder.push(&tx);
2782        assert!(!builder.is_empty());
2783
2784        let txs = builder.build();
2785        assert!(txs.full.is_none());
2786        let txs = txs.pooled.unwrap();
2787        assert_eq!(txs.len(), 1);
2788    }
2789
2790    #[test]
2791    fn test_transaction_builder_large() {
2792        let mut builder =
2793            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2794        assert!(builder.is_empty());
2795
2796        let mut factory = MockTransactionFactory::default();
2797        let mut tx = factory.create_eip1559();
2798        // create a transaction that still fits
2799        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2800        let tx = Arc::new(tx);
2801        let tx = PropagateTransaction::pool_tx(tx);
2802        builder.push(&tx);
2803        assert!(!builder.is_empty());
2804
2805        let txs = builder.clone().build();
2806        assert!(txs.pooled.is_none());
2807        let txs = txs.full.unwrap();
2808        assert_eq!(txs.len(), 1);
2809
2810        builder.push(&tx);
2811
2812        let txs = builder.clone().build();
2813        let pooled = txs.pooled.unwrap();
2814        assert_eq!(pooled.len(), 1);
2815        let txs = txs.full.unwrap();
2816        assert_eq!(txs.len(), 1);
2817    }
2818
2819    #[test]
2820    fn test_transaction_builder_eip4844() {
2821        let mut builder =
2822            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2823        assert!(builder.is_empty());
2824
2825        let mut factory = MockTransactionFactory::default();
2826        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2827        builder.push(&tx);
2828        assert!(!builder.is_empty());
2829
2830        let txs = builder.clone().build();
2831        assert!(txs.full.is_none());
2832        let txs = txs.pooled.unwrap();
2833        assert_eq!(txs.len(), 1);
2834
2835        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2836        builder.push(&tx);
2837
2838        let txs = builder.clone().build();
2839        let pooled = txs.pooled.unwrap();
2840        assert_eq!(pooled.len(), 1);
2841        let txs = txs.full.unwrap();
2842        assert_eq!(txs.len(), 1);
2843    }
2844
2845    #[tokio::test]
2846    async fn test_propagate_full() {
2847        reth_tracing::init_test_tracing();
2848
2849        let (mut tx_manager, network) = new_tx_manager().await;
2850        let peer_id = PeerId::random();
2851
2852        // ensure not syncing
2853        network.handle().update_sync_state(SyncState::Idle);
2854
2855        // mock a peer
2856        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2857
2858        let session_info = SessionInfo {
2859            peer_id,
2860            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2861            client_version: Arc::from(""),
2862            capabilities: Arc::new(vec![].into()),
2863            status: Arc::new(Default::default()),
2864            version: EthVersion::Eth68,
2865            peer_kind: PeerKind::Basic,
2866        };
2867        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2868        tx_manager
2869            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2870        let mut propagate = vec![];
2871        let mut factory = MockTransactionFactory::default();
2872        let eip1559_tx = Arc::new(factory.create_eip1559());
2873        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2874        let eip4844_tx = Arc::new(factory.create_eip4844());
2875        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2876
2877        let propagated =
2878            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2879        assert_eq!(propagated.0.len(), 2);
2880        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2881        assert_eq!(prop_txs.len(), 1);
2882        assert!(prop_txs[0].is_full());
2883
2884        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2885        assert_eq!(prop_txs.len(), 1);
2886        assert!(prop_txs[0].is_hash());
2887
2888        let peer = tx_manager.peers.get(&peer_id).unwrap();
2889        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2890        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2891        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2892
2893        // propagate again
2894        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2895        assert!(propagated.0.is_empty());
2896    }
2897
2898    #[tokio::test]
2899    async fn test_relaxed_filter_ignores_unknown_tx_types() {
2900        reth_tracing::init_test_tracing();
2901
2902        let transactions_manager_config = TransactionsManagerConfig::default();
2903
2904        let propagation_policy = TransactionPropagationKind::default();
2905        let announcement_policy = RelaxedEthAnnouncementFilter::default();
2906
2907        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2908
2909        let pool = testing_pool();
2910        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2911        let client = NoopProvider::default();
2912
2913        let network_config = NetworkConfigBuilder::new(secret_key)
2914            .listener_port(0)
2915            .disable_discovery()
2916            .build(client.clone());
2917
2918        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2919        let (to_tx_manager_tx, from_network_rx) =
2920            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2921        network_manager.set_transactions(to_tx_manager_tx);
2922        let network_handle = network_manager.handle().clone();
2923        let network_service_handle = tokio::spawn(network_manager);
2924
2925        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2926            network_handle.clone(),
2927            pool.clone(),
2928            from_network_rx,
2929            transactions_manager_config,
2930            policy_bundle,
2931        );
2932
2933        let peer_id = PeerId::random();
2934        let eth_version = EthVersion::Eth68;
2935        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2936        tx_manager.peers.insert(peer_id, mock_peer_metadata);
2937
2938        let mut tx_factory = MockTransactionFactory::default();
2939
2940        let valid_known_tx = tx_factory.create_eip1559();
2941        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2942
2943        let known_tx_hash = *known_tx_signed.hash();
2944        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2945        let known_tx_size = known_tx_signed.encoded_length();
2946
2947        let unknown_tx_hash = B256::random();
2948        let unknown_tx_type_byte = 0xff_u8;
2949        let unknown_tx_size = 150;
2950
2951        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2952            types: vec![known_tx_type_byte, unknown_tx_type_byte],
2953            sizes: vec![known_tx_size, unknown_tx_size],
2954            hashes: vec![known_tx_hash, unknown_tx_hash],
2955        });
2956
2957        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2958
2959        poll_fn(|cx| {
2960            let _ = tx_manager.poll_unpin(cx);
2961            Poll::Ready(())
2962        })
2963        .await;
2964
2965        let mut requested_hashes_in_getpooled = HashSet::new();
2966        let mut unexpected_request_received = false;
2967
2968        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2969            .await
2970        {
2971            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2972                let GetPooledTransactions(hashes) = request;
2973                for hash in hashes {
2974                    requested_hashes_in_getpooled.insert(hash);
2975                }
2976                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2977            }
2978            Ok(Some(other_request)) => {
2979                tracing::error!(?other_request, "Received unexpected PeerRequest type");
2980                unexpected_request_received = true;
2981            }
2982            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2983            Err(_timeout_err) => {
2984                tracing::info!("Timeout: No GetPooledTransactions request received.")
2985            }
2986        }
2987
2988        assert!(
2989            requested_hashes_in_getpooled.contains(&known_tx_hash),
2990            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2991        );
2992        assert!(
2993            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2994            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2995        );
2996        assert!(
2997            !unexpected_request_received,
2998            "An unexpected P2P request was received by the mock peer."
2999        );
3000
3001        network_service_handle.abort();
3002    }
3003}