Skip to main content

reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use itertools::Itertools;
5use rayon::iter::{IntoParallelIterator, ParallelIterator};
6
7/// Aggregation on configurable parameters for [`TransactionsManager`].
8pub mod config;
9/// Default and spec'd bounds.
10pub mod constants;
11/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
12pub mod fetcher;
13/// Defines the traits for transaction-related policies.
14pub mod policy;
15
16pub use self::constants::{
17    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
18    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
19};
20use config::AnnouncementAcceptance;
21pub use config::{
22    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
23    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
24};
25use policy::NetworkPolicies;
26
27pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
28
29use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
30use crate::{
31    budget::{
32        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
33        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
34    },
35    cache::LruCache,
36    duration_metered_exec, metered_poll_nested_stream_with_budget,
37    metrics::{
38        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
39    },
40    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
41    NetworkHandle, TxTypesCounter,
42};
43use alloy_primitives::{TxHash, B256};
44use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
45use futures::{stream::FuturesUnordered, Future, StreamExt};
46use reth_eth_wire::{
47    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
48    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
49    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
50    RequestTxHashes, Transactions, ValidAnnouncementData,
51};
52use reth_ethereum_primitives::{TransactionSigned, TxType};
53use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
54use reth_network_api::{
55    events::{PeerEvent, SessionInfo},
56    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
57};
58use reth_network_p2p::{
59    error::{RequestError, RequestResult},
60    sync::SyncStateProvider,
61};
62use reth_network_peers::PeerId;
63use reth_network_types::ReputationChangeKind;
64use reth_primitives_traits::SignedTransaction;
65use reth_tokio_util::EventStream;
66use reth_transaction_pool::{
67    error::{PoolError, PoolResult},
68    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
69    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
70};
71use std::{
72    collections::{hash_map::Entry, HashMap, HashSet},
73    pin::Pin,
74    sync::{
75        atomic::{AtomicUsize, Ordering},
76        Arc,
77    },
78    task::{Context, Poll},
79    time::{Duration, Instant},
80};
81use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
82use tokio_stream::wrappers::UnboundedReceiverStream;
83use tracing::{debug, trace};
84
85/// The future for importing transactions into the pool.
86///
87/// Resolves with the result of each transaction import.
88pub type PoolImportFuture =
89    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
90
91/// Api to interact with [`TransactionsManager`] task.
92///
93/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
94/// with the [`TransactionsManager`] task once it is spawned.
95///
96/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
97/// known by a specific peer.
98#[derive(Debug, Clone)]
99pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
100    /// Command channel to the [`TransactionsManager`]
101    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
102}
103
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105    fn send(&self, cmd: TransactionsCommand<N>) {
106        let _ = self.manager_tx.send(cmd);
107    }
108
109    /// Fetch the [`PeerRequestSender`] for the given peer.
110    async fn peer_handle(
111        &self,
112        peer_id: PeerId,
113    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114        let (tx, rx) = oneshot::channel();
115        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116        rx.await
117    }
118
119    /// Manually propagate the transaction that belongs to the hash.
120    pub fn propagate(&self, hash: TxHash) {
121        self.send(TransactionsCommand::PropagateHash(hash))
122    }
123
124    /// Manually propagate the transaction hash to a specific peer.
125    ///
126    /// Note: this only propagates if the pool contains the transaction.
127    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128        self.propagate_hashes_to(Some(hash), peer)
129    }
130
131    /// Manually propagate the transaction hashes to a specific peer.
132    ///
133    /// Note: this only propagates the transactions that are known to the pool.
134    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135        let hashes = hash.into_iter().collect::<Vec<_>>();
136        if hashes.is_empty() {
137            return
138        }
139        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140    }
141
142    /// Request the active peer IDs from the [`TransactionsManager`].
143    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144        let (tx, rx) = oneshot::channel();
145        self.send(TransactionsCommand::GetActivePeers(tx));
146        rx.await
147    }
148
149    /// Manually propagate full transaction hashes to a specific peer.
150    ///
151    /// Do nothing if transactions are empty.
152    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153        if transactions.is_empty() {
154            return
155        }
156        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157    }
158
159    /// Manually propagate the given transaction hashes to all peers.
160    ///
161    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
162    /// full.
163    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164        if transactions.is_empty() {
165            return
166        }
167        self.send(TransactionsCommand::PropagateTransactions(transactions))
168    }
169
170    /// Manually propagate the given transactions to all peers.
171    ///
172    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
173    /// full.
174    pub fn broadcast_transactions(
175        &self,
176        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177    ) {
178        let transactions =
179            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180        if transactions.is_empty() {
181            return
182        }
183        self.send(TransactionsCommand::BroadcastTransactions(transactions))
184    }
185
186    /// Request the transaction hashes known by specific peers.
187    pub async fn get_transaction_hashes(
188        &self,
189        peers: Vec<PeerId>,
190    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
191        if peers.is_empty() {
192            return Ok(Default::default())
193        }
194        let (tx, rx) = oneshot::channel();
195        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196        rx.await
197    }
198
199    /// Request the transaction hashes known by a specific peer.
200    pub async fn get_peer_transaction_hashes(
201        &self,
202        peer: PeerId,
203    ) -> Result<HashSet<TxHash>, RecvError> {
204        let res = self.get_transaction_hashes(vec![peer]).await?;
205        Ok(res.into_values().next().unwrap_or_default())
206    }
207
208    /// Requests the transactions directly from the given peer.
209    ///
210    /// Returns `None` if the peer is not connected.
211    ///
212    /// **Note**: this returns the response from the peer as received.
213    pub async fn get_pooled_transactions_from(
214        &self,
215        peer_id: PeerId,
216        hashes: Vec<B256>,
217    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
218        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
219
220        let (tx, rx) = oneshot::channel();
221        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
222        peer.try_send(request).ok();
223
224        rx.await?.map(|res| Some(res.0))
225    }
226}
227
228/// Manages transactions on top of the p2p network.
229///
230/// This can be spawned to another task and is supposed to be run as background service.
231/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
232/// interact with it.
233///
234/// The [`TransactionsManager`] is responsible for:
235///    - handling incoming eth messages for transactions.
236///    - serving transaction requests.
237///    - propagate transactions
238///
239/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
240///   - receives incoming network messages.
241///   - sends messages to dispatch (responses, propagate tx)
242///
243/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
244/// propagate new transactions over the network.
245///
246/// It can be configured with different policies for transaction propagation and announcement
247/// filtering. See [`NetworkPolicies`] for more details.
248///
249/// ## Network Transaction Processing
250///
251/// ### Message Types
252///
253/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
254/// - **`NewPooledTransactionHashes`**: Hash announcements
255///
256/// ### Peer Tracking
257///
258/// - Maintains per-peer transaction cache (default: 10,240 entries)
259/// - Prevents duplicate imports and enables efficient propagation
260///
261/// ### Bad Transaction Handling
262///
263/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
264/// Penalizes peers sending invalid transactions.
265///
266/// ### Import Management
267///
268/// Limits concurrent pool imports and backs off when approaching capacity.
269///
270/// ### Transaction Fetching
271///
272/// For announced transactions: filters known → queues unknown → fetches → imports
273///
274/// ### Propagation Rules
275///
276/// Based on: origin (Local/External/Private), peer capabilities, and network state.
277/// Disabled during initial sync.
278///
279/// ### Security
280///
281/// Rate limiting via reputation, bad transaction isolation, peer scoring.
282#[derive(Debug)]
283#[must_use = "Manager does nothing unless polled."]
284pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
285    /// Access to the transaction pool.
286    pool: Pool,
287    /// Network access.
288    network: NetworkHandle<N>,
289    /// Subscriptions to all network related events.
290    ///
291    /// From which we get all new incoming transaction related messages.
292    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
293    /// Transaction fetcher to handle inflight and missing transaction requests.
294    transaction_fetcher: TransactionFetcher<N>,
295    /// All currently pending transactions grouped by peers.
296    ///
297    /// This way we can track incoming transactions and prevent multiple pool imports for the same
298    /// transaction
299    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
300    /// Transactions that are currently imported into the `Pool`.
301    ///
302    /// The import process includes:
303    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
304    ///    valid, or for 4844 transaction the blobs are valid. See also
305    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
306    /// - if the transaction is valid, it is added into the pool.
307    ///
308    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
309    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
310    /// receiver.
311    pool_imports: FuturesUnordered<PoolImportFuture>,
312    /// Stats on pending pool imports that help the node self-monitor.
313    pending_pool_imports_info: PendingPoolImportsInfo,
314    /// Bad imports.
315    bad_imports: LruCache<TxHash>,
316    /// All the connected peers.
317    peers: HashMap<PeerId, PeerMetadata<N>>,
318    /// Send half for the command channel.
319    ///
320    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
321    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
322    /// Incoming commands from [`TransactionsHandle`].
323    ///
324    /// This will only receive commands if a user manually sends a command to the manager through
325    /// the [`TransactionsHandle`] to interact with this type directly.
326    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
327    /// A stream that yields new __pending__ transactions.
328    ///
329    /// A transaction is considered __pending__ if it is executable on the current state of the
330    /// chain. In other words, this only yields transactions that satisfy all consensus
331    /// requirements, these include:
332    ///   - no nonce gaps
333    ///   - all dynamic fee requirements are (currently) met
334    ///   - account has enough balance to cover the transaction's gas
335    pending_transactions: mpsc::Receiver<TxHash>,
336    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
337    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
338    /// How the `TransactionsManager` is configured.
339    config: TransactionsManagerConfig,
340    /// Network Policies
341    policies: NetworkPolicies<N>,
342    /// `TransactionsManager` metrics
343    metrics: TransactionsManagerMetrics,
344    /// `AnnouncedTxTypes` metrics
345    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
346}
347
348impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
349    /// Sets up a new instance.
350    ///
351    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
352    pub fn new(
353        network: NetworkHandle<N>,
354        pool: Pool,
355        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
356        transactions_manager_config: TransactionsManagerConfig,
357    ) -> Self {
358        Self::with_policy(
359            network,
360            pool,
361            from_network,
362            transactions_manager_config,
363            NetworkPolicies::new(
364                TransactionPropagationKind::default(),
365                StrictEthAnnouncementFilter::default(),
366            ),
367        )
368    }
369}
370
371impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
372    /// Sets up a new instance with given the settings.
373    ///
374    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
375    pub fn with_policy(
376        network: NetworkHandle<N>,
377        pool: Pool,
378        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
379        transactions_manager_config: TransactionsManagerConfig,
380        policies: NetworkPolicies<N>,
381    ) -> Self {
382        let network_events = network.event_listener();
383
384        let (command_tx, command_rx) = mpsc::unbounded_channel();
385
386        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
387            &transactions_manager_config.transaction_fetcher_config,
388        );
389
390        // install a listener for new __pending__ transactions that are allowed to be propagated
391        // over the network
392        let pending = pool.pending_transactions_listener();
393        let pending_pool_imports_info =
394            PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
395        let metrics = TransactionsManagerMetrics::default();
396        metrics
397            .capacity_pending_pool_imports
398            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
399
400        Self {
401            pool,
402            network,
403            network_events,
404            transaction_fetcher,
405            transactions_by_peers: Default::default(),
406            pool_imports: Default::default(),
407            pending_pool_imports_info,
408            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409            peers: Default::default(),
410            command_tx,
411            command_rx: UnboundedReceiverStream::new(command_rx),
412            pending_transactions: pending,
413            transaction_events: UnboundedMeteredReceiver::new(
414                from_network,
415                NETWORK_POOL_TRANSACTIONS_SCOPE,
416            ),
417            config: transactions_manager_config,
418            policies,
419            metrics,
420            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421        }
422    }
423
424    /// Returns a new handle that can send commands to this type.
425    pub fn handle(&self) -> TransactionsHandle<N> {
426        TransactionsHandle { manager_tx: self.command_tx.clone() }
427    }
428
429    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
430    /// `false` if [`TransactionsManager`] is operating close to full capacity.
431    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432        self.has_capacity_for_pending_pool_imports() &&
433            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434    }
435
436    /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
437    fn has_capacity_for_pending_pool_imports(&self) -> bool {
438        self.remaining_pool_import_capacity() > 0
439    }
440
441    /// Returns the remaining capacity for pending pool imports.
442    fn remaining_pool_import_capacity(&self) -> usize {
443        self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444            self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445        )
446    }
447
448    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450        self.metrics.reported_bad_transactions.increment(1);
451    }
452
453    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455        self.network.reputation_change(peer_id, kind);
456    }
457
458    fn report_already_seen(&self, peer_id: PeerId) {
459        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461    }
462
463    /// Handles a closed peer session, removing the peer from transaction-local tracking state.
464    fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
465        if let Some(mut peer) = self.peers.remove(peer_id) {
466            self.policies.propagation_policy_mut().on_session_closed(&mut peer);
467        }
468        self.transaction_fetcher.remove_peer(peer_id);
469    }
470
471    /// Clear the transaction
472    fn on_good_import(&mut self, hash: TxHash) {
473        self.transactions_by_peers.remove(&hash);
474    }
475
476    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
477    /// fetching or importing it again.
478    ///
479    /// Errors that count as bad transactions are:
480    ///
481    /// - intrinsic gas too low
482    /// - exceeds gas limit
483    /// - gas uint overflow
484    /// - exceeds max init code size
485    /// - oversized data
486    /// - signer account has bytecode
487    /// - chain id mismatch
488    /// - old legacy chain id
489    /// - tx type not supported
490    ///
491    /// (and additionally for blobs txns...)
492    ///
493    /// - no blobs
494    /// - too many blobs
495    /// - invalid kzg proof
496    /// - kzg error
497    /// - not blob transaction (tx type mismatch)
498    /// - wrong versioned kzg commitment hash
499    fn on_bad_import(&mut self, err: PoolError) {
500        let peers = self.transactions_by_peers.remove(&err.hash);
501
502        // if we're _currently_ syncing, we ignore a bad transaction
503        if !err.is_bad_transaction() || self.network.is_syncing() {
504            return
505        }
506        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
507        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
508        if let Some(peers) = peers {
509            for peer_id in peers {
510                self.report_peer_bad_transactions(peer_id);
511            }
512        }
513        self.metrics.bad_imports.increment(1);
514        self.bad_imports.insert(err.hash);
515    }
516
517    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
518    ///
519    /// Returns `true` if a request was sent.
520    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
521        // try drain transaction hashes pending fetch
522        let info = &self.pending_pool_imports_info;
523        let max_pending_pool_imports = info.max_pending_pool_imports;
524        let has_capacity_wrt_pending_pool_imports =
525            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
526
527        self.transaction_fetcher
528            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
529    }
530
531    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
532        let kind = match req_err {
533            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
534            RequestError::Timeout => ReputationChangeKind::Timeout,
535            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
536                // peer is already disconnected
537                return
538            }
539            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
540        };
541        self.report_peer(peer_id, kind);
542    }
543
544    #[inline]
545    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
546        let metrics = &self.metrics;
547
548        let TxManagerPollDurations {
549            acc_network_events,
550            acc_pending_imports,
551            acc_tx_events,
552            acc_imported_txns,
553            acc_fetch_events,
554            acc_pending_fetch,
555            acc_cmds,
556        } = poll_durations;
557
558        // update metrics for whole poll function
559        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
560        // update metrics for nested expressions
561        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
562        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
563        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
564        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
565        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
566        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
567        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
568    }
569}
570
571impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
572    /// Processes a batch import results.
573    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
574        for res in batch_results {
575            match res {
576                Ok(AddedTransactionOutcome { hash, .. }) => {
577                    self.on_good_import(hash);
578                }
579                Err(err) => {
580                    self.on_bad_import(err);
581                }
582            }
583        }
584    }
585
586    /// Request handler for an incoming `NewPooledTransactionHashes`
587    fn on_new_pooled_transaction_hashes(
588        &mut self,
589        peer_id: PeerId,
590        msg: NewPooledTransactionHashes,
591    ) {
592        // If the node is initially syncing, ignore transactions
593        if self.network.is_initially_syncing() {
594            return
595        }
596        if self.network.tx_gossip_disabled() {
597            return
598        }
599
600        // get handle to peer's session, if the session is still active
601        let Some(peer) = self.peers.get_mut(&peer_id) else {
602            trace!(
603                peer_id = format!("{peer_id:#}"),
604                ?msg,
605                "discarding announcement from inactive peer"
606            );
607
608            return
609        };
610        let client = peer.client_version.clone();
611
612        // keep track of the transactions the peer knows
613        let mut count_txns_already_seen_by_peer = 0;
614        for tx in msg.iter_hashes().copied() {
615            if !peer.seen_transactions.insert(tx) {
616                count_txns_already_seen_by_peer += 1;
617            }
618        }
619        if count_txns_already_seen_by_peer > 0 {
620            // this may occur if transactions are sent or announced to a peer, at the same time as
621            // the peer sends/announces those hashes to us. this is because, marking
622            // txns as seen by a peer is done optimistically upon sending them to the
623            // peer.
624            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
625            self.metrics
626                .occurrences_hash_already_seen_by_peer
627                .increment(count_txns_already_seen_by_peer);
628
629            trace!(target: "net::tx",
630                %count_txns_already_seen_by_peer,
631                peer_id=format!("{peer_id:#}"),
632                ?client,
633                "Peer sent hashes that have already been marked as seen by peer"
634            );
635
636            self.report_already_seen(peer_id);
637        }
638
639        // 1. filter out spam
640        if msg.is_empty() {
641            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
642            return;
643        }
644
645        let original_len = msg.len();
646        let mut partially_valid_msg = msg.dedup();
647
648        if partially_valid_msg.len() != original_len {
649            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
650        }
651
652        // 2. filter out transactions pending import to pool
653        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
654
655        // 3. filter out invalid entries (spam)
656        //
657        // validates messages with respect to the given network, e.g. allowed tx types.
658        // done before the pool lookup since these are cheap in-memory checks that shrink
659        // the set before acquiring the pool lock.
660        //
661        let mut should_report_peer = false;
662        let mut tx_types_counter = TxTypesCounter::default();
663
664        let is_eth68_message = partially_valid_msg
665            .msg_version()
666            .expect("partially valid announcement should have a version")
667            .is_eth68();
668
669        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
670            let (ty_byte, size_val) = match *metadata_ref_mut {
671                Some((ty, size)) => {
672                    if !is_eth68_message {
673                        should_report_peer = true;
674                    }
675                    (ty, size)
676                }
677                None => {
678                    if is_eth68_message {
679                        should_report_peer = true;
680                        return false;
681                    }
682                    (0u8, 0)
683                }
684            };
685
686            if is_eth68_message &&
687                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
688                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
689            {
690                tx_types_counter.increase_by_tx_type(parsed_tx_type);
691            }
692
693            let decision = self
694                .policies
695                .announcement_filter()
696                .decide_on_announcement(ty_byte, tx_hash, size_val);
697
698            match decision {
699                AnnouncementAcceptance::Accept => true,
700                AnnouncementAcceptance::Ignore => false,
701                AnnouncementAcceptance::Reject { penalize_peer } => {
702                    if penalize_peer {
703                        should_report_peer = true;
704                    }
705                    false
706                }
707            }
708        });
709
710        if is_eth68_message {
711            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
712        }
713
714        if should_report_peer {
715            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
716        }
717
718        // 4. filter out known hashes
719        //
720        // known txns have already been successfully fetched or received over gossip.
721        //
722        // most hashes will be filtered out here since the mempool protocol is a gossip
723        // protocol, healthy peers will send many of the same hashes.
724        //
725        let hashes_count_pre_pool_filter = partially_valid_msg.len();
726        self.pool.retain_unknown(&mut partially_valid_msg);
727        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
728            let already_known_hashes_count =
729                hashes_count_pre_pool_filter - partially_valid_msg.len();
730            self.metrics
731                .occurrences_hashes_already_in_pool
732                .increment(already_known_hashes_count as u64);
733        }
734
735        if partially_valid_msg.is_empty() {
736            // nothing to request
737            return
738        }
739
740        let mut valid_announcement_data =
741            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
742
743        if valid_announcement_data.is_empty() {
744            // no valid announcement data
745            return
746        }
747
748        // 5. filter out already seen unknown hashes
749        //
750        // seen hashes are already in the tx fetcher, pending fetch.
751        //
752        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
753        // fetcher, hence they should be valid at this point.
754        let bad_imports = &self.bad_imports;
755        self.transaction_fetcher.filter_unseen_and_pending_hashes(
756            &mut valid_announcement_data,
757            |hash| bad_imports.contains(hash),
758            &peer_id,
759            &client,
760        );
761
762        if valid_announcement_data.is_empty() {
763            // nothing to request
764            return
765        }
766
767        trace!(target: "net::tx::propagation",
768            peer_id=format!("{peer_id:#}"),
769            hashes_len=valid_announcement_data.len(),
770            hashes=%valid_announcement_data.keys().format(", "),
771            msg_version=%valid_announcement_data.msg_version(),
772            client_version=%client,
773            "received previously unseen and pending hashes in announcement from peer"
774        );
775
776        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
777        // fallback
778        if !self.transaction_fetcher.is_idle(&peer_id) {
779            // load message version before announcement data is destructed in packing
780            let msg_version = valid_announcement_data.msg_version();
781            let (hashes, _version) = valid_announcement_data.into_request_hashes();
782
783            trace!(target: "net::tx",
784                peer_id=format!("{peer_id:#}"),
785                hashes=?*hashes,
786                %msg_version,
787                %client,
788                "buffering hashes announced by busy peer"
789            );
790
791            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
792
793            return
794        }
795
796        let mut hashes_to_request =
797            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
798        let surplus_hashes =
799            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
800
801        if !surplus_hashes.is_empty() {
802            trace!(target: "net::tx",
803                peer_id=format!("{peer_id:#}"),
804                surplus_hashes=?*surplus_hashes,
805                %client,
806                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
807            );
808
809            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
810        }
811
812        trace!(target: "net::tx",
813            peer_id=format!("{peer_id:#}"),
814            hashes=?*hashes_to_request,
815            %client,
816            "sending hashes in `GetPooledTransactions` request to peer's session"
817        );
818
819        // request the missing transactions
820        //
821        // get handle to peer's session again, at this point we know it exists
822        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
823        if let Some(failed_to_request_hashes) =
824            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
825        {
826            let conn_eth_version = peer.version;
827
828            trace!(target: "net::tx",
829                peer_id=format!("{peer_id:#}"),
830                failed_to_request_hashes=?*failed_to_request_hashes,
831                %conn_eth_version,
832                %client,
833                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
834            );
835            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
836        }
837    }
838}
839
840impl<Pool, N> TransactionsManager<Pool, N>
841where
842    Pool: TransactionPool + Unpin + 'static,
843    N: NetworkPrimitives<
844            BroadcastedTransaction: SignedTransaction,
845            PooledTransaction: SignedTransaction,
846        > + Unpin,
847    Pool::Transaction:
848        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
849{
850    /// Invoked when transactions in the local mempool are considered __pending__.
851    ///
852    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
853    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
854    /// messages. The Transactions message relays complete transaction objects and is typically
855    /// sent to a small, random fraction of connected peers.
856    ///
857    /// All other peers receive a notification of the transaction hash and can request the
858    /// complete transaction object if it is unknown to them. The dissemination of complete
859    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
860    /// and won't need to request it.
861    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
862        // We intentionally do not gate this on initial sync.
863        // During initial sync we skip importing tx announcements from peers in
864        // `on_new_pooled_transaction_hashes`, so transactions reaching this path are local.
865        if self.network.tx_gossip_disabled() {
866            return
867        }
868
869        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
870
871        self.propagate_all(hashes);
872    }
873
874    /// Propagate the full transactions to a specific peer.
875    ///
876    /// Returns the propagated transactions.
877    fn propagate_full_transactions_to_peer(
878        &mut self,
879        txs: Vec<TxHash>,
880        peer_id: PeerId,
881        propagation_mode: PropagationMode,
882    ) -> Option<PropagatedTransactions> {
883        let peer = self.peers.get_mut(&peer_id)?;
884        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
885        let mut propagated = PropagatedTransactions::default();
886
887        // filter all transactions unknown to the peer
888        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
889
890        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
891
892        if propagation_mode.is_forced() {
893            // skip cache check if forced
894            full_transactions.extend(to_propagate);
895        } else {
896            // Iterate through the transactions to propagate and fill the hashes and full
897            // transaction
898            for tx in to_propagate {
899                if !peer.seen_transactions.contains(tx.tx_hash()) {
900                    // Only include if the peer hasn't seen the transaction
901                    full_transactions.push(&tx);
902                }
903            }
904        }
905
906        if full_transactions.is_empty() {
907            // nothing to propagate
908            return None
909        }
910
911        let PropagateTransactions { pooled, full } = full_transactions.build();
912
913        // send hashes if any
914        if let Some(new_pooled_hashes) = pooled {
915            for hash in new_pooled_hashes.iter_hashes().copied() {
916                propagated.record(hash, PropagateKind::Hash(peer_id));
917                // mark transaction as seen by peer
918                peer.seen_transactions.insert(hash);
919            }
920
921            // send hashes of transactions
922            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
923        }
924
925        // send full transactions, if any
926        if let Some(new_full_transactions) = full {
927            for tx in &new_full_transactions {
928                propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
929                // mark transaction as seen by peer
930                peer.seen_transactions.insert(*tx.tx_hash());
931            }
932
933            // send full transactions
934            self.network.send_transactions(peer_id, new_full_transactions);
935        }
936
937        // Update propagated transactions metrics
938        self.metrics.propagated_transactions.increment(propagated.len() as u64);
939
940        Some(propagated)
941    }
942
943    /// Propagate the transaction hashes to the given peer
944    ///
945    /// Note: This will only send the hashes for transactions that exist in the pool.
946    fn propagate_hashes_to(
947        &mut self,
948        hashes: Vec<TxHash>,
949        peer_id: PeerId,
950        propagation_mode: PropagationMode,
951    ) {
952        trace!(target: "net::tx", "Start propagating transactions as hashes");
953
954        // This fetches a transactions from the pool, including the blob transactions, which are
955        // only ever sent as hashes.
956        let propagated = {
957            let Some(peer) = self.peers.get_mut(&peer_id) else {
958                // no such peer
959                return
960            };
961
962            let to_propagate =
963                self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
964
965            let mut propagated = PropagatedTransactions::default();
966
967            // check if transaction is known to peer
968            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
969
970            if propagation_mode.is_forced() {
971                hashes.extend(to_propagate)
972            } else {
973                for tx in to_propagate {
974                    if !peer.seen_transactions.contains(tx.tx_hash()) {
975                        // Include if the peer hasn't seen it
976                        hashes.push(&tx);
977                    }
978                }
979            }
980
981            let new_pooled_hashes = hashes.build();
982
983            if new_pooled_hashes.is_empty() {
984                // nothing to propagate
985                return
986            }
987
988            if let Some(peer) = self.peers.get_mut(&peer_id) {
989                for hash in new_pooled_hashes.iter_hashes().copied() {
990                    propagated.record(hash, PropagateKind::Hash(peer_id));
991                    peer.seen_transactions.insert(hash);
992                }
993            }
994
995            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
996
997            // send hashes of transactions
998            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
999
1000            // Update propagated transactions metrics
1001            self.metrics.propagated_transactions.increment(propagated.len() as u64);
1002
1003            propagated
1004        };
1005
1006        // notify pool so events get fired
1007        self.pool.on_propagated(propagated);
1008    }
1009
1010    /// Propagate the transactions to all connected peers either as full objects or hashes.
1011    ///
1012    /// The message for new pooled hashes depends on the negotiated version of the stream.
1013    /// See [`NewPooledTransactionHashes`]
1014    ///
1015    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1016    fn propagate_transactions(
1017        &mut self,
1018        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1019        propagation_mode: PropagationMode,
1020    ) -> PropagatedTransactions {
1021        let mut propagated = PropagatedTransactions::default();
1022        if self.network.tx_gossip_disabled() {
1023            return propagated
1024        }
1025
1026        // send full transactions to a set of the connected peers based on the configured mode
1027        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1028
1029        // Note: Assuming ~random~ order due to random state of the peers map hasher
1030        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1031            if !self.policies.propagation_policy().can_propagate(peer) {
1032                // skip peers we should not propagate to
1033                continue
1034            }
1035            // determine whether to send full tx objects or hashes.
1036            let mut builder = if peer_idx > max_num_full {
1037                PropagateTransactionsBuilder::pooled(peer.version)
1038            } else {
1039                PropagateTransactionsBuilder::full(peer.version)
1040            };
1041
1042            if propagation_mode.is_forced() {
1043                builder.extend(to_propagate.iter());
1044            } else {
1045                // Iterate through the transactions to propagate and fill the hashes and full
1046                // transaction lists, before deciding whether or not to send full transactions to
1047                // the peer.
1048                for tx in &to_propagate {
1049                    // Only proceed if the transaction is not in the peer's list of seen
1050                    // transactions
1051                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1052                        builder.push(tx);
1053                    }
1054                }
1055            }
1056
1057            if builder.is_empty() {
1058                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1059                continue
1060            }
1061
1062            let PropagateTransactions { pooled, full } = builder.build();
1063
1064            // send hashes if any
1065            if let Some(mut new_pooled_hashes) = pooled {
1066                // enforce tx soft limit per message for the (unlikely) event the number of
1067                // hashes exceeds it
1068                new_pooled_hashes
1069                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1070
1071                for hash in new_pooled_hashes.iter_hashes().copied() {
1072                    propagated.record(hash, PropagateKind::Hash(*peer_id));
1073                    // mark transaction as seen by peer
1074                    peer.seen_transactions.insert(hash);
1075                }
1076
1077                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1078
1079                // send hashes of transactions
1080                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1081            }
1082
1083            // send full transactions, if any
1084            if let Some(new_full_transactions) = full {
1085                for tx in &new_full_transactions {
1086                    propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1087                    // mark transaction as seen by peer
1088                    peer.seen_transactions.insert(*tx.tx_hash());
1089                }
1090
1091                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1092
1093                // send full transactions
1094                self.network.send_transactions(*peer_id, new_full_transactions);
1095            }
1096        }
1097
1098        // Update propagated transactions metrics
1099        self.metrics.propagated_transactions.increment(propagated.len() as u64);
1100
1101        propagated
1102    }
1103
1104    /// Propagates the given transactions to the peers
1105    ///
1106    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1107    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1108    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1109        if self.peers.is_empty() {
1110            // nothing to propagate
1111            return
1112        }
1113        let propagated = self.propagate_transactions(
1114            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1115            PropagationMode::Basic,
1116        );
1117
1118        // notify pool so events get fired
1119        self.pool.on_propagated(propagated);
1120    }
1121
1122    /// Request handler for an incoming request for transactions
1123    fn on_get_pooled_transactions(
1124        &mut self,
1125        peer_id: PeerId,
1126        request: GetPooledTransactions,
1127        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1128    ) {
1129        // fast exit if gossip is disabled
1130        if self.network.tx_gossip_disabled() {
1131            let _ = response.send(Ok(PooledTransactions::default()));
1132            return
1133        }
1134        if let Some(peer) = self.peers.get_mut(&peer_id) {
1135            let transactions = self.pool.get_pooled_transaction_elements(
1136                request.0,
1137                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1138                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1139                ),
1140            );
1141            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1142
1143            // we sent a response at which point we assume that the peer is aware of the
1144            // transactions
1145            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1146
1147            let resp = PooledTransactions(transactions);
1148            let _ = response.send(Ok(resp));
1149        }
1150    }
1151
1152    /// Handles a command received from a detached [`TransactionsHandle`]
1153    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1154        match cmd {
1155            TransactionsCommand::PropagateHash(hash) => {
1156                self.on_new_pending_transactions(vec![hash])
1157            }
1158            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1159                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1160            }
1161            TransactionsCommand::GetActivePeers(tx) => {
1162                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1163                tx.send(peers).ok();
1164            }
1165            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1166                if let Some(propagated) =
1167                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1168                {
1169                    self.pool.on_propagated(propagated);
1170                }
1171            }
1172            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1173            TransactionsCommand::BroadcastTransactions(txs) => {
1174                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1175                self.pool.on_propagated(propagated);
1176            }
1177            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1178                let mut res = HashMap::with_capacity(peers.len());
1179                for peer_id in peers {
1180                    let hashes = self
1181                        .peers
1182                        .get(&peer_id)
1183                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1184                        .unwrap_or_default();
1185                    res.insert(peer_id, hashes);
1186                }
1187                tx.send(res).ok();
1188            }
1189            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1190                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1191                peer_request_sender.send(sender).ok();
1192            }
1193        }
1194    }
1195
1196    /// Handles session establishment and peer transactions initialization.
1197    ///
1198    /// This is invoked when a new session is established.
1199    fn handle_peer_session(
1200        &mut self,
1201        info: SessionInfo,
1202        messages: PeerRequestSender<PeerRequest<N>>,
1203    ) {
1204        let SessionInfo { peer_id, client_version, version, .. } = info;
1205
1206        // Insert a new peer into the peerset.
1207        let peer = PeerMetadata::<N>::new(
1208            messages,
1209            version,
1210            client_version,
1211            self.config.max_transactions_seen_by_peer_history,
1212            info.peer_kind,
1213        );
1214        let peer = match self.peers.entry(peer_id) {
1215            Entry::Occupied(mut entry) => {
1216                entry.insert(peer);
1217                entry.into_mut()
1218            }
1219            Entry::Vacant(entry) => entry.insert(peer),
1220        };
1221
1222        self.policies.propagation_policy_mut().on_session_established(peer);
1223
1224        // Send a `NewPooledTransactionHashes` to the peer with up to
1225        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1226        // transactions in the pool.
1227        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1228            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1229            return
1230        }
1231
1232        // Get transactions to broadcast
1233        let pooled_txs = self.pool.pooled_transactions_max(
1234            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1235        );
1236        if pooled_txs.is_empty() {
1237            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1238            return;
1239        }
1240
1241        // Build and send transaction hashes message
1242        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1243        for pooled_tx in pooled_txs {
1244            peer.seen_transactions.insert(*pooled_tx.hash());
1245            msg_builder.push_pooled(pooled_tx);
1246        }
1247
1248        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1249        let msg = msg_builder.build();
1250        self.network.send_transactions_hashes(peer_id, msg);
1251    }
1252
1253    /// Handles a received event related to common network events.
1254    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1255        match event_result {
1256            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1257                self.on_peer_session_closed(&peer_id);
1258            }
1259            NetworkEvent::ActivePeerSession { info, messages } => {
1260                // process active peer session and broadcast available transaction from the pool
1261                self.handle_peer_session(info, messages);
1262            }
1263            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1264                let peer_id = info.peer_id;
1265                // get messages from existing peer
1266                let messages = match self.peers.get(&peer_id) {
1267                    Some(p) => p.request_tx.clone(),
1268                    None => {
1269                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1270                        return;
1271                    }
1272                };
1273                self.handle_peer_session(info, messages);
1274            }
1275            _ => {}
1276        }
1277    }
1278
1279    /// Returns true if the ingress policy allows processing messages from the given peer.
1280    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1281        if self.config.ingress_policy.allows_all() {
1282            return true;
1283        }
1284        let Some(peer) = self.peers.get(peer_id) else {
1285            return false;
1286        };
1287        self.config.ingress_policy.allows(peer.peer_kind())
1288    }
1289
1290    /// Handles dedicated transaction events related to the `eth` protocol.
1291    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1292        match event {
1293            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1294                if !self.accepts_incoming_from(&peer_id) {
1295                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1296                    return;
1297                }
1298
1299                // ensure we didn't receive any blob transactions as these are disallowed to be
1300                // broadcasted in full
1301
1302                let has_blob_txs = msg.has_eip4844();
1303
1304                let non_blob_txs = msg
1305                    .into_iter()
1306                    .map(N::PooledTransaction::try_from)
1307                    .filter_map(Result::ok)
1308                    .collect();
1309
1310                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1311
1312                if has_blob_txs {
1313                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1314                    self.report_peer_bad_transactions(peer_id);
1315                }
1316            }
1317            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1318                if !self.accepts_incoming_from(&peer_id) {
1319                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1320                    return;
1321                }
1322                self.on_new_pooled_transaction_hashes(peer_id, msg)
1323            }
1324            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1325                self.on_get_pooled_transactions(peer_id, request, response)
1326            }
1327            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1328                let _ = response.send(Some(self.handle()));
1329            }
1330        }
1331    }
1332
1333    /// Starts the import process for the given transactions.
1334    fn import_transactions(
1335        &mut self,
1336        peer_id: PeerId,
1337        transactions: PooledTransactions<N::PooledTransaction>,
1338        source: TransactionSource,
1339    ) {
1340        // If the node is pipeline syncing, ignore transactions
1341        if self.network.is_initially_syncing() {
1342            return
1343        }
1344        if self.network.tx_gossip_disabled() {
1345            return
1346        }
1347
1348        // Early return if we don't have capacity for any imports
1349        if !self.has_capacity_for_pending_pool_imports() {
1350            return
1351        }
1352
1353        let mut transactions = transactions.0;
1354
1355        // Truncate to remaining capacity early to bound work on all subsequent processing.
1356        // Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
1357        // malicious and we avoid wasting CPU on them.
1358        let capacity = self.remaining_pool_import_capacity();
1359        if transactions.len() > capacity {
1360            let skipped = transactions.len() - capacity;
1361            transactions.truncate(capacity);
1362            self.metrics
1363                .skipped_transactions_pending_pool_imports_at_capacity
1364                .increment(skipped as u64);
1365            trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1366        }
1367
1368        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1369        let client_version = peer.client_version.clone();
1370
1371        let start = Instant::now();
1372
1373        // mark the transactions as received
1374        self.transaction_fetcher
1375            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1376
1377        // track that the peer knows these transaction, but only if this is a new broadcast.
1378        // If we received the transactions as the response to our `GetPooledTransactions``
1379        // requests (based on received `NewPooledTransactionHashes`) then we already
1380        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1381        let mut num_already_seen_by_peer = 0;
1382        for tx in &transactions {
1383            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1384                num_already_seen_by_peer += 1;
1385            }
1386        }
1387
1388        // tracks the quality of the given transactions
1389        let mut has_bad_transactions = false;
1390
1391        // 1. Remove known, already-tracked, and invalid transactions first since these are
1392        // cheap in-memory checks against local maps
1393        transactions.retain(|tx| {
1394            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1395                entry.get_mut().insert(peer_id);
1396                return false
1397            }
1398            if self.bad_imports.contains(tx.tx_hash()) {
1399                trace!(target: "net::tx",
1400                    peer_id=format!("{peer_id:#}"),
1401                    hash=%tx.tx_hash(),
1402                    %client_version,
1403                    "received a known bad transaction from peer"
1404                );
1405                has_bad_transactions = true;
1406                return false;
1407            }
1408            true
1409        });
1410
1411        // 2. filter out txns already inserted into pool
1412        let txns_count_pre_pool_filter = transactions.len();
1413        self.pool.retain_unknown(&mut transactions);
1414        if txns_count_pre_pool_filter > transactions.len() {
1415            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1416            self.metrics
1417                .occurrences_transactions_already_in_pool
1418                .increment(already_known_txns_count as u64);
1419        }
1420
1421        let txs_len = transactions.len();
1422
1423        let new_txs = transactions
1424            .into_par_iter()
1425            .filter_map(|tx| match tx.try_into_recovered() {
1426                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1427                Err(badtx) => {
1428                    trace!(target: "net::tx",
1429                        peer_id=format!("{peer_id:#}"),
1430                        hash=%badtx.tx_hash(),
1431                        client_version=%client_version,
1432                        "failed ecrecovery for transaction"
1433                    );
1434                    None
1435                }
1436            })
1437            .collect::<Vec<_>>();
1438
1439        has_bad_transactions |= new_txs.len() != txs_len;
1440
1441        // Record the transactions as seen by the peer
1442        for tx in &new_txs {
1443            self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1444        }
1445
1446        // 3. import new transactions as a batch to minimize lock contention on the underlying
1447        // pool
1448        if !new_txs.is_empty() {
1449            let pool = self.pool.clone();
1450            // update metrics
1451            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1452            metric_pending_pool_imports.increment(new_txs.len() as f64);
1453
1454            // update self-monitoring info
1455            self.pending_pool_imports_info
1456                .pending_pool_imports
1457                .fetch_add(new_txs.len(), Ordering::Relaxed);
1458            let tx_manager_info_pending_pool_imports =
1459                self.pending_pool_imports_info.pending_pool_imports.clone();
1460
1461            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1462            let import = Box::pin(async move {
1463                let added = new_txs.len();
1464                let res = pool.add_external_transactions(new_txs).await;
1465
1466                // update metrics
1467                metric_pending_pool_imports.decrement(added as f64);
1468                // update self-monitoring info
1469                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1470
1471                res
1472            });
1473
1474            self.pool_imports.push(import);
1475        }
1476
1477        if num_already_seen_by_peer > 0 {
1478            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1479            self.metrics
1480                .occurrences_of_transaction_already_seen_by_peer
1481                .increment(num_already_seen_by_peer);
1482            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1483        }
1484
1485        if has_bad_transactions {
1486            // peer sent us invalid transactions
1487            self.report_peer_bad_transactions(peer_id)
1488        }
1489
1490        if num_already_seen_by_peer > 0 {
1491            self.report_already_seen(peer_id);
1492        }
1493
1494        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1495    }
1496
1497    /// Processes a [`FetchEvent`].
1498    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1499        match fetch_event {
1500            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1501                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1502                if report_peer {
1503                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1504                }
1505            }
1506            FetchEvent::FetchError { peer_id, error } => {
1507                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1508                self.on_request_error(peer_id, error);
1509            }
1510            FetchEvent::EmptyResponse { peer_id } => {
1511                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1512            }
1513        }
1514    }
1515}
1516
1517/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1518/// [`crate::NetworkManager`] for more context on the design pattern.
1519///
1520/// This should be spawned or used as part of `tokio::select!`.
1521//
1522// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1523// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1524impl<
1525        Pool: TransactionPool + Unpin + 'static,
1526        N: NetworkPrimitives<
1527                BroadcastedTransaction: SignedTransaction,
1528                PooledTransaction: SignedTransaction,
1529            > + Unpin,
1530    > Future for TransactionsManager<Pool, N>
1531where
1532    Pool::Transaction:
1533        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1534{
1535    type Output = ();
1536
1537    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1538        let start = Instant::now();
1539        let mut poll_durations = TxManagerPollDurations::default();
1540
1541        let this = self.get_mut();
1542
1543        // All streams are polled until their corresponding budget is exhausted, then we manually
1544        // yield back control to tokio. See `NetworkManager` for more context on the design
1545        // pattern.
1546
1547        // Advance network/peer related events (update peers map).
1548        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1549            poll_durations.acc_network_events,
1550            "net::tx",
1551            "Network events stream",
1552            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1553            this.network_events.poll_next_unpin(cx),
1554            |event| this.on_network_event(event)
1555        );
1556
1557        // Advances new __pending__ transactions, transactions that were successfully inserted into
1558        // pending set in pool (are valid), and propagates them (inform peers which
1559        // transactions we have seen).
1560        //
1561        // We try to drain this to batch the transactions in a single message.
1562        //
1563        // We don't expect this buffer to be large, since only pending transactions are
1564        // emitted here.
1565        let mut new_txs = Vec::new();
1566        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1567            cx,
1568            &mut new_txs,
1569            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1570        ) {
1571            Poll::Ready(count) => {
1572                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1573                    // we filled the entire buffer capacity and need to try again on the next poll
1574                    // immediately
1575                    true
1576                } else {
1577                    // try once more, because mostlikely the channel is now empty and the waker is
1578                    // registered if this is pending, if we filled additional hashes, we poll again
1579                    // on the next iteration
1580                    let limit =
1581                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1582                            new_txs.len();
1583                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1584                }
1585            }
1586            Poll::Pending => false,
1587        };
1588        if !new_txs.is_empty() {
1589            this.on_new_pending_transactions(new_txs);
1590        }
1591
1592        // Advance incoming transaction events (stream new txns/announcements from
1593        // network manager and queue for import to pool/fetch txns).
1594        //
1595        // This will potentially remove hashes from hashes pending fetch, it the event
1596        // is an announcement (if same hashes are announced that didn't fit into a
1597        // previous request).
1598        //
1599        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1600        // (128 KiB / 10 bytes > 13k transactions).
1601        //
1602        // If this is an event with `Transactions` message, since transactions aren't
1603        // validated until they are inserted into the pool, this can potentially queue
1604        // >13k transactions for insertion to pool. More if the message size is bigger
1605        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1606        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1607            poll_durations.acc_tx_events,
1608            "net::tx",
1609            "Network transaction events stream",
1610            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1611            this.transaction_events.poll_next_unpin(cx),
1612            |event| this.on_network_tx_event(event),
1613        );
1614
1615        // Advance inflight fetch requests (flush transaction fetcher and queue for
1616        // import to pool).
1617        //
1618        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1619        // (2 MiB / 10 bytes > 200k transactions).
1620        //
1621        // Since transactions aren't validated until they are inserted into the pool,
1622        // this can potentially queue >200k transactions for insertion to pool. More
1623        // if the message size is bigger than the soft limit on a `PooledTransactions`
1624        // response which is 2 MiB.
1625        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1626            poll_durations.acc_fetch_events,
1627            "net::tx",
1628            "Transaction fetch events stream",
1629            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1630            this.transaction_fetcher.poll_next_unpin(cx),
1631            |event| this.on_fetch_event(event),
1632        );
1633
1634        // Advance pool imports (flush txns to pool).
1635        //
1636        // Note, this is done in batches. A batch is filled from one `Transactions`
1637        // broadcast messages or one `PooledTransactions` response at a time. The
1638        // minimum batch size is 1 transaction (and might often be the case with blob
1639        // transactions).
1640        //
1641        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1642        // (2 MiB / 10 bytes > 200k transactions).
1643        //
1644        // Since transactions aren't validated until they are inserted into the pool,
1645        // this can potentially validate >200k transactions. More if the message size
1646        // is bigger than the soft limit on a `PooledTransactions` response which is
1647        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1648        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1649            poll_durations.acc_pending_imports,
1650            "net::tx",
1651            "Batched pool imports stream",
1652            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1653            this.pool_imports.poll_next_unpin(cx),
1654            |batch_results| this.on_batch_import_result(batch_results)
1655        );
1656
1657        // Tries to drain hashes pending fetch cache if the tx manager currently has
1658        // capacity for this (fetch txns).
1659        //
1660        // Sends at most one request.
1661        duration_metered_exec!(
1662            {
1663                if this.has_capacity_for_fetching_pending_hashes() &&
1664                    this.on_fetch_hashes_pending_fetch()
1665                {
1666                    maybe_more_tx_fetch_events = true;
1667                }
1668            },
1669            poll_durations.acc_pending_fetch
1670        );
1671
1672        // Advance commands (propagate/fetch/serve txns).
1673        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1674            poll_durations.acc_cmds,
1675            "net::tx",
1676            "Commands channel",
1677            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1678            this.command_rx.poll_next_unpin(cx),
1679            |cmd| this.on_command(cmd)
1680        );
1681
1682        this.transaction_fetcher.update_metrics();
1683
1684        // all channels are fully drained and import futures pending
1685        if maybe_more_network_events ||
1686            maybe_more_commands ||
1687            maybe_more_tx_events ||
1688            maybe_more_tx_fetch_events ||
1689            maybe_more_pool_imports ||
1690            maybe_more_pending_txns
1691        {
1692            // make sure we're woken up again
1693            cx.waker().wake_by_ref();
1694            return Poll::Pending
1695        }
1696
1697        this.update_poll_metrics(start, poll_durations);
1698
1699        Poll::Pending
1700    }
1701}
1702
1703/// Represents the different modes of transaction propagation.
1704///
1705/// This enum is used to determine how transactions are propagated to peers in the network.
1706#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1707enum PropagationMode {
1708    /// Default propagation mode.
1709    ///
1710    /// Transactions are only sent to peers that haven't seen them yet.
1711    Basic,
1712    /// Forced propagation mode.
1713    ///
1714    /// Transactions are sent to all peers regardless of whether they have been sent or received
1715    /// before.
1716    Forced,
1717}
1718
1719impl PropagationMode {
1720    /// Returns `true` if the propagation kind is `Forced`.
1721    const fn is_forced(self) -> bool {
1722        matches!(self, Self::Forced)
1723    }
1724}
1725
1726/// A transaction that's about to be propagated to multiple peers.
1727#[derive(Debug, Clone)]
1728struct PropagateTransaction<T = TransactionSigned> {
1729    size: usize,
1730    transaction: Arc<T>,
1731}
1732
1733impl<T: SignedTransaction> PropagateTransaction<T> {
1734    /// Create a new instance from a transaction.
1735    pub fn new(transaction: T) -> Self {
1736        let size = transaction.length();
1737        Self { size, transaction: Arc::new(transaction) }
1738    }
1739
1740    /// Create a new instance from a pooled transaction
1741    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1742    where
1743        P: PoolTransaction<Consensus = T>,
1744    {
1745        let size = tx.encoded_length();
1746        let transaction = tx.transaction.clone_into_consensus();
1747        let transaction = Arc::new(transaction.into_inner());
1748        Self { size, transaction }
1749    }
1750
1751    fn tx_hash(&self) -> &TxHash {
1752        self.transaction.tx_hash()
1753    }
1754}
1755
1756/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1757/// should receive them in full or as pooled
1758#[derive(Debug, Clone)]
1759enum PropagateTransactionsBuilder<T> {
1760    Pooled(PooledTransactionsHashesBuilder),
1761    Full(FullTransactionsBuilder<T>),
1762}
1763
1764impl<T> PropagateTransactionsBuilder<T> {
1765    /// Create a builder for pooled transactions
1766    fn pooled(version: EthVersion) -> Self {
1767        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1768    }
1769
1770    /// Create a builder that sends transactions in full and records transactions that don't fit.
1771    fn full(version: EthVersion) -> Self {
1772        Self::Full(FullTransactionsBuilder::new(version))
1773    }
1774
1775    /// Returns true if no transactions are recorded.
1776    fn is_empty(&self) -> bool {
1777        match self {
1778            Self::Pooled(builder) => builder.is_empty(),
1779            Self::Full(builder) => builder.is_empty(),
1780        }
1781    }
1782
1783    /// Consumes the type and returns the built messages that should be sent to the peer.
1784    fn build(self) -> PropagateTransactions<T> {
1785        match self {
1786            Self::Pooled(pooled) => {
1787                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1788            }
1789            Self::Full(full) => full.build(),
1790        }
1791    }
1792}
1793
1794impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1795    /// Appends all transactions
1796    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1797        for tx in txs {
1798            self.push(tx);
1799        }
1800    }
1801
1802    /// Appends a transaction to the list.
1803    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1804        match self {
1805            Self::Pooled(builder) => builder.push(transaction),
1806            Self::Full(builder) => builder.push(transaction),
1807        }
1808    }
1809}
1810
1811/// Represents how the transactions should be sent to a peer if any.
1812struct PropagateTransactions<T> {
1813    /// The pooled transaction hashes to send.
1814    pooled: Option<NewPooledTransactionHashes>,
1815    /// The transactions to send in full.
1816    full: Option<Vec<Arc<T>>>,
1817}
1818
1819/// Helper type for constructing the full transaction message that enforces the
1820/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1821/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1822/// broadcasted in full.
1823#[derive(Debug, Clone)]
1824struct FullTransactionsBuilder<T> {
1825    /// The soft limit to enforce for a single broadcast message of full transactions.
1826    total_size: usize,
1827    /// All transactions to be broadcasted.
1828    transactions: Vec<Arc<T>>,
1829    /// Transactions that didn't fit into the broadcast message
1830    pooled: PooledTransactionsHashesBuilder,
1831}
1832
1833impl<T> FullTransactionsBuilder<T> {
1834    /// Create a builder for the negotiated version of the peer's session
1835    fn new(version: EthVersion) -> Self {
1836        Self {
1837            total_size: 0,
1838            pooled: PooledTransactionsHashesBuilder::new(version),
1839            transactions: vec![],
1840        }
1841    }
1842
1843    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1844    fn is_empty(&self) -> bool {
1845        self.transactions.is_empty() && self.pooled.is_empty()
1846    }
1847
1848    /// Returns the messages that should be propagated to the peer.
1849    fn build(self) -> PropagateTransactions<T> {
1850        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1851        let full = Some(self.transactions).filter(|full| !full.is_empty());
1852        PropagateTransactions { pooled, full }
1853    }
1854}
1855
1856impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1857    /// Appends all transactions.
1858    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1859        for tx in txs {
1860            self.push(&tx)
1861        }
1862    }
1863
1864    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1865    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1866    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1867    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1868    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1869    ///
1870    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1871    /// to list of pooled transactions, (e.g. 4844 transactions).
1872    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1873    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1874        // Do not send full 4844 transaction hashes to peers.
1875        //
1876        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1877        //  Instead, those transactions are only announced using
1878        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1879        //  via `GetPooledTransactions`.
1880        //
1881        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1882        if !transaction.transaction.is_broadcastable_in_full() {
1883            self.pooled.push(transaction);
1884            return
1885        }
1886
1887        let new_size = self.total_size + transaction.size;
1888        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1889            self.total_size > 0
1890        {
1891            // transaction does not fit into the message
1892            self.pooled.push(transaction);
1893            return
1894        }
1895
1896        self.total_size = new_size;
1897        self.transactions.push(Arc::clone(&transaction.transaction));
1898    }
1899}
1900
1901/// A helper type to create the pooled transactions message based on the negotiated version of the
1902/// session with the peer
1903#[derive(Debug, Clone)]
1904enum PooledTransactionsHashesBuilder {
1905    Eth66(NewPooledTransactionHashes66),
1906    Eth68(NewPooledTransactionHashes68),
1907}
1908
1909// === impl PooledTransactionsHashesBuilder ===
1910
1911impl PooledTransactionsHashesBuilder {
1912    /// Push a transaction from the pool to the list.
1913    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1914        match self {
1915            Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1916            Self::Eth68(msg) => {
1917                msg.hashes.push(*pooled_tx.hash());
1918                msg.sizes.push(pooled_tx.encoded_length());
1919                msg.types.push(pooled_tx.transaction.ty());
1920            }
1921        }
1922    }
1923
1924    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1925    fn is_empty(&self) -> bool {
1926        match self {
1927            Self::Eth66(hashes) => hashes.is_empty(),
1928            Self::Eth68(hashes) => hashes.is_empty(),
1929        }
1930    }
1931
1932    /// Returns the number of transactions in the builder.
1933    fn len(&self) -> usize {
1934        match self {
1935            Self::Eth66(hashes) => hashes.len(),
1936            Self::Eth68(hashes) => hashes.len(),
1937        }
1938    }
1939
1940    /// Appends all hashes
1941    fn extend<T: SignedTransaction>(
1942        &mut self,
1943        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1944    ) {
1945        for tx in txs {
1946            self.push(&tx);
1947        }
1948    }
1949
1950    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1951        match self {
1952            Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1953            Self::Eth68(msg) => {
1954                msg.hashes.push(*tx.tx_hash());
1955                msg.sizes.push(tx.size);
1956                msg.types.push(tx.transaction.ty());
1957            }
1958        }
1959    }
1960
1961    /// Create a builder for the negotiated version of the peer's session
1962    fn new(version: EthVersion) -> Self {
1963        match version {
1964            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1965            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1966                Self::Eth68(Default::default())
1967            }
1968        }
1969    }
1970
1971    fn build(self) -> NewPooledTransactionHashes {
1972        match self {
1973            Self::Eth66(mut msg) => {
1974                msg.shrink_to_fit();
1975                msg.into()
1976            }
1977            Self::Eth68(mut msg) => {
1978                msg.shrink_to_fit();
1979                msg.into()
1980            }
1981        }
1982    }
1983}
1984
1985/// How we received the transactions.
1986enum TransactionSource {
1987    /// Transactions were broadcast to us via [`Transactions`] message.
1988    Broadcast,
1989    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1990    Response,
1991}
1992
1993// === impl TransactionSource ===
1994
1995impl TransactionSource {
1996    /// Whether the transaction were sent as broadcast.
1997    const fn is_broadcast(&self) -> bool {
1998        matches!(self, Self::Broadcast)
1999    }
2000}
2001
2002/// Tracks a single peer in the context of [`TransactionsManager`].
2003#[derive(Debug)]
2004pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2005    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
2006    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
2007    /// the peer.
2008    seen_transactions: LruCache<TxHash>,
2009    /// A communication channel directly to the peer's session task.
2010    request_tx: PeerRequestSender<PeerRequest<N>>,
2011    /// negotiated version of the session.
2012    version: EthVersion,
2013    /// The peer's client version.
2014    client_version: Arc<str>,
2015    /// The kind of peer.
2016    peer_kind: PeerKind,
2017}
2018
2019impl<N: NetworkPrimitives> PeerMetadata<N> {
2020    /// Returns a new instance of [`PeerMetadata`].
2021    pub fn new(
2022        request_tx: PeerRequestSender<PeerRequest<N>>,
2023        version: EthVersion,
2024        client_version: Arc<str>,
2025        max_transactions_seen_by_peer: u32,
2026        peer_kind: PeerKind,
2027    ) -> Self {
2028        Self {
2029            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2030            request_tx,
2031            version,
2032            client_version,
2033            peer_kind,
2034        }
2035    }
2036
2037    /// Returns a reference to the peer's request sender channel.
2038    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2039        &self.request_tx
2040    }
2041
2042    /// Returns a mutable reference to the seen transactions LRU cache.
2043    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2044        &mut self.seen_transactions
2045    }
2046
2047    /// Returns the negotiated `EthVersion` of the session.
2048    pub const fn version(&self) -> EthVersion {
2049        self.version
2050    }
2051
2052    /// Returns a reference to the peer's client version string.
2053    pub fn client_version(&self) -> &str {
2054        &self.client_version
2055    }
2056
2057    /// Returns the peer's kind.
2058    pub const fn peer_kind(&self) -> PeerKind {
2059        self.peer_kind
2060    }
2061}
2062
2063/// Commands to send to the [`TransactionsManager`]
2064#[derive(Debug)]
2065enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2066    /// Propagate a transaction hash to the network.
2067    PropagateHash(B256),
2068    /// Propagate transaction hashes to a specific peer.
2069    PropagateHashesTo(Vec<B256>, PeerId),
2070    /// Request the list of active peer IDs from the [`TransactionsManager`].
2071    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2072    /// Propagate a collection of full transactions to a specific peer.
2073    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2074    /// Propagate a collection of hashes to all peers.
2075    PropagateTransactions(Vec<TxHash>),
2076    /// Propagate a collection of broadcastable transactions in full to all peers.
2077    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2078    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2079    GetTransactionHashes {
2080        peers: Vec<PeerId>,
2081        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2082    },
2083    /// Requests a clone of the sender channel to the peer.
2084    GetPeerSender {
2085        peer_id: PeerId,
2086        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2087    },
2088}
2089
2090/// All events related to transactions emitted by the network.
2091#[derive(Debug)]
2092pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2093    /// Represents the event of receiving a list of transactions from a peer.
2094    ///
2095    /// This indicates transactions that were broadcasted to us from the peer.
2096    IncomingTransactions {
2097        /// The ID of the peer from which the transactions were received.
2098        peer_id: PeerId,
2099        /// The received transactions.
2100        msg: Transactions<N::BroadcastedTransaction>,
2101    },
2102    /// Represents the event of receiving a list of transaction hashes from a peer.
2103    IncomingPooledTransactionHashes {
2104        /// The ID of the peer from which the transaction hashes were received.
2105        peer_id: PeerId,
2106        /// The received new pooled transaction hashes.
2107        msg: NewPooledTransactionHashes,
2108    },
2109    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2110    GetPooledTransactions {
2111        /// The ID of the peer from which the request was received.
2112        peer_id: PeerId,
2113        /// The received `GetPooledTransactions` request.
2114        request: GetPooledTransactions,
2115        /// The sender for responding to the request with a result of `PooledTransactions`.
2116        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2117    },
2118    /// Represents the event of receiving a `GetTransactionsHandle` request.
2119    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2120}
2121
2122/// Tracks stats about the [`TransactionsManager`].
2123#[derive(Debug)]
2124pub struct PendingPoolImportsInfo {
2125    /// Number of transactions about to be inserted into the pool.
2126    pending_pool_imports: Arc<AtomicUsize>,
2127    /// Max number of transactions allowed to be imported concurrently.
2128    max_pending_pool_imports: usize,
2129}
2130
2131impl PendingPoolImportsInfo {
2132    /// Returns a new [`PendingPoolImportsInfo`].
2133    pub fn new(max_pending_pool_imports: usize) -> Self {
2134        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2135    }
2136
2137    /// Returns `true` if the number of pool imports is under a given tolerated max.
2138    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2139        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2140    }
2141}
2142
2143impl Default for PendingPoolImportsInfo {
2144    fn default() -> Self {
2145        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2146    }
2147}
2148
2149#[derive(Debug, Default)]
2150struct TxManagerPollDurations {
2151    acc_network_events: Duration,
2152    acc_pending_imports: Duration,
2153    acc_tx_events: Duration,
2154    acc_imported_txns: Duration,
2155    acc_fetch_events: Duration,
2156    acc_pending_fetch: Duration,
2157    acc_cmds: Duration,
2158}
2159
2160#[cfg(test)]
2161mod tests {
2162    use super::*;
2163    use crate::{
2164        test_utils::{
2165            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2166            Testnet,
2167        },
2168        transactions::config::RelaxedEthAnnouncementFilter,
2169        NetworkConfigBuilder, NetworkManager,
2170    };
2171    use alloy_consensus::{TxEip1559, TxLegacy};
2172    use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2173    use alloy_rlp::Decodable;
2174    use futures::FutureExt;
2175    use reth_chainspec::MIN_TRANSACTION_GAS;
2176    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2177    use reth_network_api::{NetworkInfo, PeerKind};
2178    use reth_network_p2p::{
2179        error::{RequestError, RequestResult},
2180        sync::{NetworkSyncUpdater, SyncState},
2181    };
2182    use reth_storage_api::noop::NoopProvider;
2183    use reth_tasks::Runtime;
2184    use reth_transaction_pool::test_utils::{
2185        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2186    };
2187    use secp256k1::SecretKey;
2188    use std::{
2189        future::poll_fn,
2190        net::{IpAddr, Ipv4Addr, SocketAddr},
2191        str::FromStr,
2192    };
2193    use tracing::error;
2194
2195    #[tokio::test(flavor = "multi_thread")]
2196    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2197        reth_tracing::init_test_tracing();
2198        let net = Testnet::create(3).await;
2199
2200        let mut handles = net.handles();
2201        let handle0 = handles.next().unwrap();
2202        let handle1 = handles.next().unwrap();
2203
2204        drop(handles);
2205        let handle = net.spawn();
2206
2207        let listener0 = handle0.event_listener();
2208        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2209        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2210
2211        let client = NoopProvider::default();
2212        let pool = testing_pool();
2213        let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2214            .disable_discovery()
2215            .listener_port(0)
2216            .build(client);
2217        let transactions_manager_config = config.transactions_manager_config.clone();
2218        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2219            .await
2220            .unwrap()
2221            .into_builder()
2222            .transactions(pool.clone(), transactions_manager_config)
2223            .split_with_handle();
2224
2225        tokio::task::spawn(network);
2226
2227        // go to syncing (pipeline sync)
2228        network_handle.update_sync_state(SyncState::Syncing);
2229        assert!(NetworkInfo::is_syncing(&network_handle));
2230        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2231
2232        // wait for all initiator connections
2233        let mut established = listener0.take(2);
2234        while let Some(ev) = established.next().await {
2235            match ev {
2236                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2237                    // to insert a new peer in transactions peerset
2238                    transactions
2239                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2240                }
2241                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2242                ev => {
2243                    error!("unexpected event {ev:?}")
2244                }
2245            }
2246        }
2247        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2248        let input = hex!(
2249            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2250        );
2251        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2252        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2253            peer_id: *handle1.peer_id(),
2254            msg: Transactions(vec![signed_tx.clone()]),
2255        });
2256        poll_fn(|cx| {
2257            let _ = transactions.poll_unpin(cx);
2258            Poll::Ready(())
2259        })
2260        .await;
2261        assert!(pool.is_empty());
2262        handle.terminate().await;
2263    }
2264
2265    #[tokio::test(flavor = "multi_thread")]
2266    async fn test_tx_broadcasts_through_two_syncs() {
2267        reth_tracing::init_test_tracing();
2268        let net = Testnet::create(3).await;
2269
2270        let mut handles = net.handles();
2271        let handle0 = handles.next().unwrap();
2272        let handle1 = handles.next().unwrap();
2273
2274        drop(handles);
2275        let handle = net.spawn();
2276
2277        let listener0 = handle0.event_listener();
2278        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2279        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2280
2281        let client = NoopProvider::default();
2282        let pool = testing_pool();
2283        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2284            .disable_discovery()
2285            .listener_port(0)
2286            .build(client);
2287        let transactions_manager_config = config.transactions_manager_config.clone();
2288        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2289            .await
2290            .unwrap()
2291            .into_builder()
2292            .transactions(pool.clone(), transactions_manager_config)
2293            .split_with_handle();
2294
2295        tokio::task::spawn(network);
2296
2297        // go to syncing (pipeline sync) to idle and then to syncing (live)
2298        network_handle.update_sync_state(SyncState::Syncing);
2299        assert!(NetworkInfo::is_syncing(&network_handle));
2300        network_handle.update_sync_state(SyncState::Idle);
2301        assert!(!NetworkInfo::is_syncing(&network_handle));
2302        network_handle.update_sync_state(SyncState::Syncing);
2303        assert!(NetworkInfo::is_syncing(&network_handle));
2304
2305        // wait for all initiator connections
2306        let mut established = listener0.take(2);
2307        while let Some(ev) = established.next().await {
2308            match ev {
2309                NetworkEvent::ActivePeerSession { .. } |
2310                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2311                    // to insert a new peer in transactions peerset
2312                    transactions.on_network_event(ev);
2313                }
2314                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2315                _ => {
2316                    error!("unexpected event {ev:?}")
2317                }
2318            }
2319        }
2320        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2321        let input = hex!(
2322            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2323        );
2324        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2325        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2326            peer_id: *handle1.peer_id(),
2327            msg: Transactions(vec![signed_tx.clone()]),
2328        });
2329        poll_fn(|cx| {
2330            let _ = transactions.poll_unpin(cx);
2331            Poll::Ready(())
2332        })
2333        .await;
2334        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2335        assert!(NetworkInfo::is_syncing(&network_handle));
2336        assert!(!pool.is_empty());
2337        handle.terminate().await;
2338    }
2339
2340    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2341    // event and is able to retrieve the corresponding transactions.
2342    #[tokio::test(flavor = "multi_thread")]
2343    async fn test_handle_incoming_transactions_hashes() {
2344        reth_tracing::init_test_tracing();
2345
2346        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2347        let client = NoopProvider::default();
2348
2349        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2350            // let OS choose port
2351            .listener_port(0)
2352            .disable_discovery()
2353            .build(client);
2354
2355        let pool = testing_pool();
2356
2357        let transactions_manager_config = config.transactions_manager_config.clone();
2358        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2359            .await
2360            .unwrap()
2361            .into_builder()
2362            .transactions(pool.clone(), transactions_manager_config)
2363            .split_with_handle();
2364
2365        let peer_id_1 = PeerId::new([1; 64]);
2366        let eth_version = EthVersion::Eth66;
2367
2368        let txs = vec![TransactionSigned::new_unhashed(
2369            Transaction::Legacy(TxLegacy {
2370                chain_id: Some(4),
2371                nonce: 15u64,
2372                gas_price: 2200000000,
2373                gas_limit: 34811,
2374                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2375                value: U256::from(1234u64),
2376                input: Default::default(),
2377            }),
2378            Signature::new(
2379                U256::from_str(
2380                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2381                )
2382                .unwrap(),
2383                U256::from_str(
2384                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2385                )
2386                .unwrap(),
2387                true,
2388            ),
2389        )];
2390
2391        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2392
2393        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2394        tx_manager.peers.insert(peer_id_1, peer_1);
2395
2396        assert!(pool.is_empty());
2397
2398        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2399            peer_id: peer_id_1,
2400            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2401                txs_hashes.clone(),
2402            )),
2403        });
2404
2405        // mock session of peer_1 receives request
2406        let req = to_mock_session_rx
2407            .recv()
2408            .await
2409            .expect("peer_1 session should receive request with buffered hashes");
2410        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2411        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2412
2413        let message: Vec<PooledTransactionVariant> = txs
2414            .into_iter()
2415            .map(|tx| {
2416                PooledTransactionVariant::try_from(tx)
2417                    .expect("Failed to convert MockTransaction to PooledTransaction")
2418            })
2419            .collect();
2420
2421        // return the transactions corresponding to the transaction hashes.
2422        response
2423            .send(Ok(PooledTransactions(message)))
2424            .expect("should send peer_1 response to tx manager");
2425
2426        // adance the transaction manager future
2427        poll_fn(|cx| {
2428            let _ = tx_manager.poll_unpin(cx);
2429            Poll::Ready(())
2430        })
2431        .await;
2432
2433        // ensure that the transactions corresponding to the transaction hashes have been
2434        // successfully retrieved and stored in the Pool.
2435        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2436    }
2437
2438    #[tokio::test(flavor = "multi_thread")]
2439    async fn test_handle_incoming_transactions() {
2440        reth_tracing::init_test_tracing();
2441        let net = Testnet::create(3).await;
2442
2443        let mut handles = net.handles();
2444        let handle0 = handles.next().unwrap();
2445        let handle1 = handles.next().unwrap();
2446
2447        drop(handles);
2448        let handle = net.spawn();
2449
2450        let listener0 = handle0.event_listener();
2451
2452        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2453        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2454
2455        let client = NoopProvider::default();
2456        let pool = testing_pool();
2457        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2458            .disable_discovery()
2459            .listener_port(0)
2460            .build(client);
2461        let transactions_manager_config = config.transactions_manager_config.clone();
2462        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2463            .await
2464            .unwrap()
2465            .into_builder()
2466            .transactions(pool.clone(), transactions_manager_config)
2467            .split_with_handle();
2468        tokio::task::spawn(network);
2469
2470        network_handle.update_sync_state(SyncState::Idle);
2471
2472        assert!(!NetworkInfo::is_syncing(&network_handle));
2473
2474        // wait for all initiator connections
2475        let mut established = listener0.take(2);
2476        while let Some(ev) = established.next().await {
2477            match ev {
2478                NetworkEvent::ActivePeerSession { .. } |
2479                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2480                    // to insert a new peer in transactions peerset
2481                    transactions.on_network_event(ev);
2482                }
2483                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2484                ev => {
2485                    error!("unexpected event {ev:?}")
2486                }
2487            }
2488        }
2489        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2490        let input = hex!(
2491            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2492        );
2493        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2494        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2495            peer_id: *handle1.peer_id(),
2496            msg: Transactions(vec![signed_tx.clone()]),
2497        });
2498        assert!(transactions
2499            .transactions_by_peers
2500            .get(signed_tx.tx_hash())
2501            .unwrap()
2502            .contains(handle1.peer_id()));
2503
2504        // advance the transaction manager future
2505        poll_fn(|cx| {
2506            let _ = transactions.poll_unpin(cx);
2507            Poll::Ready(())
2508        })
2509        .await;
2510
2511        assert!(!pool.is_empty());
2512        assert!(pool.get(signed_tx.tx_hash()).is_some());
2513        handle.terminate().await;
2514    }
2515
2516    #[tokio::test(flavor = "multi_thread")]
2517    async fn test_session_closed_cleans_transaction_peer_state() {
2518        let (mut tx_manager, _network) = new_tx_manager().await;
2519        let peer_id = PeerId::new([1; 64]);
2520        let fallback_peer = PeerId::new([2; 64]);
2521        let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2522        let hash_shared = B256::from_slice(&[1; 32]);
2523
2524        tx_manager.peers.insert(peer_id, peer);
2525        buffer_hash_to_tx_fetcher(
2526            &mut tx_manager.transaction_fetcher,
2527            hash_shared,
2528            peer_id,
2529            0,
2530            None,
2531        );
2532        buffer_hash_to_tx_fetcher(
2533            &mut tx_manager.transaction_fetcher,
2534            hash_shared,
2535            fallback_peer,
2536            0,
2537            None,
2538        );
2539        tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2540
2541        tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2542            peer_id,
2543            reason: None,
2544        }));
2545
2546        // peer removed from peers map and active_peers
2547        assert!(!tx_manager.peers.contains_key(&peer_id));
2548        assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2549        // fallback peer is still available for the hash
2550        assert_eq!(
2551            tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2552            Some(&fallback_peer)
2553        );
2554    }
2555
2556    #[tokio::test(flavor = "multi_thread")]
2557    async fn test_on_get_pooled_transactions_network() {
2558        reth_tracing::init_test_tracing();
2559        let net = Testnet::create(2).await;
2560
2561        let mut handles = net.handles();
2562        let handle0 = handles.next().unwrap();
2563        let handle1 = handles.next().unwrap();
2564
2565        drop(handles);
2566        let handle = net.spawn();
2567
2568        let listener0 = handle0.event_listener();
2569
2570        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2571        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2572
2573        let client = NoopProvider::default();
2574        let pool = testing_pool();
2575        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2576            .disable_discovery()
2577            .listener_port(0)
2578            .build(client);
2579        let transactions_manager_config = config.transactions_manager_config.clone();
2580        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2581            .await
2582            .unwrap()
2583            .into_builder()
2584            .transactions(pool.clone(), transactions_manager_config)
2585            .split_with_handle();
2586        tokio::task::spawn(network);
2587
2588        network_handle.update_sync_state(SyncState::Idle);
2589
2590        assert!(!NetworkInfo::is_syncing(&network_handle));
2591
2592        // wait for all initiator connections
2593        let mut established = listener0.take(2);
2594        while let Some(ev) = established.next().await {
2595            match ev {
2596                NetworkEvent::ActivePeerSession { .. } |
2597                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2598                    transactions.on_network_event(ev);
2599                }
2600                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2601                ev => {
2602                    error!("unexpected event {ev:?}")
2603                }
2604            }
2605        }
2606        handle.terminate().await;
2607
2608        let tx = MockTransaction::eip1559();
2609        let _ = transactions
2610            .pool
2611            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2612            .await;
2613
2614        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2615
2616        let (send, receive) =
2617            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2618
2619        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2620            peer_id: *handle1.peer_id(),
2621            request,
2622            response: send,
2623        });
2624
2625        match receive.await.unwrap() {
2626            Ok(PooledTransactions(transactions)) => {
2627                assert_eq!(transactions.len(), 1);
2628            }
2629            Err(e) => {
2630                panic!("error: {e:?}");
2631            }
2632        }
2633    }
2634
2635    // Ensure that when the remote peer only returns part of the requested transactions, the
2636    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2637    // re-buffered.
2638    #[tokio::test]
2639    async fn test_partially_tx_response() {
2640        reth_tracing::init_test_tracing();
2641
2642        let mut tx_manager = new_tx_manager().await.0;
2643        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2644
2645        let peer_id_1 = PeerId::new([1; 64]);
2646        let eth_version = EthVersion::Eth66;
2647
2648        let txs = vec![
2649            TransactionSigned::new_unhashed(
2650                Transaction::Legacy(TxLegacy {
2651                    chain_id: Some(4),
2652                    nonce: 15u64,
2653                    gas_price: 2200000000,
2654                    gas_limit: 34811,
2655                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2656                    value: U256::from(1234u64),
2657                    input: Default::default(),
2658                }),
2659                Signature::new(
2660                    U256::from_str(
2661                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2662                    )
2663                    .unwrap(),
2664                    U256::from_str(
2665                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2666                    )
2667                    .unwrap(),
2668                    true,
2669                ),
2670            ),
2671            TransactionSigned::new_unhashed(
2672                Transaction::Eip1559(TxEip1559 {
2673                    chain_id: 4,
2674                    nonce: 26u64,
2675                    max_priority_fee_per_gas: 1500000000,
2676                    max_fee_per_gas: 1500000013,
2677                    gas_limit: MIN_TRANSACTION_GAS,
2678                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2679                    value: U256::from(3000000000000000000u64),
2680                    input: Default::default(),
2681                    access_list: Default::default(),
2682                }),
2683                Signature::new(
2684                    U256::from_str(
2685                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2686                    )
2687                    .unwrap(),
2688                    U256::from_str(
2689                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2690                    )
2691                    .unwrap(),
2692                    true,
2693                ),
2694            ),
2695        ];
2696
2697        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2698
2699        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2700        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2701        // fetch
2702        peer_1.seen_transactions.insert(txs_hashes[0]);
2703        peer_1.seen_transactions.insert(txs_hashes[1]);
2704        tx_manager.peers.insert(peer_id_1, peer_1);
2705
2706        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2707        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2708
2709        // peer_1 is idle
2710        assert!(tx_fetcher.is_idle(&peer_id_1));
2711        assert_eq!(tx_fetcher.active_peers.len(), 0);
2712
2713        // sends requests for buffered hashes to peer_1
2714        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2715
2716        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2717        // as long as request is in flight peer_1 is not idle
2718        assert!(!tx_fetcher.is_idle(&peer_id_1));
2719        assert_eq!(tx_fetcher.active_peers.len(), 1);
2720
2721        // mock session of peer_1 receives request
2722        let req = to_mock_session_rx
2723            .recv()
2724            .await
2725            .expect("peer_1 session should receive request with buffered hashes");
2726        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2727
2728        let message: Vec<PooledTransactionVariant> = txs
2729            .into_iter()
2730            .take(1)
2731            .map(|tx| {
2732                PooledTransactionVariant::try_from(tx)
2733                    .expect("Failed to convert MockTransaction to PooledTransaction")
2734            })
2735            .collect();
2736        // response partial request
2737        response
2738            .send(Ok(PooledTransactions(message)))
2739            .expect("should send peer_1 response to tx manager");
2740        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2741            unreachable!()
2742        };
2743
2744        // request has resolved, peer_1 is idle again
2745        assert!(tx_fetcher.is_idle(&peer_id));
2746        assert_eq!(tx_fetcher.active_peers.len(), 0);
2747        // failing peer_1's request buffers requested hashes for retry.
2748        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2749    }
2750
2751    #[tokio::test]
2752    async fn test_max_retries_tx_request() {
2753        reth_tracing::init_test_tracing();
2754
2755        let mut tx_manager = new_tx_manager().await.0;
2756        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2757
2758        let peer_id_1 = PeerId::new([1; 64]);
2759        let peer_id_2 = PeerId::new([2; 64]);
2760        let eth_version = EthVersion::Eth66;
2761        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2762
2763        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2764        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2765        // fetch
2766        peer_1.seen_transactions.insert(seen_hashes[0]);
2767        peer_1.seen_transactions.insert(seen_hashes[1]);
2768        tx_manager.peers.insert(peer_id_1, peer_1);
2769
2770        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2771        // for first retry in reverse order to make index 0 lru
2772        let retries = 1;
2773        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2774        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2775
2776        // peer_1 is idle
2777        assert!(tx_fetcher.is_idle(&peer_id_1));
2778        assert_eq!(tx_fetcher.active_peers.len(), 0);
2779
2780        // sends request for buffered hashes to peer_1
2781        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2782
2783        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2784
2785        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2786        // as long as request is in inflight peer_1 is not idle
2787        assert!(!tx_fetcher.is_idle(&peer_id_1));
2788        assert_eq!(tx_fetcher.active_peers.len(), 1);
2789
2790        // mock session of peer_1 receives request
2791        let req = to_mock_session_rx
2792            .recv()
2793            .await
2794            .expect("peer_1 session should receive request with buffered hashes");
2795        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2796        let GetPooledTransactions(hashes) = request;
2797
2798        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2799
2800        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2801
2802        // fail request to peer_1
2803        response
2804            .send(Err(RequestError::BadResponse))
2805            .expect("should send peer_1 response to tx manager");
2806        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2807            unreachable!()
2808        };
2809
2810        // request has resolved, peer_1 is idle again
2811        assert!(tx_fetcher.is_idle(&peer_id));
2812        assert_eq!(tx_fetcher.active_peers.len(), 0);
2813        // failing peer_1's request buffers requested hashes for retry
2814        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2815
2816        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2817        tx_manager.peers.insert(peer_id_2, peer_2);
2818
2819        // peer_2 announces same hashes as peer_1
2820        let msg =
2821            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2822        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2823
2824        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2825
2826        // peer_2 should be in active_peers.
2827        assert_eq!(tx_fetcher.active_peers.len(), 1);
2828
2829        // since hashes are already seen, no changes to length of unknown hashes
2830        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2831        // but hashes are taken out of buffer and packed into request to peer_2
2832        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2833
2834        // mock session of peer_2 receives request
2835        let req = to_mock_session_rx
2836            .recv()
2837            .await
2838            .expect("peer_2 session should receive request with buffered hashes");
2839        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2840
2841        // report failed request to tx manager
2842        response
2843            .send(Err(RequestError::BadResponse))
2844            .expect("should send peer_2 response to tx manager");
2845        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2846
2847        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2848        // for retry
2849        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2850        assert_eq!(tx_fetcher.active_peers.len(), 0);
2851    }
2852
2853    #[test]
2854    fn test_transaction_builder_empty() {
2855        let mut builder =
2856            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2857        assert!(builder.is_empty());
2858
2859        let mut factory = MockTransactionFactory::default();
2860        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2861        builder.push(&tx);
2862        assert!(!builder.is_empty());
2863
2864        let txs = builder.build();
2865        assert!(txs.full.is_none());
2866        let txs = txs.pooled.unwrap();
2867        assert_eq!(txs.len(), 1);
2868    }
2869
2870    #[test]
2871    fn test_transaction_builder_large() {
2872        let mut builder =
2873            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2874        assert!(builder.is_empty());
2875
2876        let mut factory = MockTransactionFactory::default();
2877        let mut tx = factory.create_eip1559();
2878        // create a transaction that still fits
2879        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2880        let tx = Arc::new(tx);
2881        let tx = PropagateTransaction::pool_tx(tx);
2882        builder.push(&tx);
2883        assert!(!builder.is_empty());
2884
2885        let txs = builder.clone().build();
2886        assert!(txs.pooled.is_none());
2887        let txs = txs.full.unwrap();
2888        assert_eq!(txs.len(), 1);
2889
2890        builder.push(&tx);
2891
2892        let txs = builder.clone().build();
2893        let pooled = txs.pooled.unwrap();
2894        assert_eq!(pooled.len(), 1);
2895        let txs = txs.full.unwrap();
2896        assert_eq!(txs.len(), 1);
2897    }
2898
2899    #[test]
2900    fn test_transaction_builder_eip4844() {
2901        let mut builder =
2902            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2903        assert!(builder.is_empty());
2904
2905        let mut factory = MockTransactionFactory::default();
2906        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2907        builder.push(&tx);
2908        assert!(!builder.is_empty());
2909
2910        let txs = builder.clone().build();
2911        assert!(txs.full.is_none());
2912        let txs = txs.pooled.unwrap();
2913        assert_eq!(txs.len(), 1);
2914
2915        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2916        builder.push(&tx);
2917
2918        let txs = builder.clone().build();
2919        let pooled = txs.pooled.unwrap();
2920        assert_eq!(pooled.len(), 1);
2921        let txs = txs.full.unwrap();
2922        assert_eq!(txs.len(), 1);
2923    }
2924
2925    #[tokio::test]
2926    async fn test_propagate_full() {
2927        reth_tracing::init_test_tracing();
2928
2929        let (mut tx_manager, network) = new_tx_manager().await;
2930        let peer_id = PeerId::random();
2931
2932        // ensure not syncing
2933        network.handle().update_sync_state(SyncState::Idle);
2934
2935        // mock a peer
2936        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2937
2938        let session_info = SessionInfo {
2939            peer_id,
2940            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2941            client_version: Arc::from(""),
2942            capabilities: Arc::new(vec![].into()),
2943            status: Arc::new(Default::default()),
2944            version: EthVersion::Eth68,
2945            peer_kind: PeerKind::Basic,
2946        };
2947        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2948        tx_manager
2949            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2950        let mut propagate = vec![];
2951        let mut factory = MockTransactionFactory::default();
2952        let eip1559_tx = Arc::new(factory.create_eip1559());
2953        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2954        let eip4844_tx = Arc::new(factory.create_eip4844());
2955        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2956
2957        let propagated =
2958            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2959        assert_eq!(propagated.len(), 2);
2960        let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
2961        assert_eq!(prop_txs.len(), 1);
2962        assert!(prop_txs[0].is_full());
2963
2964        let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
2965        assert_eq!(prop_txs.len(), 1);
2966        assert!(prop_txs[0].is_hash());
2967
2968        let peer = tx_manager.peers.get(&peer_id).unwrap();
2969        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2970        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2971        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2972
2973        // propagate again
2974        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2975        assert!(propagated.is_empty());
2976    }
2977
2978    #[tokio::test]
2979    async fn test_propagate_pending_txs_while_initially_syncing() {
2980        reth_tracing::init_test_tracing();
2981
2982        let (mut tx_manager, network) = new_tx_manager().await;
2983        let peer_id = PeerId::random();
2984
2985        // Keep the node in initial sync mode.
2986        network.handle().update_sync_state(SyncState::Syncing);
2987        assert!(NetworkInfo::is_initially_syncing(&network.handle()));
2988
2989        // Add a peer so propagation has a destination.
2990        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
2991        tx_manager.peers.insert(peer_id, peer);
2992
2993        let tx = MockTransaction::eip1559();
2994        tx_manager
2995            .pool
2996            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2997            .await
2998            .expect("transaction should be accepted into the pool");
2999
3000        tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3001
3002        let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3003        assert!(peer.seen_transactions.contains(tx.get_hash()));
3004    }
3005
3006    #[tokio::test]
3007    async fn test_relaxed_filter_ignores_unknown_tx_types() {
3008        reth_tracing::init_test_tracing();
3009
3010        let transactions_manager_config = TransactionsManagerConfig::default();
3011
3012        let propagation_policy = TransactionPropagationKind::default();
3013        let announcement_policy = RelaxedEthAnnouncementFilter::default();
3014
3015        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3016
3017        let pool = testing_pool();
3018        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3019        let client = NoopProvider::default();
3020
3021        let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3022            .listener_port(0)
3023            .disable_discovery()
3024            .build(client.clone());
3025
3026        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3027        let (to_tx_manager_tx, from_network_rx) =
3028            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
3029        network_manager.set_transactions(to_tx_manager_tx);
3030        let network_handle = network_manager.handle().clone();
3031        let network_service_handle = tokio::spawn(network_manager);
3032
3033        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3034            network_handle.clone(),
3035            pool.clone(),
3036            from_network_rx,
3037            transactions_manager_config,
3038            policy_bundle,
3039        );
3040
3041        let peer_id = PeerId::random();
3042        let eth_version = EthVersion::Eth68;
3043        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3044        tx_manager.peers.insert(peer_id, mock_peer_metadata);
3045
3046        let mut tx_factory = MockTransactionFactory::default();
3047
3048        let valid_known_tx = tx_factory.create_eip1559();
3049        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3050
3051        let known_tx_hash = *known_tx_signed.hash();
3052        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3053        let known_tx_size = known_tx_signed.encoded_length();
3054
3055        let unknown_tx_hash = B256::random();
3056        let unknown_tx_type_byte = 0xff_u8;
3057        let unknown_tx_size = 150;
3058
3059        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3060            types: vec![known_tx_type_byte, unknown_tx_type_byte],
3061            sizes: vec![known_tx_size, unknown_tx_size],
3062            hashes: vec![known_tx_hash, unknown_tx_hash],
3063        });
3064
3065        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3066
3067        poll_fn(|cx| {
3068            let _ = tx_manager.poll_unpin(cx);
3069            Poll::Ready(())
3070        })
3071        .await;
3072
3073        let mut requested_hashes_in_getpooled = HashSet::new();
3074        let mut unexpected_request_received = false;
3075
3076        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3077            .await
3078        {
3079            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3080                let GetPooledTransactions(hashes) = request;
3081                for hash in hashes {
3082                    requested_hashes_in_getpooled.insert(hash);
3083                }
3084                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3085            }
3086            Ok(Some(other_request)) => {
3087                tracing::error!(?other_request, "Received unexpected PeerRequest type");
3088                unexpected_request_received = true;
3089            }
3090            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3091            Err(_timeout_err) => {
3092                tracing::info!("Timeout: No GetPooledTransactions request received.")
3093            }
3094        }
3095
3096        assert!(
3097            requested_hashes_in_getpooled.contains(&known_tx_hash),
3098            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3099        );
3100        assert!(
3101            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3102            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3103        );
3104        assert!(
3105            !unexpected_request_received,
3106            "An unexpected P2P request was received by the mock peer."
3107        );
3108
3109        network_service_handle.abort();
3110    }
3111}