Skip to main content

reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use itertools::Itertools;
5use rayon::iter::{IntoParallelIterator, ParallelIterator};
6
7/// Aggregation on configurable parameters for [`TransactionsManager`].
8pub mod config;
9/// Default and spec'd bounds.
10pub mod constants;
11/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
12pub mod fetcher;
13/// Defines the traits for transaction-related policies.
14pub mod policy;
15
16pub use self::constants::{
17    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
18    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
19};
20use config::AnnouncementAcceptance;
21pub use config::{
22    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
23    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
24};
25use policy::NetworkPolicies;
26
27pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
28
29use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
30use crate::{
31    budget::{
32        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
33        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
34    },
35    cache::LruCache,
36    duration_metered_exec, metered_poll_nested_stream_with_budget,
37    metrics::{
38        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
39    },
40    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
41    NetworkHandle, TxTypesCounter,
42};
43use alloy_primitives::{TxHash, B256};
44use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
45use futures::{stream::FuturesUnordered, Future, StreamExt};
46use reth_eth_wire::{
47    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
48    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
49    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
50    RequestTxHashes, Transactions, ValidAnnouncementData,
51};
52use reth_ethereum_primitives::{TransactionSigned, TxType};
53use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
54use reth_network_api::{
55    events::{PeerEvent, SessionInfo},
56    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
57};
58use reth_network_p2p::{
59    error::{RequestError, RequestResult},
60    sync::SyncStateProvider,
61};
62use reth_network_peers::PeerId;
63use reth_network_types::ReputationChangeKind;
64use reth_primitives_traits::SignedTransaction;
65use reth_tokio_util::EventStream;
66use reth_transaction_pool::{
67    error::{PoolError, PoolResult},
68    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
69    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
70};
71use std::{
72    collections::{hash_map::Entry, HashMap, HashSet},
73    pin::Pin,
74    sync::{
75        atomic::{AtomicUsize, Ordering},
76        Arc,
77    },
78    task::{Context, Poll},
79    time::{Duration, Instant},
80};
81use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
82use tokio_stream::wrappers::UnboundedReceiverStream;
83use tracing::{debug, trace};
84
85/// The future for importing transactions into the pool.
86///
87/// Resolves with the result of each transaction import.
88pub type PoolImportFuture =
89    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
90
91/// Api to interact with [`TransactionsManager`] task.
92///
93/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
94/// with the [`TransactionsManager`] task once it is spawned.
95///
96/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
97/// known by a specific peer.
98#[derive(Debug, Clone)]
99pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
100    /// Command channel to the [`TransactionsManager`]
101    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
102}
103
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105    fn send(&self, cmd: TransactionsCommand<N>) {
106        let _ = self.manager_tx.send(cmd);
107    }
108
109    /// Fetch the [`PeerRequestSender`] for the given peer.
110    async fn peer_handle(
111        &self,
112        peer_id: PeerId,
113    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114        let (tx, rx) = oneshot::channel();
115        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116        rx.await
117    }
118
119    /// Manually propagate the transaction that belongs to the hash.
120    pub fn propagate(&self, hash: TxHash) {
121        self.send(TransactionsCommand::PropagateHash(hash))
122    }
123
124    /// Manually propagate the transaction hash to a specific peer.
125    ///
126    /// Note: this only propagates if the pool contains the transaction.
127    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128        self.propagate_hashes_to(Some(hash), peer)
129    }
130
131    /// Manually propagate the transaction hashes to a specific peer.
132    ///
133    /// Note: this only propagates the transactions that are known to the pool.
134    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135        let hashes = hash.into_iter().collect::<Vec<_>>();
136        if hashes.is_empty() {
137            return
138        }
139        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140    }
141
142    /// Request the active peer IDs from the [`TransactionsManager`].
143    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144        let (tx, rx) = oneshot::channel();
145        self.send(TransactionsCommand::GetActivePeers(tx));
146        rx.await
147    }
148
149    /// Manually propagate full transaction hashes to a specific peer.
150    ///
151    /// Do nothing if transactions are empty.
152    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153        if transactions.is_empty() {
154            return
155        }
156        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157    }
158
159    /// Manually propagate the given transaction hashes to all peers.
160    ///
161    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
162    /// full.
163    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164        if transactions.is_empty() {
165            return
166        }
167        self.send(TransactionsCommand::PropagateTransactions(transactions))
168    }
169
170    /// Manually propagate the given transactions to all peers.
171    ///
172    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
173    /// full.
174    pub fn broadcast_transactions(
175        &self,
176        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177    ) {
178        let transactions =
179            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180        if transactions.is_empty() {
181            return
182        }
183        self.send(TransactionsCommand::BroadcastTransactions(transactions))
184    }
185
186    /// Request the transaction hashes known by specific peers.
187    pub async fn get_transaction_hashes(
188        &self,
189        peers: Vec<PeerId>,
190    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
191        if peers.is_empty() {
192            return Ok(Default::default())
193        }
194        let (tx, rx) = oneshot::channel();
195        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196        rx.await
197    }
198
199    /// Request the transaction hashes known by a specific peer.
200    pub async fn get_peer_transaction_hashes(
201        &self,
202        peer: PeerId,
203    ) -> Result<HashSet<TxHash>, RecvError> {
204        let res = self.get_transaction_hashes(vec![peer]).await?;
205        Ok(res.into_values().next().unwrap_or_default())
206    }
207
208    /// Requests the transactions directly from the given peer.
209    ///
210    /// Returns `None` if the peer is not connected.
211    ///
212    /// **Note**: this returns the response from the peer as received.
213    pub async fn get_pooled_transactions_from(
214        &self,
215        peer_id: PeerId,
216        hashes: Vec<B256>,
217    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
218        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
219
220        let (tx, rx) = oneshot::channel();
221        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
222        peer.try_send(request).ok();
223
224        rx.await?.map(|res| Some(res.0))
225    }
226}
227
228/// Manages transactions on top of the p2p network.
229///
230/// This can be spawned to another task and is supposed to be run as background service.
231/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
232/// interact with it.
233///
234/// The [`TransactionsManager`] is responsible for:
235///    - handling incoming eth messages for transactions.
236///    - serving transaction requests.
237///    - propagate transactions
238///
239/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
240///   - receives incoming network messages.
241///   - sends messages to dispatch (responses, propagate tx)
242///
243/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
244/// propagate new transactions over the network.
245///
246/// It can be configured with different policies for transaction propagation and announcement
247/// filtering. See [`NetworkPolicies`] for more details.
248///
249/// ## Network Transaction Processing
250///
251/// ### Message Types
252///
253/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
254/// - **`NewPooledTransactionHashes`**: Hash announcements
255///
256/// ### Peer Tracking
257///
258/// - Maintains per-peer transaction cache (default: 10,240 entries)
259/// - Prevents duplicate imports and enables efficient propagation
260///
261/// ### Bad Transaction Handling
262///
263/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
264/// Penalizes peers sending invalid transactions.
265///
266/// ### Import Management
267///
268/// Limits concurrent pool imports and backs off when approaching capacity.
269///
270/// ### Transaction Fetching
271///
272/// For announced transactions: filters known → queues unknown → fetches → imports
273///
274/// ### Propagation Rules
275///
276/// Based on: origin (Local/External/Private), peer capabilities, and network state.
277/// Disabled during initial sync.
278///
279/// ### Security
280///
281/// Rate limiting via reputation, bad transaction isolation, peer scoring.
282#[derive(Debug)]
283#[must_use = "Manager does nothing unless polled."]
284pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
285    /// Access to the transaction pool.
286    pool: Pool,
287    /// Network access.
288    network: NetworkHandle<N>,
289    /// Subscriptions to all network related events.
290    ///
291    /// From which we get all new incoming transaction related messages.
292    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
293    /// Transaction fetcher to handle inflight and missing transaction requests.
294    transaction_fetcher: TransactionFetcher<N>,
295    /// All currently pending transactions grouped by peers.
296    ///
297    /// This way we can track incoming transactions and prevent multiple pool imports for the same
298    /// transaction
299    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
300    /// Transactions that are currently imported into the `Pool`.
301    ///
302    /// The import process includes:
303    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
304    ///    valid, or for 4844 transaction the blobs are valid. See also
305    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
306    /// - if the transaction is valid, it is added into the pool.
307    ///
308    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
309    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
310    /// receiver.
311    pool_imports: FuturesUnordered<PoolImportFuture>,
312    /// Stats on pending pool imports that help the node self-monitor.
313    pending_pool_imports_info: PendingPoolImportsInfo,
314    /// Bad imports.
315    bad_imports: LruCache<TxHash>,
316    /// All the connected peers.
317    peers: HashMap<PeerId, PeerMetadata<N>>,
318    /// Send half for the command channel.
319    ///
320    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
321    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
322    /// Incoming commands from [`TransactionsHandle`].
323    ///
324    /// This will only receive commands if a user manually sends a command to the manager through
325    /// the [`TransactionsHandle`] to interact with this type directly.
326    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
327    /// A stream that yields new __pending__ transactions.
328    ///
329    /// A transaction is considered __pending__ if it is executable on the current state of the
330    /// chain. In other words, this only yields transactions that satisfy all consensus
331    /// requirements, these include:
332    ///   - no nonce gaps
333    ///   - all dynamic fee requirements are (currently) met
334    ///   - account has enough balance to cover the transaction's gas
335    pending_transactions: mpsc::Receiver<TxHash>,
336    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
337    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
338    /// How the `TransactionsManager` is configured.
339    config: TransactionsManagerConfig,
340    /// Network Policies
341    policies: NetworkPolicies<N>,
342    /// `TransactionsManager` metrics
343    metrics: TransactionsManagerMetrics,
344    /// `AnnouncedTxTypes` metrics
345    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
346}
347
348impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
349    /// Sets up a new instance.
350    ///
351    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
352    pub fn new(
353        network: NetworkHandle<N>,
354        pool: Pool,
355        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
356        transactions_manager_config: TransactionsManagerConfig,
357    ) -> Self {
358        Self::with_policy(
359            network,
360            pool,
361            from_network,
362            transactions_manager_config,
363            NetworkPolicies::new(
364                TransactionPropagationKind::default(),
365                StrictEthAnnouncementFilter::default(),
366            ),
367        )
368    }
369}
370
371impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
372    /// Sets up a new instance with given the settings.
373    ///
374    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
375    pub fn with_policy(
376        network: NetworkHandle<N>,
377        pool: Pool,
378        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
379        transactions_manager_config: TransactionsManagerConfig,
380        policies: NetworkPolicies<N>,
381    ) -> Self {
382        let network_events = network.event_listener();
383
384        let (command_tx, command_rx) = mpsc::unbounded_channel();
385
386        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
387            &transactions_manager_config.transaction_fetcher_config,
388        );
389
390        // install a listener for new __pending__ transactions that are allowed to be propagated
391        // over the network
392        let pending = pool.pending_transactions_listener();
393        let pending_pool_imports_info =
394            PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
395        let metrics = TransactionsManagerMetrics::default();
396        metrics
397            .capacity_pending_pool_imports
398            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
399
400        Self {
401            pool,
402            network,
403            network_events,
404            transaction_fetcher,
405            transactions_by_peers: Default::default(),
406            pool_imports: Default::default(),
407            pending_pool_imports_info,
408            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
409            peers: Default::default(),
410            command_tx,
411            command_rx: UnboundedReceiverStream::new(command_rx),
412            pending_transactions: pending,
413            transaction_events: UnboundedMeteredReceiver::new(
414                from_network,
415                NETWORK_POOL_TRANSACTIONS_SCOPE,
416            ),
417            config: transactions_manager_config,
418            policies,
419            metrics,
420            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
421        }
422    }
423
424    /// Returns a new handle that can send commands to this type.
425    pub fn handle(&self) -> TransactionsHandle<N> {
426        TransactionsHandle { manager_tx: self.command_tx.clone() }
427    }
428
429    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
430    /// `false` if [`TransactionsManager`] is operating close to full capacity.
431    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
432        self.has_capacity_for_pending_pool_imports() &&
433            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434    }
435
436    /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
437    fn has_capacity_for_pending_pool_imports(&self) -> bool {
438        self.remaining_pool_import_capacity() > 0
439    }
440
441    /// Returns the remaining capacity for pending pool imports.
442    fn remaining_pool_import_capacity(&self) -> usize {
443        self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
444            self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
445        )
446    }
447
448    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450        self.metrics.reported_bad_transactions.increment(1);
451    }
452
453    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455        self.network.reputation_change(peer_id, kind);
456    }
457
458    fn report_already_seen(&self, peer_id: PeerId) {
459        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461    }
462
463    /// Handles a closed peer session, removing the peer from transaction-local tracking state.
464    fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
465        if let Some(mut peer) = self.peers.remove(peer_id) {
466            self.policies.propagation_policy_mut().on_session_closed(&mut peer);
467        }
468        self.transaction_fetcher.remove_peer(peer_id);
469    }
470
471    /// Clear the transaction
472    fn on_good_import(&mut self, hash: TxHash) {
473        self.transactions_by_peers.remove(&hash);
474    }
475
476    /// Handles a failed transaction import.
477    ///
478    /// Blob sidecar errors (e.g. invalid proof, missing sidecar) are penalized via
479    /// `report_peer_bad_transactions` but NOT cached in `bad_imports` — the transaction itself
480    /// may be valid when fetched from another peer with correct sidecar data.
481    ///
482    /// Other bad transactions are penalized and cached in `bad_imports` to avoid fetching or
483    /// importing them again.
484    ///
485    /// Errors that count as bad transactions are:
486    ///
487    /// - intrinsic gas too low
488    /// - exceeds gas limit
489    /// - gas uint overflow
490    /// - exceeds max init code size
491    /// - oversized data
492    /// - signer account has bytecode
493    /// - chain id mismatch
494    /// - old legacy chain id
495    /// - tx type not supported
496    ///
497    /// (and additionally for blobs txns...)
498    ///
499    /// - no blobs
500    /// - too many blobs
501    /// - invalid kzg proof
502    /// - kzg error
503    /// - not blob transaction (tx type mismatch)
504    /// - wrong versioned kzg commitment hash
505    fn on_bad_import(&mut self, err: PoolError) {
506        let peers = self.transactions_by_peers.remove(&err.hash);
507
508        if err.is_bad_blob_sidecar() {
509            // Blob sidecar errors: penalize but do NOT cache the hash as bad.
510            // The transaction may be valid — only the sidecar from this peer was wrong.
511            // Using regular penalties means repeated offenders still get disconnected.
512            if let Some(peers) = peers {
513                for peer_id in peers {
514                    self.report_peer_bad_transactions(peer_id);
515                }
516            }
517            return
518        }
519
520        // if we're _currently_ syncing, we ignore a bad transaction
521        if !err.is_bad_transaction() || self.network.is_syncing() {
522            return
523        }
524        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
525        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
526        if let Some(peers) = peers {
527            for peer_id in peers {
528                self.report_peer_bad_transactions(peer_id);
529            }
530        }
531        self.metrics.bad_imports.increment(1);
532        self.bad_imports.insert(err.hash);
533    }
534
535    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
536    ///
537    /// Returns `true` if a request was sent.
538    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
539        // try drain transaction hashes pending fetch
540        let info = &self.pending_pool_imports_info;
541        let max_pending_pool_imports = info.max_pending_pool_imports;
542        let has_capacity_wrt_pending_pool_imports =
543            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
544
545        self.transaction_fetcher
546            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
547    }
548
549    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
550        let kind = match req_err {
551            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
552            RequestError::Timeout => ReputationChangeKind::Timeout,
553            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
554                // peer is already disconnected
555                return
556            }
557            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
558        };
559        self.report_peer(peer_id, kind);
560    }
561
562    #[inline]
563    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
564        let metrics = &self.metrics;
565
566        let TxManagerPollDurations {
567            acc_network_events,
568            acc_pending_imports,
569            acc_tx_events,
570            acc_imported_txns,
571            acc_fetch_events,
572            acc_pending_fetch,
573            acc_cmds,
574        } = poll_durations;
575
576        // update metrics for whole poll function
577        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
578        // update metrics for nested expressions
579        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
580        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
581        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
582        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
583        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
584        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
585        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
586    }
587}
588
589impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
590    /// Processes a batch import results.
591    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
592        for res in batch_results {
593            match res {
594                Ok(AddedTransactionOutcome { hash, .. }) => {
595                    self.on_good_import(hash);
596                }
597                Err(err) => {
598                    self.on_bad_import(err);
599                }
600            }
601        }
602    }
603
604    /// Request handler for an incoming `NewPooledTransactionHashes`
605    fn on_new_pooled_transaction_hashes(
606        &mut self,
607        peer_id: PeerId,
608        msg: NewPooledTransactionHashes,
609    ) {
610        // If the node is initially syncing, ignore transactions
611        if self.network.is_initially_syncing() {
612            return
613        }
614        if self.network.tx_gossip_disabled() {
615            return
616        }
617
618        // get handle to peer's session, if the session is still active
619        let Some(peer) = self.peers.get_mut(&peer_id) else {
620            trace!(
621                peer_id = format!("{peer_id:#}"),
622                ?msg,
623                "discarding announcement from inactive peer"
624            );
625
626            return
627        };
628        let client = peer.client_version.clone();
629
630        // keep track of the transactions the peer knows
631        let mut count_txns_already_seen_by_peer = 0;
632        for tx in msg.iter_hashes().copied() {
633            if !peer.seen_transactions.insert(tx) {
634                count_txns_already_seen_by_peer += 1;
635            }
636        }
637        if count_txns_already_seen_by_peer > 0 {
638            // this may occur if transactions are sent or announced to a peer, at the same time as
639            // the peer sends/announces those hashes to us. this is because, marking
640            // txns as seen by a peer is done optimistically upon sending them to the
641            // peer.
642            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
643            self.metrics
644                .occurrences_hash_already_seen_by_peer
645                .increment(count_txns_already_seen_by_peer);
646
647            trace!(target: "net::tx",
648                %count_txns_already_seen_by_peer,
649                peer_id=format!("{peer_id:#}"),
650                ?client,
651                "Peer sent hashes that have already been marked as seen by peer"
652            );
653
654            self.report_already_seen(peer_id);
655        }
656
657        // 1. filter out spam
658        if msg.is_empty() {
659            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
660            return;
661        }
662
663        let original_len = msg.len();
664        let mut partially_valid_msg = msg.dedup();
665
666        if partially_valid_msg.len() != original_len {
667            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
668        }
669
670        // 2. filter out transactions pending import to pool
671        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
672
673        // 3. filter out invalid entries (spam)
674        //
675        // validates messages with respect to the given network, e.g. allowed tx types.
676        // done before the pool lookup since these are cheap in-memory checks that shrink
677        // the set before acquiring the pool lock.
678        //
679        let mut should_report_peer = false;
680        let mut tx_types_counter = TxTypesCounter::default();
681
682        let is_eth68_message = partially_valid_msg
683            .msg_version()
684            .expect("partially valid announcement should have a version")
685            .is_eth68();
686
687        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
688            let (ty_byte, size_val) = match *metadata_ref_mut {
689                Some((ty, size)) => {
690                    if !is_eth68_message {
691                        should_report_peer = true;
692                    }
693                    (ty, size)
694                }
695                None => {
696                    if is_eth68_message {
697                        should_report_peer = true;
698                        return false;
699                    }
700                    (0u8, 0)
701                }
702            };
703
704            if is_eth68_message &&
705                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
706                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
707            {
708                tx_types_counter.increase_by_tx_type(parsed_tx_type);
709            }
710
711            let decision = self
712                .policies
713                .announcement_filter()
714                .decide_on_announcement(ty_byte, tx_hash, size_val);
715
716            match decision {
717                AnnouncementAcceptance::Accept => true,
718                AnnouncementAcceptance::Ignore => false,
719                AnnouncementAcceptance::Reject { penalize_peer } => {
720                    if penalize_peer {
721                        should_report_peer = true;
722                    }
723                    false
724                }
725            }
726        });
727
728        if is_eth68_message {
729            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
730        }
731
732        if should_report_peer {
733            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
734        }
735
736        // 4. filter out known hashes
737        //
738        // known txns have already been successfully fetched or received over gossip.
739        //
740        // most hashes will be filtered out here since the mempool protocol is a gossip
741        // protocol, healthy peers will send many of the same hashes.
742        //
743        let hashes_count_pre_pool_filter = partially_valid_msg.len();
744        self.pool.retain_unknown(&mut partially_valid_msg);
745        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
746            let already_known_hashes_count =
747                hashes_count_pre_pool_filter - partially_valid_msg.len();
748            self.metrics
749                .occurrences_hashes_already_in_pool
750                .increment(already_known_hashes_count as u64);
751        }
752
753        if partially_valid_msg.is_empty() {
754            // nothing to request
755            return
756        }
757
758        let mut valid_announcement_data =
759            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
760
761        if valid_announcement_data.is_empty() {
762            // no valid announcement data
763            return
764        }
765
766        // 5. filter out already seen unknown hashes
767        //
768        // seen hashes are already in the tx fetcher, pending fetch.
769        //
770        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
771        // fetcher, hence they should be valid at this point.
772        let bad_imports = &self.bad_imports;
773        self.transaction_fetcher.filter_unseen_and_pending_hashes(
774            &mut valid_announcement_data,
775            |hash| bad_imports.contains(hash),
776            &peer_id,
777            &client,
778        );
779
780        if valid_announcement_data.is_empty() {
781            // nothing to request
782            return
783        }
784
785        trace!(target: "net::tx::propagation",
786            peer_id=format!("{peer_id:#}"),
787            hashes_len=valid_announcement_data.len(),
788            hashes=%valid_announcement_data.keys().format(", "),
789            msg_version=%valid_announcement_data.msg_version(),
790            client_version=%client,
791            "received previously unseen and pending hashes in announcement from peer"
792        );
793
794        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
795        // fallback
796        if !self.transaction_fetcher.is_idle(&peer_id) {
797            // load message version before announcement data is destructed in packing
798            let msg_version = valid_announcement_data.msg_version();
799            let (hashes, _version) = valid_announcement_data.into_request_hashes();
800
801            trace!(target: "net::tx",
802                peer_id=format!("{peer_id:#}"),
803                hashes=?*hashes,
804                %msg_version,
805                %client,
806                "buffering hashes announced by busy peer"
807            );
808
809            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
810
811            return
812        }
813
814        let mut hashes_to_request =
815            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
816        let surplus_hashes =
817            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
818
819        if !surplus_hashes.is_empty() {
820            trace!(target: "net::tx",
821                peer_id=format!("{peer_id:#}"),
822                surplus_hashes=?*surplus_hashes,
823                %client,
824                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
825            );
826
827            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
828        }
829
830        trace!(target: "net::tx",
831            peer_id=format!("{peer_id:#}"),
832            hashes=?*hashes_to_request,
833            %client,
834            "sending hashes in `GetPooledTransactions` request to peer's session"
835        );
836
837        // request the missing transactions
838        //
839        // get handle to peer's session again, at this point we know it exists
840        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
841        if let Some(failed_to_request_hashes) =
842            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
843        {
844            let conn_eth_version = peer.version;
845
846            trace!(target: "net::tx",
847                peer_id=format!("{peer_id:#}"),
848                failed_to_request_hashes=?*failed_to_request_hashes,
849                %conn_eth_version,
850                %client,
851                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
852            );
853            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
854        }
855    }
856}
857
858impl<Pool, N> TransactionsManager<Pool, N>
859where
860    Pool: TransactionPool + Unpin + 'static,
861    N: NetworkPrimitives<
862            BroadcastedTransaction: SignedTransaction,
863            PooledTransaction: SignedTransaction,
864        > + Unpin,
865    Pool::Transaction:
866        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
867{
868    /// Invoked when transactions in the local mempool are considered __pending__.
869    ///
870    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
871    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
872    /// messages. The Transactions message relays complete transaction objects and is typically
873    /// sent to a small, random fraction of connected peers.
874    ///
875    /// All other peers receive a notification of the transaction hash and can request the
876    /// complete transaction object if it is unknown to them. The dissemination of complete
877    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
878    /// and won't need to request it.
879    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
880        // We intentionally do not gate this on initial sync.
881        // During initial sync we skip importing tx announcements from peers in
882        // `on_new_pooled_transaction_hashes`, so transactions reaching this path are local.
883        if self.network.tx_gossip_disabled() {
884            return
885        }
886
887        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
888
889        self.propagate_all(hashes);
890    }
891
892    /// Propagate the full transactions to a specific peer.
893    ///
894    /// Returns the propagated transactions.
895    fn propagate_full_transactions_to_peer(
896        &mut self,
897        txs: Vec<TxHash>,
898        peer_id: PeerId,
899        propagation_mode: PropagationMode,
900    ) -> Option<PropagatedTransactions> {
901        let peer = self.peers.get_mut(&peer_id)?;
902        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
903        let mut propagated = PropagatedTransactions::default();
904
905        // filter all transactions unknown to the peer
906        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
907
908        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
909
910        if propagation_mode.is_forced() {
911            // skip cache check if forced
912            full_transactions.extend(to_propagate);
913        } else {
914            // Iterate through the transactions to propagate and fill the hashes and full
915            // transaction
916            for tx in to_propagate {
917                if !peer.seen_transactions.contains(tx.tx_hash()) {
918                    // Only include if the peer hasn't seen the transaction
919                    full_transactions.push(&tx);
920                }
921            }
922        }
923
924        if full_transactions.is_empty() {
925            // nothing to propagate
926            return None
927        }
928
929        let PropagateTransactions { pooled, full } = full_transactions.build();
930
931        // send hashes if any
932        if let Some(new_pooled_hashes) = pooled {
933            for hash in new_pooled_hashes.iter_hashes().copied() {
934                propagated.record(hash, PropagateKind::Hash(peer_id));
935                // mark transaction as seen by peer
936                peer.seen_transactions.insert(hash);
937            }
938
939            // send hashes of transactions
940            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
941        }
942
943        // send full transactions, if any
944        if let Some(new_full_transactions) = full {
945            for tx in &new_full_transactions {
946                propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
947                // mark transaction as seen by peer
948                peer.seen_transactions.insert(*tx.tx_hash());
949            }
950
951            // send full transactions
952            self.network.send_transactions(peer_id, new_full_transactions);
953        }
954
955        // Update propagated transactions metrics
956        self.metrics.propagated_transactions.increment(propagated.len() as u64);
957
958        Some(propagated)
959    }
960
961    /// Propagate the transaction hashes to the given peer
962    ///
963    /// Note: This will only send the hashes for transactions that exist in the pool.
964    fn propagate_hashes_to(
965        &mut self,
966        hashes: Vec<TxHash>,
967        peer_id: PeerId,
968        propagation_mode: PropagationMode,
969    ) {
970        trace!(target: "net::tx", "Start propagating transactions as hashes");
971
972        // This fetches a transactions from the pool, including the blob transactions, which are
973        // only ever sent as hashes.
974        let propagated = {
975            let Some(peer) = self.peers.get_mut(&peer_id) else {
976                // no such peer
977                return
978            };
979
980            let to_propagate =
981                self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
982
983            let mut propagated = PropagatedTransactions::default();
984
985            // check if transaction is known to peer
986            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
987
988            if propagation_mode.is_forced() {
989                hashes.extend(to_propagate)
990            } else {
991                for tx in to_propagate {
992                    if !peer.seen_transactions.contains(tx.tx_hash()) {
993                        // Include if the peer hasn't seen it
994                        hashes.push(&tx);
995                    }
996                }
997            }
998
999            let new_pooled_hashes = hashes.build();
1000
1001            if new_pooled_hashes.is_empty() {
1002                // nothing to propagate
1003                return
1004            }
1005
1006            if let Some(peer) = self.peers.get_mut(&peer_id) {
1007                for hash in new_pooled_hashes.iter_hashes().copied() {
1008                    propagated.record(hash, PropagateKind::Hash(peer_id));
1009                    peer.seen_transactions.insert(hash);
1010                }
1011            }
1012
1013            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1014
1015            // send hashes of transactions
1016            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1017
1018            // Update propagated transactions metrics
1019            self.metrics.propagated_transactions.increment(propagated.len() as u64);
1020
1021            propagated
1022        };
1023
1024        // notify pool so events get fired
1025        self.pool.on_propagated(propagated);
1026    }
1027
1028    /// Propagate the transactions to all connected peers either as full objects or hashes.
1029    ///
1030    /// The message for new pooled hashes depends on the negotiated version of the stream.
1031    /// See [`NewPooledTransactionHashes`]
1032    ///
1033    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1034    fn propagate_transactions(
1035        &mut self,
1036        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1037        propagation_mode: PropagationMode,
1038    ) -> PropagatedTransactions {
1039        let mut propagated = PropagatedTransactions::default();
1040        if self.network.tx_gossip_disabled() {
1041            return propagated
1042        }
1043
1044        // send full transactions to a set of the connected peers based on the configured mode
1045        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1046
1047        // Note: Assuming ~random~ order due to random state of the peers map hasher
1048        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1049            if !self.policies.propagation_policy().can_propagate(peer) {
1050                // skip peers we should not propagate to
1051                continue
1052            }
1053            // determine whether to send full tx objects or hashes.
1054            let mut builder = if peer_idx > max_num_full {
1055                PropagateTransactionsBuilder::pooled(peer.version)
1056            } else {
1057                PropagateTransactionsBuilder::full(peer.version)
1058            };
1059
1060            if propagation_mode.is_forced() {
1061                builder.extend(to_propagate.iter());
1062            } else {
1063                // Iterate through the transactions to propagate and fill the hashes and full
1064                // transaction lists, before deciding whether or not to send full transactions to
1065                // the peer.
1066                for tx in &to_propagate {
1067                    // Only proceed if the transaction is not in the peer's list of seen
1068                    // transactions
1069                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1070                        builder.push(tx);
1071                    }
1072                }
1073            }
1074
1075            if builder.is_empty() {
1076                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1077                continue
1078            }
1079
1080            let PropagateTransactions { pooled, full } = builder.build();
1081
1082            // send hashes if any
1083            if let Some(mut new_pooled_hashes) = pooled {
1084                // enforce tx soft limit per message for the (unlikely) event the number of
1085                // hashes exceeds it
1086                new_pooled_hashes
1087                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1088
1089                for hash in new_pooled_hashes.iter_hashes().copied() {
1090                    propagated.record(hash, PropagateKind::Hash(*peer_id));
1091                    // mark transaction as seen by peer
1092                    peer.seen_transactions.insert(hash);
1093                }
1094
1095                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1096
1097                // send hashes of transactions
1098                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1099            }
1100
1101            // send full transactions, if any
1102            if let Some(new_full_transactions) = full {
1103                for tx in &new_full_transactions {
1104                    propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1105                    // mark transaction as seen by peer
1106                    peer.seen_transactions.insert(*tx.tx_hash());
1107                }
1108
1109                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1110
1111                // send full transactions
1112                self.network.send_transactions(*peer_id, new_full_transactions);
1113            }
1114        }
1115
1116        // Update propagated transactions metrics
1117        self.metrics.propagated_transactions.increment(propagated.len() as u64);
1118
1119        propagated
1120    }
1121
1122    /// Propagates the given transactions to the peers
1123    ///
1124    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1125    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1126    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1127        if self.peers.is_empty() {
1128            // nothing to propagate
1129            return
1130        }
1131        let propagated = self.propagate_transactions(
1132            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1133            PropagationMode::Basic,
1134        );
1135
1136        // notify pool so events get fired
1137        self.pool.on_propagated(propagated);
1138    }
1139
1140    /// Request handler for an incoming request for transactions
1141    fn on_get_pooled_transactions(
1142        &mut self,
1143        peer_id: PeerId,
1144        request: GetPooledTransactions,
1145        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1146    ) {
1147        // fast exit if gossip is disabled
1148        if self.network.tx_gossip_disabled() {
1149            let _ = response.send(Ok(PooledTransactions::default()));
1150            return
1151        }
1152        if let Some(peer) = self.peers.get_mut(&peer_id) {
1153            let transactions = self.pool.get_pooled_transaction_elements(
1154                request.0,
1155                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1156                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1157                ),
1158            );
1159            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1160
1161            // we sent a response at which point we assume that the peer is aware of the
1162            // transactions
1163            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1164
1165            let resp = PooledTransactions(transactions);
1166            let _ = response.send(Ok(resp));
1167        }
1168    }
1169
1170    /// Handles a command received from a detached [`TransactionsHandle`]
1171    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1172        match cmd {
1173            TransactionsCommand::PropagateHash(hash) => {
1174                self.on_new_pending_transactions(vec![hash])
1175            }
1176            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1177                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1178            }
1179            TransactionsCommand::GetActivePeers(tx) => {
1180                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1181                tx.send(peers).ok();
1182            }
1183            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1184                if let Some(propagated) =
1185                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1186                {
1187                    self.pool.on_propagated(propagated);
1188                }
1189            }
1190            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1191            TransactionsCommand::BroadcastTransactions(txs) => {
1192                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1193                self.pool.on_propagated(propagated);
1194            }
1195            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1196                let mut res = HashMap::with_capacity(peers.len());
1197                for peer_id in peers {
1198                    let hashes = self
1199                        .peers
1200                        .get(&peer_id)
1201                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1202                        .unwrap_or_default();
1203                    res.insert(peer_id, hashes);
1204                }
1205                tx.send(res).ok();
1206            }
1207            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1208                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1209                peer_request_sender.send(sender).ok();
1210            }
1211        }
1212    }
1213
1214    /// Handles session establishment and peer transactions initialization.
1215    ///
1216    /// This is invoked when a new session is established.
1217    fn handle_peer_session(
1218        &mut self,
1219        info: SessionInfo,
1220        messages: PeerRequestSender<PeerRequest<N>>,
1221    ) {
1222        let SessionInfo { peer_id, client_version, version, .. } = info;
1223
1224        // Insert a new peer into the peerset.
1225        let peer = PeerMetadata::<N>::new(
1226            messages,
1227            version,
1228            client_version,
1229            self.config.max_transactions_seen_by_peer_history,
1230            info.peer_kind,
1231        );
1232        let peer = match self.peers.entry(peer_id) {
1233            Entry::Occupied(mut entry) => {
1234                entry.insert(peer);
1235                entry.into_mut()
1236            }
1237            Entry::Vacant(entry) => entry.insert(peer),
1238        };
1239
1240        self.policies.propagation_policy_mut().on_session_established(peer);
1241
1242        // Send a `NewPooledTransactionHashes` to the peer with up to
1243        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1244        // transactions in the pool.
1245        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1246            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1247            return
1248        }
1249
1250        // Get transactions to broadcast
1251        let pooled_txs = self.pool.pooled_transactions_max(
1252            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1253        );
1254        if pooled_txs.is_empty() {
1255            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1256            return;
1257        }
1258
1259        // Build and send transaction hashes message
1260        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1261        for pooled_tx in pooled_txs {
1262            peer.seen_transactions.insert(*pooled_tx.hash());
1263            msg_builder.push_pooled(pooled_tx);
1264        }
1265
1266        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1267        let msg = msg_builder.build();
1268        self.network.send_transactions_hashes(peer_id, msg);
1269    }
1270
1271    /// Handles a received event related to common network events.
1272    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1273        match event_result {
1274            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1275                self.on_peer_session_closed(&peer_id);
1276            }
1277            NetworkEvent::ActivePeerSession { info, messages } => {
1278                // process active peer session and broadcast available transaction from the pool
1279                self.handle_peer_session(info, messages);
1280            }
1281            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1282                let peer_id = info.peer_id;
1283                // get messages from existing peer
1284                let messages = match self.peers.get(&peer_id) {
1285                    Some(p) => p.request_tx.clone(),
1286                    None => {
1287                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1288                        return;
1289                    }
1290                };
1291                self.handle_peer_session(info, messages);
1292            }
1293            _ => {}
1294        }
1295    }
1296
1297    /// Returns true if the ingress policy allows processing messages from the given peer.
1298    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1299        if self.config.ingress_policy.allows_all() {
1300            return true;
1301        }
1302        let Some(peer) = self.peers.get(peer_id) else {
1303            return false;
1304        };
1305        self.config.ingress_policy.allows(peer.peer_kind())
1306    }
1307
1308    /// Handles dedicated transaction events related to the `eth` protocol.
1309    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1310        match event {
1311            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1312                if !self.accepts_incoming_from(&peer_id) {
1313                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1314                    return;
1315                }
1316
1317                // ensure we didn't receive any blob transactions as these are disallowed to be
1318                // broadcasted in full
1319
1320                let has_blob_txs = msg.has_eip4844();
1321
1322                let non_blob_txs = msg
1323                    .into_iter()
1324                    .map(N::PooledTransaction::try_from)
1325                    .filter_map(Result::ok)
1326                    .collect();
1327
1328                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1329
1330                if has_blob_txs {
1331                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1332                    self.report_peer_bad_transactions(peer_id);
1333                }
1334            }
1335            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1336                if !self.accepts_incoming_from(&peer_id) {
1337                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1338                    return;
1339                }
1340                self.on_new_pooled_transaction_hashes(peer_id, msg)
1341            }
1342            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1343                self.on_get_pooled_transactions(peer_id, request, response)
1344            }
1345            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1346                let _ = response.send(Some(self.handle()));
1347            }
1348        }
1349    }
1350
1351    /// Starts the import process for the given transactions.
1352    fn import_transactions(
1353        &mut self,
1354        peer_id: PeerId,
1355        transactions: PooledTransactions<N::PooledTransaction>,
1356        source: TransactionSource,
1357    ) {
1358        // If the node is pipeline syncing, ignore transactions
1359        if self.network.is_initially_syncing() {
1360            return
1361        }
1362        if self.network.tx_gossip_disabled() {
1363            return
1364        }
1365
1366        // Early return if we don't have capacity for any imports
1367        if !self.has_capacity_for_pending_pool_imports() {
1368            return
1369        }
1370
1371        let mut transactions = transactions.0;
1372
1373        // Truncate to remaining capacity early to bound work on all subsequent processing.
1374        // Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
1375        // malicious and we avoid wasting CPU on them.
1376        let capacity = self.remaining_pool_import_capacity();
1377        if transactions.len() > capacity {
1378            let skipped = transactions.len() - capacity;
1379            transactions.truncate(capacity);
1380            self.metrics
1381                .skipped_transactions_pending_pool_imports_at_capacity
1382                .increment(skipped as u64);
1383            trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1384        }
1385
1386        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1387        let client_version = peer.client_version.clone();
1388
1389        let start = Instant::now();
1390
1391        // mark the transactions as received
1392        self.transaction_fetcher
1393            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1394
1395        // track that the peer knows these transaction, but only if this is a new broadcast.
1396        // If we received the transactions as the response to our `GetPooledTransactions``
1397        // requests (based on received `NewPooledTransactionHashes`) then we already
1398        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1399        let mut num_already_seen_by_peer = 0;
1400        for tx in &transactions {
1401            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1402                num_already_seen_by_peer += 1;
1403            }
1404        }
1405
1406        // tracks the quality of the given transactions
1407        let mut has_bad_transactions = false;
1408
1409        // 1. Remove known, already-tracked, and invalid transactions first since these are
1410        // cheap in-memory checks against local maps
1411        transactions.retain(|tx| {
1412            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1413                entry.get_mut().insert(peer_id);
1414                return false
1415            }
1416            if self.bad_imports.contains(tx.tx_hash()) {
1417                trace!(target: "net::tx",
1418                    peer_id=format!("{peer_id:#}"),
1419                    hash=%tx.tx_hash(),
1420                    %client_version,
1421                    "received a known bad transaction from peer"
1422                );
1423                has_bad_transactions = true;
1424                return false;
1425            }
1426            true
1427        });
1428
1429        // 2. filter out txns already inserted into pool
1430        let txns_count_pre_pool_filter = transactions.len();
1431        self.pool.retain_unknown(&mut transactions);
1432        if txns_count_pre_pool_filter > transactions.len() {
1433            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1434            self.metrics
1435                .occurrences_transactions_already_in_pool
1436                .increment(already_known_txns_count as u64);
1437        }
1438
1439        let txs_len = transactions.len();
1440
1441        let new_txs = transactions
1442            .into_par_iter()
1443            .filter_map(|tx| match tx.try_into_recovered() {
1444                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1445                Err(badtx) => {
1446                    trace!(target: "net::tx",
1447                        peer_id=format!("{peer_id:#}"),
1448                        hash=%badtx.tx_hash(),
1449                        client_version=%client_version,
1450                        "failed ecrecovery for transaction"
1451                    );
1452                    None
1453                }
1454            })
1455            .collect::<Vec<_>>();
1456
1457        has_bad_transactions |= new_txs.len() != txs_len;
1458
1459        // Record the transactions as seen by the peer
1460        for tx in &new_txs {
1461            self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1462        }
1463
1464        // 3. import new transactions as a batch to minimize lock contention on the underlying
1465        // pool
1466        if !new_txs.is_empty() {
1467            let pool = self.pool.clone();
1468            // update metrics
1469            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1470            metric_pending_pool_imports.increment(new_txs.len() as f64);
1471
1472            // update self-monitoring info
1473            self.pending_pool_imports_info
1474                .pending_pool_imports
1475                .fetch_add(new_txs.len(), Ordering::Relaxed);
1476            let tx_manager_info_pending_pool_imports =
1477                self.pending_pool_imports_info.pending_pool_imports.clone();
1478
1479            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1480            let import = Box::pin(async move {
1481                let added = new_txs.len();
1482                let res = pool.add_external_transactions(new_txs).await;
1483
1484                // update metrics
1485                metric_pending_pool_imports.decrement(added as f64);
1486                // update self-monitoring info
1487                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1488
1489                res
1490            });
1491
1492            self.pool_imports.push(import);
1493        }
1494
1495        if num_already_seen_by_peer > 0 {
1496            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1497            self.metrics
1498                .occurrences_of_transaction_already_seen_by_peer
1499                .increment(num_already_seen_by_peer);
1500            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1501        }
1502
1503        if has_bad_transactions {
1504            // peer sent us invalid transactions
1505            self.report_peer_bad_transactions(peer_id)
1506        }
1507
1508        if num_already_seen_by_peer > 0 {
1509            self.report_already_seen(peer_id);
1510        }
1511
1512        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1513    }
1514
1515    /// Processes a [`FetchEvent`].
1516    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1517        match fetch_event {
1518            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1519                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1520                if report_peer {
1521                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1522                }
1523            }
1524            FetchEvent::FetchError { peer_id, error } => {
1525                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1526                self.on_request_error(peer_id, error);
1527            }
1528            FetchEvent::EmptyResponse { peer_id } => {
1529                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1530            }
1531        }
1532    }
1533}
1534
1535/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1536/// [`crate::NetworkManager`] for more context on the design pattern.
1537///
1538/// This should be spawned or used as part of `tokio::select!`.
1539//
1540// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1541// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1542impl<
1543        Pool: TransactionPool + Unpin + 'static,
1544        N: NetworkPrimitives<
1545                BroadcastedTransaction: SignedTransaction,
1546                PooledTransaction: SignedTransaction,
1547            > + Unpin,
1548    > Future for TransactionsManager<Pool, N>
1549where
1550    Pool::Transaction:
1551        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1552{
1553    type Output = ();
1554
1555    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1556        let start = Instant::now();
1557        let mut poll_durations = TxManagerPollDurations::default();
1558
1559        let this = self.get_mut();
1560
1561        // All streams are polled until their corresponding budget is exhausted, then we manually
1562        // yield back control to tokio. See `NetworkManager` for more context on the design
1563        // pattern.
1564
1565        // Advance network/peer related events (update peers map).
1566        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1567            poll_durations.acc_network_events,
1568            "net::tx",
1569            "Network events stream",
1570            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1571            this.network_events.poll_next_unpin(cx),
1572            |event| this.on_network_event(event)
1573        );
1574
1575        // Advances new __pending__ transactions, transactions that were successfully inserted into
1576        // pending set in pool (are valid), and propagates them (inform peers which
1577        // transactions we have seen).
1578        //
1579        // We try to drain this to batch the transactions in a single message.
1580        //
1581        // We don't expect this buffer to be large, since only pending transactions are
1582        // emitted here.
1583        let mut new_txs = Vec::new();
1584        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1585            cx,
1586            &mut new_txs,
1587            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1588        ) {
1589            Poll::Ready(count) => {
1590                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1591                    // we filled the entire buffer capacity and need to try again on the next poll
1592                    // immediately
1593                    true
1594                } else {
1595                    // try once more, because mostlikely the channel is now empty and the waker is
1596                    // registered if this is pending, if we filled additional hashes, we poll again
1597                    // on the next iteration
1598                    let limit =
1599                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1600                            new_txs.len();
1601                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1602                }
1603            }
1604            Poll::Pending => false,
1605        };
1606        if !new_txs.is_empty() {
1607            this.on_new_pending_transactions(new_txs);
1608        }
1609
1610        // Advance incoming transaction events (stream new txns/announcements from
1611        // network manager and queue for import to pool/fetch txns).
1612        //
1613        // This will potentially remove hashes from hashes pending fetch, it the event
1614        // is an announcement (if same hashes are announced that didn't fit into a
1615        // previous request).
1616        //
1617        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1618        // (128 KiB / 10 bytes > 13k transactions).
1619        //
1620        // If this is an event with `Transactions` message, since transactions aren't
1621        // validated until they are inserted into the pool, this can potentially queue
1622        // >13k transactions for insertion to pool. More if the message size is bigger
1623        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1624        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1625            poll_durations.acc_tx_events,
1626            "net::tx",
1627            "Network transaction events stream",
1628            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1629            this.transaction_events.poll_next_unpin(cx),
1630            |event| this.on_network_tx_event(event),
1631        );
1632
1633        // Advance inflight fetch requests (flush transaction fetcher and queue for
1634        // import to pool).
1635        //
1636        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1637        // (2 MiB / 10 bytes > 200k transactions).
1638        //
1639        // Since transactions aren't validated until they are inserted into the pool,
1640        // this can potentially queue >200k transactions for insertion to pool. More
1641        // if the message size is bigger than the soft limit on a `PooledTransactions`
1642        // response which is 2 MiB.
1643        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1644            poll_durations.acc_fetch_events,
1645            "net::tx",
1646            "Transaction fetch events stream",
1647            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1648            this.transaction_fetcher.poll_next_unpin(cx),
1649            |event| this.on_fetch_event(event),
1650        );
1651
1652        // Advance pool imports (flush txns to pool).
1653        //
1654        // Note, this is done in batches. A batch is filled from one `Transactions`
1655        // broadcast messages or one `PooledTransactions` response at a time. The
1656        // minimum batch size is 1 transaction (and might often be the case with blob
1657        // transactions).
1658        //
1659        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1660        // (2 MiB / 10 bytes > 200k transactions).
1661        //
1662        // Since transactions aren't validated until they are inserted into the pool,
1663        // this can potentially validate >200k transactions. More if the message size
1664        // is bigger than the soft limit on a `PooledTransactions` response which is
1665        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1666        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1667            poll_durations.acc_pending_imports,
1668            "net::tx",
1669            "Batched pool imports stream",
1670            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1671            this.pool_imports.poll_next_unpin(cx),
1672            |batch_results| this.on_batch_import_result(batch_results)
1673        );
1674
1675        // Tries to drain hashes pending fetch cache if the tx manager currently has
1676        // capacity for this (fetch txns).
1677        //
1678        // Sends at most one request.
1679        duration_metered_exec!(
1680            {
1681                if this.has_capacity_for_fetching_pending_hashes() &&
1682                    this.on_fetch_hashes_pending_fetch()
1683                {
1684                    maybe_more_tx_fetch_events = true;
1685                }
1686            },
1687            poll_durations.acc_pending_fetch
1688        );
1689
1690        // Advance commands (propagate/fetch/serve txns).
1691        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1692            poll_durations.acc_cmds,
1693            "net::tx",
1694            "Commands channel",
1695            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1696            this.command_rx.poll_next_unpin(cx),
1697            |cmd| this.on_command(cmd)
1698        );
1699
1700        this.transaction_fetcher.update_metrics();
1701
1702        // all channels are fully drained and import futures pending
1703        if maybe_more_network_events ||
1704            maybe_more_commands ||
1705            maybe_more_tx_events ||
1706            maybe_more_tx_fetch_events ||
1707            maybe_more_pool_imports ||
1708            maybe_more_pending_txns
1709        {
1710            // make sure we're woken up again
1711            cx.waker().wake_by_ref();
1712            return Poll::Pending
1713        }
1714
1715        this.update_poll_metrics(start, poll_durations);
1716
1717        Poll::Pending
1718    }
1719}
1720
1721/// Represents the different modes of transaction propagation.
1722///
1723/// This enum is used to determine how transactions are propagated to peers in the network.
1724#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1725enum PropagationMode {
1726    /// Default propagation mode.
1727    ///
1728    /// Transactions are only sent to peers that haven't seen them yet.
1729    Basic,
1730    /// Forced propagation mode.
1731    ///
1732    /// Transactions are sent to all peers regardless of whether they have been sent or received
1733    /// before.
1734    Forced,
1735}
1736
1737impl PropagationMode {
1738    /// Returns `true` if the propagation kind is `Forced`.
1739    const fn is_forced(self) -> bool {
1740        matches!(self, Self::Forced)
1741    }
1742}
1743
1744/// A transaction that's about to be propagated to multiple peers.
1745#[derive(Debug, Clone)]
1746struct PropagateTransaction<T = TransactionSigned> {
1747    size: usize,
1748    transaction: Arc<T>,
1749}
1750
1751impl<T: SignedTransaction> PropagateTransaction<T> {
1752    /// Create a new instance from a transaction.
1753    pub fn new(transaction: T) -> Self {
1754        let size = transaction.length();
1755        Self { size, transaction: Arc::new(transaction) }
1756    }
1757
1758    /// Create a new instance from a pooled transaction
1759    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1760    where
1761        P: PoolTransaction<Consensus = T>,
1762    {
1763        let size = tx.encoded_length();
1764        let transaction = tx.transaction.clone_into_consensus();
1765        let transaction = Arc::new(transaction.into_inner());
1766        Self { size, transaction }
1767    }
1768
1769    fn tx_hash(&self) -> &TxHash {
1770        self.transaction.tx_hash()
1771    }
1772}
1773
1774/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1775/// should receive them in full or as pooled
1776#[derive(Debug, Clone)]
1777enum PropagateTransactionsBuilder<T> {
1778    Pooled(PooledTransactionsHashesBuilder),
1779    Full(FullTransactionsBuilder<T>),
1780}
1781
1782impl<T> PropagateTransactionsBuilder<T> {
1783    /// Create a builder for pooled transactions
1784    fn pooled(version: EthVersion) -> Self {
1785        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1786    }
1787
1788    /// Create a builder that sends transactions in full and records transactions that don't fit.
1789    fn full(version: EthVersion) -> Self {
1790        Self::Full(FullTransactionsBuilder::new(version))
1791    }
1792
1793    /// Returns true if no transactions are recorded.
1794    fn is_empty(&self) -> bool {
1795        match self {
1796            Self::Pooled(builder) => builder.is_empty(),
1797            Self::Full(builder) => builder.is_empty(),
1798        }
1799    }
1800
1801    /// Consumes the type and returns the built messages that should be sent to the peer.
1802    fn build(self) -> PropagateTransactions<T> {
1803        match self {
1804            Self::Pooled(pooled) => {
1805                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1806            }
1807            Self::Full(full) => full.build(),
1808        }
1809    }
1810}
1811
1812impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1813    /// Appends all transactions
1814    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1815        for tx in txs {
1816            self.push(tx);
1817        }
1818    }
1819
1820    /// Appends a transaction to the list.
1821    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1822        match self {
1823            Self::Pooled(builder) => builder.push(transaction),
1824            Self::Full(builder) => builder.push(transaction),
1825        }
1826    }
1827}
1828
1829/// Represents how the transactions should be sent to a peer if any.
1830struct PropagateTransactions<T> {
1831    /// The pooled transaction hashes to send.
1832    pooled: Option<NewPooledTransactionHashes>,
1833    /// The transactions to send in full.
1834    full: Option<Vec<Arc<T>>>,
1835}
1836
1837/// Helper type for constructing the full transaction message that enforces the
1838/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1839/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1840/// broadcasted in full.
1841#[derive(Debug, Clone)]
1842struct FullTransactionsBuilder<T> {
1843    /// The soft limit to enforce for a single broadcast message of full transactions.
1844    total_size: usize,
1845    /// All transactions to be broadcasted.
1846    transactions: Vec<Arc<T>>,
1847    /// Transactions that didn't fit into the broadcast message
1848    pooled: PooledTransactionsHashesBuilder,
1849}
1850
1851impl<T> FullTransactionsBuilder<T> {
1852    /// Create a builder for the negotiated version of the peer's session
1853    fn new(version: EthVersion) -> Self {
1854        Self {
1855            total_size: 0,
1856            pooled: PooledTransactionsHashesBuilder::new(version),
1857            transactions: vec![],
1858        }
1859    }
1860
1861    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1862    fn is_empty(&self) -> bool {
1863        self.transactions.is_empty() && self.pooled.is_empty()
1864    }
1865
1866    /// Returns the messages that should be propagated to the peer.
1867    fn build(self) -> PropagateTransactions<T> {
1868        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1869        let full = Some(self.transactions).filter(|full| !full.is_empty());
1870        PropagateTransactions { pooled, full }
1871    }
1872}
1873
1874impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1875    /// Appends all transactions.
1876    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1877        for tx in txs {
1878            self.push(&tx)
1879        }
1880    }
1881
1882    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1883    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1884    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1885    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1886    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1887    ///
1888    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1889    /// to list of pooled transactions, (e.g. 4844 transactions).
1890    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1891    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1892        // Do not send full 4844 transaction hashes to peers.
1893        //
1894        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1895        //  Instead, those transactions are only announced using
1896        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1897        //  via `GetPooledTransactions`.
1898        //
1899        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1900        if !transaction.transaction.is_broadcastable_in_full() {
1901            self.pooled.push(transaction);
1902            return
1903        }
1904
1905        let new_size = self.total_size + transaction.size;
1906        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1907            self.total_size > 0
1908        {
1909            // transaction does not fit into the message
1910            self.pooled.push(transaction);
1911            return
1912        }
1913
1914        self.total_size = new_size;
1915        self.transactions.push(Arc::clone(&transaction.transaction));
1916    }
1917}
1918
1919/// A helper type to create the pooled transactions message based on the negotiated version of the
1920/// session with the peer
1921#[derive(Debug, Clone)]
1922enum PooledTransactionsHashesBuilder {
1923    Eth66(NewPooledTransactionHashes66),
1924    Eth68(NewPooledTransactionHashes68),
1925}
1926
1927// === impl PooledTransactionsHashesBuilder ===
1928
1929impl PooledTransactionsHashesBuilder {
1930    /// Push a transaction from the pool to the list.
1931    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1932        match self {
1933            Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1934            Self::Eth68(msg) => {
1935                msg.hashes.push(*pooled_tx.hash());
1936                msg.sizes.push(pooled_tx.encoded_length());
1937                msg.types.push(pooled_tx.transaction.ty());
1938            }
1939        }
1940    }
1941
1942    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1943    fn is_empty(&self) -> bool {
1944        match self {
1945            Self::Eth66(hashes) => hashes.is_empty(),
1946            Self::Eth68(hashes) => hashes.is_empty(),
1947        }
1948    }
1949
1950    /// Returns the number of transactions in the builder.
1951    fn len(&self) -> usize {
1952        match self {
1953            Self::Eth66(hashes) => hashes.len(),
1954            Self::Eth68(hashes) => hashes.len(),
1955        }
1956    }
1957
1958    /// Appends all hashes
1959    fn extend<T: SignedTransaction>(
1960        &mut self,
1961        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1962    ) {
1963        for tx in txs {
1964            self.push(&tx);
1965        }
1966    }
1967
1968    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1969        match self {
1970            Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1971            Self::Eth68(msg) => {
1972                msg.hashes.push(*tx.tx_hash());
1973                msg.sizes.push(tx.size);
1974                msg.types.push(tx.transaction.ty());
1975            }
1976        }
1977    }
1978
1979    /// Create a builder for the negotiated version of the peer's session
1980    fn new(version: EthVersion) -> Self {
1981        match version {
1982            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1983            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1984                Self::Eth68(Default::default())
1985            }
1986        }
1987    }
1988
1989    fn build(self) -> NewPooledTransactionHashes {
1990        match self {
1991            Self::Eth66(mut msg) => {
1992                msg.shrink_to_fit();
1993                msg.into()
1994            }
1995            Self::Eth68(mut msg) => {
1996                msg.shrink_to_fit();
1997                msg.into()
1998            }
1999        }
2000    }
2001}
2002
2003/// How we received the transactions.
2004enum TransactionSource {
2005    /// Transactions were broadcast to us via [`Transactions`] message.
2006    Broadcast,
2007    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
2008    Response,
2009}
2010
2011// === impl TransactionSource ===
2012
2013impl TransactionSource {
2014    /// Whether the transaction were sent as broadcast.
2015    const fn is_broadcast(&self) -> bool {
2016        matches!(self, Self::Broadcast)
2017    }
2018}
2019
2020/// Tracks a single peer in the context of [`TransactionsManager`].
2021#[derive(Debug)]
2022pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2023    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
2024    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
2025    /// the peer.
2026    seen_transactions: LruCache<TxHash>,
2027    /// A communication channel directly to the peer's session task.
2028    request_tx: PeerRequestSender<PeerRequest<N>>,
2029    /// negotiated version of the session.
2030    version: EthVersion,
2031    /// The peer's client version.
2032    client_version: Arc<str>,
2033    /// The kind of peer.
2034    peer_kind: PeerKind,
2035}
2036
2037impl<N: NetworkPrimitives> PeerMetadata<N> {
2038    /// Returns a new instance of [`PeerMetadata`].
2039    pub fn new(
2040        request_tx: PeerRequestSender<PeerRequest<N>>,
2041        version: EthVersion,
2042        client_version: Arc<str>,
2043        max_transactions_seen_by_peer: u32,
2044        peer_kind: PeerKind,
2045    ) -> Self {
2046        Self {
2047            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
2048            request_tx,
2049            version,
2050            client_version,
2051            peer_kind,
2052        }
2053    }
2054
2055    /// Returns a reference to the peer's request sender channel.
2056    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2057        &self.request_tx
2058    }
2059
2060    /// Returns a mutable reference to the seen transactions LRU cache.
2061    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
2062        &mut self.seen_transactions
2063    }
2064
2065    /// Returns the negotiated `EthVersion` of the session.
2066    pub const fn version(&self) -> EthVersion {
2067        self.version
2068    }
2069
2070    /// Returns a reference to the peer's client version string.
2071    pub fn client_version(&self) -> &str {
2072        &self.client_version
2073    }
2074
2075    /// Returns the peer's kind.
2076    pub const fn peer_kind(&self) -> PeerKind {
2077        self.peer_kind
2078    }
2079}
2080
2081/// Commands to send to the [`TransactionsManager`]
2082#[derive(Debug)]
2083enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2084    /// Propagate a transaction hash to the network.
2085    PropagateHash(B256),
2086    /// Propagate transaction hashes to a specific peer.
2087    PropagateHashesTo(Vec<B256>, PeerId),
2088    /// Request the list of active peer IDs from the [`TransactionsManager`].
2089    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2090    /// Propagate a collection of full transactions to a specific peer.
2091    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2092    /// Propagate a collection of hashes to all peers.
2093    PropagateTransactions(Vec<TxHash>),
2094    /// Propagate a collection of broadcastable transactions in full to all peers.
2095    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2096    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2097    GetTransactionHashes {
2098        peers: Vec<PeerId>,
2099        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2100    },
2101    /// Requests a clone of the sender channel to the peer.
2102    GetPeerSender {
2103        peer_id: PeerId,
2104        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2105    },
2106}
2107
2108/// All events related to transactions emitted by the network.
2109#[derive(Debug)]
2110pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2111    /// Represents the event of receiving a list of transactions from a peer.
2112    ///
2113    /// This indicates transactions that were broadcasted to us from the peer.
2114    IncomingTransactions {
2115        /// The ID of the peer from which the transactions were received.
2116        peer_id: PeerId,
2117        /// The received transactions.
2118        msg: Transactions<N::BroadcastedTransaction>,
2119    },
2120    /// Represents the event of receiving a list of transaction hashes from a peer.
2121    IncomingPooledTransactionHashes {
2122        /// The ID of the peer from which the transaction hashes were received.
2123        peer_id: PeerId,
2124        /// The received new pooled transaction hashes.
2125        msg: NewPooledTransactionHashes,
2126    },
2127    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2128    GetPooledTransactions {
2129        /// The ID of the peer from which the request was received.
2130        peer_id: PeerId,
2131        /// The received `GetPooledTransactions` request.
2132        request: GetPooledTransactions,
2133        /// The sender for responding to the request with a result of `PooledTransactions`.
2134        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2135    },
2136    /// Represents the event of receiving a `GetTransactionsHandle` request.
2137    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2138}
2139
2140/// Tracks stats about the [`TransactionsManager`].
2141#[derive(Debug)]
2142pub struct PendingPoolImportsInfo {
2143    /// Number of transactions about to be inserted into the pool.
2144    pending_pool_imports: Arc<AtomicUsize>,
2145    /// Max number of transactions allowed to be imported concurrently.
2146    max_pending_pool_imports: usize,
2147}
2148
2149impl PendingPoolImportsInfo {
2150    /// Returns a new [`PendingPoolImportsInfo`].
2151    pub fn new(max_pending_pool_imports: usize) -> Self {
2152        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2153    }
2154
2155    /// Returns `true` if the number of pool imports is under a given tolerated max.
2156    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2157        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2158    }
2159}
2160
2161impl Default for PendingPoolImportsInfo {
2162    fn default() -> Self {
2163        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2164    }
2165}
2166
2167#[derive(Debug, Default)]
2168struct TxManagerPollDurations {
2169    acc_network_events: Duration,
2170    acc_pending_imports: Duration,
2171    acc_tx_events: Duration,
2172    acc_imported_txns: Duration,
2173    acc_fetch_events: Duration,
2174    acc_pending_fetch: Duration,
2175    acc_cmds: Duration,
2176}
2177
2178#[cfg(test)]
2179mod tests {
2180    use super::*;
2181    use crate::{
2182        test_utils::{
2183            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2184            Testnet,
2185        },
2186        transactions::config::RelaxedEthAnnouncementFilter,
2187        NetworkConfigBuilder, NetworkManager,
2188    };
2189    use alloy_consensus::{TxEip1559, TxLegacy};
2190    use alloy_eips::eip4844::BlobTransactionValidationError;
2191    use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2192    use alloy_rlp::Decodable;
2193    use futures::FutureExt;
2194    use reth_chainspec::MIN_TRANSACTION_GAS;
2195    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2196    use reth_network_api::{NetworkInfo, PeerKind};
2197    use reth_network_p2p::{
2198        error::{RequestError, RequestResult},
2199        sync::{NetworkSyncUpdater, SyncState},
2200    };
2201    use reth_storage_api::noop::NoopProvider;
2202    use reth_tasks::Runtime;
2203    use reth_transaction_pool::{
2204        error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2205        test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2206    };
2207    use secp256k1::SecretKey;
2208    use std::{
2209        collections::HashSet,
2210        future::poll_fn,
2211        net::{IpAddr, Ipv4Addr, SocketAddr},
2212        str::FromStr,
2213    };
2214    use tracing::error;
2215
2216    #[tokio::test(flavor = "multi_thread")]
2217    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2218        reth_tracing::init_test_tracing();
2219        let net = Testnet::create(3).await;
2220
2221        let mut handles = net.handles();
2222        let handle0 = handles.next().unwrap();
2223        let handle1 = handles.next().unwrap();
2224
2225        drop(handles);
2226        let handle = net.spawn();
2227
2228        let listener0 = handle0.event_listener();
2229        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2230        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2231
2232        let client = NoopProvider::default();
2233        let pool = testing_pool();
2234        let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2235            .disable_discovery()
2236            .listener_port(0)
2237            .build(client);
2238        let transactions_manager_config = config.transactions_manager_config.clone();
2239        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2240            .await
2241            .unwrap()
2242            .into_builder()
2243            .transactions(pool.clone(), transactions_manager_config)
2244            .split_with_handle();
2245
2246        tokio::task::spawn(network);
2247
2248        // go to syncing (pipeline sync)
2249        network_handle.update_sync_state(SyncState::Syncing);
2250        assert!(NetworkInfo::is_syncing(&network_handle));
2251        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2252
2253        // wait for all initiator connections
2254        let mut established = listener0.take(2);
2255        while let Some(ev) = established.next().await {
2256            match ev {
2257                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2258                    // to insert a new peer in transactions peerset
2259                    transactions
2260                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2261                }
2262                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2263                ev => {
2264                    error!("unexpected event {ev:?}")
2265                }
2266            }
2267        }
2268        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2269        let input = hex!(
2270            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2271        );
2272        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2273        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2274            peer_id: *handle1.peer_id(),
2275            msg: Transactions(vec![signed_tx.clone()]),
2276        });
2277        poll_fn(|cx| {
2278            let _ = transactions.poll_unpin(cx);
2279            Poll::Ready(())
2280        })
2281        .await;
2282        assert!(pool.is_empty());
2283        handle.terminate().await;
2284    }
2285
2286    #[tokio::test(flavor = "multi_thread")]
2287    async fn test_tx_broadcasts_through_two_syncs() {
2288        reth_tracing::init_test_tracing();
2289        let net = Testnet::create(3).await;
2290
2291        let mut handles = net.handles();
2292        let handle0 = handles.next().unwrap();
2293        let handle1 = handles.next().unwrap();
2294
2295        drop(handles);
2296        let handle = net.spawn();
2297
2298        let listener0 = handle0.event_listener();
2299        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2300        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2301
2302        let client = NoopProvider::default();
2303        let pool = testing_pool();
2304        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2305            .disable_discovery()
2306            .listener_port(0)
2307            .build(client);
2308        let transactions_manager_config = config.transactions_manager_config.clone();
2309        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2310            .await
2311            .unwrap()
2312            .into_builder()
2313            .transactions(pool.clone(), transactions_manager_config)
2314            .split_with_handle();
2315
2316        tokio::task::spawn(network);
2317
2318        // go to syncing (pipeline sync) to idle and then to syncing (live)
2319        network_handle.update_sync_state(SyncState::Syncing);
2320        assert!(NetworkInfo::is_syncing(&network_handle));
2321        network_handle.update_sync_state(SyncState::Idle);
2322        assert!(!NetworkInfo::is_syncing(&network_handle));
2323        network_handle.update_sync_state(SyncState::Syncing);
2324        assert!(NetworkInfo::is_syncing(&network_handle));
2325
2326        // wait for all initiator connections
2327        let mut established = listener0.take(2);
2328        while let Some(ev) = established.next().await {
2329            match ev {
2330                NetworkEvent::ActivePeerSession { .. } |
2331                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2332                    // to insert a new peer in transactions peerset
2333                    transactions.on_network_event(ev);
2334                }
2335                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2336                _ => {
2337                    error!("unexpected event {ev:?}")
2338                }
2339            }
2340        }
2341        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2342        let input = hex!(
2343            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2344        );
2345        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2346        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2347            peer_id: *handle1.peer_id(),
2348            msg: Transactions(vec![signed_tx.clone()]),
2349        });
2350        poll_fn(|cx| {
2351            let _ = transactions.poll_unpin(cx);
2352            Poll::Ready(())
2353        })
2354        .await;
2355        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2356        assert!(NetworkInfo::is_syncing(&network_handle));
2357        assert!(!pool.is_empty());
2358        handle.terminate().await;
2359    }
2360
2361    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2362    // event and is able to retrieve the corresponding transactions.
2363    #[tokio::test(flavor = "multi_thread")]
2364    async fn test_handle_incoming_transactions_hashes() {
2365        reth_tracing::init_test_tracing();
2366
2367        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2368        let client = NoopProvider::default();
2369
2370        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2371            // let OS choose port
2372            .listener_port(0)
2373            .disable_discovery()
2374            .build(client);
2375
2376        let pool = testing_pool();
2377
2378        let transactions_manager_config = config.transactions_manager_config.clone();
2379        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2380            .await
2381            .unwrap()
2382            .into_builder()
2383            .transactions(pool.clone(), transactions_manager_config)
2384            .split_with_handle();
2385
2386        let peer_id_1 = PeerId::new([1; 64]);
2387        let eth_version = EthVersion::Eth66;
2388
2389        let txs = vec![TransactionSigned::new_unhashed(
2390            Transaction::Legacy(TxLegacy {
2391                chain_id: Some(4),
2392                nonce: 15u64,
2393                gas_price: 2200000000,
2394                gas_limit: 34811,
2395                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2396                value: U256::from(1234u64),
2397                input: Default::default(),
2398            }),
2399            Signature::new(
2400                U256::from_str(
2401                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2402                )
2403                .unwrap(),
2404                U256::from_str(
2405                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2406                )
2407                .unwrap(),
2408                true,
2409            ),
2410        )];
2411
2412        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2413
2414        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2415        tx_manager.peers.insert(peer_id_1, peer_1);
2416
2417        assert!(pool.is_empty());
2418
2419        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2420            peer_id: peer_id_1,
2421            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2422                txs_hashes.clone(),
2423            )),
2424        });
2425
2426        // mock session of peer_1 receives request
2427        let req = to_mock_session_rx
2428            .recv()
2429            .await
2430            .expect("peer_1 session should receive request with buffered hashes");
2431        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2432        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2433
2434        let message: Vec<PooledTransactionVariant> = txs
2435            .into_iter()
2436            .map(|tx| {
2437                PooledTransactionVariant::try_from(tx)
2438                    .expect("Failed to convert MockTransaction to PooledTransaction")
2439            })
2440            .collect();
2441
2442        // return the transactions corresponding to the transaction hashes.
2443        response
2444            .send(Ok(PooledTransactions(message)))
2445            .expect("should send peer_1 response to tx manager");
2446
2447        // adance the transaction manager future
2448        poll_fn(|cx| {
2449            let _ = tx_manager.poll_unpin(cx);
2450            Poll::Ready(())
2451        })
2452        .await;
2453
2454        // ensure that the transactions corresponding to the transaction hashes have been
2455        // successfully retrieved and stored in the Pool.
2456        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2457    }
2458
2459    #[tokio::test(flavor = "multi_thread")]
2460    async fn test_handle_incoming_transactions() {
2461        reth_tracing::init_test_tracing();
2462        let net = Testnet::create(3).await;
2463
2464        let mut handles = net.handles();
2465        let handle0 = handles.next().unwrap();
2466        let handle1 = handles.next().unwrap();
2467
2468        drop(handles);
2469        let handle = net.spawn();
2470
2471        let listener0 = handle0.event_listener();
2472
2473        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2474        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2475
2476        let client = NoopProvider::default();
2477        let pool = testing_pool();
2478        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2479            .disable_discovery()
2480            .listener_port(0)
2481            .build(client);
2482        let transactions_manager_config = config.transactions_manager_config.clone();
2483        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2484            .await
2485            .unwrap()
2486            .into_builder()
2487            .transactions(pool.clone(), transactions_manager_config)
2488            .split_with_handle();
2489        tokio::task::spawn(network);
2490
2491        network_handle.update_sync_state(SyncState::Idle);
2492
2493        assert!(!NetworkInfo::is_syncing(&network_handle));
2494
2495        // wait for all initiator connections
2496        let mut established = listener0.take(2);
2497        while let Some(ev) = established.next().await {
2498            match ev {
2499                NetworkEvent::ActivePeerSession { .. } |
2500                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2501                    // to insert a new peer in transactions peerset
2502                    transactions.on_network_event(ev);
2503                }
2504                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2505                ev => {
2506                    error!("unexpected event {ev:?}")
2507                }
2508            }
2509        }
2510        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2511        let input = hex!(
2512            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2513        );
2514        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2515        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2516            peer_id: *handle1.peer_id(),
2517            msg: Transactions(vec![signed_tx.clone()]),
2518        });
2519        assert!(transactions
2520            .transactions_by_peers
2521            .get(signed_tx.tx_hash())
2522            .unwrap()
2523            .contains(handle1.peer_id()));
2524
2525        // advance the transaction manager future
2526        poll_fn(|cx| {
2527            let _ = transactions.poll_unpin(cx);
2528            Poll::Ready(())
2529        })
2530        .await;
2531
2532        assert!(!pool.is_empty());
2533        assert!(pool.get(signed_tx.tx_hash()).is_some());
2534        handle.terminate().await;
2535    }
2536
2537    #[tokio::test(flavor = "multi_thread")]
2538    async fn test_session_closed_cleans_transaction_peer_state() {
2539        let (mut tx_manager, _network) = new_tx_manager().await;
2540        let peer_id = PeerId::new([1; 64]);
2541        let fallback_peer = PeerId::new([2; 64]);
2542        let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2543        let hash_shared = B256::from_slice(&[1; 32]);
2544
2545        tx_manager.peers.insert(peer_id, peer);
2546        buffer_hash_to_tx_fetcher(
2547            &mut tx_manager.transaction_fetcher,
2548            hash_shared,
2549            peer_id,
2550            0,
2551            None,
2552        );
2553        buffer_hash_to_tx_fetcher(
2554            &mut tx_manager.transaction_fetcher,
2555            hash_shared,
2556            fallback_peer,
2557            0,
2558            None,
2559        );
2560        tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2561
2562        tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2563            peer_id,
2564            reason: None,
2565        }));
2566
2567        // peer removed from peers map and active_peers
2568        assert!(!tx_manager.peers.contains_key(&peer_id));
2569        assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2570        // fallback peer is still available for the hash
2571        assert_eq!(
2572            tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2573            Some(&fallback_peer)
2574        );
2575    }
2576
2577    #[tokio::test(flavor = "multi_thread")]
2578    async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2579        let (mut tx_manager, _network) = new_tx_manager().await;
2580        let peer_id = PeerId::new([1; 64]);
2581        let hash = B256::from_slice(&[1; 32]);
2582
2583        tx_manager.network.update_sync_state(SyncState::Idle);
2584        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2585
2586        let err = PoolError::new(
2587            hash,
2588            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2589                BlobTransactionValidationError::InvalidProof,
2590            )),
2591        );
2592
2593        tx_manager.on_bad_import(err);
2594
2595        assert!(!tx_manager.bad_imports.contains(&hash));
2596    }
2597
2598    #[tokio::test(flavor = "multi_thread")]
2599    async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2600        let (mut tx_manager, _network) = new_tx_manager().await;
2601        let peer_id = PeerId::new([1; 64]);
2602        let hash = B256::from_slice(&[3; 32]);
2603
2604        tx_manager.network.update_sync_state(SyncState::Idle);
2605        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2606
2607        let err = PoolError::new(
2608            hash,
2609            InvalidPoolTransactionError::Eip4844(
2610                Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2611            ),
2612        );
2613
2614        tx_manager.on_bad_import(err);
2615
2616        assert!(!tx_manager.bad_imports.contains(&hash));
2617    }
2618
2619    #[tokio::test(flavor = "multi_thread")]
2620    async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2621        let (mut tx_manager, _network) = new_tx_manager().await;
2622        let peer_id = PeerId::new([1; 64]);
2623        let hash = B256::from_slice(&[2; 32]);
2624
2625        tx_manager.network.update_sync_state(SyncState::Idle);
2626        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2627
2628        let err = PoolError::new(
2629            hash,
2630            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2631        );
2632
2633        tx_manager.on_bad_import(err);
2634
2635        assert!(tx_manager.bad_imports.contains(&hash));
2636    }
2637
2638    #[tokio::test(flavor = "multi_thread")]
2639    async fn test_on_get_pooled_transactions_network() {
2640        reth_tracing::init_test_tracing();
2641        let net = Testnet::create(2).await;
2642
2643        let mut handles = net.handles();
2644        let handle0 = handles.next().unwrap();
2645        let handle1 = handles.next().unwrap();
2646
2647        drop(handles);
2648        let handle = net.spawn();
2649
2650        let listener0 = handle0.event_listener();
2651
2652        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2653        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2654
2655        let client = NoopProvider::default();
2656        let pool = testing_pool();
2657        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2658            .disable_discovery()
2659            .listener_port(0)
2660            .build(client);
2661        let transactions_manager_config = config.transactions_manager_config.clone();
2662        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2663            .await
2664            .unwrap()
2665            .into_builder()
2666            .transactions(pool.clone(), transactions_manager_config)
2667            .split_with_handle();
2668        tokio::task::spawn(network);
2669
2670        network_handle.update_sync_state(SyncState::Idle);
2671
2672        assert!(!NetworkInfo::is_syncing(&network_handle));
2673
2674        // wait for all initiator connections
2675        let mut established = listener0.take(2);
2676        while let Some(ev) = established.next().await {
2677            match ev {
2678                NetworkEvent::ActivePeerSession { .. } |
2679                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2680                    transactions.on_network_event(ev);
2681                }
2682                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2683                ev => {
2684                    error!("unexpected event {ev:?}")
2685                }
2686            }
2687        }
2688        handle.terminate().await;
2689
2690        let tx = MockTransaction::eip1559();
2691        let _ = transactions
2692            .pool
2693            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2694            .await;
2695
2696        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2697
2698        let (send, receive) =
2699            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2700
2701        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2702            peer_id: *handle1.peer_id(),
2703            request,
2704            response: send,
2705        });
2706
2707        match receive.await.unwrap() {
2708            Ok(PooledTransactions(transactions)) => {
2709                assert_eq!(transactions.len(), 1);
2710            }
2711            Err(e) => {
2712                panic!("error: {e:?}");
2713            }
2714        }
2715    }
2716
2717    // Ensure that when the remote peer only returns part of the requested transactions, the
2718    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2719    // re-buffered.
2720    #[tokio::test]
2721    async fn test_partially_tx_response() {
2722        reth_tracing::init_test_tracing();
2723
2724        let mut tx_manager = new_tx_manager().await.0;
2725        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2726
2727        let peer_id_1 = PeerId::new([1; 64]);
2728        let eth_version = EthVersion::Eth66;
2729
2730        let txs = vec![
2731            TransactionSigned::new_unhashed(
2732                Transaction::Legacy(TxLegacy {
2733                    chain_id: Some(4),
2734                    nonce: 15u64,
2735                    gas_price: 2200000000,
2736                    gas_limit: 34811,
2737                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2738                    value: U256::from(1234u64),
2739                    input: Default::default(),
2740                }),
2741                Signature::new(
2742                    U256::from_str(
2743                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2744                    )
2745                    .unwrap(),
2746                    U256::from_str(
2747                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2748                    )
2749                    .unwrap(),
2750                    true,
2751                ),
2752            ),
2753            TransactionSigned::new_unhashed(
2754                Transaction::Eip1559(TxEip1559 {
2755                    chain_id: 4,
2756                    nonce: 26u64,
2757                    max_priority_fee_per_gas: 1500000000,
2758                    max_fee_per_gas: 1500000013,
2759                    gas_limit: MIN_TRANSACTION_GAS,
2760                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2761                    value: U256::from(3000000000000000000u64),
2762                    input: Default::default(),
2763                    access_list: Default::default(),
2764                }),
2765                Signature::new(
2766                    U256::from_str(
2767                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2768                    )
2769                    .unwrap(),
2770                    U256::from_str(
2771                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2772                    )
2773                    .unwrap(),
2774                    true,
2775                ),
2776            ),
2777        ];
2778
2779        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2780
2781        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2782        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2783        // fetch
2784        peer_1.seen_transactions.insert(txs_hashes[0]);
2785        peer_1.seen_transactions.insert(txs_hashes[1]);
2786        tx_manager.peers.insert(peer_id_1, peer_1);
2787
2788        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2789        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2790
2791        // peer_1 is idle
2792        assert!(tx_fetcher.is_idle(&peer_id_1));
2793        assert_eq!(tx_fetcher.active_peers.len(), 0);
2794
2795        // sends requests for buffered hashes to peer_1
2796        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2797
2798        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2799        // as long as request is in flight peer_1 is not idle
2800        assert!(!tx_fetcher.is_idle(&peer_id_1));
2801        assert_eq!(tx_fetcher.active_peers.len(), 1);
2802
2803        // mock session of peer_1 receives request
2804        let req = to_mock_session_rx
2805            .recv()
2806            .await
2807            .expect("peer_1 session should receive request with buffered hashes");
2808        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2809
2810        let message: Vec<PooledTransactionVariant> = txs
2811            .into_iter()
2812            .take(1)
2813            .map(|tx| {
2814                PooledTransactionVariant::try_from(tx)
2815                    .expect("Failed to convert MockTransaction to PooledTransaction")
2816            })
2817            .collect();
2818        // response partial request
2819        response
2820            .send(Ok(PooledTransactions(message)))
2821            .expect("should send peer_1 response to tx manager");
2822        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2823            unreachable!()
2824        };
2825
2826        // request has resolved, peer_1 is idle again
2827        assert!(tx_fetcher.is_idle(&peer_id));
2828        assert_eq!(tx_fetcher.active_peers.len(), 0);
2829        // failing peer_1's request buffers requested hashes for retry.
2830        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2831    }
2832
2833    #[tokio::test]
2834    async fn test_max_retries_tx_request() {
2835        reth_tracing::init_test_tracing();
2836
2837        let mut tx_manager = new_tx_manager().await.0;
2838        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2839
2840        let peer_id_1 = PeerId::new([1; 64]);
2841        let peer_id_2 = PeerId::new([2; 64]);
2842        let eth_version = EthVersion::Eth66;
2843        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2844
2845        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2846        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2847        // fetch
2848        peer_1.seen_transactions.insert(seen_hashes[0]);
2849        peer_1.seen_transactions.insert(seen_hashes[1]);
2850        tx_manager.peers.insert(peer_id_1, peer_1);
2851
2852        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2853        // for first retry in reverse order to make index 0 lru
2854        let retries = 1;
2855        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2856        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2857
2858        // peer_1 is idle
2859        assert!(tx_fetcher.is_idle(&peer_id_1));
2860        assert_eq!(tx_fetcher.active_peers.len(), 0);
2861
2862        // sends request for buffered hashes to peer_1
2863        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2864
2865        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2866
2867        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2868        // as long as request is in inflight peer_1 is not idle
2869        assert!(!tx_fetcher.is_idle(&peer_id_1));
2870        assert_eq!(tx_fetcher.active_peers.len(), 1);
2871
2872        // mock session of peer_1 receives request
2873        let req = to_mock_session_rx
2874            .recv()
2875            .await
2876            .expect("peer_1 session should receive request with buffered hashes");
2877        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2878        let GetPooledTransactions(hashes) = request;
2879
2880        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2881
2882        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2883
2884        // fail request to peer_1
2885        response
2886            .send(Err(RequestError::BadResponse))
2887            .expect("should send peer_1 response to tx manager");
2888        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2889            unreachable!()
2890        };
2891
2892        // request has resolved, peer_1 is idle again
2893        assert!(tx_fetcher.is_idle(&peer_id));
2894        assert_eq!(tx_fetcher.active_peers.len(), 0);
2895        // failing peer_1's request buffers requested hashes for retry
2896        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2897
2898        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2899        tx_manager.peers.insert(peer_id_2, peer_2);
2900
2901        // peer_2 announces same hashes as peer_1
2902        let msg =
2903            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2904        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2905
2906        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2907
2908        // peer_2 should be in active_peers.
2909        assert_eq!(tx_fetcher.active_peers.len(), 1);
2910
2911        // since hashes are already seen, no changes to length of unknown hashes
2912        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2913        // but hashes are taken out of buffer and packed into request to peer_2
2914        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2915
2916        // mock session of peer_2 receives request
2917        let req = to_mock_session_rx
2918            .recv()
2919            .await
2920            .expect("peer_2 session should receive request with buffered hashes");
2921        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2922
2923        // report failed request to tx manager
2924        response
2925            .send(Err(RequestError::BadResponse))
2926            .expect("should send peer_2 response to tx manager");
2927        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2928
2929        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2930        // for retry
2931        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2932        assert_eq!(tx_fetcher.active_peers.len(), 0);
2933    }
2934
2935    #[test]
2936    fn test_transaction_builder_empty() {
2937        let mut builder =
2938            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2939        assert!(builder.is_empty());
2940
2941        let mut factory = MockTransactionFactory::default();
2942        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2943        builder.push(&tx);
2944        assert!(!builder.is_empty());
2945
2946        let txs = builder.build();
2947        assert!(txs.full.is_none());
2948        let txs = txs.pooled.unwrap();
2949        assert_eq!(txs.len(), 1);
2950    }
2951
2952    #[test]
2953    fn test_transaction_builder_large() {
2954        let mut builder =
2955            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2956        assert!(builder.is_empty());
2957
2958        let mut factory = MockTransactionFactory::default();
2959        let mut tx = factory.create_eip1559();
2960        // create a transaction that still fits
2961        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2962        let tx = Arc::new(tx);
2963        let tx = PropagateTransaction::pool_tx(tx);
2964        builder.push(&tx);
2965        assert!(!builder.is_empty());
2966
2967        let txs = builder.clone().build();
2968        assert!(txs.pooled.is_none());
2969        let txs = txs.full.unwrap();
2970        assert_eq!(txs.len(), 1);
2971
2972        builder.push(&tx);
2973
2974        let txs = builder.clone().build();
2975        let pooled = txs.pooled.unwrap();
2976        assert_eq!(pooled.len(), 1);
2977        let txs = txs.full.unwrap();
2978        assert_eq!(txs.len(), 1);
2979    }
2980
2981    #[test]
2982    fn test_transaction_builder_eip4844() {
2983        let mut builder =
2984            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2985        assert!(builder.is_empty());
2986
2987        let mut factory = MockTransactionFactory::default();
2988        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2989        builder.push(&tx);
2990        assert!(!builder.is_empty());
2991
2992        let txs = builder.clone().build();
2993        assert!(txs.full.is_none());
2994        let txs = txs.pooled.unwrap();
2995        assert_eq!(txs.len(), 1);
2996
2997        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2998        builder.push(&tx);
2999
3000        let txs = builder.clone().build();
3001        let pooled = txs.pooled.unwrap();
3002        assert_eq!(pooled.len(), 1);
3003        let txs = txs.full.unwrap();
3004        assert_eq!(txs.len(), 1);
3005    }
3006
3007    #[tokio::test]
3008    async fn test_propagate_full() {
3009        reth_tracing::init_test_tracing();
3010
3011        let (mut tx_manager, network) = new_tx_manager().await;
3012        let peer_id = PeerId::random();
3013
3014        // ensure not syncing
3015        network.handle().update_sync_state(SyncState::Idle);
3016
3017        // mock a peer
3018        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3019
3020        let session_info = SessionInfo {
3021            peer_id,
3022            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3023            client_version: Arc::from(""),
3024            capabilities: Arc::new(vec![].into()),
3025            status: Arc::new(Default::default()),
3026            version: EthVersion::Eth68,
3027            peer_kind: PeerKind::Basic,
3028        };
3029        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3030        tx_manager
3031            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3032        let mut propagate = vec![];
3033        let mut factory = MockTransactionFactory::default();
3034        let eip1559_tx = Arc::new(factory.create_eip1559());
3035        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3036        let eip4844_tx = Arc::new(factory.create_eip4844());
3037        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3038
3039        let propagated =
3040            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3041        assert_eq!(propagated.len(), 2);
3042        let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3043        assert_eq!(prop_txs.len(), 1);
3044        assert!(prop_txs[0].is_full());
3045
3046        let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3047        assert_eq!(prop_txs.len(), 1);
3048        assert!(prop_txs[0].is_hash());
3049
3050        let peer = tx_manager.peers.get(&peer_id).unwrap();
3051        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3052        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3053        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3054
3055        // propagate again
3056        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3057        assert!(propagated.is_empty());
3058    }
3059
3060    #[tokio::test]
3061    async fn test_propagate_pending_txs_while_initially_syncing() {
3062        reth_tracing::init_test_tracing();
3063
3064        let (mut tx_manager, network) = new_tx_manager().await;
3065        let peer_id = PeerId::random();
3066
3067        // Keep the node in initial sync mode.
3068        network.handle().update_sync_state(SyncState::Syncing);
3069        assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3070
3071        // Add a peer so propagation has a destination.
3072        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3073        tx_manager.peers.insert(peer_id, peer);
3074
3075        let tx = MockTransaction::eip1559();
3076        tx_manager
3077            .pool
3078            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3079            .await
3080            .expect("transaction should be accepted into the pool");
3081
3082        tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3083
3084        let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3085        assert!(peer.seen_transactions.contains(tx.get_hash()));
3086    }
3087
3088    #[tokio::test]
3089    async fn test_relaxed_filter_ignores_unknown_tx_types() {
3090        reth_tracing::init_test_tracing();
3091
3092        let transactions_manager_config = TransactionsManagerConfig::default();
3093
3094        let propagation_policy = TransactionPropagationKind::default();
3095        let announcement_policy = RelaxedEthAnnouncementFilter::default();
3096
3097        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3098
3099        let pool = testing_pool();
3100        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3101        let client = NoopProvider::default();
3102
3103        let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3104            .listener_port(0)
3105            .disable_discovery()
3106            .build(client.clone());
3107
3108        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3109        let (to_tx_manager_tx, from_network_rx) =
3110            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
3111        network_manager.set_transactions(to_tx_manager_tx);
3112        let network_handle = network_manager.handle().clone();
3113        let network_service_handle = tokio::spawn(network_manager);
3114
3115        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3116            network_handle.clone(),
3117            pool.clone(),
3118            from_network_rx,
3119            transactions_manager_config,
3120            policy_bundle,
3121        );
3122
3123        let peer_id = PeerId::random();
3124        let eth_version = EthVersion::Eth68;
3125        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3126        tx_manager.peers.insert(peer_id, mock_peer_metadata);
3127
3128        let mut tx_factory = MockTransactionFactory::default();
3129
3130        let valid_known_tx = tx_factory.create_eip1559();
3131        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3132
3133        let known_tx_hash = *known_tx_signed.hash();
3134        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3135        let known_tx_size = known_tx_signed.encoded_length();
3136
3137        let unknown_tx_hash = B256::random();
3138        let unknown_tx_type_byte = 0xff_u8;
3139        let unknown_tx_size = 150;
3140
3141        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3142            types: vec![known_tx_type_byte, unknown_tx_type_byte],
3143            sizes: vec![known_tx_size, unknown_tx_size],
3144            hashes: vec![known_tx_hash, unknown_tx_hash],
3145        });
3146
3147        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3148
3149        poll_fn(|cx| {
3150            let _ = tx_manager.poll_unpin(cx);
3151            Poll::Ready(())
3152        })
3153        .await;
3154
3155        let mut requested_hashes_in_getpooled = HashSet::new();
3156        let mut unexpected_request_received = false;
3157
3158        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3159            .await
3160        {
3161            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3162                let GetPooledTransactions(hashes) = request;
3163                for hash in hashes {
3164                    requested_hashes_in_getpooled.insert(hash);
3165                }
3166                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3167            }
3168            Ok(Some(other_request)) => {
3169                tracing::error!(?other_request, "Received unexpected PeerRequest type");
3170                unexpected_request_received = true;
3171            }
3172            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3173            Err(_timeout_err) => {
3174                tracing::info!("Timeout: No GetPooledTransactions request received.")
3175            }
3176        }
3177
3178        assert!(
3179            requested_hashes_in_getpooled.contains(&known_tx_hash),
3180            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3181        );
3182        assert!(
3183            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3184            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3185        );
3186        assert!(
3187            !unexpected_request_received,
3188            "An unexpected P2P request was received by the mock peer."
3189        );
3190
3191        network_service_handle.abort();
3192    }
3193}