Skip to main content

reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6/// Aggregation on configurable parameters for [`TransactionsManager`].
7pub mod config;
8/// Default and spec'd bounds.
9pub mod constants;
10/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
11pub mod fetcher;
12/// Defines the traits for transaction-related policies.
13pub mod policy;
14
15pub use self::constants::{
16    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30    budget::{
31        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33    },
34    cache::LruCache,
35    duration_metered_exec, metered_poll_nested_stream_with_budget,
36    metrics::{
37        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38    },
39    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
40    NetworkHandle, TxTypesCounter,
41};
42use alloy_primitives::{TxHash, B256};
43use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
44use futures::{stream::FuturesUnordered, Future, StreamExt};
45use reth_eth_wire::{
46    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
47    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
48    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
49    RequestTxHashes, Transactions, ValidAnnouncementData,
50};
51use reth_ethereum_primitives::{TransactionSigned, TxType};
52use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
53use reth_network_api::{
54    events::{PeerEvent, SessionInfo},
55    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
56};
57use reth_network_p2p::{
58    error::{RequestError, RequestResult},
59    sync::SyncStateProvider,
60};
61use reth_network_peers::PeerId;
62use reth_network_types::ReputationChangeKind;
63use reth_primitives_traits::SignedTransaction;
64use reth_tokio_util::EventStream;
65use reth_transaction_pool::{
66    error::{PoolError, PoolResult},
67    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
68    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
69};
70use std::{
71    collections::{hash_map::Entry, HashMap, HashSet},
72    pin::Pin,
73    sync::{
74        atomic::{AtomicUsize, Ordering},
75        Arc,
76    },
77    task::{Context, Poll},
78    time::{Duration, Instant},
79};
80use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
81use tokio_stream::wrappers::UnboundedReceiverStream;
82use tracing::{debug, trace};
83
84/// The future for importing transactions into the pool.
85///
86/// Resolves with the result of each transaction import.
87pub type PoolImportFuture =
88    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
89
90/// Api to interact with [`TransactionsManager`] task.
91///
92/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
93/// with the [`TransactionsManager`] task once it is spawned.
94///
95/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
96/// known by a specific peer.
97#[derive(Debug, Clone)]
98pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
99    /// Command channel to the [`TransactionsManager`]
100    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
101}
102
103impl<N: NetworkPrimitives> TransactionsHandle<N> {
104    fn send(&self, cmd: TransactionsCommand<N>) {
105        let _ = self.manager_tx.send(cmd);
106    }
107
108    /// Fetch the [`PeerRequestSender`] for the given peer.
109    async fn peer_handle(
110        &self,
111        peer_id: PeerId,
112    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
113        let (tx, rx) = oneshot::channel();
114        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
115        rx.await
116    }
117
118    /// Manually propagate the transaction that belongs to the hash.
119    pub fn propagate(&self, hash: TxHash) {
120        self.send(TransactionsCommand::PropagateHash(hash))
121    }
122
123    /// Manually propagate the transaction hash to a specific peer.
124    ///
125    /// Note: this only propagates if the pool contains the transaction.
126    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
127        self.propagate_hashes_to(Some(hash), peer)
128    }
129
130    /// Manually propagate the transaction hashes to a specific peer.
131    ///
132    /// Note: this only propagates the transactions that are known to the pool.
133    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
134        let hashes = hash.into_iter().collect::<Vec<_>>();
135        if hashes.is_empty() {
136            return
137        }
138        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
139    }
140
141    /// Request the active peer IDs from the [`TransactionsManager`].
142    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
143        let (tx, rx) = oneshot::channel();
144        self.send(TransactionsCommand::GetActivePeers(tx));
145        rx.await
146    }
147
148    /// Manually propagate full transaction hashes to a specific peer.
149    ///
150    /// Do nothing if transactions are empty.
151    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
152        if transactions.is_empty() {
153            return
154        }
155        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
156    }
157
158    /// Manually propagate the given transaction hashes to all peers.
159    ///
160    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
161    /// full.
162    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
163        if transactions.is_empty() {
164            return
165        }
166        self.send(TransactionsCommand::PropagateTransactions(transactions))
167    }
168
169    /// Manually propagate the given transactions to all peers.
170    ///
171    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
172    /// full.
173    pub fn broadcast_transactions(
174        &self,
175        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
176    ) {
177        let transactions =
178            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
179        if transactions.is_empty() {
180            return
181        }
182        self.send(TransactionsCommand::BroadcastTransactions(transactions))
183    }
184
185    /// Request the transaction hashes known by specific peers.
186    pub async fn get_transaction_hashes(
187        &self,
188        peers: Vec<PeerId>,
189    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
190        if peers.is_empty() {
191            return Ok(Default::default())
192        }
193        let (tx, rx) = oneshot::channel();
194        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
195        rx.await
196    }
197
198    /// Request the transaction hashes known by a specific peer.
199    pub async fn get_peer_transaction_hashes(
200        &self,
201        peer: PeerId,
202    ) -> Result<HashSet<TxHash>, RecvError> {
203        let res = self.get_transaction_hashes(vec![peer]).await?;
204        Ok(res.into_values().next().unwrap_or_default())
205    }
206
207    /// Requests the transactions directly from the given peer.
208    ///
209    /// Returns `None` if the peer is not connected.
210    ///
211    /// **Note**: this returns the response from the peer as received.
212    pub async fn get_pooled_transactions_from(
213        &self,
214        peer_id: PeerId,
215        hashes: Vec<B256>,
216    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
217        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
218
219        let (tx, rx) = oneshot::channel();
220        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
221        peer.try_send(request).ok();
222
223        rx.await?.map(|res| Some(res.0))
224    }
225}
226
227/// Manages transactions on top of the p2p network.
228///
229/// This can be spawned to another task and is supposed to be run as background service.
230/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
231/// interact with it.
232///
233/// The [`TransactionsManager`] is responsible for:
234///    - handling incoming eth messages for transactions.
235///    - serving transaction requests.
236///    - propagate transactions
237///
238/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
239///   - receives incoming network messages.
240///   - sends messages to dispatch (responses, propagate tx)
241///
242/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
243/// propagate new transactions over the network.
244///
245/// It can be configured with different policies for transaction propagation and announcement
246/// filtering. See [`NetworkPolicies`] for more details.
247///
248/// ## Network Transaction Processing
249///
250/// ### Message Types
251///
252/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
253/// - **`NewPooledTransactionHashes`**: Hash announcements
254///
255/// ### Peer Tracking
256///
257/// - Maintains per-peer transaction cache (default: 10,240 entries)
258/// - Prevents duplicate imports and enables efficient propagation
259///
260/// ### Bad Transaction Handling
261///
262/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
263/// Penalizes peers sending invalid transactions.
264///
265/// ### Import Management
266///
267/// Limits concurrent pool imports and backs off when approaching capacity.
268///
269/// ### Transaction Fetching
270///
271/// For announced transactions: filters known → queues unknown → fetches → imports
272///
273/// ### Propagation Rules
274///
275/// Based on: origin (Local/External/Private), peer capabilities, and network state.
276/// Disabled during initial sync.
277///
278/// ### Security
279///
280/// Rate limiting via reputation, bad transaction isolation, peer scoring.
281#[derive(Debug)]
282#[must_use = "Manager does nothing unless polled."]
283pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
284    /// Access to the transaction pool.
285    pool: Pool,
286    /// Network access.
287    network: NetworkHandle<N>,
288    /// Subscriptions to all network related events.
289    ///
290    /// From which we get all new incoming transaction related messages.
291    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
292    /// Transaction fetcher to handle inflight and missing transaction requests.
293    transaction_fetcher: TransactionFetcher<N>,
294    /// All currently pending transactions grouped by peers.
295    ///
296    /// This way we can track incoming transactions and prevent multiple pool imports for the same
297    /// transaction
298    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
299    /// Transactions that are currently imported into the `Pool`.
300    ///
301    /// The import process includes:
302    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
303    ///    valid, or for 4844 transaction the blobs are valid. See also
304    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
305    /// - if the transaction is valid, it is added into the pool.
306    ///
307    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
308    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
309    /// receiver.
310    pool_imports: FuturesUnordered<PoolImportFuture>,
311    /// Stats on pending pool imports that help the node self-monitor.
312    pending_pool_imports_info: PendingPoolImportsInfo,
313    /// Bad imports.
314    bad_imports: LruCache<TxHash>,
315    /// All the connected peers.
316    peers: HashMap<PeerId, PeerMetadata<N>>,
317    /// Send half for the command channel.
318    ///
319    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
320    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
321    /// Incoming commands from [`TransactionsHandle`].
322    ///
323    /// This will only receive commands if a user manually sends a command to the manager through
324    /// the [`TransactionsHandle`] to interact with this type directly.
325    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
326    /// A stream that yields new __pending__ transactions.
327    ///
328    /// A transaction is considered __pending__ if it is executable on the current state of the
329    /// chain. In other words, this only yields transactions that satisfy all consensus
330    /// requirements, these include:
331    ///   - no nonce gaps
332    ///   - all dynamic fee requirements are (currently) met
333    ///   - account has enough balance to cover the transaction's gas
334    pending_transactions: mpsc::Receiver<TxHash>,
335    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
336    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
337    /// How the `TransactionsManager` is configured.
338    config: TransactionsManagerConfig,
339    /// Network Policies
340    policies: NetworkPolicies<N>,
341    /// `TransactionsManager` metrics
342    metrics: TransactionsManagerMetrics,
343    /// `AnnouncedTxTypes` metrics
344    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
345}
346
347impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
348    /// Sets up a new instance.
349    ///
350    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
351    pub fn new(
352        network: NetworkHandle<N>,
353        pool: Pool,
354        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
355        transactions_manager_config: TransactionsManagerConfig,
356    ) -> Self {
357        Self::with_policy(
358            network,
359            pool,
360            from_network,
361            transactions_manager_config,
362            NetworkPolicies::new(
363                TransactionPropagationKind::default(),
364                StrictEthAnnouncementFilter::default(),
365            ),
366        )
367    }
368}
369
370impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
371    /// Sets up a new instance with given the settings.
372    ///
373    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
374    pub fn with_policy(
375        network: NetworkHandle<N>,
376        pool: Pool,
377        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
378        transactions_manager_config: TransactionsManagerConfig,
379        policies: NetworkPolicies<N>,
380    ) -> Self {
381        let network_events = network.event_listener();
382
383        let (command_tx, command_rx) = mpsc::unbounded_channel();
384
385        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
386            &transactions_manager_config.transaction_fetcher_config,
387        );
388
389        // install a listener for new __pending__ transactions that are allowed to be propagated
390        // over the network
391        let pending = pool.pending_transactions_listener();
392        let pending_pool_imports_info =
393            PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
394        let metrics = TransactionsManagerMetrics::default();
395        metrics
396            .capacity_pending_pool_imports
397            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
398
399        Self {
400            pool,
401            network,
402            network_events,
403            transaction_fetcher,
404            transactions_by_peers: Default::default(),
405            pool_imports: Default::default(),
406            pending_pool_imports_info,
407            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
408            peers: Default::default(),
409            command_tx,
410            command_rx: UnboundedReceiverStream::new(command_rx),
411            pending_transactions: pending,
412            transaction_events: UnboundedMeteredReceiver::new(
413                from_network,
414                NETWORK_POOL_TRANSACTIONS_SCOPE,
415            ),
416            config: transactions_manager_config,
417            policies,
418            metrics,
419            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
420        }
421    }
422
423    /// Returns a new handle that can send commands to this type.
424    pub fn handle(&self) -> TransactionsHandle<N> {
425        TransactionsHandle { manager_tx: self.command_tx.clone() }
426    }
427
428    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
429    /// `false` if [`TransactionsManager`] is operating close to full capacity.
430    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
431        self.has_capacity_for_pending_pool_imports() &&
432            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
433    }
434
435    /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
436    fn has_capacity_for_pending_pool_imports(&self) -> bool {
437        self.remaining_pool_import_capacity() > 0
438    }
439
440    /// Returns the remaining capacity for pending pool imports.
441    fn remaining_pool_import_capacity(&self) -> usize {
442        self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
443            self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
444        )
445    }
446
447    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
448        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
449        self.metrics.reported_bad_transactions.increment(1);
450    }
451
452    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
453        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
454        self.network.reputation_change(peer_id, kind);
455    }
456
457    fn report_already_seen(&self, peer_id: PeerId) {
458        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
459        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
460    }
461
462    /// Handles a closed peer session, removing the peer from transaction-local tracking state.
463    fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
464        if let Some(mut peer) = self.peers.remove(peer_id) {
465            self.policies.propagation_policy_mut().on_session_closed(&mut peer);
466        }
467        self.transaction_fetcher.remove_peer(peer_id);
468    }
469
470    /// Clear the transaction
471    fn on_good_import(&mut self, hash: TxHash) {
472        self.transactions_by_peers.remove(&hash);
473    }
474
475    /// Handles a failed transaction import.
476    ///
477    /// Blob sidecar errors (e.g. invalid proof, missing sidecar) are penalized via
478    /// `report_peer_bad_transactions` but NOT cached in `bad_imports` — the transaction itself
479    /// may be valid when fetched from another peer with correct sidecar data.
480    ///
481    /// Other bad transactions are penalized and cached in `bad_imports` to avoid fetching or
482    /// importing them again.
483    ///
484    /// Errors that count as bad transactions are:
485    ///
486    /// - intrinsic gas too low
487    /// - exceeds gas limit
488    /// - gas uint overflow
489    /// - exceeds max init code size
490    /// - oversized data
491    /// - signer account has bytecode
492    /// - chain id mismatch
493    /// - old legacy chain id
494    /// - tx type not supported
495    ///
496    /// (and additionally for blobs txns...)
497    ///
498    /// - no blobs
499    /// - too many blobs
500    /// - invalid kzg proof
501    /// - kzg error
502    /// - not blob transaction (tx type mismatch)
503    /// - wrong versioned kzg commitment hash
504    fn on_bad_import(&mut self, err: PoolError) {
505        let peers = self.transactions_by_peers.remove(&err.hash);
506
507        if err.is_bad_blob_sidecar() {
508            // Blob sidecar errors: penalize but do NOT cache the hash as bad.
509            // The transaction may be valid — only the sidecar from this peer was wrong.
510            // Using regular penalties means repeated offenders still get disconnected.
511            if let Some(peers) = peers {
512                for peer_id in peers {
513                    self.report_peer_bad_transactions(peer_id);
514                }
515            }
516            return
517        }
518
519        // if we're _currently_ syncing, we ignore a bad transaction
520        if !err.is_bad_transaction() || self.network.is_syncing() {
521            return
522        }
523        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
524        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
525        if let Some(peers) = peers {
526            for peer_id in peers {
527                self.report_peer_bad_transactions(peer_id);
528            }
529        }
530        self.metrics.bad_imports.increment(1);
531        self.bad_imports.insert(err.hash);
532    }
533
534    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
535    ///
536    /// Returns `true` if a request was sent.
537    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
538        // try drain transaction hashes pending fetch
539        let info = &self.pending_pool_imports_info;
540        let max_pending_pool_imports = info.max_pending_pool_imports;
541        let has_capacity_wrt_pending_pool_imports =
542            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
543
544        self.transaction_fetcher
545            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
546    }
547
548    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
549        let kind = match req_err {
550            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
551            RequestError::Timeout => ReputationChangeKind::Timeout,
552            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
553                // peer is already disconnected
554                return
555            }
556            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
557        };
558        self.report_peer(peer_id, kind);
559    }
560
561    #[inline]
562    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
563        let metrics = &self.metrics;
564
565        let TxManagerPollDurations {
566            acc_network_events,
567            acc_pending_imports,
568            acc_tx_events,
569            acc_imported_txns,
570            acc_fetch_events,
571            acc_pending_fetch,
572            acc_cmds,
573        } = poll_durations;
574
575        // update metrics for whole poll function
576        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
577        // update metrics for nested expressions
578        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
579        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
580        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
581        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
582        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
583        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
584        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
585    }
586}
587
588impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
589    /// Processes a batch import results.
590    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
591        for res in batch_results {
592            match res {
593                Ok(AddedTransactionOutcome { hash, .. }) => {
594                    self.on_good_import(hash);
595                }
596                Err(err) => {
597                    self.on_bad_import(err);
598                }
599            }
600        }
601    }
602
603    /// Request handler for an incoming `NewPooledTransactionHashes`
604    fn on_new_pooled_transaction_hashes(
605        &mut self,
606        peer_id: PeerId,
607        msg: NewPooledTransactionHashes,
608    ) {
609        // If the node is initially syncing, ignore transactions
610        if self.network.is_initially_syncing() {
611            return
612        }
613        if self.network.tx_gossip_disabled() {
614            return
615        }
616
617        // get handle to peer's session, if the session is still active
618        let Some(peer) = self.peers.get_mut(&peer_id) else {
619            trace!(
620                peer_id = format!("{peer_id:#}"),
621                ?msg,
622                "discarding announcement from inactive peer"
623            );
624
625            return
626        };
627        let client = peer.client_version.clone();
628
629        // keep track of the transactions the peer knows
630        let mut count_txns_already_seen_by_peer = 0;
631        for tx in msg.iter_hashes().copied() {
632            if !peer.seen_transactions.insert(tx) {
633                count_txns_already_seen_by_peer += 1;
634            }
635        }
636        if count_txns_already_seen_by_peer > 0 {
637            // this may occur if transactions are sent or announced to a peer, at the same time as
638            // the peer sends/announces those hashes to us. this is because, marking
639            // txns as seen by a peer is done optimistically upon sending them to the
640            // peer.
641            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
642            self.metrics
643                .occurrences_hash_already_seen_by_peer
644                .increment(count_txns_already_seen_by_peer);
645
646            trace!(target: "net::tx",
647                %count_txns_already_seen_by_peer,
648                peer_id=format!("{peer_id:#}"),
649                ?client,
650                "Peer sent hashes that have already been marked as seen by peer"
651            );
652
653            self.report_already_seen(peer_id);
654        }
655
656        // 1. filter out spam
657        if msg.is_empty() {
658            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
659            return;
660        }
661
662        let original_len = msg.len();
663        let mut partially_valid_msg = msg.dedup();
664
665        if partially_valid_msg.len() != original_len {
666            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
667        }
668
669        // 2. filter out transactions pending import to pool
670        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
671
672        // 3. filter out invalid entries (spam)
673        //
674        // validates messages with respect to the given network, e.g. allowed tx types.
675        // done before the pool lookup since these are cheap in-memory checks that shrink
676        // the set before acquiring the pool lock.
677        //
678        let mut should_report_peer = false;
679        let mut tx_types_counter = TxTypesCounter::default();
680
681        let is_eth68_message = partially_valid_msg
682            .msg_version()
683            .expect("partially valid announcement should have a version")
684            .is_eth68();
685
686        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
687            let (ty_byte, size_val) = match *metadata_ref_mut {
688                Some((ty, size)) => {
689                    if !is_eth68_message {
690                        should_report_peer = true;
691                    }
692                    (ty, size)
693                }
694                None => {
695                    if is_eth68_message {
696                        should_report_peer = true;
697                        return false;
698                    }
699                    (0u8, 0)
700                }
701            };
702
703            if is_eth68_message &&
704                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
705                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
706            {
707                tx_types_counter.increase_by_tx_type(parsed_tx_type);
708            }
709
710            let decision = self
711                .policies
712                .announcement_filter()
713                .decide_on_announcement(ty_byte, tx_hash, size_val);
714
715            match decision {
716                AnnouncementAcceptance::Accept => true,
717                AnnouncementAcceptance::Ignore => false,
718                AnnouncementAcceptance::Reject { penalize_peer } => {
719                    if penalize_peer {
720                        should_report_peer = true;
721                    }
722                    false
723                }
724            }
725        });
726
727        if is_eth68_message {
728            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
729        }
730
731        if should_report_peer {
732            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
733        }
734
735        // 4. filter out known hashes
736        //
737        // known txns have already been successfully fetched or received over gossip.
738        //
739        // most hashes will be filtered out here since the mempool protocol is a gossip
740        // protocol, healthy peers will send many of the same hashes.
741        //
742        let hashes_count_pre_pool_filter = partially_valid_msg.len();
743        self.pool.retain_unknown(&mut partially_valid_msg);
744        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
745            let already_known_hashes_count =
746                hashes_count_pre_pool_filter - partially_valid_msg.len();
747            self.metrics
748                .occurrences_hashes_already_in_pool
749                .increment(already_known_hashes_count as u64);
750        }
751
752        if partially_valid_msg.is_empty() {
753            // nothing to request
754            return
755        }
756
757        let mut valid_announcement_data =
758            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
759
760        if valid_announcement_data.is_empty() {
761            // no valid announcement data
762            return
763        }
764
765        // 5. filter out already seen unknown hashes
766        //
767        // seen hashes are already in the tx fetcher, pending fetch.
768        //
769        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
770        // fetcher, hence they should be valid at this point.
771        let bad_imports = &self.bad_imports;
772        self.transaction_fetcher.filter_unseen_and_pending_hashes(
773            &mut valid_announcement_data,
774            |hash| bad_imports.contains(hash),
775            &peer_id,
776            &client,
777        );
778
779        if valid_announcement_data.is_empty() {
780            // nothing to request
781            return
782        }
783
784        trace!(target: "net::tx::propagation",
785            peer_id=format!("{peer_id:#}"),
786            hashes_len=valid_announcement_data.len(),
787            hashes=?valid_announcement_data.keys(),
788            msg_version=%valid_announcement_data.msg_version(),
789            client_version=%client,
790            "received previously unseen and pending hashes in announcement from peer"
791        );
792
793        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
794        // fallback
795        if !self.transaction_fetcher.is_idle(&peer_id) {
796            // load message version before announcement data is destructed in packing
797            let msg_version = valid_announcement_data.msg_version();
798            let (hashes, _version) = valid_announcement_data.into_request_hashes();
799
800            trace!(target: "net::tx",
801                peer_id=format!("{peer_id:#}"),
802                hashes=?*hashes,
803                %msg_version,
804                %client,
805                "buffering hashes announced by busy peer"
806            );
807
808            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
809
810            return
811        }
812
813        let mut hashes_to_request =
814            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
815        let surplus_hashes =
816            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
817
818        if !surplus_hashes.is_empty() {
819            trace!(target: "net::tx",
820                peer_id=format!("{peer_id:#}"),
821                surplus_hashes=?*surplus_hashes,
822                %client,
823                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
824            );
825
826            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
827        }
828
829        trace!(target: "net::tx",
830            peer_id=format!("{peer_id:#}"),
831            hashes=?*hashes_to_request,
832            %client,
833            "sending hashes in `GetPooledTransactions` request to peer's session"
834        );
835
836        // request the missing transactions
837        //
838        // get handle to peer's session again, at this point we know it exists
839        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
840        if let Some(failed_to_request_hashes) =
841            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
842        {
843            let conn_eth_version = peer.version;
844
845            trace!(target: "net::tx",
846                peer_id=format!("{peer_id:#}"),
847                failed_to_request_hashes=?*failed_to_request_hashes,
848                %conn_eth_version,
849                %client,
850                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
851            );
852            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
853        }
854    }
855}
856
857impl<Pool, N> TransactionsManager<Pool, N>
858where
859    Pool: TransactionPool + Unpin + 'static,
860    N: NetworkPrimitives<
861            BroadcastedTransaction: SignedTransaction,
862            PooledTransaction: SignedTransaction,
863        > + Unpin,
864    Pool::Transaction:
865        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
866{
867    /// Invoked when transactions in the local mempool are considered __pending__.
868    ///
869    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
870    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
871    /// messages. The Transactions message relays complete transaction objects and is typically
872    /// sent to a small, random fraction of connected peers.
873    ///
874    /// All other peers receive a notification of the transaction hash and can request the
875    /// complete transaction object if it is unknown to them. The dissemination of complete
876    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
877    /// and won't need to request it.
878    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
879        // We intentionally do not gate this on initial sync.
880        // During initial sync we skip importing tx announcements from peers in
881        // `on_new_pooled_transaction_hashes`, so transactions reaching this path are local.
882        if self.network.tx_gossip_disabled() {
883            return
884        }
885
886        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
887
888        self.propagate_all(hashes);
889    }
890
891    /// Propagate the full transactions to a specific peer.
892    ///
893    /// Returns the propagated transactions.
894    fn propagate_full_transactions_to_peer(
895        &mut self,
896        txs: Vec<TxHash>,
897        peer_id: PeerId,
898        propagation_mode: PropagationMode,
899    ) -> Option<PropagatedTransactions> {
900        let peer = self.peers.get_mut(&peer_id)?;
901        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
902        let mut propagated = PropagatedTransactions::default();
903
904        // filter all transactions unknown to the peer
905        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
906
907        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
908
909        if propagation_mode.is_forced() {
910            // skip cache check if forced
911            full_transactions.extend(to_propagate);
912        } else {
913            // Iterate through the transactions to propagate and fill the hashes and full
914            // transaction
915            for tx in to_propagate {
916                if !peer.seen_transactions.contains(tx.tx_hash()) {
917                    // Only include if the peer hasn't seen the transaction
918                    full_transactions.push(&tx);
919                }
920            }
921        }
922
923        if full_transactions.is_empty() {
924            // nothing to propagate
925            return None
926        }
927
928        let PropagateTransactions { pooled, full } = full_transactions.build();
929
930        // send hashes if any
931        if let Some(new_pooled_hashes) = pooled {
932            for hash in new_pooled_hashes.iter_hashes().copied() {
933                propagated.record(hash, PropagateKind::Hash(peer_id));
934                // mark transaction as seen by peer
935                peer.seen_transactions.insert(hash);
936            }
937
938            // send hashes of transactions
939            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
940        }
941
942        // send full transactions, if any
943        if let Some(new_full_transactions) = full {
944            for tx in &new_full_transactions {
945                propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
946                // mark transaction as seen by peer
947                peer.seen_transactions.insert(*tx.tx_hash());
948            }
949
950            // send full transactions
951            self.network.send_transactions(peer_id, new_full_transactions);
952        }
953
954        // Update propagated transactions metrics
955        self.metrics.propagated_transactions.increment(propagated.len() as u64);
956
957        Some(propagated)
958    }
959
960    /// Propagate the transaction hashes to the given peer
961    ///
962    /// Note: This will only send the hashes for transactions that exist in the pool.
963    fn propagate_hashes_to(
964        &mut self,
965        hashes: Vec<TxHash>,
966        peer_id: PeerId,
967        propagation_mode: PropagationMode,
968    ) {
969        trace!(target: "net::tx", "Start propagating transactions as hashes");
970
971        // This fetches a transactions from the pool, including the blob transactions, which are
972        // only ever sent as hashes.
973        let propagated = {
974            let Some(peer) = self.peers.get_mut(&peer_id) else {
975                // no such peer
976                return
977            };
978
979            let to_propagate =
980                self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
981
982            let mut propagated = PropagatedTransactions::default();
983
984            // check if transaction is known to peer
985            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
986
987            if propagation_mode.is_forced() {
988                hashes.extend(to_propagate)
989            } else {
990                for tx in to_propagate {
991                    if !peer.seen_transactions.contains(tx.tx_hash()) {
992                        // Include if the peer hasn't seen it
993                        hashes.push(&tx);
994                    }
995                }
996            }
997
998            let new_pooled_hashes = hashes.build();
999
1000            if new_pooled_hashes.is_empty() {
1001                // nothing to propagate
1002                return
1003            }
1004
1005            if let Some(peer) = self.peers.get_mut(&peer_id) {
1006                for hash in new_pooled_hashes.iter_hashes().copied() {
1007                    propagated.record(hash, PropagateKind::Hash(peer_id));
1008                    peer.seen_transactions.insert(hash);
1009                }
1010            }
1011
1012            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1013
1014            // send hashes of transactions
1015            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1016
1017            // Update propagated transactions metrics
1018            self.metrics.propagated_transactions.increment(propagated.len() as u64);
1019
1020            propagated
1021        };
1022
1023        // notify pool so events get fired
1024        self.pool.on_propagated(propagated);
1025    }
1026
1027    /// Propagate the transactions to all connected peers either as full objects or hashes.
1028    ///
1029    /// The message for new pooled hashes depends on the negotiated version of the stream.
1030    /// See [`NewPooledTransactionHashes`]
1031    ///
1032    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1033    fn propagate_transactions(
1034        &mut self,
1035        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1036        propagation_mode: PropagationMode,
1037    ) -> PropagatedTransactions {
1038        let mut propagated = PropagatedTransactions::default();
1039        if self.network.tx_gossip_disabled() {
1040            return propagated
1041        }
1042
1043        // send full transactions to a set of the connected peers based on the configured mode
1044        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1045
1046        // Note: Assuming ~random~ order due to random state of the peers map hasher
1047        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1048            if !self.policies.propagation_policy().can_propagate(peer) {
1049                // skip peers we should not propagate to
1050                continue
1051            }
1052            // determine whether to send full tx objects or hashes.
1053            let mut builder = if peer_idx > max_num_full {
1054                PropagateTransactionsBuilder::pooled(peer.version)
1055            } else {
1056                PropagateTransactionsBuilder::full(peer.version)
1057            };
1058
1059            if propagation_mode.is_forced() {
1060                builder.extend(to_propagate.iter());
1061            } else {
1062                // Iterate through the transactions to propagate and fill the hashes and full
1063                // transaction lists, before deciding whether or not to send full transactions to
1064                // the peer.
1065                for tx in &to_propagate {
1066                    // Only proceed if the transaction is not in the peer's list of seen
1067                    // transactions
1068                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1069                        builder.push(tx);
1070                    }
1071                }
1072            }
1073
1074            if builder.is_empty() {
1075                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1076                continue
1077            }
1078
1079            let PropagateTransactions { pooled, full } = builder.build();
1080
1081            // send hashes if any
1082            if let Some(mut new_pooled_hashes) = pooled {
1083                // enforce tx soft limit per message for the (unlikely) event the number of
1084                // hashes exceeds it
1085                new_pooled_hashes
1086                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1087
1088                for hash in new_pooled_hashes.iter_hashes().copied() {
1089                    propagated.record(hash, PropagateKind::Hash(*peer_id));
1090                    // mark transaction as seen by peer
1091                    peer.seen_transactions.insert(hash);
1092                }
1093
1094                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1095
1096                // send hashes of transactions
1097                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1098            }
1099
1100            // send full transactions, if any
1101            if let Some(new_full_transactions) = full {
1102                for tx in &new_full_transactions {
1103                    propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1104                    // mark transaction as seen by peer
1105                    peer.seen_transactions.insert(*tx.tx_hash());
1106                }
1107
1108                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1109
1110                // send full transactions
1111                self.network.send_transactions(*peer_id, new_full_transactions);
1112            }
1113        }
1114
1115        // Update propagated transactions metrics
1116        self.metrics.propagated_transactions.increment(propagated.len() as u64);
1117
1118        propagated
1119    }
1120
1121    /// Propagates the given transactions to the peers
1122    ///
1123    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1124    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1125    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1126        if self.peers.is_empty() {
1127            // nothing to propagate
1128            return
1129        }
1130        let propagated = self.propagate_transactions(
1131            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1132            PropagationMode::Basic,
1133        );
1134
1135        // notify pool so events get fired
1136        self.pool.on_propagated(propagated);
1137    }
1138
1139    /// Request handler for an incoming request for transactions
1140    fn on_get_pooled_transactions(
1141        &mut self,
1142        peer_id: PeerId,
1143        request: GetPooledTransactions,
1144        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1145    ) {
1146        // fast exit if gossip is disabled
1147        if self.network.tx_gossip_disabled() {
1148            let _ = response.send(Ok(PooledTransactions::default()));
1149            return
1150        }
1151        if let Some(peer) = self.peers.get_mut(&peer_id) {
1152            let transactions = self.pool.get_pooled_transaction_elements(
1153                request.0,
1154                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1155                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1156                ),
1157            );
1158            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1159
1160            // we sent a response at which point we assume that the peer is aware of the
1161            // transactions
1162            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1163
1164            let resp = PooledTransactions(transactions);
1165            let _ = response.send(Ok(resp));
1166        }
1167    }
1168
1169    /// Handles a command received from a detached [`TransactionsHandle`]
1170    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1171        match cmd {
1172            TransactionsCommand::PropagateHash(hash) => {
1173                self.on_new_pending_transactions(vec![hash])
1174            }
1175            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1176                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1177            }
1178            TransactionsCommand::GetActivePeers(tx) => {
1179                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1180                tx.send(peers).ok();
1181            }
1182            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1183                if let Some(propagated) =
1184                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1185                {
1186                    self.pool.on_propagated(propagated);
1187                }
1188            }
1189            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1190            TransactionsCommand::BroadcastTransactions(txs) => {
1191                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1192                self.pool.on_propagated(propagated);
1193            }
1194            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1195                let mut res = HashMap::with_capacity(peers.len());
1196                for peer_id in peers {
1197                    let hashes = self
1198                        .peers
1199                        .get(&peer_id)
1200                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1201                        .unwrap_or_default();
1202                    res.insert(peer_id, hashes);
1203                }
1204                tx.send(res).ok();
1205            }
1206            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1207                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1208                peer_request_sender.send(sender).ok();
1209            }
1210        }
1211    }
1212
1213    /// Handles session establishment and peer transactions initialization.
1214    ///
1215    /// This is invoked when a new session is established.
1216    fn handle_peer_session(
1217        &mut self,
1218        info: SessionInfo,
1219        messages: PeerRequestSender<PeerRequest<N>>,
1220    ) {
1221        let SessionInfo { peer_id, client_version, version, .. } = info;
1222
1223        // Insert a new peer into the peerset.
1224        let peer = PeerMetadata::<N>::new(
1225            messages,
1226            version,
1227            client_version,
1228            self.config.max_transactions_seen_by_peer_history,
1229            info.peer_kind,
1230        );
1231        let peer = match self.peers.entry(peer_id) {
1232            Entry::Occupied(mut entry) => {
1233                entry.insert(peer);
1234                entry.into_mut()
1235            }
1236            Entry::Vacant(entry) => entry.insert(peer),
1237        };
1238
1239        self.policies.propagation_policy_mut().on_session_established(peer);
1240
1241        // Send a `NewPooledTransactionHashes` to the peer with up to
1242        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1243        // transactions in the pool.
1244        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1245            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1246            return
1247        }
1248
1249        // Get transactions to broadcast
1250        let pooled_txs = self.pool.pooled_transactions_max(
1251            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1252        );
1253        if pooled_txs.is_empty() {
1254            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1255            return;
1256        }
1257
1258        // Build and send transaction hashes message
1259        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1260        for pooled_tx in pooled_txs {
1261            peer.seen_transactions.insert(*pooled_tx.hash());
1262            msg_builder.push_pooled(pooled_tx);
1263        }
1264
1265        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1266        let msg = msg_builder.build();
1267        self.network.send_transactions_hashes(peer_id, msg);
1268    }
1269
1270    /// Handles a received event related to common network events.
1271    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1272        match event_result {
1273            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1274                self.on_peer_session_closed(&peer_id);
1275            }
1276            NetworkEvent::ActivePeerSession { info, messages } => {
1277                // process active peer session and broadcast available transaction from the pool
1278                self.handle_peer_session(info, messages);
1279            }
1280            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1281                let peer_id = info.peer_id;
1282                // get messages from existing peer
1283                let messages = match self.peers.get(&peer_id) {
1284                    Some(p) => p.request_tx.clone(),
1285                    None => {
1286                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1287                        return;
1288                    }
1289                };
1290                self.handle_peer_session(info, messages);
1291            }
1292            _ => {}
1293        }
1294    }
1295
1296    /// Returns true if the ingress policy allows processing messages from the given peer.
1297    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1298        if self.config.ingress_policy.allows_all() {
1299            return true;
1300        }
1301        let Some(peer) = self.peers.get(peer_id) else {
1302            return false;
1303        };
1304        self.config.ingress_policy.allows(peer.peer_kind())
1305    }
1306
1307    /// Handles dedicated transaction events related to the `eth` protocol.
1308    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1309        match event {
1310            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1311                if !self.accepts_incoming_from(&peer_id) {
1312                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1313                    return;
1314                }
1315
1316                // ensure we didn't receive any blob transactions as these are disallowed to be
1317                // broadcasted in full
1318
1319                let has_blob_txs = msg.has_eip4844();
1320
1321                let non_blob_txs = msg
1322                    .into_iter()
1323                    .map(N::PooledTransaction::try_from)
1324                    .filter_map(Result::ok)
1325                    .collect();
1326
1327                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1328
1329                if has_blob_txs {
1330                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1331                    self.report_peer_bad_transactions(peer_id);
1332                }
1333            }
1334            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1335                if !self.accepts_incoming_from(&peer_id) {
1336                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1337                    return;
1338                }
1339                self.on_new_pooled_transaction_hashes(peer_id, msg)
1340            }
1341            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1342                self.on_get_pooled_transactions(peer_id, request, response)
1343            }
1344            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1345                let _ = response.send(Some(self.handle()));
1346            }
1347        }
1348    }
1349
1350    /// Starts the import process for the given transactions.
1351    fn import_transactions(
1352        &mut self,
1353        peer_id: PeerId,
1354        transactions: PooledTransactions<N::PooledTransaction>,
1355        source: TransactionSource,
1356    ) {
1357        // If the node is pipeline syncing, ignore transactions
1358        if self.network.is_initially_syncing() {
1359            return
1360        }
1361        if self.network.tx_gossip_disabled() {
1362            return
1363        }
1364
1365        // Early return if we don't have capacity for any imports
1366        if !self.has_capacity_for_pending_pool_imports() {
1367            return
1368        }
1369
1370        let mut transactions = transactions.0;
1371
1372        // Truncate to remaining capacity early to bound work on all subsequent processing.
1373        // Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
1374        // malicious and we avoid wasting CPU on them.
1375        let capacity = self.remaining_pool_import_capacity();
1376        if transactions.len() > capacity {
1377            let skipped = transactions.len() - capacity;
1378            transactions.truncate(capacity);
1379            self.metrics
1380                .skipped_transactions_pending_pool_imports_at_capacity
1381                .increment(skipped as u64);
1382            trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1383        }
1384
1385        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1386        let client_version = peer.client_version.clone();
1387
1388        let start = Instant::now();
1389
1390        // mark the transactions as received
1391        self.transaction_fetcher
1392            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1393
1394        // track that the peer knows these transaction, but only if this is a new broadcast.
1395        // If we received the transactions as the response to our `GetPooledTransactions``
1396        // requests (based on received `NewPooledTransactionHashes`) then we already
1397        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1398        let mut num_already_seen_by_peer = 0;
1399        for tx in &transactions {
1400            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1401                num_already_seen_by_peer += 1;
1402            }
1403        }
1404
1405        // tracks the quality of the given transactions
1406        let mut has_bad_transactions = false;
1407
1408        // 1. Remove known, already-tracked, and invalid transactions first since these are
1409        // cheap in-memory checks against local maps
1410        transactions.retain(|tx| {
1411            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1412                entry.get_mut().insert(peer_id);
1413                return false
1414            }
1415            if self.bad_imports.contains(tx.tx_hash()) {
1416                trace!(target: "net::tx",
1417                    peer_id=format!("{peer_id:#}"),
1418                    hash=%tx.tx_hash(),
1419                    %client_version,
1420                    "received a known bad transaction from peer"
1421                );
1422                has_bad_transactions = true;
1423                return false;
1424            }
1425            true
1426        });
1427
1428        // 2. filter out txns already inserted into pool
1429        let txns_count_pre_pool_filter = transactions.len();
1430        self.pool.retain_unknown(&mut transactions);
1431        if txns_count_pre_pool_filter > transactions.len() {
1432            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1433            self.metrics
1434                .occurrences_transactions_already_in_pool
1435                .increment(already_known_txns_count as u64);
1436        }
1437
1438        let txs_len = transactions.len();
1439
1440        let new_txs = transactions
1441            .into_par_iter()
1442            .filter_map(|tx| match tx.try_into_recovered() {
1443                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1444                Err(badtx) => {
1445                    trace!(target: "net::tx",
1446                        peer_id=format!("{peer_id:#}"),
1447                        hash=%badtx.tx_hash(),
1448                        client_version=%client_version,
1449                        "failed ecrecovery for transaction"
1450                    );
1451                    None
1452                }
1453            })
1454            .collect::<Vec<_>>();
1455
1456        has_bad_transactions |= new_txs.len() != txs_len;
1457
1458        // Record the transactions as seen by the peer
1459        for tx in &new_txs {
1460            self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1461        }
1462
1463        // 3. import new transactions as a batch to minimize lock contention on the underlying
1464        // pool
1465        if !new_txs.is_empty() {
1466            let pool = self.pool.clone();
1467            // update metrics
1468            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1469            metric_pending_pool_imports.increment(new_txs.len() as f64);
1470
1471            // update self-monitoring info
1472            self.pending_pool_imports_info
1473                .pending_pool_imports
1474                .fetch_add(new_txs.len(), Ordering::Relaxed);
1475            let tx_manager_info_pending_pool_imports =
1476                self.pending_pool_imports_info.pending_pool_imports.clone();
1477
1478            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1479            let import = Box::pin(async move {
1480                let added = new_txs.len();
1481                let res = pool.add_external_transactions(new_txs).await;
1482
1483                // update metrics
1484                metric_pending_pool_imports.decrement(added as f64);
1485                // update self-monitoring info
1486                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1487
1488                res
1489            });
1490
1491            self.pool_imports.push(import);
1492        }
1493
1494        if num_already_seen_by_peer > 0 {
1495            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1496            self.metrics
1497                .occurrences_of_transaction_already_seen_by_peer
1498                .increment(num_already_seen_by_peer);
1499            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1500        }
1501
1502        if has_bad_transactions {
1503            // peer sent us invalid transactions
1504            self.report_peer_bad_transactions(peer_id)
1505        }
1506
1507        if num_already_seen_by_peer > 0 {
1508            self.report_already_seen(peer_id);
1509        }
1510
1511        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1512    }
1513
1514    /// Processes a [`FetchEvent`].
1515    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1516        match fetch_event {
1517            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1518                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1519                if report_peer {
1520                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1521                }
1522            }
1523            FetchEvent::FetchError { peer_id, error } => {
1524                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1525                self.on_request_error(peer_id, error);
1526            }
1527            FetchEvent::EmptyResponse { peer_id } => {
1528                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1529            }
1530        }
1531    }
1532}
1533
1534/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1535/// [`crate::NetworkManager`] for more context on the design pattern.
1536///
1537/// This should be spawned or used as part of `tokio::select!`.
1538//
1539// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1540// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1541impl<
1542        Pool: TransactionPool + Unpin + 'static,
1543        N: NetworkPrimitives<
1544                BroadcastedTransaction: SignedTransaction,
1545                PooledTransaction: SignedTransaction,
1546            > + Unpin,
1547    > Future for TransactionsManager<Pool, N>
1548where
1549    Pool::Transaction:
1550        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1551{
1552    type Output = ();
1553
1554    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1555        let start = Instant::now();
1556        let mut poll_durations = TxManagerPollDurations::default();
1557
1558        let this = self.get_mut();
1559
1560        // All streams are polled until their corresponding budget is exhausted, then we manually
1561        // yield back control to tokio. See `NetworkManager` for more context on the design
1562        // pattern.
1563
1564        // Advance network/peer related events (update peers map).
1565        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1566            poll_durations.acc_network_events,
1567            "net::tx",
1568            "Network events stream",
1569            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1570            this.network_events.poll_next_unpin(cx),
1571            |event| this.on_network_event(event)
1572        );
1573
1574        // Advances new __pending__ transactions, transactions that were successfully inserted into
1575        // pending set in pool (are valid), and propagates them (inform peers which
1576        // transactions we have seen).
1577        //
1578        // We try to drain this to batch the transactions in a single message.
1579        //
1580        // We don't expect this buffer to be large, since only pending transactions are
1581        // emitted here.
1582        let mut new_txs = Vec::new();
1583        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1584            cx,
1585            &mut new_txs,
1586            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1587        ) {
1588            Poll::Ready(count) => {
1589                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1590                    // we filled the entire buffer capacity and need to try again on the next poll
1591                    // immediately
1592                    true
1593                } else {
1594                    // try once more, because mostlikely the channel is now empty and the waker is
1595                    // registered if this is pending, if we filled additional hashes, we poll again
1596                    // on the next iteration
1597                    let limit =
1598                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1599                            new_txs.len();
1600                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1601                }
1602            }
1603            Poll::Pending => false,
1604        };
1605        if !new_txs.is_empty() {
1606            this.on_new_pending_transactions(new_txs);
1607        }
1608
1609        // Advance incoming transaction events (stream new txns/announcements from
1610        // network manager and queue for import to pool/fetch txns).
1611        //
1612        // This will potentially remove hashes from hashes pending fetch, it the event
1613        // is an announcement (if same hashes are announced that didn't fit into a
1614        // previous request).
1615        //
1616        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1617        // (128 KiB / 10 bytes > 13k transactions).
1618        //
1619        // If this is an event with `Transactions` message, since transactions aren't
1620        // validated until they are inserted into the pool, this can potentially queue
1621        // >13k transactions for insertion to pool. More if the message size is bigger
1622        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1623        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1624            poll_durations.acc_tx_events,
1625            "net::tx",
1626            "Network transaction events stream",
1627            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1628            this.transaction_events.poll_next_unpin(cx),
1629            |event| this.on_network_tx_event(event),
1630        );
1631
1632        // Advance inflight fetch requests (flush transaction fetcher and queue for
1633        // import to pool).
1634        //
1635        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1636        // (2 MiB / 10 bytes > 200k transactions).
1637        //
1638        // Since transactions aren't validated until they are inserted into the pool,
1639        // this can potentially queue >200k transactions for insertion to pool. More
1640        // if the message size is bigger than the soft limit on a `PooledTransactions`
1641        // response which is 2 MiB.
1642        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1643            poll_durations.acc_fetch_events,
1644            "net::tx",
1645            "Transaction fetch events stream",
1646            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1647            this.transaction_fetcher.poll_next_unpin(cx),
1648            |event| this.on_fetch_event(event),
1649        );
1650
1651        // Advance pool imports (flush txns to pool).
1652        //
1653        // Note, this is done in batches. A batch is filled from one `Transactions`
1654        // broadcast messages or one `PooledTransactions` response at a time. The
1655        // minimum batch size is 1 transaction (and might often be the case with blob
1656        // transactions).
1657        //
1658        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1659        // (2 MiB / 10 bytes > 200k transactions).
1660        //
1661        // Since transactions aren't validated until they are inserted into the pool,
1662        // this can potentially validate >200k transactions. More if the message size
1663        // is bigger than the soft limit on a `PooledTransactions` response which is
1664        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1665        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1666            poll_durations.acc_pending_imports,
1667            "net::tx",
1668            "Batched pool imports stream",
1669            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1670            this.pool_imports.poll_next_unpin(cx),
1671            |batch_results| this.on_batch_import_result(batch_results)
1672        );
1673
1674        // Tries to drain hashes pending fetch cache if the tx manager currently has
1675        // capacity for this (fetch txns).
1676        //
1677        // Sends at most one request.
1678        duration_metered_exec!(
1679            {
1680                if this.has_capacity_for_fetching_pending_hashes() &&
1681                    this.on_fetch_hashes_pending_fetch()
1682                {
1683                    maybe_more_tx_fetch_events = true;
1684                }
1685            },
1686            poll_durations.acc_pending_fetch
1687        );
1688
1689        // Advance commands (propagate/fetch/serve txns).
1690        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1691            poll_durations.acc_cmds,
1692            "net::tx",
1693            "Commands channel",
1694            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1695            this.command_rx.poll_next_unpin(cx),
1696            |cmd| this.on_command(cmd)
1697        );
1698
1699        this.transaction_fetcher.update_metrics();
1700
1701        // all channels are fully drained and import futures pending
1702        if maybe_more_network_events ||
1703            maybe_more_commands ||
1704            maybe_more_tx_events ||
1705            maybe_more_tx_fetch_events ||
1706            maybe_more_pool_imports ||
1707            maybe_more_pending_txns
1708        {
1709            // make sure we're woken up again
1710            cx.waker().wake_by_ref();
1711            return Poll::Pending
1712        }
1713
1714        this.update_poll_metrics(start, poll_durations);
1715
1716        Poll::Pending
1717    }
1718}
1719
1720/// Represents the different modes of transaction propagation.
1721///
1722/// This enum is used to determine how transactions are propagated to peers in the network.
1723#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1724enum PropagationMode {
1725    /// Default propagation mode.
1726    ///
1727    /// Transactions are only sent to peers that haven't seen them yet.
1728    Basic,
1729    /// Forced propagation mode.
1730    ///
1731    /// Transactions are sent to all peers regardless of whether they have been sent or received
1732    /// before.
1733    Forced,
1734}
1735
1736impl PropagationMode {
1737    /// Returns `true` if the propagation kind is `Forced`.
1738    const fn is_forced(self) -> bool {
1739        matches!(self, Self::Forced)
1740    }
1741}
1742
1743/// A transaction that's about to be propagated to multiple peers.
1744#[derive(Debug, Clone)]
1745struct PropagateTransaction<T = TransactionSigned> {
1746    size: usize,
1747    transaction: Arc<T>,
1748}
1749
1750impl<T: SignedTransaction> PropagateTransaction<T> {
1751    /// Create a new instance from a transaction.
1752    pub fn new(transaction: T) -> Self {
1753        let size = transaction.length();
1754        Self { size, transaction: Arc::new(transaction) }
1755    }
1756
1757    /// Create a new instance from a pooled transaction
1758    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1759    where
1760        P: PoolTransaction<Consensus = T>,
1761    {
1762        let size = tx.encoded_length();
1763        let transaction = tx.transaction.clone_into_consensus();
1764        let transaction = Arc::new(transaction.into_inner());
1765        Self { size, transaction }
1766    }
1767
1768    fn tx_hash(&self) -> &TxHash {
1769        self.transaction.tx_hash()
1770    }
1771}
1772
1773/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1774/// should receive them in full or as pooled
1775#[derive(Debug, Clone)]
1776enum PropagateTransactionsBuilder<T> {
1777    Pooled(PooledTransactionsHashesBuilder),
1778    Full(FullTransactionsBuilder<T>),
1779}
1780
1781impl<T> PropagateTransactionsBuilder<T> {
1782    /// Create a builder for pooled transactions
1783    fn pooled(version: EthVersion) -> Self {
1784        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1785    }
1786
1787    /// Create a builder that sends transactions in full and records transactions that don't fit.
1788    fn full(version: EthVersion) -> Self {
1789        Self::Full(FullTransactionsBuilder::new(version))
1790    }
1791
1792    /// Returns true if no transactions are recorded.
1793    fn is_empty(&self) -> bool {
1794        match self {
1795            Self::Pooled(builder) => builder.is_empty(),
1796            Self::Full(builder) => builder.is_empty(),
1797        }
1798    }
1799
1800    /// Consumes the type and returns the built messages that should be sent to the peer.
1801    fn build(self) -> PropagateTransactions<T> {
1802        match self {
1803            Self::Pooled(pooled) => {
1804                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1805            }
1806            Self::Full(full) => full.build(),
1807        }
1808    }
1809}
1810
1811impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1812    /// Appends all transactions
1813    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1814        for tx in txs {
1815            self.push(tx);
1816        }
1817    }
1818
1819    /// Appends a transaction to the list.
1820    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1821        match self {
1822            Self::Pooled(builder) => builder.push(transaction),
1823            Self::Full(builder) => builder.push(transaction),
1824        }
1825    }
1826}
1827
1828/// Represents how the transactions should be sent to a peer if any.
1829struct PropagateTransactions<T> {
1830    /// The pooled transaction hashes to send.
1831    pooled: Option<NewPooledTransactionHashes>,
1832    /// The transactions to send in full.
1833    full: Option<Vec<Arc<T>>>,
1834}
1835
1836/// Helper type for constructing the full transaction message that enforces the
1837/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1838/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1839/// broadcasted in full.
1840#[derive(Debug, Clone)]
1841struct FullTransactionsBuilder<T> {
1842    /// The soft limit to enforce for a single broadcast message of full transactions.
1843    total_size: usize,
1844    /// All transactions to be broadcasted.
1845    transactions: Vec<Arc<T>>,
1846    /// Transactions that didn't fit into the broadcast message
1847    pooled: PooledTransactionsHashesBuilder,
1848}
1849
1850impl<T> FullTransactionsBuilder<T> {
1851    /// Create a builder for the negotiated version of the peer's session
1852    fn new(version: EthVersion) -> Self {
1853        Self {
1854            total_size: 0,
1855            pooled: PooledTransactionsHashesBuilder::new(version),
1856            transactions: vec![],
1857        }
1858    }
1859
1860    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1861    fn is_empty(&self) -> bool {
1862        self.transactions.is_empty() && self.pooled.is_empty()
1863    }
1864
1865    /// Returns the messages that should be propagated to the peer.
1866    fn build(self) -> PropagateTransactions<T> {
1867        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1868        let full = Some(self.transactions).filter(|full| !full.is_empty());
1869        PropagateTransactions { pooled, full }
1870    }
1871}
1872
1873impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1874    /// Appends all transactions.
1875    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1876        for tx in txs {
1877            self.push(&tx)
1878        }
1879    }
1880
1881    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1882    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1883    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1884    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1885    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1886    ///
1887    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1888    /// to list of pooled transactions, (e.g. 4844 transactions).
1889    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1890    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1891        // Do not send full 4844 transaction hashes to peers.
1892        //
1893        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1894        //  Instead, those transactions are only announced using
1895        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1896        //  via `GetPooledTransactions`.
1897        //
1898        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1899        if !transaction.transaction.is_broadcastable_in_full() {
1900            self.pooled.push(transaction);
1901            return
1902        }
1903
1904        let new_size = self.total_size + transaction.size;
1905        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1906            self.total_size > 0
1907        {
1908            // transaction does not fit into the message
1909            self.pooled.push(transaction);
1910            return
1911        }
1912
1913        self.total_size = new_size;
1914        self.transactions.push(Arc::clone(&transaction.transaction));
1915    }
1916}
1917
1918/// A helper type to create the pooled transactions message based on the negotiated version of the
1919/// session with the peer
1920#[derive(Debug, Clone)]
1921enum PooledTransactionsHashesBuilder {
1922    Eth66(NewPooledTransactionHashes66),
1923    Eth68(NewPooledTransactionHashes68),
1924}
1925
1926// === impl PooledTransactionsHashesBuilder ===
1927
1928impl PooledTransactionsHashesBuilder {
1929    /// Push a transaction from the pool to the list.
1930    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1931        match self {
1932            Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1933            Self::Eth68(msg) => {
1934                msg.hashes.push(*pooled_tx.hash());
1935                msg.sizes.push(pooled_tx.encoded_length());
1936                msg.types.push(pooled_tx.transaction.ty());
1937            }
1938        }
1939    }
1940
1941    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1942    fn is_empty(&self) -> bool {
1943        match self {
1944            Self::Eth66(hashes) => hashes.is_empty(),
1945            Self::Eth68(hashes) => hashes.is_empty(),
1946        }
1947    }
1948
1949    /// Returns the number of transactions in the builder.
1950    fn len(&self) -> usize {
1951        match self {
1952            Self::Eth66(hashes) => hashes.len(),
1953            Self::Eth68(hashes) => hashes.len(),
1954        }
1955    }
1956
1957    /// Appends all hashes
1958    fn extend<T: SignedTransaction>(
1959        &mut self,
1960        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1961    ) {
1962        for tx in txs {
1963            self.push(&tx);
1964        }
1965    }
1966
1967    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1968        match self {
1969            Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1970            Self::Eth68(msg) => {
1971                msg.hashes.push(*tx.tx_hash());
1972                msg.sizes.push(tx.size);
1973                msg.types.push(tx.transaction.ty());
1974            }
1975        }
1976    }
1977
1978    /// Create a builder for the negotiated version of the peer's session
1979    fn new(version: EthVersion) -> Self {
1980        match version {
1981            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1982            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1983                Self::Eth68(Default::default())
1984            }
1985        }
1986    }
1987
1988    fn build(self) -> NewPooledTransactionHashes {
1989        match self {
1990            Self::Eth66(mut msg) => {
1991                msg.shrink_to_fit();
1992                msg.into()
1993            }
1994            Self::Eth68(mut msg) => {
1995                msg.shrink_to_fit();
1996                msg.into()
1997            }
1998        }
1999    }
2000}
2001
2002/// How we received the transactions.
2003enum TransactionSource {
2004    /// Transactions were broadcast to us via [`Transactions`] message.
2005    Broadcast,
2006    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
2007    Response,
2008}
2009
2010// === impl TransactionSource ===
2011
2012impl TransactionSource {
2013    /// Whether the transaction were sent as broadcast.
2014    const fn is_broadcast(&self) -> bool {
2015        matches!(self, Self::Broadcast)
2016    }
2017}
2018
2019/// Tracks a single peer in the context of [`TransactionsManager`].
2020#[derive(Debug)]
2021pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2022    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
2023    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
2024    /// the peer.
2025    seen_transactions: LruCache<TxHash>,
2026    /// A communication channel directly to the peer's session task.
2027    request_tx: PeerRequestSender<PeerRequest<N>>,
2028    /// negotiated version of the session.
2029    version: EthVersion,
2030    /// The peer's client version.
2031    client_version: Arc<str>,
2032    /// The kind of peer.
2033    peer_kind: PeerKind,
2034}
2035
2036impl<N: NetworkPrimitives> PeerMetadata<N> {
2037    /// Returns a new instance of [`PeerMetadata`].
2038    pub fn new(
2039        request_tx: PeerRequestSender<PeerRequest<N>>,
2040        version: EthVersion,
2041        client_version: Arc<str>,
2042        max_transactions_seen_by_peer: u32,
2043        peer_kind: PeerKind,
2044    ) -> Self {
2045        Self {
2046            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2047            request_tx,
2048            version,
2049            client_version,
2050            peer_kind,
2051        }
2052    }
2053
2054    /// Returns a reference to the peer's request sender channel.
2055    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2056        &self.request_tx
2057    }
2058
2059    /// Returns a mutable reference to the seen transactions LRU cache.
2060    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2061        &mut self.seen_transactions
2062    }
2063
2064    /// Returns the negotiated `EthVersion` of the session.
2065    pub const fn version(&self) -> EthVersion {
2066        self.version
2067    }
2068
2069    /// Returns a reference to the peer's client version string.
2070    pub fn client_version(&self) -> &str {
2071        &self.client_version
2072    }
2073
2074    /// Returns the peer's kind.
2075    pub const fn peer_kind(&self) -> PeerKind {
2076        self.peer_kind
2077    }
2078}
2079
2080/// Commands to send to the [`TransactionsManager`]
2081#[derive(Debug)]
2082enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2083    /// Propagate a transaction hash to the network.
2084    PropagateHash(B256),
2085    /// Propagate transaction hashes to a specific peer.
2086    PropagateHashesTo(Vec<B256>, PeerId),
2087    /// Request the list of active peer IDs from the [`TransactionsManager`].
2088    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2089    /// Propagate a collection of full transactions to a specific peer.
2090    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2091    /// Propagate a collection of hashes to all peers.
2092    PropagateTransactions(Vec<TxHash>),
2093    /// Propagate a collection of broadcastable transactions in full to all peers.
2094    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2095    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2096    GetTransactionHashes {
2097        peers: Vec<PeerId>,
2098        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2099    },
2100    /// Requests a clone of the sender channel to the peer.
2101    GetPeerSender {
2102        peer_id: PeerId,
2103        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2104    },
2105}
2106
2107/// All events related to transactions emitted by the network.
2108#[derive(Debug)]
2109pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2110    /// Represents the event of receiving a list of transactions from a peer.
2111    ///
2112    /// This indicates transactions that were broadcasted to us from the peer.
2113    IncomingTransactions {
2114        /// The ID of the peer from which the transactions were received.
2115        peer_id: PeerId,
2116        /// The received transactions.
2117        msg: Transactions<N::BroadcastedTransaction>,
2118    },
2119    /// Represents the event of receiving a list of transaction hashes from a peer.
2120    IncomingPooledTransactionHashes {
2121        /// The ID of the peer from which the transaction hashes were received.
2122        peer_id: PeerId,
2123        /// The received new pooled transaction hashes.
2124        msg: NewPooledTransactionHashes,
2125    },
2126    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2127    GetPooledTransactions {
2128        /// The ID of the peer from which the request was received.
2129        peer_id: PeerId,
2130        /// The received `GetPooledTransactions` request.
2131        request: GetPooledTransactions,
2132        /// The sender for responding to the request with a result of `PooledTransactions`.
2133        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2134    },
2135    /// Represents the event of receiving a `GetTransactionsHandle` request.
2136    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2137}
2138
2139/// Tracks stats about the [`TransactionsManager`].
2140#[derive(Debug)]
2141pub struct PendingPoolImportsInfo {
2142    /// Number of transactions about to be inserted into the pool.
2143    pending_pool_imports: Arc<AtomicUsize>,
2144    /// Max number of transactions allowed to be imported concurrently.
2145    max_pending_pool_imports: usize,
2146}
2147
2148impl PendingPoolImportsInfo {
2149    /// Returns a new [`PendingPoolImportsInfo`].
2150    pub fn new(max_pending_pool_imports: usize) -> Self {
2151        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2152    }
2153
2154    /// Returns `true` if the number of pool imports is under a given tolerated max.
2155    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2156        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2157    }
2158}
2159
2160impl Default for PendingPoolImportsInfo {
2161    fn default() -> Self {
2162        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2163    }
2164}
2165
2166#[derive(Debug, Default)]
2167struct TxManagerPollDurations {
2168    acc_network_events: Duration,
2169    acc_pending_imports: Duration,
2170    acc_tx_events: Duration,
2171    acc_imported_txns: Duration,
2172    acc_fetch_events: Duration,
2173    acc_pending_fetch: Duration,
2174    acc_cmds: Duration,
2175}
2176
2177#[cfg(test)]
2178mod tests {
2179    use super::*;
2180    use crate::{
2181        test_utils::{
2182            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2183            Testnet,
2184        },
2185        transactions::config::RelaxedEthAnnouncementFilter,
2186        NetworkConfigBuilder, NetworkManager,
2187    };
2188    use alloy_consensus::{TxEip1559, TxLegacy};
2189    use alloy_eips::eip4844::BlobTransactionValidationError;
2190    use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2191    use alloy_rlp::Decodable;
2192    use futures::FutureExt;
2193    use reth_chainspec::MIN_TRANSACTION_GAS;
2194    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2195    use reth_network_api::{NetworkInfo, PeerKind};
2196    use reth_network_p2p::{
2197        error::{RequestError, RequestResult},
2198        sync::{NetworkSyncUpdater, SyncState},
2199    };
2200    use reth_storage_api::noop::NoopProvider;
2201    use reth_tasks::Runtime;
2202    use reth_transaction_pool::{
2203        error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2204        test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2205    };
2206    use secp256k1::SecretKey;
2207    use std::{
2208        collections::HashSet,
2209        future::poll_fn,
2210        net::{IpAddr, Ipv4Addr, SocketAddr},
2211        str::FromStr,
2212    };
2213    use tracing::error;
2214
2215    #[tokio::test(flavor = "multi_thread")]
2216    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2217        reth_tracing::init_test_tracing();
2218        let net = Testnet::create(3).await;
2219
2220        let mut handles = net.handles();
2221        let handle0 = handles.next().unwrap();
2222        let handle1 = handles.next().unwrap();
2223
2224        drop(handles);
2225        let handle = net.spawn();
2226
2227        let listener0 = handle0.event_listener();
2228        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2229        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2230
2231        let client = NoopProvider::default();
2232        let pool = testing_pool();
2233        let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2234            .disable_discovery()
2235            .listener_port(0)
2236            .build(client);
2237        let transactions_manager_config = config.transactions_manager_config.clone();
2238        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2239            .await
2240            .unwrap()
2241            .into_builder()
2242            .transactions(pool.clone(), transactions_manager_config)
2243            .split_with_handle();
2244
2245        tokio::task::spawn(network);
2246
2247        // go to syncing (pipeline sync)
2248        network_handle.update_sync_state(SyncState::Syncing);
2249        assert!(NetworkInfo::is_syncing(&network_handle));
2250        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2251
2252        // wait for all initiator connections
2253        let mut established = listener0.take(2);
2254        while let Some(ev) = established.next().await {
2255            match ev {
2256                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2257                    // to insert a new peer in transactions peerset
2258                    transactions
2259                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2260                }
2261                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2262                ev => {
2263                    error!("unexpected event {ev:?}")
2264                }
2265            }
2266        }
2267        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2268        let input = hex!(
2269            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2270        );
2271        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2272        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2273            peer_id: *handle1.peer_id(),
2274            msg: Transactions(vec![signed_tx.clone()]),
2275        });
2276        poll_fn(|cx| {
2277            let _ = transactions.poll_unpin(cx);
2278            Poll::Ready(())
2279        })
2280        .await;
2281        assert!(pool.is_empty());
2282        handle.terminate().await;
2283    }
2284
2285    #[tokio::test(flavor = "multi_thread")]
2286    async fn test_tx_broadcasts_through_two_syncs() {
2287        reth_tracing::init_test_tracing();
2288        let net = Testnet::create(3).await;
2289
2290        let mut handles = net.handles();
2291        let handle0 = handles.next().unwrap();
2292        let handle1 = handles.next().unwrap();
2293
2294        drop(handles);
2295        let handle = net.spawn();
2296
2297        let listener0 = handle0.event_listener();
2298        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2299        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2300
2301        let client = NoopProvider::default();
2302        let pool = testing_pool();
2303        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2304            .disable_discovery()
2305            .listener_port(0)
2306            .build(client);
2307        let transactions_manager_config = config.transactions_manager_config.clone();
2308        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2309            .await
2310            .unwrap()
2311            .into_builder()
2312            .transactions(pool.clone(), transactions_manager_config)
2313            .split_with_handle();
2314
2315        tokio::task::spawn(network);
2316
2317        // go to syncing (pipeline sync) to idle and then to syncing (live)
2318        network_handle.update_sync_state(SyncState::Syncing);
2319        assert!(NetworkInfo::is_syncing(&network_handle));
2320        network_handle.update_sync_state(SyncState::Idle);
2321        assert!(!NetworkInfo::is_syncing(&network_handle));
2322        network_handle.update_sync_state(SyncState::Syncing);
2323        assert!(NetworkInfo::is_syncing(&network_handle));
2324
2325        // wait for all initiator connections
2326        let mut established = listener0.take(2);
2327        while let Some(ev) = established.next().await {
2328            match ev {
2329                NetworkEvent::ActivePeerSession { .. } |
2330                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2331                    // to insert a new peer in transactions peerset
2332                    transactions.on_network_event(ev);
2333                }
2334                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2335                _ => {
2336                    error!("unexpected event {ev:?}")
2337                }
2338            }
2339        }
2340        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2341        let input = hex!(
2342            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2343        );
2344        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2345        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2346            peer_id: *handle1.peer_id(),
2347            msg: Transactions(vec![signed_tx.clone()]),
2348        });
2349        poll_fn(|cx| {
2350            let _ = transactions.poll_unpin(cx);
2351            Poll::Ready(())
2352        })
2353        .await;
2354        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2355        assert!(NetworkInfo::is_syncing(&network_handle));
2356        assert!(!pool.is_empty());
2357        handle.terminate().await;
2358    }
2359
2360    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2361    // event and is able to retrieve the corresponding transactions.
2362    #[tokio::test(flavor = "multi_thread")]
2363    async fn test_handle_incoming_transactions_hashes() {
2364        reth_tracing::init_test_tracing();
2365
2366        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2367        let client = NoopProvider::default();
2368
2369        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2370            // let OS choose port
2371            .listener_port(0)
2372            .disable_discovery()
2373            .build(client);
2374
2375        let pool = testing_pool();
2376
2377        let transactions_manager_config = config.transactions_manager_config.clone();
2378        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2379            .await
2380            .unwrap()
2381            .into_builder()
2382            .transactions(pool.clone(), transactions_manager_config)
2383            .split_with_handle();
2384
2385        let peer_id_1 = PeerId::new([1; 64]);
2386        let eth_version = EthVersion::Eth66;
2387
2388        let txs = vec![TransactionSigned::new_unhashed(
2389            Transaction::Legacy(TxLegacy {
2390                chain_id: Some(4),
2391                nonce: 15u64,
2392                gas_price: 2200000000,
2393                gas_limit: 34811,
2394                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2395                value: U256::from(1234u64),
2396                input: Default::default(),
2397            }),
2398            Signature::new(
2399                U256::from_str(
2400                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2401                )
2402                .unwrap(),
2403                U256::from_str(
2404                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2405                )
2406                .unwrap(),
2407                true,
2408            ),
2409        )];
2410
2411        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2412
2413        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2414        tx_manager.peers.insert(peer_id_1, peer_1);
2415
2416        assert!(pool.is_empty());
2417
2418        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2419            peer_id: peer_id_1,
2420            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2421                txs_hashes.clone(),
2422            )),
2423        });
2424
2425        // mock session of peer_1 receives request
2426        let req = to_mock_session_rx
2427            .recv()
2428            .await
2429            .expect("peer_1 session should receive request with buffered hashes");
2430        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2431        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2432
2433        let message: Vec<PooledTransactionVariant> = txs
2434            .into_iter()
2435            .map(|tx| {
2436                PooledTransactionVariant::try_from(tx)
2437                    .expect("Failed to convert MockTransaction to PooledTransaction")
2438            })
2439            .collect();
2440
2441        // return the transactions corresponding to the transaction hashes.
2442        response
2443            .send(Ok(PooledTransactions(message)))
2444            .expect("should send peer_1 response to tx manager");
2445
2446        // adance the transaction manager future
2447        poll_fn(|cx| {
2448            let _ = tx_manager.poll_unpin(cx);
2449            Poll::Ready(())
2450        })
2451        .await;
2452
2453        // ensure that the transactions corresponding to the transaction hashes have been
2454        // successfully retrieved and stored in the Pool.
2455        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2456    }
2457
2458    #[tokio::test(flavor = "multi_thread")]
2459    async fn test_handle_incoming_transactions() {
2460        reth_tracing::init_test_tracing();
2461        let net = Testnet::create(3).await;
2462
2463        let mut handles = net.handles();
2464        let handle0 = handles.next().unwrap();
2465        let handle1 = handles.next().unwrap();
2466
2467        drop(handles);
2468        let handle = net.spawn();
2469
2470        let listener0 = handle0.event_listener();
2471
2472        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2473        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2474
2475        let client = NoopProvider::default();
2476        let pool = testing_pool();
2477        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2478            .disable_discovery()
2479            .listener_port(0)
2480            .build(client);
2481        let transactions_manager_config = config.transactions_manager_config.clone();
2482        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2483            .await
2484            .unwrap()
2485            .into_builder()
2486            .transactions(pool.clone(), transactions_manager_config)
2487            .split_with_handle();
2488        tokio::task::spawn(network);
2489
2490        network_handle.update_sync_state(SyncState::Idle);
2491
2492        assert!(!NetworkInfo::is_syncing(&network_handle));
2493
2494        // wait for all initiator connections
2495        let mut established = listener0.take(2);
2496        while let Some(ev) = established.next().await {
2497            match ev {
2498                NetworkEvent::ActivePeerSession { .. } |
2499                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2500                    // to insert a new peer in transactions peerset
2501                    transactions.on_network_event(ev);
2502                }
2503                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2504                ev => {
2505                    error!("unexpected event {ev:?}")
2506                }
2507            }
2508        }
2509        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2510        let input = hex!(
2511            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2512        );
2513        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2514        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2515            peer_id: *handle1.peer_id(),
2516            msg: Transactions(vec![signed_tx.clone()]),
2517        });
2518        assert!(transactions
2519            .transactions_by_peers
2520            .get(signed_tx.tx_hash())
2521            .unwrap()
2522            .contains(handle1.peer_id()));
2523
2524        // advance the transaction manager future
2525        poll_fn(|cx| {
2526            let _ = transactions.poll_unpin(cx);
2527            Poll::Ready(())
2528        })
2529        .await;
2530
2531        assert!(!pool.is_empty());
2532        assert!(pool.get(signed_tx.tx_hash()).is_some());
2533        handle.terminate().await;
2534    }
2535
2536    #[tokio::test(flavor = "multi_thread")]
2537    async fn test_session_closed_cleans_transaction_peer_state() {
2538        let (mut tx_manager, _network) = new_tx_manager().await;
2539        let peer_id = PeerId::new([1; 64]);
2540        let fallback_peer = PeerId::new([2; 64]);
2541        let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2542        let hash_shared = B256::from_slice(&[1; 32]);
2543
2544        tx_manager.peers.insert(peer_id, peer);
2545        buffer_hash_to_tx_fetcher(
2546            &mut tx_manager.transaction_fetcher,
2547            hash_shared,
2548            peer_id,
2549            0,
2550            None,
2551        );
2552        buffer_hash_to_tx_fetcher(
2553            &mut tx_manager.transaction_fetcher,
2554            hash_shared,
2555            fallback_peer,
2556            0,
2557            None,
2558        );
2559        tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2560
2561        tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2562            peer_id,
2563            reason: None,
2564        }));
2565
2566        // peer removed from peers map and active_peers
2567        assert!(!tx_manager.peers.contains_key(&peer_id));
2568        assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2569        // fallback peer is still available for the hash
2570        assert_eq!(
2571            tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2572            Some(&fallback_peer)
2573        );
2574    }
2575
2576    #[tokio::test(flavor = "multi_thread")]
2577    async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2578        let (mut tx_manager, _network) = new_tx_manager().await;
2579        let peer_id = PeerId::new([1; 64]);
2580        let hash = B256::from_slice(&[1; 32]);
2581
2582        tx_manager.network.update_sync_state(SyncState::Idle);
2583        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2584
2585        let err = PoolError::new(
2586            hash,
2587            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2588                BlobTransactionValidationError::InvalidProof,
2589            )),
2590        );
2591
2592        tx_manager.on_bad_import(err);
2593
2594        assert!(!tx_manager.bad_imports.contains(&hash));
2595    }
2596
2597    #[tokio::test(flavor = "multi_thread")]
2598    async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2599        let (mut tx_manager, _network) = new_tx_manager().await;
2600        let peer_id = PeerId::new([1; 64]);
2601        let hash = B256::from_slice(&[3; 32]);
2602
2603        tx_manager.network.update_sync_state(SyncState::Idle);
2604        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2605
2606        let err = PoolError::new(
2607            hash,
2608            InvalidPoolTransactionError::Eip4844(
2609                Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2610            ),
2611        );
2612
2613        tx_manager.on_bad_import(err);
2614
2615        assert!(!tx_manager.bad_imports.contains(&hash));
2616    }
2617
2618    #[tokio::test(flavor = "multi_thread")]
2619    async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2620        let (mut tx_manager, _network) = new_tx_manager().await;
2621        let peer_id = PeerId::new([1; 64]);
2622        let hash = B256::from_slice(&[2; 32]);
2623
2624        tx_manager.network.update_sync_state(SyncState::Idle);
2625        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2626
2627        let err = PoolError::new(
2628            hash,
2629            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2630        );
2631
2632        tx_manager.on_bad_import(err);
2633
2634        assert!(tx_manager.bad_imports.contains(&hash));
2635    }
2636
2637    #[tokio::test(flavor = "multi_thread")]
2638    async fn test_on_get_pooled_transactions_network() {
2639        reth_tracing::init_test_tracing();
2640        let net = Testnet::create(2).await;
2641
2642        let mut handles = net.handles();
2643        let handle0 = handles.next().unwrap();
2644        let handle1 = handles.next().unwrap();
2645
2646        drop(handles);
2647        let handle = net.spawn();
2648
2649        let listener0 = handle0.event_listener();
2650
2651        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2652        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2653
2654        let client = NoopProvider::default();
2655        let pool = testing_pool();
2656        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2657            .disable_discovery()
2658            .listener_port(0)
2659            .build(client);
2660        let transactions_manager_config = config.transactions_manager_config.clone();
2661        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2662            .await
2663            .unwrap()
2664            .into_builder()
2665            .transactions(pool.clone(), transactions_manager_config)
2666            .split_with_handle();
2667        tokio::task::spawn(network);
2668
2669        network_handle.update_sync_state(SyncState::Idle);
2670
2671        assert!(!NetworkInfo::is_syncing(&network_handle));
2672
2673        // wait for all initiator connections
2674        let mut established = listener0.take(2);
2675        while let Some(ev) = established.next().await {
2676            match ev {
2677                NetworkEvent::ActivePeerSession { .. } |
2678                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2679                    transactions.on_network_event(ev);
2680                }
2681                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2682                ev => {
2683                    error!("unexpected event {ev:?}")
2684                }
2685            }
2686        }
2687        handle.terminate().await;
2688
2689        let tx = MockTransaction::eip1559();
2690        let _ = transactions
2691            .pool
2692            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2693            .await;
2694
2695        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2696
2697        let (send, receive) =
2698            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2699
2700        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2701            peer_id: *handle1.peer_id(),
2702            request,
2703            response: send,
2704        });
2705
2706        match receive.await.unwrap() {
2707            Ok(PooledTransactions(transactions)) => {
2708                assert_eq!(transactions.len(), 1);
2709            }
2710            Err(e) => {
2711                panic!("error: {e:?}");
2712            }
2713        }
2714    }
2715
2716    // Ensure that when the remote peer only returns part of the requested transactions, the
2717    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2718    // re-buffered.
2719    #[tokio::test]
2720    async fn test_partially_tx_response() {
2721        reth_tracing::init_test_tracing();
2722
2723        let mut tx_manager = new_tx_manager().await.0;
2724        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2725
2726        let peer_id_1 = PeerId::new([1; 64]);
2727        let eth_version = EthVersion::Eth66;
2728
2729        let txs = vec![
2730            TransactionSigned::new_unhashed(
2731                Transaction::Legacy(TxLegacy {
2732                    chain_id: Some(4),
2733                    nonce: 15u64,
2734                    gas_price: 2200000000,
2735                    gas_limit: 34811,
2736                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2737                    value: U256::from(1234u64),
2738                    input: Default::default(),
2739                }),
2740                Signature::new(
2741                    U256::from_str(
2742                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2743                    )
2744                    .unwrap(),
2745                    U256::from_str(
2746                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2747                    )
2748                    .unwrap(),
2749                    true,
2750                ),
2751            ),
2752            TransactionSigned::new_unhashed(
2753                Transaction::Eip1559(TxEip1559 {
2754                    chain_id: 4,
2755                    nonce: 26u64,
2756                    max_priority_fee_per_gas: 1500000000,
2757                    max_fee_per_gas: 1500000013,
2758                    gas_limit: MIN_TRANSACTION_GAS,
2759                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2760                    value: U256::from(3000000000000000000u64),
2761                    input: Default::default(),
2762                    access_list: Default::default(),
2763                }),
2764                Signature::new(
2765                    U256::from_str(
2766                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2767                    )
2768                    .unwrap(),
2769                    U256::from_str(
2770                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2771                    )
2772                    .unwrap(),
2773                    true,
2774                ),
2775            ),
2776        ];
2777
2778        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2779
2780        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2781        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2782        // fetch
2783        peer_1.seen_transactions.insert(txs_hashes[0]);
2784        peer_1.seen_transactions.insert(txs_hashes[1]);
2785        tx_manager.peers.insert(peer_id_1, peer_1);
2786
2787        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2788        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2789
2790        // peer_1 is idle
2791        assert!(tx_fetcher.is_idle(&peer_id_1));
2792        assert_eq!(tx_fetcher.active_peers.len(), 0);
2793
2794        // sends requests for buffered hashes to peer_1
2795        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2796
2797        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2798        // as long as request is in flight peer_1 is not idle
2799        assert!(!tx_fetcher.is_idle(&peer_id_1));
2800        assert_eq!(tx_fetcher.active_peers.len(), 1);
2801
2802        // mock session of peer_1 receives request
2803        let req = to_mock_session_rx
2804            .recv()
2805            .await
2806            .expect("peer_1 session should receive request with buffered hashes");
2807        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2808
2809        let message: Vec<PooledTransactionVariant> = txs
2810            .into_iter()
2811            .take(1)
2812            .map(|tx| {
2813                PooledTransactionVariant::try_from(tx)
2814                    .expect("Failed to convert MockTransaction to PooledTransaction")
2815            })
2816            .collect();
2817        // response partial request
2818        response
2819            .send(Ok(PooledTransactions(message)))
2820            .expect("should send peer_1 response to tx manager");
2821        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2822            unreachable!()
2823        };
2824
2825        // request has resolved, peer_1 is idle again
2826        assert!(tx_fetcher.is_idle(&peer_id));
2827        assert_eq!(tx_fetcher.active_peers.len(), 0);
2828        // failing peer_1's request buffers requested hashes for retry.
2829        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2830    }
2831
2832    #[tokio::test]
2833    async fn test_max_retries_tx_request() {
2834        reth_tracing::init_test_tracing();
2835
2836        let mut tx_manager = new_tx_manager().await.0;
2837        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2838
2839        let peer_id_1 = PeerId::new([1; 64]);
2840        let peer_id_2 = PeerId::new([2; 64]);
2841        let eth_version = EthVersion::Eth66;
2842        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2843
2844        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2845        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2846        // fetch
2847        peer_1.seen_transactions.insert(seen_hashes[0]);
2848        peer_1.seen_transactions.insert(seen_hashes[1]);
2849        tx_manager.peers.insert(peer_id_1, peer_1);
2850
2851        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2852        // for first retry in reverse order to make index 0 lru
2853        let retries = 1;
2854        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2855        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2856
2857        // peer_1 is idle
2858        assert!(tx_fetcher.is_idle(&peer_id_1));
2859        assert_eq!(tx_fetcher.active_peers.len(), 0);
2860
2861        // sends request for buffered hashes to peer_1
2862        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2863
2864        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2865
2866        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2867        // as long as request is in inflight peer_1 is not idle
2868        assert!(!tx_fetcher.is_idle(&peer_id_1));
2869        assert_eq!(tx_fetcher.active_peers.len(), 1);
2870
2871        // mock session of peer_1 receives request
2872        let req = to_mock_session_rx
2873            .recv()
2874            .await
2875            .expect("peer_1 session should receive request with buffered hashes");
2876        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2877        let GetPooledTransactions(hashes) = request;
2878
2879        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2880
2881        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2882
2883        // fail request to peer_1
2884        response
2885            .send(Err(RequestError::BadResponse))
2886            .expect("should send peer_1 response to tx manager");
2887        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2888            unreachable!()
2889        };
2890
2891        // request has resolved, peer_1 is idle again
2892        assert!(tx_fetcher.is_idle(&peer_id));
2893        assert_eq!(tx_fetcher.active_peers.len(), 0);
2894        // failing peer_1's request buffers requested hashes for retry
2895        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2896
2897        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2898        tx_manager.peers.insert(peer_id_2, peer_2);
2899
2900        // peer_2 announces same hashes as peer_1
2901        let msg =
2902            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2903        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2904
2905        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2906
2907        // peer_2 should be in active_peers.
2908        assert_eq!(tx_fetcher.active_peers.len(), 1);
2909
2910        // since hashes are already seen, no changes to length of unknown hashes
2911        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2912        // but hashes are taken out of buffer and packed into request to peer_2
2913        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2914
2915        // mock session of peer_2 receives request
2916        let req = to_mock_session_rx
2917            .recv()
2918            .await
2919            .expect("peer_2 session should receive request with buffered hashes");
2920        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2921
2922        // report failed request to tx manager
2923        response
2924            .send(Err(RequestError::BadResponse))
2925            .expect("should send peer_2 response to tx manager");
2926        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2927
2928        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2929        // for retry
2930        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2931        assert_eq!(tx_fetcher.active_peers.len(), 0);
2932    }
2933
2934    #[test]
2935    fn test_transaction_builder_empty() {
2936        let mut builder =
2937            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2938        assert!(builder.is_empty());
2939
2940        let mut factory = MockTransactionFactory::default();
2941        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2942        builder.push(&tx);
2943        assert!(!builder.is_empty());
2944
2945        let txs = builder.build();
2946        assert!(txs.full.is_none());
2947        let txs = txs.pooled.unwrap();
2948        assert_eq!(txs.len(), 1);
2949    }
2950
2951    #[test]
2952    fn test_transaction_builder_large() {
2953        let mut builder =
2954            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2955        assert!(builder.is_empty());
2956
2957        let mut factory = MockTransactionFactory::default();
2958        let mut tx = factory.create_eip1559();
2959        // create a transaction that still fits
2960        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2961        let tx = Arc::new(tx);
2962        let tx = PropagateTransaction::pool_tx(tx);
2963        builder.push(&tx);
2964        assert!(!builder.is_empty());
2965
2966        let txs = builder.clone().build();
2967        assert!(txs.pooled.is_none());
2968        let txs = txs.full.unwrap();
2969        assert_eq!(txs.len(), 1);
2970
2971        builder.push(&tx);
2972
2973        let txs = builder.clone().build();
2974        let pooled = txs.pooled.unwrap();
2975        assert_eq!(pooled.len(), 1);
2976        let txs = txs.full.unwrap();
2977        assert_eq!(txs.len(), 1);
2978    }
2979
2980    #[test]
2981    fn test_transaction_builder_eip4844() {
2982        let mut builder =
2983            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2984        assert!(builder.is_empty());
2985
2986        let mut factory = MockTransactionFactory::default();
2987        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2988        builder.push(&tx);
2989        assert!(!builder.is_empty());
2990
2991        let txs = builder.clone().build();
2992        assert!(txs.full.is_none());
2993        let txs = txs.pooled.unwrap();
2994        assert_eq!(txs.len(), 1);
2995
2996        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2997        builder.push(&tx);
2998
2999        let txs = builder.clone().build();
3000        let pooled = txs.pooled.unwrap();
3001        assert_eq!(pooled.len(), 1);
3002        let txs = txs.full.unwrap();
3003        assert_eq!(txs.len(), 1);
3004    }
3005
3006    #[tokio::test]
3007    async fn test_propagate_full() {
3008        reth_tracing::init_test_tracing();
3009
3010        let (mut tx_manager, network) = new_tx_manager().await;
3011        let peer_id = PeerId::random();
3012
3013        // ensure not syncing
3014        network.handle().update_sync_state(SyncState::Idle);
3015
3016        // mock a peer
3017        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3018
3019        let session_info = SessionInfo {
3020            peer_id,
3021            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3022            client_version: Arc::from(""),
3023            capabilities: Arc::new(vec![].into()),
3024            status: Arc::new(Default::default()),
3025            version: EthVersion::Eth68,
3026            peer_kind: PeerKind::Basic,
3027        };
3028        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3029        tx_manager
3030            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3031        let mut propagate = vec![];
3032        let mut factory = MockTransactionFactory::default();
3033        let eip1559_tx = Arc::new(factory.create_eip1559());
3034        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3035        let eip4844_tx = Arc::new(factory.create_eip4844());
3036        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3037
3038        let propagated =
3039            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3040        assert_eq!(propagated.len(), 2);
3041        let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3042        assert_eq!(prop_txs.len(), 1);
3043        assert!(prop_txs[0].is_full());
3044
3045        let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3046        assert_eq!(prop_txs.len(), 1);
3047        assert!(prop_txs[0].is_hash());
3048
3049        let peer = tx_manager.peers.get(&peer_id).unwrap();
3050        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3051        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3052        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3053
3054        // propagate again
3055        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3056        assert!(propagated.is_empty());
3057    }
3058
3059    #[tokio::test]
3060    async fn test_propagate_pending_txs_while_initially_syncing() {
3061        reth_tracing::init_test_tracing();
3062
3063        let (mut tx_manager, network) = new_tx_manager().await;
3064        let peer_id = PeerId::random();
3065
3066        // Keep the node in initial sync mode.
3067        network.handle().update_sync_state(SyncState::Syncing);
3068        assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3069
3070        // Add a peer so propagation has a destination.
3071        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3072        tx_manager.peers.insert(peer_id, peer);
3073
3074        let tx = MockTransaction::eip1559();
3075        tx_manager
3076            .pool
3077            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3078            .await
3079            .expect("transaction should be accepted into the pool");
3080
3081        tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3082
3083        let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3084        assert!(peer.seen_transactions.contains(tx.get_hash()));
3085    }
3086
3087    #[tokio::test]
3088    async fn test_relaxed_filter_ignores_unknown_tx_types() {
3089        reth_tracing::init_test_tracing();
3090
3091        let transactions_manager_config = TransactionsManagerConfig::default();
3092
3093        let propagation_policy = TransactionPropagationKind::default();
3094        let announcement_policy = RelaxedEthAnnouncementFilter::default();
3095
3096        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3097
3098        let pool = testing_pool();
3099        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3100        let client = NoopProvider::default();
3101
3102        let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3103            .listener_port(0)
3104            .disable_discovery()
3105            .build(client.clone());
3106
3107        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3108        let (to_tx_manager_tx, from_network_rx) =
3109            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
3110        network_manager.set_transactions(to_tx_manager_tx);
3111        let network_handle = network_manager.handle().clone();
3112        let network_service_handle = tokio::spawn(network_manager);
3113
3114        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3115            network_handle.clone(),
3116            pool.clone(),
3117            from_network_rx,
3118            transactions_manager_config,
3119            policy_bundle,
3120        );
3121
3122        let peer_id = PeerId::random();
3123        let eth_version = EthVersion::Eth68;
3124        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3125        tx_manager.peers.insert(peer_id, mock_peer_metadata);
3126
3127        let mut tx_factory = MockTransactionFactory::default();
3128
3129        let valid_known_tx = tx_factory.create_eip1559();
3130        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3131
3132        let known_tx_hash = *known_tx_signed.hash();
3133        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3134        let known_tx_size = known_tx_signed.encoded_length();
3135
3136        let unknown_tx_hash = B256::random();
3137        let unknown_tx_type_byte = 0xff_u8;
3138        let unknown_tx_size = 150;
3139
3140        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3141            types: vec![known_tx_type_byte, unknown_tx_type_byte],
3142            sizes: vec![known_tx_size, unknown_tx_size],
3143            hashes: vec![known_tx_hash, unknown_tx_hash],
3144        });
3145
3146        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3147
3148        poll_fn(|cx| {
3149            let _ = tx_manager.poll_unpin(cx);
3150            Poll::Ready(())
3151        })
3152        .await;
3153
3154        let mut requested_hashes_in_getpooled = HashSet::new();
3155        let mut unexpected_request_received = false;
3156
3157        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3158            .await
3159        {
3160            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3161                let GetPooledTransactions(hashes) = request;
3162                for hash in hashes {
3163                    requested_hashes_in_getpooled.insert(hash);
3164                }
3165                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3166            }
3167            Ok(Some(other_request)) => {
3168                tracing::error!(?other_request, "Received unexpected PeerRequest type");
3169                unexpected_request_received = true;
3170            }
3171            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3172            Err(_timeout_err) => {
3173                tracing::info!("Timeout: No GetPooledTransactions request received.")
3174            }
3175        }
3176
3177        assert!(
3178            requested_hashes_in_getpooled.contains(&known_tx_hash),
3179            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3180        );
3181        assert!(
3182            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3183            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3184        );
3185        assert!(
3186            !unexpected_request_received,
3187            "An unexpected P2P request was received by the mock peer."
3188        );
3189
3190        network_service_handle.abort();
3191    }
3192}