Skip to main content

reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6/// Aggregation on configurable parameters for [`TransactionsManager`].
7pub mod config;
8/// Default and spec'd bounds.
9pub mod constants;
10/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
11pub mod fetcher;
12/// Defines the traits for transaction-related policies.
13pub mod policy;
14
15pub use self::constants::{
16    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30    budget::{
31        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33    },
34    cache::LruCache,
35    duration_metered_exec, metered_poll_nested_stream_with_budget,
36    metrics::{
37        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38    },
39    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40    NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49    RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54    events::{PeerEvent, SessionInfo},
55    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58    error::{RequestError, RequestResult},
59    sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66    error::{PoolError, PoolResult},
67    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71    collections::{hash_map::Entry, HashMap, HashSet},
72    pin::Pin,
73    sync::{
74        atomic::{AtomicUsize, Ordering},
75        Arc,
76    },
77    task::{Context, Poll},
78    time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84/// The future for importing transactions into the pool.
85///
86/// Resolves with the result of each transaction import.
87pub type PoolImportFuture =
88    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90/// Api to interact with [`TransactionsManager`] task.
91///
92/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
93/// with the [`TransactionsManager`] task once it is spawned.
94///
95/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
96/// known by a specific peer.
97#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99    /// Command channel to the [`TransactionsManager`]
100    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104    fn send(&self, cmd: TransactionsCommand<N>) {
105        let _ = self.manager_tx.send(cmd);
106    }
107
108    /// Fetch the [`PeerRequestSender`] for the given peer.
109    async fn peer_handle(
110        &self,
111        peer_id: PeerId,
112    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113        let (tx, rx) = oneshot::channel();
114        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115        rx.await
116    }
117
118    /// Manually propagate the transaction that belongs to the hash.
119    pub fn propagate(&self, hash: TxHash) {
120        self.send(TransactionsCommand::PropagateHash(hash))
121    }
122
123    /// Manually propagate the transaction hash to a specific peer.
124    ///
125    /// Note: this only propagates if the pool contains the transaction.
126    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127        self.propagate_hashes_to(Some(hash), peer)
128    }
129
130    /// Manually propagate the transaction hashes to a specific peer.
131    ///
132    /// Note: this only propagates the transactions that are known to the pool.
133    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134        let hashes = hash.into_iter().collect::<Vec<_>>();
135        if hashes.is_empty() {
136            return
137        }
138        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139    }
140
141    /// Request the active peer IDs from the [`TransactionsManager`].
142    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143        let (tx, rx) = oneshot::channel();
144        self.send(TransactionsCommand::GetActivePeers(tx));
145        rx.await
146    }
147
148    /// Manually propagate full transaction hashes to a specific peer.
149    ///
150    /// Do nothing if transactions are empty.
151    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152        if transactions.is_empty() {
153            return
154        }
155        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156    }
157
158    /// Manually propagate the given transaction hashes to all peers.
159    ///
160    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
161    /// full.
162    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163        if transactions.is_empty() {
164            return
165        }
166        self.send(TransactionsCommand::PropagateTransactions(transactions))
167    }
168
169    /// Manually propagate the given transactions to all peers.
170    ///
171    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
172    /// full.
173    pub fn broadcast_transactions(
174        &self,
175        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176    ) {
177        let transactions =
178            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179        if transactions.is_empty() {
180            return
181        }
182        self.send(TransactionsCommand::BroadcastTransactions(transactions))
183    }
184
185    /// Request the transaction hashes known by specific peers.
186    pub async fn get_transaction_hashes(
187        &self,
188        peers: Vec<PeerId>,
189    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190        if peers.is_empty() {
191            return Ok(Default::default())
192        }
193        let (tx, rx) = oneshot::channel();
194        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195        rx.await
196    }
197
198    /// Request the transaction hashes known by a specific peer.
199    pub async fn get_peer_transaction_hashes(
200        &self,
201        peer: PeerId,
202    ) -> Result<HashSet<TxHash>, RecvError> {
203        let res = self.get_transaction_hashes(vec![peer]).await?;
204        Ok(res.into_values().next().unwrap_or_default())
205    }
206
207    /// Requests the transactions directly from the given peer.
208    ///
209    /// Returns `None` if the peer is not connected.
210    ///
211    /// **Note**: this returns the response from the peer as received.
212    pub async fn get_pooled_transactions_from(
213        &self,
214        peer_id: PeerId,
215        hashes: Vec<B256>,
216    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219        let (tx, rx) = oneshot::channel();
220        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221        peer.try_send(request).ok();
222
223        rx.await?.map(|res| Some(res.0))
224    }
225}
226
227/// Manages transactions on top of the p2p network.
228///
229/// This can be spawned to another task and is supposed to be run as background service.
230/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
231/// interact with it.
232///
233/// The [`TransactionsManager`] is responsible for:
234///    - handling incoming eth messages for transactions.
235///    - serving transaction requests.
236///    - propagate transactions
237///
238/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
239///   - receives incoming network messages.
240///   - sends messages to dispatch (responses, propagate tx)
241///
242/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
243/// propagate new transactions over the network.
244///
245/// It can be configured with different policies for transaction propagation and announcement
246/// filtering. See [`NetworkPolicies`] for more details.
247///
248/// ## Network Transaction Processing
249///
250/// ### Message Types
251///
252/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
253/// - **`NewPooledTransactionHashes`**: Hash announcements
254///
255/// ### Peer Tracking
256///
257/// - Maintains per-peer transaction cache (default: 10,240 entries)
258/// - Prevents duplicate imports and enables efficient propagation
259///
260/// ### Bad Transaction Handling
261///
262/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
263/// Penalizes peers sending invalid transactions.
264///
265/// ### Import Management
266///
267/// Limits concurrent pool imports and backs off when approaching capacity.
268///
269/// ### Transaction Fetching
270///
271/// For announced transactions: filters known → queues unknown → fetches → imports
272///
273/// ### Propagation Rules
274///
275/// Based on: origin (Local/External/Private), peer capabilities, and network state.
276/// Disabled during initial sync.
277///
278/// ### Security
279///
280/// Rate limiting via reputation, bad transaction isolation, peer scoring.
281#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284    /// Access to the transaction pool.
285    pool: Pool,
286    /// Network access.
287    network: NetworkHandle<N>,
288    /// Subscriptions to all network related events.
289    ///
290    /// From which we get all new incoming transaction related messages.
291    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292    /// Transaction fetcher to handle inflight and missing transaction requests.
293    transaction_fetcher: TransactionFetcher<N>,
294    /// All currently pending transactions grouped by peers.
295    ///
296    /// This way we can track incoming transactions and prevent multiple pool imports for the same
297    /// transaction
298    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299    /// Transactions that are currently imported into the `Pool`.
300    ///
301    /// The import process includes:
302    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
303    ///    valid, or for 4844 transaction the blobs are valid. See also
304    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
305    /// - if the transaction is valid, it is added into the pool.
306    ///
307    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
308    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
309    /// receiver.
310    pool_imports: FuturesUnordered<PoolImportFuture>,
311    /// Stats on pending pool imports that help the node self-monitor.
312    pending_pool_imports_info: PendingPoolImportsInfo,
313    /// Bad imports.
314    bad_imports: LruCache<TxHash>,
315    /// All the connected peers.
316    peers: HashMap<PeerId, PeerMetadata<N>>,
317    /// Send half for the command channel.
318    ///
319    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
320    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321    /// Incoming commands from [`TransactionsHandle`].
322    ///
323    /// This will only receive commands if a user manually sends a command to the manager through
324    /// the [`TransactionsHandle`] to interact with this type directly.
325    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326    /// A stream that yields new __pending__ transactions.
327    ///
328    /// A transaction is considered __pending__ if it is executable on the current state of the
329    /// chain. In other words, this only yields transactions that satisfy all consensus
330    /// requirements, these include:
331    ///   - no nonce gaps
332    ///   - all dynamic fee requirements are (currently) met
333    ///   - account has enough balance to cover the transaction's gas
334    pending_transactions: mpsc::Receiver<TxHash>,
335    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
336    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337    /// How the `TransactionsManager` is configured.
338    config: TransactionsManagerConfig,
339    /// Network Policies
340    policies: NetworkPolicies<N>,
341    /// `TransactionsManager` metrics
342    metrics: TransactionsManagerMetrics,
343    /// `AnnouncedTxTypes` metrics
344    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348    /// Sets up a new instance.
349    ///
350    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
351    pub fn new(
352        network: NetworkHandle<N>,
353        pool: Pool,
354        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355        transactions_manager_config: TransactionsManagerConfig,
356    ) -> Self {
357        Self::with_policy(
358            network,
359            pool,
360            from_network,
361            transactions_manager_config,
362            NetworkPolicies::new(
363                TransactionPropagationKind::default(),
364                StrictEthAnnouncementFilter::default(),
365            ),
366        )
367    }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371    /// Sets up a new instance with given the settings.
372    ///
373    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
374    pub fn with_policy(
375        network: NetworkHandle<N>,
376        pool: Pool,
377        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378        transactions_manager_config: TransactionsManagerConfig,
379        policies: NetworkPolicies<N>,
380    ) -> Self {
381        let network_events = network.event_listener();
382
383        let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386            &transactions_manager_config.transaction_fetcher_config,
387        );
388
389        // install a listener for new __pending__ transactions that are allowed to be propagated
390        // over the network
391        let pending = pool.pending_transactions_listener();
392        let pending_pool_imports_info = PendingPoolImportsInfo::default();
393        let metrics = TransactionsManagerMetrics::default();
394        metrics
395            .capacity_pending_pool_imports
396            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398        Self {
399            pool,
400            network,
401            network_events,
402            transaction_fetcher,
403            transactions_by_peers: Default::default(),
404            pool_imports: Default::default(),
405            pending_pool_imports_info: PendingPoolImportsInfo::new(
406                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
407            ),
408            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409            peers: Default::default(),
410            command_tx,
411            command_rx: UnboundedReceiverStream::new(command_rx),
412            pending_transactions: pending,
413            transaction_events: UnboundedMeteredReceiver::new(
414                from_network,
415                NETWORK_POOL_TRANSACTIONS_SCOPE,
416            ),
417            config: transactions_manager_config,
418            policies,
419            metrics,
420            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421        }
422    }
423
424    /// Returns a new handle that can send commands to this type.
425    pub fn handle(&self) -> TransactionsHandle<N> {
426        TransactionsHandle { manager_tx: self.command_tx.clone() }
427    }
428
429    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
430    /// `false` if [`TransactionsManager`] is operating close to full capacity.
431    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432        self.has_capacity_for_pending_pool_imports() &&
433            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434    }
435
436    /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
437    fn has_capacity_for_pending_pool_imports(&self) -> bool {
438        self.remaining_pool_import_capacity() > 0
439    }
440
441    /// Returns the remaining capacity for pending pool imports.
442    fn remaining_pool_import_capacity(&self) -> usize {
443        self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444            self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445        )
446    }
447
448    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450        self.metrics.reported_bad_transactions.increment(1);
451    }
452
453    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455        self.network.reputation_change(peer_id, kind);
456    }
457
458    fn report_already_seen(&self, peer_id: PeerId) {
459        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461    }
462
463    /// Clear the transaction
464    fn on_good_import(&mut self, hash: TxHash) {
465        self.transactions_by_peers.remove(&hash);
466    }
467
468    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
469    /// fetching or importing it again.
470    ///
471    /// Errors that count as bad transactions are:
472    ///
473    /// - intrinsic gas too low
474    /// - exceeds gas limit
475    /// - gas uint overflow
476    /// - exceeds max init code size
477    /// - oversized data
478    /// - signer account has bytecode
479    /// - chain id mismatch
480    /// - old legacy chain id
481    /// - tx type not supported
482    ///
483    /// (and additionally for blobs txns...)
484    ///
485    /// - no blobs
486    /// - too many blobs
487    /// - invalid kzg proof
488    /// - kzg error
489    /// - not blob transaction (tx type mismatch)
490    /// - wrong versioned kzg commitment hash
491    fn on_bad_import(&mut self, err: PoolError) {
492        let peers = self.transactions_by_peers.remove(&err.hash);
493
494        // if we're _currently_ syncing, we ignore a bad transaction
495        if !err.is_bad_transaction() || self.network.is_syncing() {
496            return
497        }
498        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
499        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
500        if let Some(peers) = peers {
501            for peer_id in peers {
502                self.report_peer_bad_transactions(peer_id);
503            }
504        }
505        self.metrics.bad_imports.increment(1);
506        self.bad_imports.insert(err.hash);
507    }
508
509    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
510    ///
511    /// Returns `true` if a request was sent.
512    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
513        // try drain transaction hashes pending fetch
514        let info = &self.pending_pool_imports_info;
515        let max_pending_pool_imports = info.max_pending_pool_imports;
516        let has_capacity_wrt_pending_pool_imports =
517            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
518
519        self.transaction_fetcher
520            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
521    }
522
523    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
524        let kind = match req_err {
525            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
526            RequestError::Timeout => ReputationChangeKind::Timeout,
527            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
528                // peer is already disconnected
529                return
530            }
531            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
532        };
533        self.report_peer(peer_id, kind);
534    }
535
536    #[inline]
537    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
538        let metrics = &self.metrics;
539
540        let TxManagerPollDurations {
541            acc_network_events,
542            acc_pending_imports,
543            acc_tx_events,
544            acc_imported_txns,
545            acc_fetch_events,
546            acc_pending_fetch,
547            acc_cmds,
548        } = poll_durations;
549
550        // update metrics for whole poll function
551        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
552        // update metrics for nested expressions
553        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
554        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
555        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
556        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
557        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
558        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
559        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
560    }
561}
562
563impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
564    /// Processes a batch import results.
565    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
566        for res in batch_results {
567            match res {
568                Ok(AddedTransactionOutcome { hash, .. }) => {
569                    self.on_good_import(hash);
570                }
571                Err(err) => {
572                    self.on_bad_import(err);
573                }
574            }
575        }
576    }
577
578    /// Request handler for an incoming `NewPooledTransactionHashes`
579    fn on_new_pooled_transaction_hashes(
580        &mut self,
581        peer_id: PeerId,
582        msg: NewPooledTransactionHashes,
583    ) {
584        // If the node is initially syncing, ignore transactions
585        if self.network.is_initially_syncing() {
586            return
587        }
588        if self.network.tx_gossip_disabled() {
589            return
590        }
591
592        // get handle to peer's session, if the session is still active
593        let Some(peer) = self.peers.get_mut(&peer_id) else {
594            trace!(
595                peer_id = format!("{peer_id:#}"),
596                ?msg,
597                "discarding announcement from inactive peer"
598            );
599
600            return
601        };
602        let client = peer.client_version.clone();
603
604        // keep track of the transactions the peer knows
605        let mut count_txns_already_seen_by_peer = 0;
606        for tx in msg.iter_hashes().copied() {
607            if !peer.seen_transactions.insert(tx) {
608                count_txns_already_seen_by_peer += 1;
609            }
610        }
611        if count_txns_already_seen_by_peer > 0 {
612            // this may occur if transactions are sent or announced to a peer, at the same time as
613            // the peer sends/announces those hashes to us. this is because, marking
614            // txns as seen by a peer is done optimistically upon sending them to the
615            // peer.
616            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
617            self.metrics
618                .occurrences_hash_already_seen_by_peer
619                .increment(count_txns_already_seen_by_peer);
620
621            trace!(target: "net::tx",
622                %count_txns_already_seen_by_peer,
623                peer_id=format!("{peer_id:#}"),
624                ?client,
625                "Peer sent hashes that have already been marked as seen by peer"
626            );
627
628            self.report_already_seen(peer_id);
629        }
630
631        // 1. filter out spam
632        if msg.is_empty() {
633            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
634            return;
635        }
636
637        let original_len = msg.len();
638        let mut partially_valid_msg = msg.dedup();
639
640        if partially_valid_msg.len() != original_len {
641            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
642        }
643
644        // 2. filter out transactions pending import to pool
645        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
646
647        // 3. filter out invalid entries (spam)
648        //
649        // validates messages with respect to the given network, e.g. allowed tx types.
650        // done before the pool lookup since these are cheap in-memory checks that shrink
651        // the set before acquiring the pool lock.
652        //
653        let mut should_report_peer = false;
654        let mut tx_types_counter = TxTypesCounter::default();
655
656        let is_eth68_message = partially_valid_msg
657            .msg_version()
658            .expect("partially valid announcement should have a version")
659            .is_eth68();
660
661        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
662            let (ty_byte, size_val) = match *metadata_ref_mut {
663                Some((ty, size)) => {
664                    if !is_eth68_message {
665                        should_report_peer = true;
666                    }
667                    (ty, size)
668                }
669                None => {
670                    if is_eth68_message {
671                        should_report_peer = true;
672                        return false;
673                    }
674                    (0u8, 0)
675                }
676            };
677
678            if is_eth68_message &&
679                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
680                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
681            {
682                tx_types_counter.increase_by_tx_type(parsed_tx_type);
683            }
684
685            let decision = self
686                .policies
687                .announcement_filter()
688                .decide_on_announcement(ty_byte, tx_hash, size_val);
689
690            match decision {
691                AnnouncementAcceptance::Accept => true,
692                AnnouncementAcceptance::Ignore => false,
693                AnnouncementAcceptance::Reject { penalize_peer } => {
694                    if penalize_peer {
695                        should_report_peer = true;
696                    }
697                    false
698                }
699            }
700        });
701
702        if is_eth68_message {
703            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
704        }
705
706        if should_report_peer {
707            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
708        }
709
710        // 4. filter out known hashes
711        //
712        // known txns have already been successfully fetched or received over gossip.
713        //
714        // most hashes will be filtered out here since the mempool protocol is a gossip
715        // protocol, healthy peers will send many of the same hashes.
716        //
717        let hashes_count_pre_pool_filter = partially_valid_msg.len();
718        self.pool.retain_unknown(&mut partially_valid_msg);
719        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
720            let already_known_hashes_count =
721                hashes_count_pre_pool_filter - partially_valid_msg.len();
722            self.metrics
723                .occurrences_hashes_already_in_pool
724                .increment(already_known_hashes_count as u64);
725        }
726
727        if partially_valid_msg.is_empty() {
728            // nothing to request
729            return
730        }
731
732        let mut valid_announcement_data =
733            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
734
735        if valid_announcement_data.is_empty() {
736            // no valid announcement data
737            return
738        }
739
740        // 5. filter out already seen unknown hashes
741        //
742        // seen hashes are already in the tx fetcher, pending fetch.
743        //
744        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
745        // fetcher, hence they should be valid at this point.
746        let bad_imports = &self.bad_imports;
747        self.transaction_fetcher.filter_unseen_and_pending_hashes(
748            &mut valid_announcement_data,
749            |hash| bad_imports.contains(hash),
750            &peer_id,
751            &client,
752        );
753
754        if valid_announcement_data.is_empty() {
755            // nothing to request
756            return
757        }
758
759        trace!(target: "net::tx::propagation",
760            peer_id=format!("{peer_id:#}"),
761            hashes_len=valid_announcement_data.len(),
762            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
763            msg_version=%valid_announcement_data.msg_version(),
764            client_version=%client,
765            "received previously unseen and pending hashes in announcement from peer"
766        );
767
768        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
769        // fallback
770        if !self.transaction_fetcher.is_idle(&peer_id) {
771            // load message version before announcement data is destructed in packing
772            let msg_version = valid_announcement_data.msg_version();
773            let (hashes, _version) = valid_announcement_data.into_request_hashes();
774
775            trace!(target: "net::tx",
776                peer_id=format!("{peer_id:#}"),
777                hashes=?*hashes,
778                %msg_version,
779                %client,
780                "buffering hashes announced by busy peer"
781            );
782
783            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
784
785            return
786        }
787
788        let mut hashes_to_request =
789            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
790        let surplus_hashes =
791            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
792
793        if !surplus_hashes.is_empty() {
794            trace!(target: "net::tx",
795                peer_id=format!("{peer_id:#}"),
796                surplus_hashes=?*surplus_hashes,
797                %client,
798                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
799            );
800
801            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
802        }
803
804        trace!(target: "net::tx",
805            peer_id=format!("{peer_id:#}"),
806            hashes=?*hashes_to_request,
807            %client,
808            "sending hashes in `GetPooledTransactions` request to peer's session"
809        );
810
811        // request the missing transactions
812        //
813        // get handle to peer's session again, at this point we know it exists
814        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
815        if let Some(failed_to_request_hashes) =
816            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
817        {
818            let conn_eth_version = peer.version;
819
820            trace!(target: "net::tx",
821                peer_id=format!("{peer_id:#}"),
822                failed_to_request_hashes=?*failed_to_request_hashes,
823                %conn_eth_version,
824                %client,
825                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
826            );
827            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
828        }
829    }
830}
831
832impl<Pool, N> TransactionsManager<Pool, N>
833where
834    Pool: TransactionPool + Unpin + 'static,
835    N: NetworkPrimitives<
836            BroadcastedTransaction: SignedTransaction,
837            PooledTransaction: SignedTransaction,
838        > + Unpin,
839    Pool::Transaction:
840        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
841{
842    /// Invoked when transactions in the local mempool are considered __pending__.
843    ///
844    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
845    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
846    /// messages. The Transactions message relays complete transaction objects and is typically
847    /// sent to a small, random fraction of connected peers.
848    ///
849    /// All other peers receive a notification of the transaction hash and can request the
850    /// complete transaction object if it is unknown to them. The dissemination of complete
851    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
852    /// and won't need to request it.
853    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
854        // We intentionally do not gate this on initial sync.
855        // During initial sync we skip importing tx announcements from peers in
856        // `on_new_pooled_transaction_hashes`, so transactions reaching this path are local.
857        if self.network.tx_gossip_disabled() {
858            return
859        }
860
861        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
862
863        self.propagate_all(hashes);
864    }
865
866    /// Propagate the full transactions to a specific peer.
867    ///
868    /// Returns the propagated transactions.
869    fn propagate_full_transactions_to_peer(
870        &mut self,
871        txs: Vec<TxHash>,
872        peer_id: PeerId,
873        propagation_mode: PropagationMode,
874    ) -> Option<PropagatedTransactions> {
875        let peer = self.peers.get_mut(&peer_id)?;
876        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
877        let mut propagated = PropagatedTransactions::default();
878
879        // filter all transactions unknown to the peer
880        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
881
882        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
883
884        if propagation_mode.is_forced() {
885            // skip cache check if forced
886            full_transactions.extend(to_propagate);
887        } else {
888            // Iterate through the transactions to propagate and fill the hashes and full
889            // transaction
890            for tx in to_propagate {
891                if !peer.seen_transactions.contains(tx.tx_hash()) {
892                    // Only include if the peer hasn't seen the transaction
893                    full_transactions.push(&tx);
894                }
895            }
896        }
897
898        if full_transactions.is_empty() {
899            // nothing to propagate
900            return None
901        }
902
903        let PropagateTransactions { pooled, full } = full_transactions.build();
904
905        // send hashes if any
906        if let Some(new_pooled_hashes) = pooled {
907            for hash in new_pooled_hashes.iter_hashes().copied() {
908                propagated.record(hash, PropagateKind::Hash(peer_id));
909                // mark transaction as seen by peer
910                peer.seen_transactions.insert(hash);
911            }
912
913            // send hashes of transactions
914            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
915        }
916
917        // send full transactions, if any
918        if let Some(new_full_transactions) = full {
919            for tx in &new_full_transactions {
920                propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
921                // mark transaction as seen by peer
922                peer.seen_transactions.insert(*tx.tx_hash());
923            }
924
925            // send full transactions
926            self.network.send_transactions(peer_id, new_full_transactions);
927        }
928
929        // Update propagated transactions metrics
930        self.metrics.propagated_transactions.increment(propagated.len() as u64);
931
932        Some(propagated)
933    }
934
935    /// Propagate the transaction hashes to the given peer
936    ///
937    /// Note: This will only send the hashes for transactions that exist in the pool.
938    fn propagate_hashes_to(
939        &mut self,
940        hashes: Vec<TxHash>,
941        peer_id: PeerId,
942        propagation_mode: PropagationMode,
943    ) {
944        trace!(target: "net::tx", "Start propagating transactions as hashes");
945
946        // This fetches a transactions from the pool, including the blob transactions, which are
947        // only ever sent as hashes.
948        let propagated = {
949            let Some(peer) = self.peers.get_mut(&peer_id) else {
950                // no such peer
951                return
952            };
953
954            let to_propagate =
955                self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
956
957            let mut propagated = PropagatedTransactions::default();
958
959            // check if transaction is known to peer
960            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
961
962            if propagation_mode.is_forced() {
963                hashes.extend(to_propagate)
964            } else {
965                for tx in to_propagate {
966                    if !peer.seen_transactions.contains(tx.tx_hash()) {
967                        // Include if the peer hasn't seen it
968                        hashes.push(&tx);
969                    }
970                }
971            }
972
973            let new_pooled_hashes = hashes.build();
974
975            if new_pooled_hashes.is_empty() {
976                // nothing to propagate
977                return
978            }
979
980            if let Some(peer) = self.peers.get_mut(&peer_id) {
981                for hash in new_pooled_hashes.iter_hashes().copied() {
982                    propagated.record(hash, PropagateKind::Hash(peer_id));
983                    peer.seen_transactions.insert(hash);
984                }
985            }
986
987            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
988
989            // send hashes of transactions
990            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
991
992            // Update propagated transactions metrics
993            self.metrics.propagated_transactions.increment(propagated.len() as u64);
994
995            propagated
996        };
997
998        // notify pool so events get fired
999        self.pool.on_propagated(propagated);
1000    }
1001
1002    /// Propagate the transactions to all connected peers either as full objects or hashes.
1003    ///
1004    /// The message for new pooled hashes depends on the negotiated version of the stream.
1005    /// See [`NewPooledTransactionHashes`]
1006    ///
1007    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1008    fn propagate_transactions(
1009        &mut self,
1010        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1011        propagation_mode: PropagationMode,
1012    ) -> PropagatedTransactions {
1013        let mut propagated = PropagatedTransactions::default();
1014        if self.network.tx_gossip_disabled() {
1015            return propagated
1016        }
1017
1018        // send full transactions to a set of the connected peers based on the configured mode
1019        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1020
1021        // Note: Assuming ~random~ order due to random state of the peers map hasher
1022        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1023            if !self.policies.propagation_policy().can_propagate(peer) {
1024                // skip peers we should not propagate to
1025                continue
1026            }
1027            // determine whether to send full tx objects or hashes.
1028            let mut builder = if peer_idx > max_num_full {
1029                PropagateTransactionsBuilder::pooled(peer.version)
1030            } else {
1031                PropagateTransactionsBuilder::full(peer.version)
1032            };
1033
1034            if propagation_mode.is_forced() {
1035                builder.extend(to_propagate.iter());
1036            } else {
1037                // Iterate through the transactions to propagate and fill the hashes and full
1038                // transaction lists, before deciding whether or not to send full transactions to
1039                // the peer.
1040                for tx in &to_propagate {
1041                    // Only proceed if the transaction is not in the peer's list of seen
1042                    // transactions
1043                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1044                        builder.push(tx);
1045                    }
1046                }
1047            }
1048
1049            if builder.is_empty() {
1050                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1051                continue
1052            }
1053
1054            let PropagateTransactions { pooled, full } = builder.build();
1055
1056            // send hashes if any
1057            if let Some(mut new_pooled_hashes) = pooled {
1058                // enforce tx soft limit per message for the (unlikely) event the number of
1059                // hashes exceeds it
1060                new_pooled_hashes
1061                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1062
1063                for hash in new_pooled_hashes.iter_hashes().copied() {
1064                    propagated.record(hash, PropagateKind::Hash(*peer_id));
1065                    // mark transaction as seen by peer
1066                    peer.seen_transactions.insert(hash);
1067                }
1068
1069                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1070
1071                // send hashes of transactions
1072                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1073            }
1074
1075            // send full transactions, if any
1076            if let Some(new_full_transactions) = full {
1077                for tx in &new_full_transactions {
1078                    propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1079                    // mark transaction as seen by peer
1080                    peer.seen_transactions.insert(*tx.tx_hash());
1081                }
1082
1083                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1084
1085                // send full transactions
1086                self.network.send_transactions(*peer_id, new_full_transactions);
1087            }
1088        }
1089
1090        // Update propagated transactions metrics
1091        self.metrics.propagated_transactions.increment(propagated.len() as u64);
1092
1093        propagated
1094    }
1095
1096    /// Propagates the given transactions to the peers
1097    ///
1098    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1099    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1100    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1101        if self.peers.is_empty() {
1102            // nothing to propagate
1103            return
1104        }
1105        let propagated = self.propagate_transactions(
1106            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1107            PropagationMode::Basic,
1108        );
1109
1110        // notify pool so events get fired
1111        self.pool.on_propagated(propagated);
1112    }
1113
1114    /// Request handler for an incoming request for transactions
1115    fn on_get_pooled_transactions(
1116        &mut self,
1117        peer_id: PeerId,
1118        request: GetPooledTransactions,
1119        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1120    ) {
1121        // fast exit if gossip is disabled
1122        if self.network.tx_gossip_disabled() {
1123            let _ = response.send(Ok(PooledTransactions::default()));
1124            return
1125        }
1126        if let Some(peer) = self.peers.get_mut(&peer_id) {
1127            let transactions = self.pool.get_pooled_transaction_elements(
1128                request.0,
1129                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1130                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1131                ),
1132            );
1133            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1134
1135            // we sent a response at which point we assume that the peer is aware of the
1136            // transactions
1137            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1138
1139            let resp = PooledTransactions(transactions);
1140            let _ = response.send(Ok(resp));
1141        }
1142    }
1143
1144    /// Handles a command received from a detached [`TransactionsHandle`]
1145    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1146        match cmd {
1147            TransactionsCommand::PropagateHash(hash) => {
1148                self.on_new_pending_transactions(vec![hash])
1149            }
1150            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1151                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1152            }
1153            TransactionsCommand::GetActivePeers(tx) => {
1154                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1155                tx.send(peers).ok();
1156            }
1157            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1158                if let Some(propagated) =
1159                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1160                {
1161                    self.pool.on_propagated(propagated);
1162                }
1163            }
1164            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1165            TransactionsCommand::BroadcastTransactions(txs) => {
1166                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1167                self.pool.on_propagated(propagated);
1168            }
1169            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1170                let mut res = HashMap::with_capacity(peers.len());
1171                for peer_id in peers {
1172                    let hashes = self
1173                        .peers
1174                        .get(&peer_id)
1175                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1176                        .unwrap_or_default();
1177                    res.insert(peer_id, hashes);
1178                }
1179                tx.send(res).ok();
1180            }
1181            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1182                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1183                peer_request_sender.send(sender).ok();
1184            }
1185        }
1186    }
1187
1188    /// Handles session establishment and peer transactions initialization.
1189    ///
1190    /// This is invoked when a new session is established.
1191    fn handle_peer_session(
1192        &mut self,
1193        info: SessionInfo,
1194        messages: PeerRequestSender<PeerRequest<N>>,
1195    ) {
1196        let SessionInfo { peer_id, client_version, version, .. } = info;
1197
1198        // Insert a new peer into the peerset.
1199        let peer = PeerMetadata::<N>::new(
1200            messages,
1201            version,
1202            client_version,
1203            self.config.max_transactions_seen_by_peer_history,
1204            info.peer_kind,
1205        );
1206        let peer = match self.peers.entry(peer_id) {
1207            Entry::Occupied(mut entry) => {
1208                entry.insert(peer);
1209                entry.into_mut()
1210            }
1211            Entry::Vacant(entry) => entry.insert(peer),
1212        };
1213
1214        self.policies.propagation_policy_mut().on_session_established(peer);
1215
1216        // Send a `NewPooledTransactionHashes` to the peer with up to
1217        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1218        // transactions in the pool.
1219        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1220            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1221            return
1222        }
1223
1224        // Get transactions to broadcast
1225        let pooled_txs = self.pool.pooled_transactions_max(
1226            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1227        );
1228        if pooled_txs.is_empty() {
1229            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1230            return;
1231        }
1232
1233        // Build and send transaction hashes message
1234        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1235        for pooled_tx in pooled_txs {
1236            peer.seen_transactions.insert(*pooled_tx.hash());
1237            msg_builder.push_pooled(pooled_tx);
1238        }
1239
1240        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1241        let msg = msg_builder.build();
1242        self.network.send_transactions_hashes(peer_id, msg);
1243    }
1244
1245    /// Handles a received event related to common network events.
1246    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1247        match event_result {
1248            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1249                // remove the peer
1250
1251                let peer = self.peers.remove(&peer_id);
1252                if let Some(mut peer) = peer {
1253                    self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1254                }
1255                self.transaction_fetcher.remove_peer(&peer_id);
1256            }
1257            NetworkEvent::ActivePeerSession { info, messages } => {
1258                // process active peer session and broadcast available transaction from the pool
1259                self.handle_peer_session(info, messages);
1260            }
1261            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1262                let peer_id = info.peer_id;
1263                // get messages from existing peer
1264                let messages = match self.peers.get(&peer_id) {
1265                    Some(p) => p.request_tx.clone(),
1266                    None => {
1267                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1268                        return;
1269                    }
1270                };
1271                self.handle_peer_session(info, messages);
1272            }
1273            _ => {}
1274        }
1275    }
1276
1277    /// Returns true if the ingress policy allows processing messages from the given peer.
1278    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1279        if self.config.ingress_policy.allows_all() {
1280            return true;
1281        }
1282        let Some(peer) = self.peers.get(peer_id) else {
1283            return false;
1284        };
1285        self.config.ingress_policy.allows(peer.peer_kind())
1286    }
1287
1288    /// Handles dedicated transaction events related to the `eth` protocol.
1289    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1290        match event {
1291            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1292                if !self.accepts_incoming_from(&peer_id) {
1293                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1294                    return;
1295                }
1296
1297                // ensure we didn't receive any blob transactions as these are disallowed to be
1298                // broadcasted in full
1299
1300                let has_blob_txs = msg.has_eip4844();
1301
1302                let non_blob_txs = msg
1303                    .into_iter()
1304                    .map(N::PooledTransaction::try_from)
1305                    .filter_map(Result::ok)
1306                    .collect();
1307
1308                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1309
1310                if has_blob_txs {
1311                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1312                    self.report_peer_bad_transactions(peer_id);
1313                }
1314            }
1315            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1316                if !self.accepts_incoming_from(&peer_id) {
1317                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1318                    return;
1319                }
1320                self.on_new_pooled_transaction_hashes(peer_id, msg)
1321            }
1322            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1323                self.on_get_pooled_transactions(peer_id, request, response)
1324            }
1325            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1326                let _ = response.send(Some(self.handle()));
1327            }
1328        }
1329    }
1330
1331    /// Starts the import process for the given transactions.
1332    fn import_transactions(
1333        &mut self,
1334        peer_id: PeerId,
1335        transactions: PooledTransactions<N::PooledTransaction>,
1336        source: TransactionSource,
1337    ) {
1338        // If the node is pipeline syncing, ignore transactions
1339        if self.network.is_initially_syncing() {
1340            return
1341        }
1342        if self.network.tx_gossip_disabled() {
1343            return
1344        }
1345
1346        // Early return if we don't have capacity for any imports
1347        if !self.has_capacity_for_pending_pool_imports() {
1348            return
1349        }
1350
1351        let mut transactions = transactions.0;
1352
1353        // Truncate to remaining capacity early to bound work on all subsequent processing.
1354        // Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
1355        // malicious and we avoid wasting CPU on them.
1356        let capacity = self.remaining_pool_import_capacity();
1357        if transactions.len() > capacity {
1358            let skipped = transactions.len() - capacity;
1359            transactions.truncate(capacity);
1360            self.metrics
1361                .skipped_transactions_pending_pool_imports_at_capacity
1362                .increment(skipped as u64);
1363            trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1364        }
1365
1366        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1367        let client_version = peer.client_version.clone();
1368
1369        let start = Instant::now();
1370
1371        // mark the transactions as received
1372        self.transaction_fetcher
1373            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1374
1375        // track that the peer knows these transaction, but only if this is a new broadcast.
1376        // If we received the transactions as the response to our `GetPooledTransactions``
1377        // requests (based on received `NewPooledTransactionHashes`) then we already
1378        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1379        let mut num_already_seen_by_peer = 0;
1380        for tx in &transactions {
1381            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1382                num_already_seen_by_peer += 1;
1383            }
1384        }
1385
1386        // tracks the quality of the given transactions
1387        let mut has_bad_transactions = false;
1388
1389        // 1. Remove known, already-tracked, and invalid transactions first since these are
1390        // cheap in-memory checks against local maps
1391        transactions.retain(|tx| {
1392            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1393                entry.get_mut().insert(peer_id);
1394                return false
1395            }
1396            if self.bad_imports.contains(tx.tx_hash()) {
1397                trace!(target: "net::tx",
1398                    peer_id=format!("{peer_id:#}"),
1399                    hash=%tx.tx_hash(),
1400                    %client_version,
1401                    "received a known bad transaction from peer"
1402                );
1403                has_bad_transactions = true;
1404                return false;
1405            }
1406            true
1407        });
1408
1409        // 2. filter out txns already inserted into pool
1410        let txns_count_pre_pool_filter = transactions.len();
1411        self.pool.retain_unknown(&mut transactions);
1412        if txns_count_pre_pool_filter > transactions.len() {
1413            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1414            self.metrics
1415                .occurrences_transactions_already_in_pool
1416                .increment(already_known_txns_count as u64);
1417        }
1418
1419        let txs_len = transactions.len();
1420
1421        let new_txs = transactions
1422            .into_par_iter()
1423            .filter_map(|tx| match tx.try_into_recovered() {
1424                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1425                Err(badtx) => {
1426                    trace!(target: "net::tx",
1427                        peer_id=format!("{peer_id:#}"),
1428                        hash=%badtx.tx_hash(),
1429                        client_version=%client_version,
1430                        "failed ecrecovery for transaction"
1431                    );
1432                    None
1433                }
1434            })
1435            .collect::<Vec<_>>();
1436
1437        has_bad_transactions |= new_txs.len() != txs_len;
1438
1439        // Record the transactions as seen by the peer
1440        for tx in &new_txs {
1441            self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1442        }
1443
1444        // 3. import new transactions as a batch to minimize lock contention on the underlying
1445        // pool
1446        if !new_txs.is_empty() {
1447            let pool = self.pool.clone();
1448            // update metrics
1449            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1450            metric_pending_pool_imports.increment(new_txs.len() as f64);
1451
1452            // update self-monitoring info
1453            self.pending_pool_imports_info
1454                .pending_pool_imports
1455                .fetch_add(new_txs.len(), Ordering::Relaxed);
1456            let tx_manager_info_pending_pool_imports =
1457                self.pending_pool_imports_info.pending_pool_imports.clone();
1458
1459            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1460            let import = Box::pin(async move {
1461                let added = new_txs.len();
1462                let res = pool.add_external_transactions(new_txs).await;
1463
1464                // update metrics
1465                metric_pending_pool_imports.decrement(added as f64);
1466                // update self-monitoring info
1467                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1468
1469                res
1470            });
1471
1472            self.pool_imports.push(import);
1473        }
1474
1475        if num_already_seen_by_peer > 0 {
1476            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1477            self.metrics
1478                .occurrences_of_transaction_already_seen_by_peer
1479                .increment(num_already_seen_by_peer);
1480            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1481        }
1482
1483        if has_bad_transactions {
1484            // peer sent us invalid transactions
1485            self.report_peer_bad_transactions(peer_id)
1486        }
1487
1488        if num_already_seen_by_peer > 0 {
1489            self.report_already_seen(peer_id);
1490        }
1491
1492        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1493    }
1494
1495    /// Processes a [`FetchEvent`].
1496    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1497        match fetch_event {
1498            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1499                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1500                if report_peer {
1501                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1502                }
1503            }
1504            FetchEvent::FetchError { peer_id, error } => {
1505                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1506                self.on_request_error(peer_id, error);
1507            }
1508            FetchEvent::EmptyResponse { peer_id } => {
1509                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1510            }
1511        }
1512    }
1513}
1514
1515/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1516/// [`crate::NetworkManager`] for more context on the design pattern.
1517///
1518/// This should be spawned or used as part of `tokio::select!`.
1519//
1520// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1521// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1522impl<
1523        Pool: TransactionPool + Unpin + 'static,
1524        N: NetworkPrimitives<
1525                BroadcastedTransaction: SignedTransaction,
1526                PooledTransaction: SignedTransaction,
1527            > + Unpin,
1528    > Future for TransactionsManager<Pool, N>
1529where
1530    Pool::Transaction:
1531        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1532{
1533    type Output = ();
1534
1535    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1536        let start = Instant::now();
1537        let mut poll_durations = TxManagerPollDurations::default();
1538
1539        let this = self.get_mut();
1540
1541        // All streams are polled until their corresponding budget is exhausted, then we manually
1542        // yield back control to tokio. See `NetworkManager` for more context on the design
1543        // pattern.
1544
1545        // Advance network/peer related events (update peers map).
1546        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1547            poll_durations.acc_network_events,
1548            "net::tx",
1549            "Network events stream",
1550            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1551            this.network_events.poll_next_unpin(cx),
1552            |event| this.on_network_event(event)
1553        );
1554
1555        // Advances new __pending__ transactions, transactions that were successfully inserted into
1556        // pending set in pool (are valid), and propagates them (inform peers which
1557        // transactions we have seen).
1558        //
1559        // We try to drain this to batch the transactions in a single message.
1560        //
1561        // We don't expect this buffer to be large, since only pending transactions are
1562        // emitted here.
1563        let mut new_txs = Vec::new();
1564        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1565            cx,
1566            &mut new_txs,
1567            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1568        ) {
1569            Poll::Ready(count) => {
1570                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1571                    // we filled the entire buffer capacity and need to try again on the next poll
1572                    // immediately
1573                    true
1574                } else {
1575                    // try once more, because mostlikely the channel is now empty and the waker is
1576                    // registered if this is pending, if we filled additional hashes, we poll again
1577                    // on the next iteration
1578                    let limit =
1579                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1580                            new_txs.len();
1581                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1582                }
1583            }
1584            Poll::Pending => false,
1585        };
1586        if !new_txs.is_empty() {
1587            this.on_new_pending_transactions(new_txs);
1588        }
1589
1590        // Advance incoming transaction events (stream new txns/announcements from
1591        // network manager and queue for import to pool/fetch txns).
1592        //
1593        // This will potentially remove hashes from hashes pending fetch, it the event
1594        // is an announcement (if same hashes are announced that didn't fit into a
1595        // previous request).
1596        //
1597        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1598        // (128 KiB / 10 bytes > 13k transactions).
1599        //
1600        // If this is an event with `Transactions` message, since transactions aren't
1601        // validated until they are inserted into the pool, this can potentially queue
1602        // >13k transactions for insertion to pool. More if the message size is bigger
1603        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1604        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1605            poll_durations.acc_tx_events,
1606            "net::tx",
1607            "Network transaction events stream",
1608            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1609            this.transaction_events.poll_next_unpin(cx),
1610            |event| this.on_network_tx_event(event),
1611        );
1612
1613        // Advance inflight fetch requests (flush transaction fetcher and queue for
1614        // import to pool).
1615        //
1616        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1617        // (2 MiB / 10 bytes > 200k transactions).
1618        //
1619        // Since transactions aren't validated until they are inserted into the pool,
1620        // this can potentially queue >200k transactions for insertion to pool. More
1621        // if the message size is bigger than the soft limit on a `PooledTransactions`
1622        // response which is 2 MiB.
1623        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1624            poll_durations.acc_fetch_events,
1625            "net::tx",
1626            "Transaction fetch events stream",
1627            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1628            this.transaction_fetcher.poll_next_unpin(cx),
1629            |event| this.on_fetch_event(event),
1630        );
1631
1632        // Advance pool imports (flush txns to pool).
1633        //
1634        // Note, this is done in batches. A batch is filled from one `Transactions`
1635        // broadcast messages or one `PooledTransactions` response at a time. The
1636        // minimum batch size is 1 transaction (and might often be the case with blob
1637        // transactions).
1638        //
1639        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1640        // (2 MiB / 10 bytes > 200k transactions).
1641        //
1642        // Since transactions aren't validated until they are inserted into the pool,
1643        // this can potentially validate >200k transactions. More if the message size
1644        // is bigger than the soft limit on a `PooledTransactions` response which is
1645        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1646        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1647            poll_durations.acc_pending_imports,
1648            "net::tx",
1649            "Batched pool imports stream",
1650            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1651            this.pool_imports.poll_next_unpin(cx),
1652            |batch_results| this.on_batch_import_result(batch_results)
1653        );
1654
1655        // Tries to drain hashes pending fetch cache if the tx manager currently has
1656        // capacity for this (fetch txns).
1657        //
1658        // Sends at most one request.
1659        duration_metered_exec!(
1660            {
1661                if this.has_capacity_for_fetching_pending_hashes() &&
1662                    this.on_fetch_hashes_pending_fetch()
1663                {
1664                    maybe_more_tx_fetch_events = true;
1665                }
1666            },
1667            poll_durations.acc_pending_fetch
1668        );
1669
1670        // Advance commands (propagate/fetch/serve txns).
1671        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1672            poll_durations.acc_cmds,
1673            "net::tx",
1674            "Commands channel",
1675            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1676            this.command_rx.poll_next_unpin(cx),
1677            |cmd| this.on_command(cmd)
1678        );
1679
1680        this.transaction_fetcher.update_metrics();
1681
1682        // all channels are fully drained and import futures pending
1683        if maybe_more_network_events ||
1684            maybe_more_commands ||
1685            maybe_more_tx_events ||
1686            maybe_more_tx_fetch_events ||
1687            maybe_more_pool_imports ||
1688            maybe_more_pending_txns
1689        {
1690            // make sure we're woken up again
1691            cx.waker().wake_by_ref();
1692            return Poll::Pending
1693        }
1694
1695        this.update_poll_metrics(start, poll_durations);
1696
1697        Poll::Pending
1698    }
1699}
1700
1701/// Represents the different modes of transaction propagation.
1702///
1703/// This enum is used to determine how transactions are propagated to peers in the network.
1704#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1705enum PropagationMode {
1706    /// Default propagation mode.
1707    ///
1708    /// Transactions are only sent to peers that haven't seen them yet.
1709    Basic,
1710    /// Forced propagation mode.
1711    ///
1712    /// Transactions are sent to all peers regardless of whether they have been sent or received
1713    /// before.
1714    Forced,
1715}
1716
1717impl PropagationMode {
1718    /// Returns `true` if the propagation kind is `Forced`.
1719    const fn is_forced(self) -> bool {
1720        matches!(self, Self::Forced)
1721    }
1722}
1723
1724/// A transaction that's about to be propagated to multiple peers.
1725#[derive(Debug, Clone)]
1726struct PropagateTransaction<T = TransactionSigned> {
1727    size: usize,
1728    transaction: Arc<T>,
1729}
1730
1731impl<T: SignedTransaction> PropagateTransaction<T> {
1732    /// Create a new instance from a transaction.
1733    pub fn new(transaction: T) -> Self {
1734        let size = transaction.length();
1735        Self { size, transaction: Arc::new(transaction) }
1736    }
1737
1738    /// Create a new instance from a pooled transaction
1739    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1740    where
1741        P: PoolTransaction<Consensus = T>,
1742    {
1743        let size = tx.encoded_length();
1744        let transaction = tx.transaction.clone_into_consensus();
1745        let transaction = Arc::new(transaction.into_inner());
1746        Self { size, transaction }
1747    }
1748
1749    fn tx_hash(&self) -> &TxHash {
1750        self.transaction.tx_hash()
1751    }
1752}
1753
1754/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1755/// should receive them in full or as pooled
1756#[derive(Debug, Clone)]
1757enum PropagateTransactionsBuilder<T> {
1758    Pooled(PooledTransactionsHashesBuilder),
1759    Full(FullTransactionsBuilder<T>),
1760}
1761
1762impl<T> PropagateTransactionsBuilder<T> {
1763    /// Create a builder for pooled transactions
1764    fn pooled(version: EthVersion) -> Self {
1765        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1766    }
1767
1768    /// Create a builder that sends transactions in full and records transactions that don't fit.
1769    fn full(version: EthVersion) -> Self {
1770        Self::Full(FullTransactionsBuilder::new(version))
1771    }
1772
1773    /// Returns true if no transactions are recorded.
1774    fn is_empty(&self) -> bool {
1775        match self {
1776            Self::Pooled(builder) => builder.is_empty(),
1777            Self::Full(builder) => builder.is_empty(),
1778        }
1779    }
1780
1781    /// Consumes the type and returns the built messages that should be sent to the peer.
1782    fn build(self) -> PropagateTransactions<T> {
1783        match self {
1784            Self::Pooled(pooled) => {
1785                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1786            }
1787            Self::Full(full) => full.build(),
1788        }
1789    }
1790}
1791
1792impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1793    /// Appends all transactions
1794    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1795        for tx in txs {
1796            self.push(tx);
1797        }
1798    }
1799
1800    /// Appends a transaction to the list.
1801    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1802        match self {
1803            Self::Pooled(builder) => builder.push(transaction),
1804            Self::Full(builder) => builder.push(transaction),
1805        }
1806    }
1807}
1808
1809/// Represents how the transactions should be sent to a peer if any.
1810struct PropagateTransactions<T> {
1811    /// The pooled transaction hashes to send.
1812    pooled: Option<NewPooledTransactionHashes>,
1813    /// The transactions to send in full.
1814    full: Option<Vec<Arc<T>>>,
1815}
1816
1817/// Helper type for constructing the full transaction message that enforces the
1818/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1819/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1820/// broadcasted in full.
1821#[derive(Debug, Clone)]
1822struct FullTransactionsBuilder<T> {
1823    /// The soft limit to enforce for a single broadcast message of full transactions.
1824    total_size: usize,
1825    /// All transactions to be broadcasted.
1826    transactions: Vec<Arc<T>>,
1827    /// Transactions that didn't fit into the broadcast message
1828    pooled: PooledTransactionsHashesBuilder,
1829}
1830
1831impl<T> FullTransactionsBuilder<T> {
1832    /// Create a builder for the negotiated version of the peer's session
1833    fn new(version: EthVersion) -> Self {
1834        Self {
1835            total_size: 0,
1836            pooled: PooledTransactionsHashesBuilder::new(version),
1837            transactions: vec![],
1838        }
1839    }
1840
1841    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1842    fn is_empty(&self) -> bool {
1843        self.transactions.is_empty() && self.pooled.is_empty()
1844    }
1845
1846    /// Returns the messages that should be propagated to the peer.
1847    fn build(self) -> PropagateTransactions<T> {
1848        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1849        let full = Some(self.transactions).filter(|full| !full.is_empty());
1850        PropagateTransactions { pooled, full }
1851    }
1852}
1853
1854impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1855    /// Appends all transactions.
1856    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1857        for tx in txs {
1858            self.push(&tx)
1859        }
1860    }
1861
1862    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1863    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1864    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1865    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1866    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1867    ///
1868    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1869    /// to list of pooled transactions, (e.g. 4844 transactions).
1870    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1871    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1872        // Do not send full 4844 transaction hashes to peers.
1873        //
1874        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1875        //  Instead, those transactions are only announced using
1876        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1877        //  via `GetPooledTransactions`.
1878        //
1879        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1880        if !transaction.transaction.is_broadcastable_in_full() {
1881            self.pooled.push(transaction);
1882            return
1883        }
1884
1885        let new_size = self.total_size + transaction.size;
1886        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1887            self.total_size > 0
1888        {
1889            // transaction does not fit into the message
1890            self.pooled.push(transaction);
1891            return
1892        }
1893
1894        self.total_size = new_size;
1895        self.transactions.push(Arc::clone(&transaction.transaction));
1896    }
1897}
1898
1899/// A helper type to create the pooled transactions message based on the negotiated version of the
1900/// session with the peer
1901#[derive(Debug, Clone)]
1902enum PooledTransactionsHashesBuilder {
1903    Eth66(NewPooledTransactionHashes66),
1904    Eth68(NewPooledTransactionHashes68),
1905}
1906
1907// === impl PooledTransactionsHashesBuilder ===
1908
1909impl PooledTransactionsHashesBuilder {
1910    /// Push a transaction from the pool to the list.
1911    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1912        match self {
1913            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1914            Self::Eth68(msg) => {
1915                msg.hashes.push(*pooled_tx.hash());
1916                msg.sizes.push(pooled_tx.encoded_length());
1917                msg.types.push(pooled_tx.transaction.ty());
1918            }
1919        }
1920    }
1921
1922    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1923    fn is_empty(&self) -> bool {
1924        match self {
1925            Self::Eth66(hashes) => hashes.is_empty(),
1926            Self::Eth68(hashes) => hashes.is_empty(),
1927        }
1928    }
1929
1930    /// Returns the number of transactions in the builder.
1931    fn len(&self) -> usize {
1932        match self {
1933            Self::Eth66(hashes) => hashes.len(),
1934            Self::Eth68(hashes) => hashes.len(),
1935        }
1936    }
1937
1938    /// Appends all hashes
1939    fn extend<T: SignedTransaction>(
1940        &mut self,
1941        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1942    ) {
1943        for tx in txs {
1944            self.push(&tx);
1945        }
1946    }
1947
1948    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1949        match self {
1950            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1951            Self::Eth68(msg) => {
1952                msg.hashes.push(*tx.tx_hash());
1953                msg.sizes.push(tx.size);
1954                msg.types.push(tx.transaction.ty());
1955            }
1956        }
1957    }
1958
1959    /// Create a builder for the negotiated version of the peer's session
1960    fn new(version: EthVersion) -> Self {
1961        match version {
1962            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1963            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1964                Self::Eth68(Default::default())
1965            }
1966        }
1967    }
1968
1969    fn build(self) -> NewPooledTransactionHashes {
1970        match self {
1971            Self::Eth66(mut msg) => {
1972                msg.0.shrink_to_fit();
1973                msg.into()
1974            }
1975            Self::Eth68(mut msg) => {
1976                msg.shrink_to_fit();
1977                msg.into()
1978            }
1979        }
1980    }
1981}
1982
1983/// How we received the transactions.
1984enum TransactionSource {
1985    /// Transactions were broadcast to us via [`Transactions`] message.
1986    Broadcast,
1987    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1988    Response,
1989}
1990
1991// === impl TransactionSource ===
1992
1993impl TransactionSource {
1994    /// Whether the transaction were sent as broadcast.
1995    const fn is_broadcast(&self) -> bool {
1996        matches!(self, Self::Broadcast)
1997    }
1998}
1999
2000/// Tracks a single peer in the context of [`TransactionsManager`].
2001#[derive(Debug)]
2002pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2003    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
2004    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
2005    /// the peer.
2006    seen_transactions: LruCache<TxHash>,
2007    /// A communication channel directly to the peer's session task.
2008    request_tx: PeerRequestSender<PeerRequest<N>>,
2009    /// negotiated version of the session.
2010    version: EthVersion,
2011    /// The peer's client version.
2012    client_version: Arc<str>,
2013    /// The kind of peer.
2014    peer_kind: PeerKind,
2015}
2016
2017impl<N: NetworkPrimitives> PeerMetadata<N> {
2018    /// Returns a new instance of [`PeerMetadata`].
2019    pub fn new(
2020        request_tx: PeerRequestSender<PeerRequest<N>>,
2021        version: EthVersion,
2022        client_version: Arc<str>,
2023        max_transactions_seen_by_peer: u32,
2024        peer_kind: PeerKind,
2025    ) -> Self {
2026        Self {
2027            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2028            request_tx,
2029            version,
2030            client_version,
2031            peer_kind,
2032        }
2033    }
2034
2035    /// Returns a reference to the peer's request sender channel.
2036    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2037        &self.request_tx
2038    }
2039
2040    /// Returns a mutable reference to the seen transactions LRU cache.
2041    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2042        &mut self.seen_transactions
2043    }
2044
2045    /// Returns the negotiated `EthVersion` of the session.
2046    pub const fn version(&self) -> EthVersion {
2047        self.version
2048    }
2049
2050    /// Returns a reference to the peer's client version string.
2051    pub fn client_version(&self) -> &str {
2052        &self.client_version
2053    }
2054
2055    /// Returns the peer's kind.
2056    pub const fn peer_kind(&self) -> PeerKind {
2057        self.peer_kind
2058    }
2059}
2060
2061/// Commands to send to the [`TransactionsManager`]
2062#[derive(Debug)]
2063enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2064    /// Propagate a transaction hash to the network.
2065    PropagateHash(B256),
2066    /// Propagate transaction hashes to a specific peer.
2067    PropagateHashesTo(Vec<B256>, PeerId),
2068    /// Request the list of active peer IDs from the [`TransactionsManager`].
2069    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2070    /// Propagate a collection of full transactions to a specific peer.
2071    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2072    /// Propagate a collection of hashes to all peers.
2073    PropagateTransactions(Vec<TxHash>),
2074    /// Propagate a collection of broadcastable transactions in full to all peers.
2075    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2076    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2077    GetTransactionHashes {
2078        peers: Vec<PeerId>,
2079        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2080    },
2081    /// Requests a clone of the sender channel to the peer.
2082    GetPeerSender {
2083        peer_id: PeerId,
2084        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2085    },
2086}
2087
2088/// All events related to transactions emitted by the network.
2089#[derive(Debug)]
2090pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2091    /// Represents the event of receiving a list of transactions from a peer.
2092    ///
2093    /// This indicates transactions that were broadcasted to us from the peer.
2094    IncomingTransactions {
2095        /// The ID of the peer from which the transactions were received.
2096        peer_id: PeerId,
2097        /// The received transactions.
2098        msg: Transactions<N::BroadcastedTransaction>,
2099    },
2100    /// Represents the event of receiving a list of transaction hashes from a peer.
2101    IncomingPooledTransactionHashes {
2102        /// The ID of the peer from which the transaction hashes were received.
2103        peer_id: PeerId,
2104        /// The received new pooled transaction hashes.
2105        msg: NewPooledTransactionHashes,
2106    },
2107    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2108    GetPooledTransactions {
2109        /// The ID of the peer from which the request was received.
2110        peer_id: PeerId,
2111        /// The received `GetPooledTransactions` request.
2112        request: GetPooledTransactions,
2113        /// The sender for responding to the request with a result of `PooledTransactions`.
2114        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2115    },
2116    /// Represents the event of receiving a `GetTransactionsHandle` request.
2117    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2118}
2119
2120/// Tracks stats about the [`TransactionsManager`].
2121#[derive(Debug)]
2122pub struct PendingPoolImportsInfo {
2123    /// Number of transactions about to be inserted into the pool.
2124    pending_pool_imports: Arc<AtomicUsize>,
2125    /// Max number of transactions allowed to be imported concurrently.
2126    max_pending_pool_imports: usize,
2127}
2128
2129impl PendingPoolImportsInfo {
2130    /// Returns a new [`PendingPoolImportsInfo`].
2131    pub fn new(max_pending_pool_imports: usize) -> Self {
2132        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2133    }
2134
2135    /// Returns `true` if the number of pool imports is under a given tolerated max.
2136    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2137        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2138    }
2139}
2140
2141impl Default for PendingPoolImportsInfo {
2142    fn default() -> Self {
2143        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2144    }
2145}
2146
2147#[derive(Debug, Default)]
2148struct TxManagerPollDurations {
2149    acc_network_events: Duration,
2150    acc_pending_imports: Duration,
2151    acc_tx_events: Duration,
2152    acc_imported_txns: Duration,
2153    acc_fetch_events: Duration,
2154    acc_pending_fetch: Duration,
2155    acc_cmds: Duration,
2156}
2157
2158#[cfg(test)]
2159mod tests {
2160    use super::*;
2161    use crate::{
2162        test_utils::{
2163            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2164            Testnet,
2165        },
2166        transactions::config::RelaxedEthAnnouncementFilter,
2167        NetworkConfigBuilder, NetworkManager,
2168    };
2169    use alloy_consensus::{TxEip1559, TxLegacy};
2170    use alloy_primitives::{hex, Signature, TxKind, U256};
2171    use alloy_rlp::Decodable;
2172    use futures::FutureExt;
2173    use reth_chainspec::MIN_TRANSACTION_GAS;
2174    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2175    use reth_network_api::{NetworkInfo, PeerKind};
2176    use reth_network_p2p::{
2177        error::{RequestError, RequestResult},
2178        sync::{NetworkSyncUpdater, SyncState},
2179    };
2180    use reth_storage_api::noop::NoopProvider;
2181    use reth_tasks::Runtime;
2182    use reth_transaction_pool::test_utils::{
2183        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2184    };
2185    use secp256k1::SecretKey;
2186    use std::{
2187        future::poll_fn,
2188        net::{IpAddr, Ipv4Addr, SocketAddr},
2189        str::FromStr,
2190    };
2191    use tracing::error;
2192
2193    #[tokio::test(flavor = "multi_thread")]
2194    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2195        reth_tracing::init_test_tracing();
2196        let net = Testnet::create(3).await;
2197
2198        let mut handles = net.handles();
2199        let handle0 = handles.next().unwrap();
2200        let handle1 = handles.next().unwrap();
2201
2202        drop(handles);
2203        let handle = net.spawn();
2204
2205        let listener0 = handle0.event_listener();
2206        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2207        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2208
2209        let client = NoopProvider::default();
2210        let pool = testing_pool();
2211        let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2212            .disable_discovery()
2213            .listener_port(0)
2214            .build(client);
2215        let transactions_manager_config = config.transactions_manager_config.clone();
2216        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2217            .await
2218            .unwrap()
2219            .into_builder()
2220            .transactions(pool.clone(), transactions_manager_config)
2221            .split_with_handle();
2222
2223        tokio::task::spawn(network);
2224
2225        // go to syncing (pipeline sync)
2226        network_handle.update_sync_state(SyncState::Syncing);
2227        assert!(NetworkInfo::is_syncing(&network_handle));
2228        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2229
2230        // wait for all initiator connections
2231        let mut established = listener0.take(2);
2232        while let Some(ev) = established.next().await {
2233            match ev {
2234                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2235                    // to insert a new peer in transactions peerset
2236                    transactions
2237                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2238                }
2239                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2240                ev => {
2241                    error!("unexpected event {ev:?}")
2242                }
2243            }
2244        }
2245        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2246        let input = hex!(
2247            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2248        );
2249        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2250        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2251            peer_id: *handle1.peer_id(),
2252            msg: Transactions(vec![signed_tx.clone()]),
2253        });
2254        poll_fn(|cx| {
2255            let _ = transactions.poll_unpin(cx);
2256            Poll::Ready(())
2257        })
2258        .await;
2259        assert!(pool.is_empty());
2260        handle.terminate().await;
2261    }
2262
2263    #[tokio::test(flavor = "multi_thread")]
2264    async fn test_tx_broadcasts_through_two_syncs() {
2265        reth_tracing::init_test_tracing();
2266        let net = Testnet::create(3).await;
2267
2268        let mut handles = net.handles();
2269        let handle0 = handles.next().unwrap();
2270        let handle1 = handles.next().unwrap();
2271
2272        drop(handles);
2273        let handle = net.spawn();
2274
2275        let listener0 = handle0.event_listener();
2276        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2277        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2278
2279        let client = NoopProvider::default();
2280        let pool = testing_pool();
2281        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2282            .disable_discovery()
2283            .listener_port(0)
2284            .build(client);
2285        let transactions_manager_config = config.transactions_manager_config.clone();
2286        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2287            .await
2288            .unwrap()
2289            .into_builder()
2290            .transactions(pool.clone(), transactions_manager_config)
2291            .split_with_handle();
2292
2293        tokio::task::spawn(network);
2294
2295        // go to syncing (pipeline sync) to idle and then to syncing (live)
2296        network_handle.update_sync_state(SyncState::Syncing);
2297        assert!(NetworkInfo::is_syncing(&network_handle));
2298        network_handle.update_sync_state(SyncState::Idle);
2299        assert!(!NetworkInfo::is_syncing(&network_handle));
2300        network_handle.update_sync_state(SyncState::Syncing);
2301        assert!(NetworkInfo::is_syncing(&network_handle));
2302
2303        // wait for all initiator connections
2304        let mut established = listener0.take(2);
2305        while let Some(ev) = established.next().await {
2306            match ev {
2307                NetworkEvent::ActivePeerSession { .. } |
2308                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2309                    // to insert a new peer in transactions peerset
2310                    transactions.on_network_event(ev);
2311                }
2312                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2313                _ => {
2314                    error!("unexpected event {ev:?}")
2315                }
2316            }
2317        }
2318        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2319        let input = hex!(
2320            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2321        );
2322        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2323        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2324            peer_id: *handle1.peer_id(),
2325            msg: Transactions(vec![signed_tx.clone()]),
2326        });
2327        poll_fn(|cx| {
2328            let _ = transactions.poll_unpin(cx);
2329            Poll::Ready(())
2330        })
2331        .await;
2332        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2333        assert!(NetworkInfo::is_syncing(&network_handle));
2334        assert!(!pool.is_empty());
2335        handle.terminate().await;
2336    }
2337
2338    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2339    // event and is able to retrieve the corresponding transactions.
2340    #[tokio::test(flavor = "multi_thread")]
2341    async fn test_handle_incoming_transactions_hashes() {
2342        reth_tracing::init_test_tracing();
2343
2344        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2345        let client = NoopProvider::default();
2346
2347        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2348            // let OS choose port
2349            .listener_port(0)
2350            .disable_discovery()
2351            .build(client);
2352
2353        let pool = testing_pool();
2354
2355        let transactions_manager_config = config.transactions_manager_config.clone();
2356        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2357            .await
2358            .unwrap()
2359            .into_builder()
2360            .transactions(pool.clone(), transactions_manager_config)
2361            .split_with_handle();
2362
2363        let peer_id_1 = PeerId::new([1; 64]);
2364        let eth_version = EthVersion::Eth66;
2365
2366        let txs = vec![TransactionSigned::new_unhashed(
2367            Transaction::Legacy(TxLegacy {
2368                chain_id: Some(4),
2369                nonce: 15u64,
2370                gas_price: 2200000000,
2371                gas_limit: 34811,
2372                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2373                value: U256::from(1234u64),
2374                input: Default::default(),
2375            }),
2376            Signature::new(
2377                U256::from_str(
2378                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2379                )
2380                .unwrap(),
2381                U256::from_str(
2382                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2383                )
2384                .unwrap(),
2385                true,
2386            ),
2387        )];
2388
2389        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2390
2391        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2392        tx_manager.peers.insert(peer_id_1, peer_1);
2393
2394        assert!(pool.is_empty());
2395
2396        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2397            peer_id: peer_id_1,
2398            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2399                txs_hashes.clone(),
2400            )),
2401        });
2402
2403        // mock session of peer_1 receives request
2404        let req = to_mock_session_rx
2405            .recv()
2406            .await
2407            .expect("peer_1 session should receive request with buffered hashes");
2408        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2409        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2410
2411        let message: Vec<PooledTransactionVariant> = txs
2412            .into_iter()
2413            .map(|tx| {
2414                PooledTransactionVariant::try_from(tx)
2415                    .expect("Failed to convert MockTransaction to PooledTransaction")
2416            })
2417            .collect();
2418
2419        // return the transactions corresponding to the transaction hashes.
2420        response
2421            .send(Ok(PooledTransactions(message)))
2422            .expect("should send peer_1 response to tx manager");
2423
2424        // adance the transaction manager future
2425        poll_fn(|cx| {
2426            let _ = tx_manager.poll_unpin(cx);
2427            Poll::Ready(())
2428        })
2429        .await;
2430
2431        // ensure that the transactions corresponding to the transaction hashes have been
2432        // successfully retrieved and stored in the Pool.
2433        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2434    }
2435
2436    #[tokio::test(flavor = "multi_thread")]
2437    async fn test_handle_incoming_transactions() {
2438        reth_tracing::init_test_tracing();
2439        let net = Testnet::create(3).await;
2440
2441        let mut handles = net.handles();
2442        let handle0 = handles.next().unwrap();
2443        let handle1 = handles.next().unwrap();
2444
2445        drop(handles);
2446        let handle = net.spawn();
2447
2448        let listener0 = handle0.event_listener();
2449
2450        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2451        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2452
2453        let client = NoopProvider::default();
2454        let pool = testing_pool();
2455        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2456            .disable_discovery()
2457            .listener_port(0)
2458            .build(client);
2459        let transactions_manager_config = config.transactions_manager_config.clone();
2460        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2461            .await
2462            .unwrap()
2463            .into_builder()
2464            .transactions(pool.clone(), transactions_manager_config)
2465            .split_with_handle();
2466        tokio::task::spawn(network);
2467
2468        network_handle.update_sync_state(SyncState::Idle);
2469
2470        assert!(!NetworkInfo::is_syncing(&network_handle));
2471
2472        // wait for all initiator connections
2473        let mut established = listener0.take(2);
2474        while let Some(ev) = established.next().await {
2475            match ev {
2476                NetworkEvent::ActivePeerSession { .. } |
2477                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2478                    // to insert a new peer in transactions peerset
2479                    transactions.on_network_event(ev);
2480                }
2481                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2482                ev => {
2483                    error!("unexpected event {ev:?}")
2484                }
2485            }
2486        }
2487        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2488        let input = hex!(
2489            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2490        );
2491        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2492        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2493            peer_id: *handle1.peer_id(),
2494            msg: Transactions(vec![signed_tx.clone()]),
2495        });
2496        assert!(transactions
2497            .transactions_by_peers
2498            .get(signed_tx.tx_hash())
2499            .unwrap()
2500            .contains(handle1.peer_id()));
2501
2502        // advance the transaction manager future
2503        poll_fn(|cx| {
2504            let _ = transactions.poll_unpin(cx);
2505            Poll::Ready(())
2506        })
2507        .await;
2508
2509        assert!(!pool.is_empty());
2510        assert!(pool.get(signed_tx.tx_hash()).is_some());
2511        handle.terminate().await;
2512    }
2513
2514    #[tokio::test(flavor = "multi_thread")]
2515    async fn test_on_get_pooled_transactions_network() {
2516        reth_tracing::init_test_tracing();
2517        let net = Testnet::create(2).await;
2518
2519        let mut handles = net.handles();
2520        let handle0 = handles.next().unwrap();
2521        let handle1 = handles.next().unwrap();
2522
2523        drop(handles);
2524        let handle = net.spawn();
2525
2526        let listener0 = handle0.event_listener();
2527
2528        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2529        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2530
2531        let client = NoopProvider::default();
2532        let pool = testing_pool();
2533        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2534            .disable_discovery()
2535            .listener_port(0)
2536            .build(client);
2537        let transactions_manager_config = config.transactions_manager_config.clone();
2538        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2539            .await
2540            .unwrap()
2541            .into_builder()
2542            .transactions(pool.clone(), transactions_manager_config)
2543            .split_with_handle();
2544        tokio::task::spawn(network);
2545
2546        network_handle.update_sync_state(SyncState::Idle);
2547
2548        assert!(!NetworkInfo::is_syncing(&network_handle));
2549
2550        // wait for all initiator connections
2551        let mut established = listener0.take(2);
2552        while let Some(ev) = established.next().await {
2553            match ev {
2554                NetworkEvent::ActivePeerSession { .. } |
2555                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2556                    transactions.on_network_event(ev);
2557                }
2558                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2559                ev => {
2560                    error!("unexpected event {ev:?}")
2561                }
2562            }
2563        }
2564        handle.terminate().await;
2565
2566        let tx = MockTransaction::eip1559();
2567        let _ = transactions
2568            .pool
2569            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2570            .await;
2571
2572        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2573
2574        let (send, receive) =
2575            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2576
2577        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2578            peer_id: *handle1.peer_id(),
2579            request,
2580            response: send,
2581        });
2582
2583        match receive.await.unwrap() {
2584            Ok(PooledTransactions(transactions)) => {
2585                assert_eq!(transactions.len(), 1);
2586            }
2587            Err(e) => {
2588                panic!("error: {e:?}");
2589            }
2590        }
2591    }
2592
2593    // Ensure that when the remote peer only returns part of the requested transactions, the
2594    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2595    // re-buffered.
2596    #[tokio::test]
2597    async fn test_partially_tx_response() {
2598        reth_tracing::init_test_tracing();
2599
2600        let mut tx_manager = new_tx_manager().await.0;
2601        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2602
2603        let peer_id_1 = PeerId::new([1; 64]);
2604        let eth_version = EthVersion::Eth66;
2605
2606        let txs = vec![
2607            TransactionSigned::new_unhashed(
2608                Transaction::Legacy(TxLegacy {
2609                    chain_id: Some(4),
2610                    nonce: 15u64,
2611                    gas_price: 2200000000,
2612                    gas_limit: 34811,
2613                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2614                    value: U256::from(1234u64),
2615                    input: Default::default(),
2616                }),
2617                Signature::new(
2618                    U256::from_str(
2619                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2620                    )
2621                    .unwrap(),
2622                    U256::from_str(
2623                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2624                    )
2625                    .unwrap(),
2626                    true,
2627                ),
2628            ),
2629            TransactionSigned::new_unhashed(
2630                Transaction::Eip1559(TxEip1559 {
2631                    chain_id: 4,
2632                    nonce: 26u64,
2633                    max_priority_fee_per_gas: 1500000000,
2634                    max_fee_per_gas: 1500000013,
2635                    gas_limit: MIN_TRANSACTION_GAS,
2636                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2637                    value: U256::from(3000000000000000000u64),
2638                    input: Default::default(),
2639                    access_list: Default::default(),
2640                }),
2641                Signature::new(
2642                    U256::from_str(
2643                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2644                    )
2645                    .unwrap(),
2646                    U256::from_str(
2647                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2648                    )
2649                    .unwrap(),
2650                    true,
2651                ),
2652            ),
2653        ];
2654
2655        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2656
2657        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2658        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2659        // fetch
2660        peer_1.seen_transactions.insert(txs_hashes[0]);
2661        peer_1.seen_transactions.insert(txs_hashes[1]);
2662        tx_manager.peers.insert(peer_id_1, peer_1);
2663
2664        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2665        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2666
2667        // peer_1 is idle
2668        assert!(tx_fetcher.is_idle(&peer_id_1));
2669        assert_eq!(tx_fetcher.active_peers.len(), 0);
2670
2671        // sends requests for buffered hashes to peer_1
2672        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2673
2674        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2675        // as long as request is in flight peer_1 is not idle
2676        assert!(!tx_fetcher.is_idle(&peer_id_1));
2677        assert_eq!(tx_fetcher.active_peers.len(), 1);
2678
2679        // mock session of peer_1 receives request
2680        let req = to_mock_session_rx
2681            .recv()
2682            .await
2683            .expect("peer_1 session should receive request with buffered hashes");
2684        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2685
2686        let message: Vec<PooledTransactionVariant> = txs
2687            .into_iter()
2688            .take(1)
2689            .map(|tx| {
2690                PooledTransactionVariant::try_from(tx)
2691                    .expect("Failed to convert MockTransaction to PooledTransaction")
2692            })
2693            .collect();
2694        // response partial request
2695        response
2696            .send(Ok(PooledTransactions(message)))
2697            .expect("should send peer_1 response to tx manager");
2698        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2699            unreachable!()
2700        };
2701
2702        // request has resolved, peer_1 is idle again
2703        assert!(tx_fetcher.is_idle(&peer_id));
2704        assert_eq!(tx_fetcher.active_peers.len(), 0);
2705        // failing peer_1's request buffers requested hashes for retry.
2706        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2707    }
2708
2709    #[tokio::test]
2710    async fn test_max_retries_tx_request() {
2711        reth_tracing::init_test_tracing();
2712
2713        let mut tx_manager = new_tx_manager().await.0;
2714        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2715
2716        let peer_id_1 = PeerId::new([1; 64]);
2717        let peer_id_2 = PeerId::new([2; 64]);
2718        let eth_version = EthVersion::Eth66;
2719        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2720
2721        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2722        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2723        // fetch
2724        peer_1.seen_transactions.insert(seen_hashes[0]);
2725        peer_1.seen_transactions.insert(seen_hashes[1]);
2726        tx_manager.peers.insert(peer_id_1, peer_1);
2727
2728        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2729        // for first retry in reverse order to make index 0 lru
2730        let retries = 1;
2731        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2732        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2733
2734        // peer_1 is idle
2735        assert!(tx_fetcher.is_idle(&peer_id_1));
2736        assert_eq!(tx_fetcher.active_peers.len(), 0);
2737
2738        // sends request for buffered hashes to peer_1
2739        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2740
2741        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2742
2743        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2744        // as long as request is in inflight peer_1 is not idle
2745        assert!(!tx_fetcher.is_idle(&peer_id_1));
2746        assert_eq!(tx_fetcher.active_peers.len(), 1);
2747
2748        // mock session of peer_1 receives request
2749        let req = to_mock_session_rx
2750            .recv()
2751            .await
2752            .expect("peer_1 session should receive request with buffered hashes");
2753        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2754        let GetPooledTransactions(hashes) = request;
2755
2756        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2757
2758        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2759
2760        // fail request to peer_1
2761        response
2762            .send(Err(RequestError::BadResponse))
2763            .expect("should send peer_1 response to tx manager");
2764        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2765            unreachable!()
2766        };
2767
2768        // request has resolved, peer_1 is idle again
2769        assert!(tx_fetcher.is_idle(&peer_id));
2770        assert_eq!(tx_fetcher.active_peers.len(), 0);
2771        // failing peer_1's request buffers requested hashes for retry
2772        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2773
2774        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2775        tx_manager.peers.insert(peer_id_2, peer_2);
2776
2777        // peer_2 announces same hashes as peer_1
2778        let msg =
2779            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2780        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2781
2782        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2783
2784        // peer_2 should be in active_peers.
2785        assert_eq!(tx_fetcher.active_peers.len(), 1);
2786
2787        // since hashes are already seen, no changes to length of unknown hashes
2788        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2789        // but hashes are taken out of buffer and packed into request to peer_2
2790        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2791
2792        // mock session of peer_2 receives request
2793        let req = to_mock_session_rx
2794            .recv()
2795            .await
2796            .expect("peer_2 session should receive request with buffered hashes");
2797        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2798
2799        // report failed request to tx manager
2800        response
2801            .send(Err(RequestError::BadResponse))
2802            .expect("should send peer_2 response to tx manager");
2803        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2804
2805        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2806        // for retry
2807        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2808        assert_eq!(tx_fetcher.active_peers.len(), 0);
2809    }
2810
2811    #[test]
2812    fn test_transaction_builder_empty() {
2813        let mut builder =
2814            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2815        assert!(builder.is_empty());
2816
2817        let mut factory = MockTransactionFactory::default();
2818        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2819        builder.push(&tx);
2820        assert!(!builder.is_empty());
2821
2822        let txs = builder.build();
2823        assert!(txs.full.is_none());
2824        let txs = txs.pooled.unwrap();
2825        assert_eq!(txs.len(), 1);
2826    }
2827
2828    #[test]
2829    fn test_transaction_builder_large() {
2830        let mut builder =
2831            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2832        assert!(builder.is_empty());
2833
2834        let mut factory = MockTransactionFactory::default();
2835        let mut tx = factory.create_eip1559();
2836        // create a transaction that still fits
2837        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2838        let tx = Arc::new(tx);
2839        let tx = PropagateTransaction::pool_tx(tx);
2840        builder.push(&tx);
2841        assert!(!builder.is_empty());
2842
2843        let txs = builder.clone().build();
2844        assert!(txs.pooled.is_none());
2845        let txs = txs.full.unwrap();
2846        assert_eq!(txs.len(), 1);
2847
2848        builder.push(&tx);
2849
2850        let txs = builder.clone().build();
2851        let pooled = txs.pooled.unwrap();
2852        assert_eq!(pooled.len(), 1);
2853        let txs = txs.full.unwrap();
2854        assert_eq!(txs.len(), 1);
2855    }
2856
2857    #[test]
2858    fn test_transaction_builder_eip4844() {
2859        let mut builder =
2860            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2861        assert!(builder.is_empty());
2862
2863        let mut factory = MockTransactionFactory::default();
2864        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2865        builder.push(&tx);
2866        assert!(!builder.is_empty());
2867
2868        let txs = builder.clone().build();
2869        assert!(txs.full.is_none());
2870        let txs = txs.pooled.unwrap();
2871        assert_eq!(txs.len(), 1);
2872
2873        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2874        builder.push(&tx);
2875
2876        let txs = builder.clone().build();
2877        let pooled = txs.pooled.unwrap();
2878        assert_eq!(pooled.len(), 1);
2879        let txs = txs.full.unwrap();
2880        assert_eq!(txs.len(), 1);
2881    }
2882
2883    #[tokio::test]
2884    async fn test_propagate_full() {
2885        reth_tracing::init_test_tracing();
2886
2887        let (mut tx_manager, network) = new_tx_manager().await;
2888        let peer_id = PeerId::random();
2889
2890        // ensure not syncing
2891        network.handle().update_sync_state(SyncState::Idle);
2892
2893        // mock a peer
2894        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2895
2896        let session_info = SessionInfo {
2897            peer_id,
2898            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2899            client_version: Arc::from(""),
2900            capabilities: Arc::new(vec![].into()),
2901            status: Arc::new(Default::default()),
2902            version: EthVersion::Eth68,
2903            peer_kind: PeerKind::Basic,
2904        };
2905        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2906        tx_manager
2907            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2908        let mut propagate = vec![];
2909        let mut factory = MockTransactionFactory::default();
2910        let eip1559_tx = Arc::new(factory.create_eip1559());
2911        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2912        let eip4844_tx = Arc::new(factory.create_eip4844());
2913        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2914
2915        let propagated =
2916            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2917        assert_eq!(propagated.len(), 2);
2918        let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
2919        assert_eq!(prop_txs.len(), 1);
2920        assert!(prop_txs[0].is_full());
2921
2922        let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
2923        assert_eq!(prop_txs.len(), 1);
2924        assert!(prop_txs[0].is_hash());
2925
2926        let peer = tx_manager.peers.get(&peer_id).unwrap();
2927        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2928        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2929        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2930
2931        // propagate again
2932        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2933        assert!(propagated.is_empty());
2934    }
2935
2936    #[tokio::test]
2937    async fn test_propagate_pending_txs_while_initially_syncing() {
2938        reth_tracing::init_test_tracing();
2939
2940        let (mut tx_manager, network) = new_tx_manager().await;
2941        let peer_id = PeerId::random();
2942
2943        // Keep the node in initial sync mode.
2944        network.handle().update_sync_state(SyncState::Syncing);
2945        assert!(NetworkInfo::is_initially_syncing(&network.handle()));
2946
2947        // Add a peer so propagation has a destination.
2948        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
2949        tx_manager.peers.insert(peer_id, peer);
2950
2951        let tx = MockTransaction::eip1559();
2952        tx_manager
2953            .pool
2954            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2955            .await
2956            .expect("transaction should be accepted into the pool");
2957
2958        tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
2959
2960        let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
2961        assert!(peer.seen_transactions.contains(tx.get_hash()));
2962    }
2963
2964    #[tokio::test]
2965    async fn test_relaxed_filter_ignores_unknown_tx_types() {
2966        reth_tracing::init_test_tracing();
2967
2968        let transactions_manager_config = TransactionsManagerConfig::default();
2969
2970        let propagation_policy = TransactionPropagationKind::default();
2971        let announcement_policy = RelaxedEthAnnouncementFilter::default();
2972
2973        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2974
2975        let pool = testing_pool();
2976        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2977        let client = NoopProvider::default();
2978
2979        let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2980            .listener_port(0)
2981            .disable_discovery()
2982            .build(client.clone());
2983
2984        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2985        let (to_tx_manager_tx, from_network_rx) =
2986            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2987        network_manager.set_transactions(to_tx_manager_tx);
2988        let network_handle = network_manager.handle().clone();
2989        let network_service_handle = tokio::spawn(network_manager);
2990
2991        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2992            network_handle.clone(),
2993            pool.clone(),
2994            from_network_rx,
2995            transactions_manager_config,
2996            policy_bundle,
2997        );
2998
2999        let peer_id = PeerId::random();
3000        let eth_version = EthVersion::Eth68;
3001        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3002        tx_manager.peers.insert(peer_id, mock_peer_metadata);
3003
3004        let mut tx_factory = MockTransactionFactory::default();
3005
3006        let valid_known_tx = tx_factory.create_eip1559();
3007        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3008
3009        let known_tx_hash = *known_tx_signed.hash();
3010        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3011        let known_tx_size = known_tx_signed.encoded_length();
3012
3013        let unknown_tx_hash = B256::random();
3014        let unknown_tx_type_byte = 0xff_u8;
3015        let unknown_tx_size = 150;
3016
3017        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3018            types: vec![known_tx_type_byte, unknown_tx_type_byte],
3019            sizes: vec![known_tx_size, unknown_tx_size],
3020            hashes: vec![known_tx_hash, unknown_tx_hash],
3021        });
3022
3023        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3024
3025        poll_fn(|cx| {
3026            let _ = tx_manager.poll_unpin(cx);
3027            Poll::Ready(())
3028        })
3029        .await;
3030
3031        let mut requested_hashes_in_getpooled = HashSet::new();
3032        let mut unexpected_request_received = false;
3033
3034        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3035            .await
3036        {
3037            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3038                let GetPooledTransactions(hashes) = request;
3039                for hash in hashes {
3040                    requested_hashes_in_getpooled.insert(hash);
3041                }
3042                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3043            }
3044            Ok(Some(other_request)) => {
3045                tracing::error!(?other_request, "Received unexpected PeerRequest type");
3046                unexpected_request_received = true;
3047            }
3048            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3049            Err(_timeout_err) => {
3050                tracing::info!("Timeout: No GetPooledTransactions request received.")
3051            }
3052        }
3053
3054        assert!(
3055            requested_hashes_in_getpooled.contains(&known_tx_hash),
3056            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3057        );
3058        assert!(
3059            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3060            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3061        );
3062        assert!(
3063            !unexpected_request_received,
3064            "An unexpected P2P request was received by the mock peer."
3065        );
3066
3067        network_service_handle.abort();
3068    }
3069}