Skip to main content

reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5
6/// Aggregation on configurable parameters for [`TransactionsManager`].
7pub mod config;
8/// Default and spec'd bounds.
9pub mod constants;
10/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
11pub mod fetcher;
12/// Defines the traits for transaction-related policies.
13pub mod policy;
14
15pub use self::constants::{
16    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
17    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
18};
19use config::AnnouncementAcceptance;
20pub use config::{
21    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
22    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
23};
24use policy::NetworkPolicies;
25
26pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
27
28use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
29use crate::{
30    budget::{
31        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
32        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33    },
34    cache::LruCache,
35    duration_metered_exec, metered_poll_nested_stream_with_budget,
36    metrics::{AnnouncedTxTypesMetrics, TransactionsManagerMetrics},
37    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
38    NetworkHandle, TxTypesCounter,
39};
40use alloy_primitives::{
41    map::{B256Map, B256Set, FbBuildHasher},
42    TxHash, B256,
43};
44use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
45use futures::{stream::FuturesUnordered, Future, StreamExt};
46use reth_eth_wire::{
47    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
48    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
49    NewPooledTransactionHashes66, NewPooledTransactionHashes68, NewPooledTransactionHashes72,
50    PooledTransactions, RequestTxHashes, Transactions, ValidAnnouncementData,
51};
52use reth_ethereum_primitives::{TransactionSigned, TxType};
53use reth_metrics::common::mpsc::MemoryBoundedReceiver;
54use reth_network_api::{
55    events::{PeerEvent, SessionInfo},
56    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
57};
58use reth_network_p2p::{
59    error::{RequestError, RequestResult},
60    sync::SyncStateProvider,
61};
62use reth_network_peers::PeerId;
63use reth_network_types::ReputationChangeKind;
64use reth_primitives_traits::{InMemorySize, SignedTransaction};
65use reth_tokio_util::EventStream;
66use reth_transaction_pool::{
67    error::{PoolError, PoolResult},
68    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
69    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
70};
71use std::{
72    collections::{hash_map::Entry, HashMap, HashSet},
73    pin::Pin,
74    sync::{
75        atomic::{AtomicUsize, Ordering},
76        Arc,
77    },
78    task::{Context, Poll},
79    time::{Duration, Instant},
80};
81use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
82use tokio_stream::wrappers::UnboundedReceiverStream;
83use tracing::{debug, trace};
84
85/// The future for importing transactions into the pool.
86///
87/// Resolves with the result of each transaction import.
88pub type PoolImportFuture =
89    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
90
91/// Api to interact with [`TransactionsManager`] task.
92///
93/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
94/// with the [`TransactionsManager`] task once it is spawned.
95///
96/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
97/// known by a specific peer.
98#[derive(Debug, Clone)]
99pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
100    /// Command channel to the [`TransactionsManager`]
101    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
102}
103
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105    fn send(&self, cmd: TransactionsCommand<N>) {
106        let _ = self.manager_tx.send(cmd);
107    }
108
109    /// Fetch the [`PeerRequestSender`] for the given peer.
110    async fn peer_handle(
111        &self,
112        peer_id: PeerId,
113    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114        let (tx, rx) = oneshot::channel();
115        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116        rx.await
117    }
118
119    /// Manually propagate the transaction that belongs to the hash.
120    pub fn propagate(&self, hash: TxHash) {
121        self.send(TransactionsCommand::PropagateHash(hash))
122    }
123
124    /// Manually propagate the transaction hash to a specific peer.
125    ///
126    /// Note: this only propagates if the pool contains the transaction.
127    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128        self.propagate_hashes_to(Some(hash), peer)
129    }
130
131    /// Manually propagate the transaction hashes to a specific peer.
132    ///
133    /// Note: this only propagates the transactions that are known to the pool.
134    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135        let hashes = hash.into_iter().collect::<Vec<_>>();
136        if hashes.is_empty() {
137            return
138        }
139        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140    }
141
142    /// Request the active peer IDs from the [`TransactionsManager`].
143    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144        let (tx, rx) = oneshot::channel();
145        self.send(TransactionsCommand::GetActivePeers(tx));
146        rx.await
147    }
148
149    /// Manually propagate full transaction hashes to a specific peer.
150    ///
151    /// Do nothing if transactions are empty.
152    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153        if transactions.is_empty() {
154            return
155        }
156        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157    }
158
159    /// Manually propagate the given transaction hashes to all peers.
160    ///
161    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
162    /// full.
163    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164        if transactions.is_empty() {
165            return
166        }
167        self.send(TransactionsCommand::PropagateTransactions(transactions))
168    }
169
170    /// Manually propagate the given transactions to all peers.
171    ///
172    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
173    /// full.
174    pub fn broadcast_transactions(
175        &self,
176        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177    ) {
178        let transactions =
179            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180        if transactions.is_empty() {
181            return
182        }
183        self.send(TransactionsCommand::BroadcastTransactions(transactions))
184    }
185
186    /// Request the transaction hashes known by specific peers.
187    pub async fn get_transaction_hashes(
188        &self,
189        peers: Vec<PeerId>,
190    ) -> Result<HashMap<PeerId, B256Set>, RecvError> {
191        if peers.is_empty() {
192            return Ok(Default::default())
193        }
194        let (tx, rx) = oneshot::channel();
195        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196        rx.await
197    }
198
199    /// Request the transaction hashes known by a specific peer.
200    pub async fn get_peer_transaction_hashes(&self, peer: PeerId) -> Result<B256Set, RecvError> {
201        let res = self.get_transaction_hashes(vec![peer]).await?;
202        Ok(res.into_values().next().unwrap_or_default())
203    }
204
205    /// Requests the transactions directly from the given peer.
206    ///
207    /// Returns `None` if the peer is not connected.
208    ///
209    /// **Note**: this returns the response from the peer as received.
210    pub async fn get_pooled_transactions_from(
211        &self,
212        peer_id: PeerId,
213        hashes: Vec<B256>,
214    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
215        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
216
217        let (tx, rx) = oneshot::channel();
218        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
219        peer.try_send(request).ok();
220
221        rx.await?.map(|res| Some(res.0))
222    }
223}
224
225/// Manages transactions on top of the p2p network.
226///
227/// This can be spawned to another task and is supposed to be run as background service.
228/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
229/// interact with it.
230///
231/// The [`TransactionsManager`] is responsible for:
232///    - handling incoming eth messages for transactions.
233///    - serving transaction requests.
234///    - propagate transactions
235///
236/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
237///   - receives incoming network messages.
238///   - sends messages to dispatch (responses, propagate tx)
239///
240/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
241/// propagate new transactions over the network.
242///
243/// It can be configured with different policies for transaction propagation and announcement
244/// filtering. See [`NetworkPolicies`] for more details.
245///
246/// ## Network Transaction Processing
247///
248/// ### Message Types
249///
250/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
251/// - **`NewPooledTransactionHashes`**: Hash announcements
252///
253/// ### Peer Tracking
254///
255/// - Maintains per-peer transaction cache (default: 10,240 entries)
256/// - Prevents duplicate imports and enables efficient propagation
257///
258/// ### Bad Transaction Handling
259///
260/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
261/// Penalizes peers sending invalid transactions.
262///
263/// ### Import Management
264///
265/// Limits concurrent pool imports and backs off when approaching capacity.
266///
267/// ### Transaction Fetching
268///
269/// For announced transactions: filters known → queues unknown → fetches → imports
270///
271/// ### Propagation Rules
272///
273/// Based on: origin (Local/External/Private), peer capabilities, and network state.
274/// Disabled during initial sync.
275///
276/// ### Security
277///
278/// Rate limiting via reputation, bad transaction isolation, peer scoring.
279#[derive(Debug)]
280#[must_use = "Manager does nothing unless polled."]
281pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
282    /// Access to the transaction pool.
283    pool: Pool,
284    /// Network access.
285    network: NetworkHandle<N>,
286    /// Subscriptions to all network related events.
287    ///
288    /// From which we get all new incoming transaction related messages.
289    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
290    /// Transaction fetcher to handle inflight and missing transaction requests.
291    transaction_fetcher: TransactionFetcher<N>,
292    /// All currently pending transactions grouped by peers.
293    ///
294    /// This way we can track incoming transactions and prevent multiple pool imports for the same
295    /// transaction
296    transactions_by_peers: B256Map<HashSet<PeerId>>,
297    /// Transactions that are currently imported into the `Pool`.
298    ///
299    /// The import process includes:
300    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
301    ///    valid, or for 4844 transaction the blobs are valid. See also
302    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
303    /// - if the transaction is valid, it is added into the pool.
304    ///
305    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
306    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
307    /// receiver.
308    pool_imports: FuturesUnordered<PoolImportFuture>,
309    /// Stats on pending pool imports that help the node self-monitor.
310    pending_pool_imports_info: PendingPoolImportsInfo,
311    /// Bad imports.
312    bad_imports: LruCache<TxHash, FbBuildHasher<32>>,
313    /// All the connected peers.
314    peers: HashMap<PeerId, PeerMetadata<N>>,
315    /// Send half for the command channel.
316    ///
317    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
318    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
319    /// Incoming commands from [`TransactionsHandle`].
320    ///
321    /// This will only receive commands if a user manually sends a command to the manager through
322    /// the [`TransactionsHandle`] to interact with this type directly.
323    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
324    /// A stream that yields new __pending__ transactions.
325    ///
326    /// A transaction is considered __pending__ if it is executable on the current state of the
327    /// chain. In other words, this only yields transactions that satisfy all consensus
328    /// requirements, these include:
329    ///   - no nonce gaps
330    ///   - all dynamic fee requirements are (currently) met
331    ///   - account has enough balance to cover the transaction's gas
332    pending_transactions: mpsc::Receiver<TxHash>,
333    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
334    transaction_events: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
335    /// How the `TransactionsManager` is configured.
336    config: TransactionsManagerConfig,
337    /// Network Policies
338    policies: NetworkPolicies<N>,
339    /// `TransactionsManager` metrics
340    metrics: TransactionsManagerMetrics,
341    /// `AnnouncedTxTypes` metrics
342    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
343}
344
345impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
346    /// Sets up a new instance.
347    ///
348    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
349    pub fn new(
350        network: NetworkHandle<N>,
351        pool: Pool,
352        from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
353        transactions_manager_config: TransactionsManagerConfig,
354    ) -> Self {
355        Self::with_policy(
356            network,
357            pool,
358            from_network,
359            transactions_manager_config,
360            NetworkPolicies::new(
361                TransactionPropagationKind::default(),
362                StrictEthAnnouncementFilter::default(),
363            ),
364        )
365    }
366}
367
368impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
369    /// Sets up a new instance with given the settings.
370    ///
371    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
372    pub fn with_policy(
373        network: NetworkHandle<N>,
374        pool: Pool,
375        from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
376        transactions_manager_config: TransactionsManagerConfig,
377        policies: NetworkPolicies<N>,
378    ) -> Self {
379        let network_events = network.event_listener();
380
381        let (command_tx, command_rx) = mpsc::unbounded_channel();
382
383        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
384            &transactions_manager_config.transaction_fetcher_config,
385        );
386
387        // install a listener for new __pending__ transactions that are allowed to be propagated
388        // over the network
389        let pending = pool.pending_transactions_listener();
390        let pending_pool_imports_info =
391            PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
392        let metrics = TransactionsManagerMetrics::default();
393        metrics
394            .capacity_pending_pool_imports
395            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
396
397        Self {
398            pool,
399            network,
400            network_events,
401            transaction_fetcher,
402            transactions_by_peers: Default::default(),
403            pool_imports: Default::default(),
404            pending_pool_imports_info,
405            bad_imports: LruCache::with_hasher(DEFAULT_MAX_COUNT_BAD_IMPORTS, Default::default()),
406            peers: Default::default(),
407            command_tx,
408            command_rx: UnboundedReceiverStream::new(command_rx),
409            pending_transactions: pending,
410            transaction_events: from_network,
411            config: transactions_manager_config,
412            policies,
413            metrics,
414            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
415        }
416    }
417
418    /// Returns a new handle that can send commands to this type.
419    pub fn handle(&self) -> TransactionsHandle<N> {
420        TransactionsHandle { manager_tx: self.command_tx.clone() }
421    }
422
423    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
424    /// `false` if [`TransactionsManager`] is operating close to full capacity.
425    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
426        self.has_capacity_for_pending_pool_imports() &&
427            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
428    }
429
430    /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
431    fn has_capacity_for_pending_pool_imports(&self) -> bool {
432        self.remaining_pool_import_capacity() > 0
433    }
434
435    /// Returns the remaining capacity for pending pool imports.
436    fn remaining_pool_import_capacity(&self) -> usize {
437        self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
438            self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
439        )
440    }
441
442    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
443        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
444        self.metrics.reported_bad_transactions.increment(1);
445    }
446
447    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
448        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
449        self.network.reputation_change(peer_id, kind);
450    }
451
452    fn report_already_seen(&self, peer_id: PeerId) {
453        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
454        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
455    }
456
457    /// Handles a closed peer session, removing the peer from transaction-local tracking state.
458    fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
459        if let Some(mut peer) = self.peers.remove(peer_id) {
460            self.policies.propagation_policy_mut().on_session_closed(&mut peer);
461        }
462        self.transaction_fetcher.remove_peer(peer_id);
463    }
464
465    /// Clear the transaction
466    fn on_good_import(&mut self, hash: TxHash) {
467        self.transactions_by_peers.remove(&hash);
468    }
469
470    /// Handles a failed transaction import.
471    ///
472    /// Blob sidecar errors (e.g. invalid proof, missing sidecar) are penalized via
473    /// `report_peer_bad_transactions` but NOT cached in `bad_imports` — the transaction itself
474    /// may be valid when fetched from another peer with correct sidecar data.
475    ///
476    /// Other bad transactions are penalized and cached in `bad_imports` to avoid fetching or
477    /// importing them again.
478    ///
479    /// Errors that count as bad transactions are:
480    ///
481    /// - intrinsic gas too low
482    /// - exceeds gas limit
483    /// - gas uint overflow
484    /// - exceeds max init code size
485    /// - oversized data
486    /// - signer account has bytecode
487    /// - chain id mismatch
488    /// - old legacy chain id
489    /// - tx type not supported
490    ///
491    /// (and additionally for blobs txns...)
492    ///
493    /// - no blobs
494    /// - too many blobs
495    /// - invalid kzg proof
496    /// - kzg error
497    /// - not blob transaction (tx type mismatch)
498    /// - wrong versioned kzg commitment hash
499    fn on_bad_import(&mut self, err: PoolError) {
500        let peers = self.transactions_by_peers.remove(&err.hash);
501
502        if err.is_bad_blob_sidecar() {
503            // Blob sidecar errors: penalize but do NOT cache the hash as bad.
504            // The transaction may be valid — only the sidecar from this peer was wrong.
505            // Using regular penalties means repeated offenders still get disconnected.
506            if let Some(peers) = peers {
507                for peer_id in peers {
508                    self.report_peer_bad_transactions(peer_id);
509                }
510            }
511            return
512        }
513
514        // if we're _currently_ syncing, we ignore a bad transaction
515        if !err.is_bad_transaction() || self.network.is_syncing() {
516            return
517        }
518        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
519        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
520        if let Some(peers) = peers {
521            for peer_id in peers {
522                self.report_peer_bad_transactions(peer_id);
523            }
524        }
525        self.metrics.bad_imports.increment(1);
526        self.bad_imports.insert(err.hash);
527    }
528
529    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
530    ///
531    /// Returns `true` if a request was sent.
532    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
533        // try drain transaction hashes pending fetch
534        let info = &self.pending_pool_imports_info;
535        let max_pending_pool_imports = info.max_pending_pool_imports;
536        let has_capacity_wrt_pending_pool_imports =
537            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
538
539        self.transaction_fetcher
540            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
541    }
542
543    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
544        let kind = match req_err {
545            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
546            RequestError::Timeout => ReputationChangeKind::Timeout,
547            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
548                // peer is already disconnected
549                return
550            }
551            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
552        };
553        self.report_peer(peer_id, kind);
554    }
555
556    #[inline]
557    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
558        let metrics = &self.metrics;
559
560        let TxManagerPollDurations {
561            acc_network_events,
562            acc_pending_imports,
563            acc_tx_events,
564            acc_imported_txns,
565            acc_fetch_events,
566            acc_pending_fetch,
567            acc_cmds,
568        } = poll_durations;
569
570        // update metrics for whole poll function
571        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
572        // update metrics for nested expressions
573        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
574        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
575        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
576        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
577        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
578        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
579        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
580    }
581}
582
583impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
584    /// Processes a batch import results.
585    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
586        for res in batch_results {
587            match res {
588                Ok(AddedTransactionOutcome { hash, .. }) => {
589                    self.on_good_import(hash);
590                }
591                Err(err) => {
592                    self.on_bad_import(err);
593                }
594            }
595        }
596    }
597
598    /// Request handler for an incoming `NewPooledTransactionHashes`
599    fn on_new_pooled_transaction_hashes(
600        &mut self,
601        peer_id: PeerId,
602        msg: NewPooledTransactionHashes,
603    ) {
604        // If the node is initially syncing, ignore transactions
605        if self.network.is_initially_syncing() {
606            return
607        }
608        if self.network.tx_gossip_disabled() {
609            return
610        }
611
612        // get handle to peer's session, if the session is still active
613        let Some(peer) = self.peers.get_mut(&peer_id) else {
614            trace!(
615                peer_id = format!("{peer_id:#}"),
616                ?msg,
617                "discarding announcement from inactive peer"
618            );
619
620            return
621        };
622        let client = peer.client_version.clone();
623
624        // keep track of the transactions the peer knows
625        let mut count_txns_already_seen_by_peer = 0;
626        for tx in msg.iter_hashes().copied() {
627            if !peer.seen_transactions.insert(tx) {
628                count_txns_already_seen_by_peer += 1;
629            }
630        }
631        if count_txns_already_seen_by_peer > 0 {
632            // this may occur if transactions are sent or announced to a peer, at the same time as
633            // the peer sends/announces those hashes to us. this is because, marking
634            // txns as seen by a peer is done optimistically upon sending them to the
635            // peer.
636            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
637            self.metrics
638                .occurrences_hash_already_seen_by_peer
639                .increment(count_txns_already_seen_by_peer);
640
641            trace!(target: "net::tx",
642                %count_txns_already_seen_by_peer,
643                peer_id=format!("{peer_id:#}"),
644                ?client,
645                "Peer sent hashes that have already been marked as seen by peer"
646            );
647
648            self.report_already_seen(peer_id);
649        }
650
651        // 1. filter out spam
652        if msg.is_empty() {
653            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
654            return;
655        }
656
657        let original_len = msg.len();
658        let mut partially_valid_msg = msg.dedup();
659
660        if partially_valid_msg.len() != original_len {
661            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
662        }
663
664        // 2. filter out transactions pending import to pool
665        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
666
667        // 3. filter out invalid entries (spam)
668        //
669        // validates messages with respect to the given network, e.g. allowed tx types.
670        // done before the pool lookup since these are cheap in-memory checks that shrink
671        // the set before acquiring the pool lock.
672        //
673        let mut should_report_peer = false;
674        let mut tx_types_counter = TxTypesCounter::default();
675
676        let has_eth68_metadata = partially_valid_msg
677            .msg_version()
678            .expect("partially valid announcement should have a version")
679            .has_eth68_metadata();
680
681        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
682            let (ty_byte, size_val) = match *metadata_ref_mut {
683                Some((ty, size)) => {
684                    if !has_eth68_metadata {
685                        should_report_peer = true;
686                    }
687                    (ty, size)
688                }
689                None => {
690                    if has_eth68_metadata {
691                        should_report_peer = true;
692                        return false;
693                    }
694                    (0u8, 0)
695                }
696            };
697
698            if has_eth68_metadata && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
699                match TxType::try_from(actual_ty_byte) {
700                    Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
701                    Err(_) => tx_types_counter.increase_other(),
702                }
703            }
704
705            let decision = self
706                .policies
707                .announcement_filter()
708                .decide_on_announcement(ty_byte, tx_hash, size_val);
709
710            match decision {
711                AnnouncementAcceptance::Accept => true,
712                AnnouncementAcceptance::Ignore => false,
713                AnnouncementAcceptance::Reject { penalize_peer } => {
714                    if penalize_peer {
715                        should_report_peer = true;
716                    }
717                    false
718                }
719            }
720        });
721
722        if has_eth68_metadata {
723            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
724        }
725
726        if should_report_peer {
727            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
728        }
729
730        // 4. filter out known hashes
731        //
732        // known txns have already been successfully fetched or received over gossip.
733        //
734        // most hashes will be filtered out here since the mempool protocol is a gossip
735        // protocol, healthy peers will send many of the same hashes.
736        //
737        let hashes_count_pre_pool_filter = partially_valid_msg.len();
738        self.pool.retain_unknown(&mut partially_valid_msg);
739        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
740            let already_known_hashes_count =
741                hashes_count_pre_pool_filter - partially_valid_msg.len();
742            self.metrics
743                .occurrences_hashes_already_in_pool
744                .increment(already_known_hashes_count as u64);
745        }
746
747        if partially_valid_msg.is_empty() {
748            // nothing to request
749            return
750        }
751
752        let mut valid_announcement_data =
753            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
754
755        if valid_announcement_data.is_empty() {
756            // no valid announcement data
757            return
758        }
759
760        // 5. filter out already seen unknown hashes
761        //
762        // seen hashes are already in the tx fetcher, pending fetch.
763        //
764        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
765        // fetcher, hence they should be valid at this point.
766        let bad_imports = &self.bad_imports;
767        self.transaction_fetcher.filter_unseen_and_pending_hashes(
768            &mut valid_announcement_data,
769            |hash| bad_imports.contains(hash),
770            &peer_id,
771            &client,
772        );
773
774        if valid_announcement_data.is_empty() {
775            // nothing to request
776            return
777        }
778
779        trace!(target: "net::tx::propagation",
780            peer_id=format!("{peer_id:#}"),
781            hashes_len=valid_announcement_data.len(),
782            hashes=?valid_announcement_data.keys(),
783            msg_version=%valid_announcement_data.msg_version(),
784            client_version=%client,
785            "received previously unseen and pending hashes in announcement from peer"
786        );
787
788        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
789        // fallback
790        if !self.transaction_fetcher.is_idle(&peer_id) {
791            // load message version before announcement data is destructed in packing
792            let msg_version = valid_announcement_data.msg_version();
793            let (hashes, _version) = valid_announcement_data.into_request_hashes();
794
795            trace!(target: "net::tx",
796                peer_id=format!("{peer_id:#}"),
797                hashes=?*hashes,
798                %msg_version,
799                %client,
800                "buffering hashes announced by busy peer"
801            );
802
803            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
804
805            return
806        }
807
808        let mut hashes_to_request =
809            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
810        let surplus_hashes =
811            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
812
813        if !surplus_hashes.is_empty() {
814            trace!(target: "net::tx",
815                peer_id=format!("{peer_id:#}"),
816                surplus_hashes=?*surplus_hashes,
817                %client,
818                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
819            );
820
821            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
822        }
823
824        trace!(target: "net::tx",
825            peer_id=format!("{peer_id:#}"),
826            hashes=?*hashes_to_request,
827            %client,
828            "sending hashes in `GetPooledTransactions` request to peer's session"
829        );
830
831        // request the missing transactions
832        //
833        // get handle to peer's session again, at this point we know it exists
834        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
835        if let Some(failed_to_request_hashes) =
836            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
837        {
838            let conn_eth_version = peer.version;
839
840            trace!(target: "net::tx",
841                peer_id=format!("{peer_id:#}"),
842                failed_to_request_hashes=?*failed_to_request_hashes,
843                %conn_eth_version,
844                %client,
845                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
846            );
847            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
848        }
849    }
850}
851
852impl<Pool, N> TransactionsManager<Pool, N>
853where
854    Pool: TransactionPool + Unpin + 'static,
855    N: NetworkPrimitives<
856            BroadcastedTransaction: SignedTransaction,
857            PooledTransaction: SignedTransaction,
858        > + Unpin,
859    Pool::Transaction:
860        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
861{
862    /// Invoked when transactions in the local mempool are considered __pending__.
863    ///
864    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
865    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
866    /// messages. The Transactions message relays complete transaction objects and is typically
867    /// sent to a small, random fraction of connected peers.
868    ///
869    /// All other peers receive a notification of the transaction hash and can request the
870    /// complete transaction object if it is unknown to them. The dissemination of complete
871    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
872    /// and won't need to request it.
873    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
874        // We intentionally do not gate this on initial sync.
875        // During initial sync we skip importing tx announcements from peers in
876        // `on_new_pooled_transaction_hashes`, so transactions reaching this path are local.
877        if self.network.tx_gossip_disabled() {
878            return
879        }
880
881        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
882
883        self.propagate_all(hashes);
884    }
885
886    /// Propagate the full transactions to a specific peer.
887    ///
888    /// Returns the propagated transactions.
889    fn propagate_full_transactions_to_peer(
890        &mut self,
891        txs: Vec<TxHash>,
892        peer_id: PeerId,
893        propagation_mode: PropagationMode,
894    ) -> Option<PropagatedTransactions> {
895        let peer = self.peers.get_mut(&peer_id)?;
896        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
897        let mut propagated = PropagatedTransactions::default();
898
899        // filter all transactions unknown to the peer
900        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
901
902        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
903
904        if propagation_mode.is_forced() {
905            // skip cache check if forced
906            full_transactions.extend(to_propagate);
907        } else {
908            // Iterate through the transactions to propagate and fill the hashes and full
909            // transaction
910            for tx in to_propagate {
911                if !peer.seen_transactions.contains(tx.tx_hash()) {
912                    // Only include if the peer hasn't seen the transaction
913                    full_transactions.push(&tx);
914                }
915            }
916        }
917
918        if full_transactions.is_empty() {
919            // nothing to propagate
920            return None
921        }
922
923        let PropagateTransactions { pooled, full } = full_transactions.build();
924
925        // send hashes if any
926        if let Some(new_pooled_hashes) = pooled {
927            for hash in new_pooled_hashes.iter_hashes().copied() {
928                propagated.record(hash, PropagateKind::Hash(peer_id));
929                // mark transaction as seen by peer
930                peer.seen_transactions.insert(hash);
931            }
932
933            // send hashes of transactions
934            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
935        }
936
937        // send full transactions, if any
938        if let Some(new_full_transactions) = full {
939            for tx in &new_full_transactions {
940                propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
941                // mark transaction as seen by peer
942                peer.seen_transactions.insert(*tx.tx_hash());
943            }
944
945            // send full transactions
946            self.network.send_transactions(peer_id, new_full_transactions);
947        }
948
949        // Update propagated transactions metrics
950        self.metrics.propagated_transactions.increment(propagated.len() as u64);
951
952        Some(propagated)
953    }
954
955    /// Propagate the transaction hashes to the given peer
956    ///
957    /// Note: This will only send the hashes for transactions that exist in the pool.
958    fn propagate_hashes_to(
959        &mut self,
960        hashes: Vec<TxHash>,
961        peer_id: PeerId,
962        propagation_mode: PropagationMode,
963    ) {
964        trace!(target: "net::tx", "Start propagating transactions as hashes");
965
966        // This fetches a transactions from the pool, including the blob transactions, which are
967        // only ever sent as hashes.
968        let propagated = {
969            let Some(peer) = self.peers.get_mut(&peer_id) else {
970                // no such peer
971                return
972            };
973
974            let to_propagate =
975                self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
976
977            let mut propagated = PropagatedTransactions::default();
978
979            // check if transaction is known to peer
980            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
981
982            if propagation_mode.is_forced() {
983                hashes.extend(to_propagate)
984            } else {
985                for tx in to_propagate {
986                    if !peer.seen_transactions.contains(tx.tx_hash()) {
987                        // Include if the peer hasn't seen it
988                        hashes.push(&tx);
989                    }
990                }
991            }
992
993            let new_pooled_hashes = hashes.build();
994
995            if new_pooled_hashes.is_empty() {
996                // nothing to propagate
997                return
998            }
999
1000            if let Some(peer) = self.peers.get_mut(&peer_id) {
1001                for hash in new_pooled_hashes.iter_hashes().copied() {
1002                    propagated.record(hash, PropagateKind::Hash(peer_id));
1003                    peer.seen_transactions.insert(hash);
1004                }
1005            }
1006
1007            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1008
1009            // send hashes of transactions
1010            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1011
1012            // Update propagated transactions metrics
1013            self.metrics.propagated_transactions.increment(propagated.len() as u64);
1014
1015            propagated
1016        };
1017
1018        // notify pool so events get fired
1019        self.pool.on_propagated(propagated);
1020    }
1021
1022    /// Propagate the transactions to all connected peers either as full objects or hashes.
1023    ///
1024    /// The message for new pooled hashes depends on the negotiated version of the stream.
1025    /// See [`NewPooledTransactionHashes`]
1026    ///
1027    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1028    fn propagate_transactions(
1029        &mut self,
1030        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1031        propagation_mode: PropagationMode,
1032    ) -> PropagatedTransactions {
1033        let mut propagated = PropagatedTransactions::default();
1034        if self.network.tx_gossip_disabled() {
1035            return propagated
1036        }
1037
1038        // send full transactions to a set of the connected peers based on the configured mode
1039        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1040
1041        // Note: Assuming ~random~ order due to random state of the peers map hasher
1042        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1043            if !self.policies.propagation_policy().can_propagate(peer) {
1044                // skip peers we should not propagate to
1045                continue
1046            }
1047            // determine whether to send full tx objects or hashes.
1048            let mut builder = if peer_idx > max_num_full {
1049                PropagateTransactionsBuilder::pooled(peer.version)
1050            } else {
1051                PropagateTransactionsBuilder::full(peer.version)
1052            };
1053
1054            if propagation_mode.is_forced() {
1055                builder.extend(to_propagate.iter());
1056            } else {
1057                // Iterate through the transactions to propagate and fill the hashes and full
1058                // transaction lists, before deciding whether or not to send full transactions to
1059                // the peer.
1060                for tx in &to_propagate {
1061                    // Only proceed if the transaction is not in the peer's list of seen
1062                    // transactions
1063                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1064                        builder.push(tx);
1065                    }
1066                }
1067            }
1068
1069            if builder.is_empty() {
1070                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1071                continue
1072            }
1073
1074            let PropagateTransactions { pooled, full } = builder.build();
1075
1076            // send hashes if any
1077            if let Some(mut new_pooled_hashes) = pooled {
1078                // enforce tx soft limit per message for the (unlikely) event the number of
1079                // hashes exceeds it
1080                new_pooled_hashes
1081                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1082
1083                for hash in new_pooled_hashes.iter_hashes().copied() {
1084                    propagated.record(hash, PropagateKind::Hash(*peer_id));
1085                    // mark transaction as seen by peer
1086                    peer.seen_transactions.insert(hash);
1087                }
1088
1089                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1090
1091                // send hashes of transactions
1092                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1093            }
1094
1095            // send full transactions, if any
1096            if let Some(new_full_transactions) = full {
1097                for tx in &new_full_transactions {
1098                    propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1099                    // mark transaction as seen by peer
1100                    peer.seen_transactions.insert(*tx.tx_hash());
1101                }
1102
1103                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1104
1105                // send full transactions
1106                self.network.send_transactions(*peer_id, new_full_transactions);
1107            }
1108        }
1109
1110        // Update propagated transactions metrics
1111        self.metrics.propagated_transactions.increment(propagated.len() as u64);
1112
1113        propagated
1114    }
1115
1116    /// Propagates the given transactions to the peers
1117    ///
1118    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1119    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1120    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1121        if self.peers.is_empty() {
1122            // nothing to propagate
1123            return
1124        }
1125        let propagated = self.propagate_transactions(
1126            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1127            PropagationMode::Basic,
1128        );
1129
1130        // notify pool so events get fired
1131        self.pool.on_propagated(propagated);
1132    }
1133
1134    /// Request handler for an incoming request for transactions
1135    fn on_get_pooled_transactions(
1136        &mut self,
1137        peer_id: PeerId,
1138        request: GetPooledTransactions,
1139        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1140    ) {
1141        // fast exit if gossip is disabled
1142        if self.network.tx_gossip_disabled() {
1143            let _ = response.send(Ok(PooledTransactions::default()));
1144            return
1145        }
1146        if let Some(peer) = self.peers.get_mut(&peer_id) {
1147            let transactions = self.pool.get_pooled_transaction_elements(
1148                request.0,
1149                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1150                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1151                ),
1152            );
1153            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1154
1155            // we sent a response at which point we assume that the peer is aware of the
1156            // transactions
1157            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1158
1159            let resp = PooledTransactions(transactions);
1160            let _ = response.send(Ok(resp));
1161        }
1162    }
1163
1164    /// Handles a command received from a detached [`TransactionsHandle`]
1165    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1166        match cmd {
1167            TransactionsCommand::PropagateHash(hash) => {
1168                self.on_new_pending_transactions(vec![hash])
1169            }
1170            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1171                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1172            }
1173            TransactionsCommand::GetActivePeers(tx) => {
1174                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1175                tx.send(peers).ok();
1176            }
1177            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1178                if let Some(propagated) =
1179                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1180                {
1181                    self.pool.on_propagated(propagated);
1182                }
1183            }
1184            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1185            TransactionsCommand::BroadcastTransactions(txs) => {
1186                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1187                self.pool.on_propagated(propagated);
1188            }
1189            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1190                let mut res = HashMap::with_capacity(peers.len());
1191                for peer_id in peers {
1192                    let hashes = self
1193                        .peers
1194                        .get(&peer_id)
1195                        .map(|peer| peer.seen_transactions.iter().copied().collect::<B256Set>())
1196                        .unwrap_or_default();
1197                    res.insert(peer_id, hashes);
1198                }
1199                tx.send(res).ok();
1200            }
1201            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1202                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1203                peer_request_sender.send(sender).ok();
1204            }
1205        }
1206    }
1207
1208    /// Handles session establishment and peer transactions initialization.
1209    ///
1210    /// This is invoked when a new session is established.
1211    fn handle_peer_session(
1212        &mut self,
1213        info: SessionInfo,
1214        messages: PeerRequestSender<PeerRequest<N>>,
1215    ) {
1216        let SessionInfo { peer_id, client_version, version, .. } = info;
1217
1218        // Insert a new peer into the peerset.
1219        let peer = PeerMetadata::<N>::new(
1220            messages,
1221            version,
1222            client_version,
1223            self.config.max_transactions_seen_by_peer_history,
1224            info.peer_kind,
1225        );
1226        let peer = match self.peers.entry(peer_id) {
1227            Entry::Occupied(mut entry) => {
1228                entry.insert(peer);
1229                entry.into_mut()
1230            }
1231            Entry::Vacant(entry) => entry.insert(peer),
1232        };
1233
1234        self.policies.propagation_policy_mut().on_session_established(peer);
1235
1236        // Send a `NewPooledTransactionHashes` to the peer with up to
1237        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1238        // transactions in the pool.
1239        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1240            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1241            return
1242        }
1243
1244        // Get transactions to broadcast
1245        let pooled_txs = self.pool.pooled_transactions_max(
1246            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1247        );
1248        if pooled_txs.is_empty() {
1249            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1250            return;
1251        }
1252
1253        // Build and send transaction hashes message
1254        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1255        for pooled_tx in pooled_txs {
1256            peer.seen_transactions.insert(*pooled_tx.hash());
1257            msg_builder.push_pooled(pooled_tx);
1258        }
1259
1260        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1261        let msg = msg_builder.build();
1262        self.network.send_transactions_hashes(peer_id, msg);
1263    }
1264
1265    /// Handles a received event related to common network events.
1266    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1267        match event_result {
1268            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1269                self.on_peer_session_closed(&peer_id);
1270            }
1271            NetworkEvent::ActivePeerSession { info, messages } => {
1272                // process active peer session and broadcast available transaction from the pool
1273                self.handle_peer_session(info, messages);
1274            }
1275            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1276                let peer_id = info.peer_id;
1277                // get messages from existing peer
1278                let messages = match self.peers.get(&peer_id) {
1279                    Some(p) => p.request_tx.clone(),
1280                    None => {
1281                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1282                        return;
1283                    }
1284                };
1285                self.handle_peer_session(info, messages);
1286            }
1287            _ => {}
1288        }
1289    }
1290
1291    /// Returns true if the ingress policy allows processing messages from the given peer.
1292    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1293        if self.config.ingress_policy.allows_all() {
1294            return true;
1295        }
1296        let Some(peer) = self.peers.get(peer_id) else {
1297            return false;
1298        };
1299        self.config.ingress_policy.allows(peer.peer_kind())
1300    }
1301
1302    /// Handles dedicated transaction events related to the `eth` protocol.
1303    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1304        match event {
1305            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1306                if !self.accepts_incoming_from(&peer_id) {
1307                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1308                    return;
1309                }
1310
1311                // ensure we didn't receive any blob transactions as these are disallowed to be
1312                // broadcasted in full
1313
1314                let has_blob_txs = msg.has_eip4844();
1315
1316                let non_blob_txs = msg
1317                    .into_iter()
1318                    .map(N::PooledTransaction::try_from)
1319                    .filter_map(Result::ok)
1320                    .collect();
1321
1322                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1323
1324                if has_blob_txs {
1325                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1326                    self.report_peer_bad_transactions(peer_id);
1327                }
1328            }
1329            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1330                if !self.accepts_incoming_from(&peer_id) {
1331                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1332                    return;
1333                }
1334                self.on_new_pooled_transaction_hashes(peer_id, msg)
1335            }
1336            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1337                self.on_get_pooled_transactions(peer_id, request, response)
1338            }
1339            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1340                let _ = response.send(Some(self.handle()));
1341            }
1342        }
1343    }
1344
1345    /// Starts the import process for the given transactions.
1346    fn import_transactions(
1347        &mut self,
1348        peer_id: PeerId,
1349        transactions: PooledTransactions<N::PooledTransaction>,
1350        source: TransactionSource,
1351    ) {
1352        // If the node is pipeline syncing, ignore transactions
1353        if self.network.is_initially_syncing() {
1354            return
1355        }
1356        if self.network.tx_gossip_disabled() {
1357            return
1358        }
1359
1360        // Early return if we don't have capacity for any imports
1361        if !self.has_capacity_for_pending_pool_imports() {
1362            return
1363        }
1364
1365        let mut transactions = transactions.0;
1366
1367        // Truncate to remaining capacity early to bound work on all subsequent processing.
1368        // Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
1369        // malicious and we avoid wasting CPU on them.
1370        let capacity = self.remaining_pool_import_capacity();
1371        if transactions.len() > capacity {
1372            let skipped = transactions.len() - capacity;
1373            transactions.truncate(capacity);
1374            self.metrics
1375                .skipped_transactions_pending_pool_imports_at_capacity
1376                .increment(skipped as u64);
1377            trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1378        }
1379
1380        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1381        let client_version = peer.client_version.clone();
1382
1383        let start = Instant::now();
1384
1385        // mark the transactions as received
1386        self.transaction_fetcher
1387            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1388
1389        // track that the peer knows these transaction, but only if this is a new broadcast.
1390        // If we received the transactions as the response to our `GetPooledTransactions``
1391        // requests (based on received `NewPooledTransactionHashes`) then we already
1392        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1393        let mut num_already_seen_by_peer = 0;
1394        for tx in &transactions {
1395            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1396                num_already_seen_by_peer += 1;
1397            }
1398        }
1399
1400        // tracks the quality of the given transactions
1401        let mut has_bad_transactions = false;
1402
1403        // 1. Remove known, already-tracked, and invalid transactions first since these are
1404        // cheap in-memory checks against local maps
1405        transactions.retain(|tx| {
1406            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1407                entry.get_mut().insert(peer_id);
1408                return false
1409            }
1410            if self.bad_imports.contains(tx.tx_hash()) {
1411                trace!(target: "net::tx",
1412                    peer_id=format!("{peer_id:#}"),
1413                    hash=%tx.tx_hash(),
1414                    %client_version,
1415                    "received a known bad transaction from peer"
1416                );
1417                has_bad_transactions = true;
1418                return false;
1419            }
1420            true
1421        });
1422
1423        // 2. filter out txns already inserted into pool
1424        let txns_count_pre_pool_filter = transactions.len();
1425        self.pool.retain_unknown(&mut transactions);
1426        if txns_count_pre_pool_filter > transactions.len() {
1427            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1428            self.metrics
1429                .occurrences_transactions_already_in_pool
1430                .increment(already_known_txns_count as u64);
1431        }
1432
1433        let txs_len = transactions.len();
1434
1435        let new_txs = transactions
1436            .into_par_iter()
1437            .filter_map(|tx| match tx.try_into_recovered() {
1438                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1439                Err(badtx) => {
1440                    trace!(target: "net::tx",
1441                        peer_id=format!("{peer_id:#}"),
1442                        hash=%badtx.tx_hash(),
1443                        client_version=%client_version,
1444                        "failed ecrecovery for transaction"
1445                    );
1446                    None
1447                }
1448            })
1449            .collect::<Vec<_>>();
1450
1451        has_bad_transactions |= new_txs.len() != txs_len;
1452
1453        // Record the transactions as seen by the peer
1454        for tx in &new_txs {
1455            self.transactions_by_peers.insert(*tx.hash(), HashSet::from([peer_id]));
1456        }
1457
1458        // 3. import new transactions as a batch to minimize lock contention on the underlying
1459        // pool
1460        if !new_txs.is_empty() {
1461            let pool = self.pool.clone();
1462            // update metrics
1463            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1464            metric_pending_pool_imports.increment(new_txs.len() as f64);
1465
1466            // update self-monitoring info
1467            self.pending_pool_imports_info
1468                .pending_pool_imports
1469                .fetch_add(new_txs.len(), Ordering::Relaxed);
1470            let tx_manager_info_pending_pool_imports =
1471                self.pending_pool_imports_info.pending_pool_imports.clone();
1472
1473            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1474            let import = Box::pin(async move {
1475                let added = new_txs.len();
1476                let res = pool.add_external_transactions(new_txs).await;
1477
1478                // update metrics
1479                metric_pending_pool_imports.decrement(added as f64);
1480                // update self-monitoring info
1481                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1482
1483                res
1484            });
1485
1486            self.pool_imports.push(import);
1487        }
1488
1489        if num_already_seen_by_peer > 0 {
1490            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1491            self.metrics
1492                .occurrences_of_transaction_already_seen_by_peer
1493                .increment(num_already_seen_by_peer);
1494            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1495        }
1496
1497        if has_bad_transactions {
1498            // peer sent us invalid transactions
1499            self.report_peer_bad_transactions(peer_id)
1500        }
1501
1502        if num_already_seen_by_peer > 0 {
1503            self.report_already_seen(peer_id);
1504        }
1505
1506        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1507    }
1508
1509    /// Processes a [`FetchEvent`].
1510    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1511        match fetch_event {
1512            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1513                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1514                if report_peer {
1515                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1516                }
1517            }
1518            FetchEvent::FetchError { peer_id, error } => {
1519                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1520                self.on_request_error(peer_id, error);
1521            }
1522            FetchEvent::EmptyResponse { peer_id } => {
1523                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1524            }
1525        }
1526    }
1527}
1528
1529/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1530/// [`crate::NetworkManager`] for more context on the design pattern.
1531///
1532/// This should be spawned or used as part of `tokio::select!`.
1533//
1534// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1535// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1536impl<
1537        Pool: TransactionPool + Unpin + 'static,
1538        N: NetworkPrimitives<
1539                BroadcastedTransaction: SignedTransaction,
1540                PooledTransaction: SignedTransaction,
1541            > + Unpin,
1542    > Future for TransactionsManager<Pool, N>
1543where
1544    Pool::Transaction:
1545        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1546{
1547    type Output = ();
1548
1549    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1550        let start = Instant::now();
1551        let mut poll_durations = TxManagerPollDurations::default();
1552
1553        let this = self.get_mut();
1554
1555        // All streams are polled until their corresponding budget is exhausted, then we manually
1556        // yield back control to tokio. See `NetworkManager` for more context on the design
1557        // pattern.
1558
1559        // Advance network/peer related events (update peers map).
1560        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1561            poll_durations.acc_network_events,
1562            "net::tx",
1563            "Network events stream",
1564            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1565            this.network_events.poll_next_unpin(cx),
1566            |event| this.on_network_event(event)
1567        );
1568
1569        // Advances new __pending__ transactions, transactions that were successfully inserted into
1570        // pending set in pool (are valid), and propagates them (inform peers which
1571        // transactions we have seen).
1572        //
1573        // We try to drain this to batch the transactions in a single message.
1574        //
1575        // We don't expect this buffer to be large, since only pending transactions are
1576        // emitted here.
1577        let mut new_txs = Vec::new();
1578        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1579            cx,
1580            &mut new_txs,
1581            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1582        ) {
1583            Poll::Ready(count) => {
1584                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1585                    // we filled the entire buffer capacity and need to try again on the next poll
1586                    // immediately
1587                    true
1588                } else {
1589                    // try once more, because mostlikely the channel is now empty and the waker is
1590                    // registered if this is pending, if we filled additional hashes, we poll again
1591                    // on the next iteration
1592                    let limit =
1593                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1594                            new_txs.len();
1595                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1596                }
1597            }
1598            Poll::Pending => false,
1599        };
1600        if !new_txs.is_empty() {
1601            this.on_new_pending_transactions(new_txs);
1602        }
1603
1604        // Advance incoming transaction events (stream new txns/announcements from
1605        // network manager and queue for import to pool/fetch txns).
1606        //
1607        // This will potentially remove hashes from hashes pending fetch, it the event
1608        // is an announcement (if same hashes are announced that didn't fit into a
1609        // previous request).
1610        //
1611        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1612        // (128 KiB / 10 bytes > 13k transactions).
1613        //
1614        // If this is an event with `Transactions` message, since transactions aren't
1615        // validated until they are inserted into the pool, this can potentially queue
1616        // >13k transactions for insertion to pool. More if the message size is bigger
1617        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1618        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1619            poll_durations.acc_tx_events,
1620            "net::tx",
1621            "Network transaction events stream",
1622            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1623            this.transaction_events.poll_next_unpin(cx),
1624            |event: NetworkTransactionEvent<N>| this.on_network_tx_event(event),
1625        );
1626
1627        // Advance inflight fetch requests (flush transaction fetcher and queue for
1628        // import to pool).
1629        //
1630        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1631        // (2 MiB / 10 bytes > 200k transactions).
1632        //
1633        // Since transactions aren't validated until they are inserted into the pool,
1634        // this can potentially queue >200k transactions for insertion to pool. More
1635        // if the message size is bigger than the soft limit on a `PooledTransactions`
1636        // response which is 2 MiB.
1637        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1638            poll_durations.acc_fetch_events,
1639            "net::tx",
1640            "Transaction fetch events stream",
1641            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1642            this.transaction_fetcher.poll_next_unpin(cx),
1643            |event| this.on_fetch_event(event),
1644        );
1645
1646        // Advance pool imports (flush txns to pool).
1647        //
1648        // Note, this is done in batches. A batch is filled from one `Transactions`
1649        // broadcast messages or one `PooledTransactions` response at a time. The
1650        // minimum batch size is 1 transaction (and might often be the case with blob
1651        // transactions).
1652        //
1653        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1654        // (2 MiB / 10 bytes > 200k transactions).
1655        //
1656        // Since transactions aren't validated until they are inserted into the pool,
1657        // this can potentially validate >200k transactions. More if the message size
1658        // is bigger than the soft limit on a `PooledTransactions` response which is
1659        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1660        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1661            poll_durations.acc_pending_imports,
1662            "net::tx",
1663            "Batched pool imports stream",
1664            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1665            this.pool_imports.poll_next_unpin(cx),
1666            |batch_results| this.on_batch_import_result(batch_results)
1667        );
1668
1669        // Tries to drain hashes pending fetch cache if the tx manager currently has
1670        // capacity for this (fetch txns).
1671        //
1672        // Sends at most one request.
1673        duration_metered_exec!(
1674            {
1675                if this.has_capacity_for_fetching_pending_hashes() &&
1676                    this.on_fetch_hashes_pending_fetch()
1677                {
1678                    maybe_more_tx_fetch_events = true;
1679                }
1680            },
1681            poll_durations.acc_pending_fetch
1682        );
1683
1684        // Advance commands (propagate/fetch/serve txns).
1685        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1686            poll_durations.acc_cmds,
1687            "net::tx",
1688            "Commands channel",
1689            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1690            this.command_rx.poll_next_unpin(cx),
1691            |cmd| this.on_command(cmd)
1692        );
1693
1694        this.transaction_fetcher.update_metrics();
1695
1696        // all channels are fully drained and import futures pending
1697        if maybe_more_network_events ||
1698            maybe_more_commands ||
1699            maybe_more_tx_events ||
1700            maybe_more_tx_fetch_events ||
1701            maybe_more_pool_imports ||
1702            maybe_more_pending_txns
1703        {
1704            // make sure we're woken up again
1705            cx.waker().wake_by_ref();
1706            return Poll::Pending
1707        }
1708
1709        this.update_poll_metrics(start, poll_durations);
1710
1711        Poll::Pending
1712    }
1713}
1714
1715/// Represents the different modes of transaction propagation.
1716///
1717/// This enum is used to determine how transactions are propagated to peers in the network.
1718#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1719enum PropagationMode {
1720    /// Default propagation mode.
1721    ///
1722    /// Transactions are only sent to peers that haven't seen them yet.
1723    Basic,
1724    /// Forced propagation mode.
1725    ///
1726    /// Transactions are sent to all peers regardless of whether they have been sent or received
1727    /// before.
1728    Forced,
1729}
1730
1731impl PropagationMode {
1732    /// Returns `true` if the propagation kind is `Forced`.
1733    const fn is_forced(self) -> bool {
1734        matches!(self, Self::Forced)
1735    }
1736}
1737
1738/// A transaction that's about to be propagated to multiple peers.
1739#[derive(Debug, Clone)]
1740struct PropagateTransaction<T = TransactionSigned> {
1741    size: usize,
1742    transaction: Arc<T>,
1743}
1744
1745impl<T: SignedTransaction> PropagateTransaction<T> {
1746    /// Create a new instance from a transaction.
1747    pub fn new(transaction: T) -> Self {
1748        let size = transaction.length();
1749        Self { size, transaction: Arc::new(transaction) }
1750    }
1751
1752    /// Create a new instance from a pooled transaction
1753    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1754    where
1755        P: PoolTransaction<Consensus = T>,
1756    {
1757        let size = tx.encoded_length();
1758        let transaction = tx.transaction.clone_into_consensus();
1759        let transaction = Arc::new(transaction.into_inner());
1760        Self { size, transaction }
1761    }
1762
1763    fn tx_hash(&self) -> &TxHash {
1764        self.transaction.tx_hash()
1765    }
1766}
1767
1768/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1769/// should receive them in full or as pooled
1770#[derive(Debug, Clone)]
1771enum PropagateTransactionsBuilder<T> {
1772    Pooled(PooledTransactionsHashesBuilder),
1773    Full(FullTransactionsBuilder<T>),
1774}
1775
1776impl<T> PropagateTransactionsBuilder<T> {
1777    /// Create a builder for pooled transactions
1778    fn pooled(version: EthVersion) -> Self {
1779        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1780    }
1781
1782    /// Create a builder that sends transactions in full and records transactions that don't fit.
1783    fn full(version: EthVersion) -> Self {
1784        Self::Full(FullTransactionsBuilder::new(version))
1785    }
1786
1787    /// Returns true if no transactions are recorded.
1788    fn is_empty(&self) -> bool {
1789        match self {
1790            Self::Pooled(builder) => builder.is_empty(),
1791            Self::Full(builder) => builder.is_empty(),
1792        }
1793    }
1794
1795    /// Consumes the type and returns the built messages that should be sent to the peer.
1796    fn build(self) -> PropagateTransactions<T> {
1797        match self {
1798            Self::Pooled(pooled) => {
1799                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1800            }
1801            Self::Full(full) => full.build(),
1802        }
1803    }
1804}
1805
1806impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1807    /// Appends all transactions
1808    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1809        for tx in txs {
1810            self.push(tx);
1811        }
1812    }
1813
1814    /// Appends a transaction to the list.
1815    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1816        match self {
1817            Self::Pooled(builder) => builder.push(transaction),
1818            Self::Full(builder) => builder.push(transaction),
1819        }
1820    }
1821}
1822
1823/// Represents how the transactions should be sent to a peer if any.
1824struct PropagateTransactions<T> {
1825    /// The pooled transaction hashes to send.
1826    pooled: Option<NewPooledTransactionHashes>,
1827    /// The transactions to send in full.
1828    full: Option<Vec<Arc<T>>>,
1829}
1830
1831/// Helper type for constructing the full transaction message that enforces the
1832/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1833/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1834/// broadcasted in full.
1835#[derive(Debug, Clone)]
1836struct FullTransactionsBuilder<T> {
1837    /// The soft limit to enforce for a single broadcast message of full transactions.
1838    total_size: usize,
1839    /// All transactions to be broadcasted.
1840    transactions: Vec<Arc<T>>,
1841    /// Transactions that didn't fit into the broadcast message
1842    pooled: PooledTransactionsHashesBuilder,
1843}
1844
1845impl<T> FullTransactionsBuilder<T> {
1846    /// Create a builder for the negotiated version of the peer's session
1847    fn new(version: EthVersion) -> Self {
1848        Self {
1849            total_size: 0,
1850            pooled: PooledTransactionsHashesBuilder::new(version),
1851            transactions: vec![],
1852        }
1853    }
1854
1855    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1856    fn is_empty(&self) -> bool {
1857        self.transactions.is_empty() && self.pooled.is_empty()
1858    }
1859
1860    /// Returns the messages that should be propagated to the peer.
1861    fn build(self) -> PropagateTransactions<T> {
1862        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1863        let full = Some(self.transactions).filter(|full| !full.is_empty());
1864        PropagateTransactions { pooled, full }
1865    }
1866}
1867
1868impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1869    /// Appends all transactions.
1870    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1871        for tx in txs {
1872            self.push(&tx)
1873        }
1874    }
1875
1876    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1877    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1878    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1879    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1880    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1881    ///
1882    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1883    /// to list of pooled transactions, (e.g. 4844 transactions).
1884    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1885    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1886        // Do not send full 4844 transaction hashes to peers.
1887        //
1888        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1889        //  Instead, those transactions are only announced using
1890        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1891        //  via `GetPooledTransactions`.
1892        //
1893        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1894        if !transaction.transaction.is_broadcastable_in_full() {
1895            self.pooled.push(transaction);
1896            return
1897        }
1898
1899        let new_size = self.total_size + transaction.size;
1900        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1901            self.total_size > 0
1902        {
1903            // transaction does not fit into the message
1904            self.pooled.push(transaction);
1905            return
1906        }
1907
1908        self.total_size = new_size;
1909        self.transactions.push(Arc::clone(&transaction.transaction));
1910    }
1911}
1912
1913/// A helper type to create the pooled transactions message based on the negotiated version of the
1914/// session with the peer
1915#[derive(Debug, Clone)]
1916enum PooledTransactionsHashesBuilder {
1917    Eth66(NewPooledTransactionHashes66),
1918    Eth68(NewPooledTransactionHashes68),
1919    Eth72(NewPooledTransactionHashes72),
1920}
1921
1922// === impl PooledTransactionsHashesBuilder ===
1923
1924impl PooledTransactionsHashesBuilder {
1925    /// Push a transaction from the pool to the list.
1926    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1927        match self {
1928            Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1929            Self::Eth68(msg) => {
1930                msg.hashes.push(*pooled_tx.hash());
1931                msg.sizes.push(pooled_tx.encoded_length());
1932                msg.types.push(pooled_tx.transaction.ty());
1933            }
1934            Self::Eth72(msg) => {
1935                msg.hashes.push(*pooled_tx.hash());
1936                msg.sizes.push(pooled_tx.encoded_length());
1937                msg.types.push(pooled_tx.transaction.ty());
1938            }
1939        }
1940    }
1941
1942    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1943    fn is_empty(&self) -> bool {
1944        match self {
1945            Self::Eth66(hashes) => hashes.is_empty(),
1946            Self::Eth68(hashes) => hashes.is_empty(),
1947            Self::Eth72(hashes) => hashes.is_empty(),
1948        }
1949    }
1950
1951    /// Returns the number of transactions in the builder.
1952    fn len(&self) -> usize {
1953        match self {
1954            Self::Eth66(hashes) => hashes.len(),
1955            Self::Eth68(hashes) => hashes.len(),
1956            Self::Eth72(hashes) => hashes.len(),
1957        }
1958    }
1959
1960    /// Appends all hashes
1961    fn extend<T: SignedTransaction>(
1962        &mut self,
1963        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1964    ) {
1965        for tx in txs {
1966            self.push(&tx);
1967        }
1968    }
1969
1970    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1971        match self {
1972            Self::Eth66(msg) => msg.push(*tx.tx_hash()),
1973            Self::Eth68(msg) => {
1974                msg.hashes.push(*tx.tx_hash());
1975                msg.sizes.push(tx.size);
1976                msg.types.push(tx.transaction.ty());
1977            }
1978            Self::Eth72(msg) => {
1979                msg.hashes.push(*tx.tx_hash());
1980                msg.sizes.push(tx.size);
1981                msg.types.push(tx.transaction.ty());
1982            }
1983        }
1984    }
1985
1986    /// Create a builder for the negotiated version of the peer's session
1987    fn new(version: EthVersion) -> Self {
1988        match version {
1989            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1990            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
1991                Self::Eth68(Default::default())
1992            }
1993            EthVersion::Eth72 => Self::Eth72(Default::default()),
1994        }
1995    }
1996
1997    fn build(self) -> NewPooledTransactionHashes {
1998        match self {
1999            Self::Eth66(mut msg) => {
2000                msg.shrink_to_fit();
2001                msg.into()
2002            }
2003            Self::Eth68(mut msg) => {
2004                msg.shrink_to_fit();
2005                msg.into()
2006            }
2007            Self::Eth72(mut msg) => {
2008                msg.shrink_to_fit();
2009                msg.into()
2010            }
2011        }
2012    }
2013}
2014
2015/// How we received the transactions.
2016enum TransactionSource {
2017    /// Transactions were broadcast to us via [`Transactions`] message.
2018    Broadcast,
2019    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
2020    Response,
2021}
2022
2023// === impl TransactionSource ===
2024
2025impl TransactionSource {
2026    /// Whether the transaction were sent as broadcast.
2027    const fn is_broadcast(&self) -> bool {
2028        matches!(self, Self::Broadcast)
2029    }
2030}
2031
2032/// Tracks a single peer in the context of [`TransactionsManager`].
2033#[derive(Debug)]
2034pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2035    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
2036    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
2037    /// the peer.
2038    seen_transactions: LruCache<TxHash, FbBuildHasher<32>>,
2039    /// A communication channel directly to the peer's session task.
2040    request_tx: PeerRequestSender<PeerRequest<N>>,
2041    /// negotiated version of the session.
2042    version: EthVersion,
2043    /// The peer's client version.
2044    client_version: Arc<str>,
2045    /// The kind of peer.
2046    peer_kind: PeerKind,
2047}
2048
2049impl<N: NetworkPrimitives> PeerMetadata<N> {
2050    /// Returns a new instance of [`PeerMetadata`].
2051    pub fn new(
2052        request_tx: PeerRequestSender<PeerRequest<N>>,
2053        version: EthVersion,
2054        client_version: Arc<str>,
2055        max_transactions_seen_by_peer: u32,
2056        peer_kind: PeerKind,
2057    ) -> Self {
2058        Self {
2059            seen_transactions: LruCache::with_hasher(
2060                max_transactions_seen_by_peer,
2061                Default::default(),
2062            ),
2063            request_tx,
2064            version,
2065            client_version,
2066            peer_kind,
2067        }
2068    }
2069
2070    /// Returns a reference to the peer's request sender channel.
2071    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2072        &self.request_tx
2073    }
2074
2075    /// Returns a mutable reference to the seen transactions LRU cache.
2076    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash, FbBuildHasher<32>> {
2077        &mut self.seen_transactions
2078    }
2079
2080    /// Returns the negotiated `EthVersion` of the session.
2081    pub const fn version(&self) -> EthVersion {
2082        self.version
2083    }
2084
2085    /// Returns a reference to the peer's client version string.
2086    pub fn client_version(&self) -> &str {
2087        &self.client_version
2088    }
2089
2090    /// Returns the peer's kind.
2091    pub const fn peer_kind(&self) -> PeerKind {
2092        self.peer_kind
2093    }
2094}
2095
2096/// Commands to send to the [`TransactionsManager`]
2097#[derive(Debug)]
2098enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2099    /// Propagate a transaction hash to the network.
2100    PropagateHash(B256),
2101    /// Propagate transaction hashes to a specific peer.
2102    PropagateHashesTo(Vec<B256>, PeerId),
2103    /// Request the list of active peer IDs from the [`TransactionsManager`].
2104    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2105    /// Propagate a collection of full transactions to a specific peer.
2106    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2107    /// Propagate a collection of hashes to all peers.
2108    PropagateTransactions(Vec<TxHash>),
2109    /// Propagate a collection of broadcastable transactions in full to all peers.
2110    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2111    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2112    GetTransactionHashes { peers: Vec<PeerId>, tx: oneshot::Sender<HashMap<PeerId, B256Set>> },
2113    /// Requests a clone of the sender channel to the peer.
2114    GetPeerSender {
2115        peer_id: PeerId,
2116        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2117    },
2118}
2119
2120/// All events related to transactions emitted by the network.
2121#[derive(Debug)]
2122pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2123    /// Represents the event of receiving a list of transactions from a peer.
2124    ///
2125    /// This indicates transactions that were broadcasted to us from the peer.
2126    IncomingTransactions {
2127        /// The ID of the peer from which the transactions were received.
2128        peer_id: PeerId,
2129        /// The received transactions.
2130        msg: Transactions<N::BroadcastedTransaction>,
2131    },
2132    /// Represents the event of receiving a list of transaction hashes from a peer.
2133    IncomingPooledTransactionHashes {
2134        /// The ID of the peer from which the transaction hashes were received.
2135        peer_id: PeerId,
2136        /// The received new pooled transaction hashes.
2137        msg: NewPooledTransactionHashes,
2138    },
2139    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2140    GetPooledTransactions {
2141        /// The ID of the peer from which the request was received.
2142        peer_id: PeerId,
2143        /// The received `GetPooledTransactions` request.
2144        request: GetPooledTransactions,
2145        /// The sender for responding to the request with a result of `PooledTransactions`.
2146        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2147    },
2148    /// Represents the event of receiving a `GetTransactionsHandle` request.
2149    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2150}
2151
2152/// Tracks stats about the [`TransactionsManager`].
2153#[derive(Debug)]
2154pub struct PendingPoolImportsInfo {
2155    /// Number of transactions about to be inserted into the pool.
2156    pending_pool_imports: Arc<AtomicUsize>,
2157    /// Max number of transactions allowed to be imported concurrently.
2158    max_pending_pool_imports: usize,
2159}
2160
2161impl PendingPoolImportsInfo {
2162    /// Returns a new [`PendingPoolImportsInfo`].
2163    pub fn new(max_pending_pool_imports: usize) -> Self {
2164        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2165    }
2166
2167    /// Returns `true` if the number of pool imports is under a given tolerated max.
2168    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2169        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2170    }
2171}
2172
2173impl Default for PendingPoolImportsInfo {
2174    fn default() -> Self {
2175        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2176    }
2177}
2178
2179#[derive(Debug, Default)]
2180struct TxManagerPollDurations {
2181    acc_network_events: Duration,
2182    acc_pending_imports: Duration,
2183    acc_tx_events: Duration,
2184    acc_imported_txns: Duration,
2185    acc_fetch_events: Duration,
2186    acc_pending_fetch: Duration,
2187    acc_cmds: Duration,
2188}
2189
2190impl<N: NetworkPrimitives> InMemorySize for NetworkTransactionEvent<N> {
2191    // `N::BroadcastedTransaction` and `N::PooledTransaction` already implement
2192    // `InMemorySize` via `SignedTransaction: InMemorySize`, so no extra bound is needed.
2193    fn size(&self) -> usize {
2194        match self {
2195            Self::IncomingTransactions { peer_id, msg } => {
2196                core::mem::size_of_val(peer_id) +
2197                    msg.0.iter().map(InMemorySize::size).sum::<usize>()
2198            }
2199            Self::IncomingPooledTransactionHashes { peer_id, msg } => {
2200                core::mem::size_of_val(peer_id) + msg.size()
2201            }
2202            Self::GetPooledTransactions { peer_id, request, response } => {
2203                core::mem::size_of_val(peer_id) +
2204                    request.0.len() * core::mem::size_of::<TxHash>() +
2205                    core::mem::size_of_val(response)
2206            }
2207            Self::GetTransactionsHandle(_) => 0,
2208        }
2209    }
2210}
2211
2212#[cfg(test)]
2213mod tests {
2214    use super::*;
2215    use crate::{
2216        test_utils::{
2217            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2218            Testnet,
2219        },
2220        transactions::config::RelaxedEthAnnouncementFilter,
2221        NetworkConfigBuilder, NetworkManager,
2222    };
2223    use alloy_consensus::{TxEip1559, TxLegacy};
2224    use alloy_eips::eip4844::BlobTransactionValidationError;
2225    use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2226    use alloy_rlp::Decodable;
2227    use futures::FutureExt;
2228    use reth_chainspec::MIN_TRANSACTION_GAS;
2229    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2230    use reth_network_api::{NetworkInfo, PeerKind};
2231    use reth_network_p2p::{
2232        error::{RequestError, RequestResult},
2233        sync::{NetworkSyncUpdater, SyncState},
2234    };
2235    use reth_storage_api::noop::NoopProvider;
2236    use reth_tasks::Runtime;
2237    use reth_transaction_pool::{
2238        error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2239        test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2240    };
2241    use secp256k1::SecretKey;
2242    use std::{
2243        collections::HashSet,
2244        future::poll_fn,
2245        net::{IpAddr, Ipv4Addr, SocketAddr},
2246        str::FromStr,
2247    };
2248    use tracing::error;
2249
2250    #[tokio::test(flavor = "multi_thread")]
2251    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2252        reth_tracing::init_test_tracing();
2253        let net = Testnet::create(3).await;
2254
2255        let mut handles = net.handles();
2256        let handle0 = handles.next().unwrap();
2257        let handle1 = handles.next().unwrap();
2258
2259        drop(handles);
2260        let handle = net.spawn();
2261
2262        let listener0 = handle0.event_listener();
2263        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2264        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2265
2266        let client = NoopProvider::default();
2267        let pool = testing_pool();
2268        let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2269            .disable_discovery()
2270            .listener_port(0)
2271            .build(client);
2272        let transactions_manager_config = config.transactions_manager_config.clone();
2273        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2274            .await
2275            .unwrap()
2276            .into_builder()
2277            .transactions(pool.clone(), transactions_manager_config)
2278            .split_with_handle();
2279
2280        tokio::task::spawn(network);
2281
2282        // go to syncing (pipeline sync)
2283        network_handle.update_sync_state(SyncState::Syncing);
2284        assert!(NetworkInfo::is_syncing(&network_handle));
2285        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2286
2287        // wait for all initiator connections
2288        let mut established = listener0.take(2);
2289        while let Some(ev) = established.next().await {
2290            match ev {
2291                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2292                    // to insert a new peer in transactions peerset
2293                    transactions
2294                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2295                }
2296                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2297                ev => {
2298                    error!("unexpected event {ev:?}")
2299                }
2300            }
2301        }
2302        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2303        let input = hex!(
2304            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2305        );
2306        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2307        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2308            peer_id: *handle1.peer_id(),
2309            msg: Transactions(vec![signed_tx.clone()]),
2310        });
2311        poll_fn(|cx| {
2312            let _ = transactions.poll_unpin(cx);
2313            Poll::Ready(())
2314        })
2315        .await;
2316        assert!(pool.is_empty());
2317        handle.terminate().await;
2318    }
2319
2320    #[tokio::test(flavor = "multi_thread")]
2321    async fn test_tx_broadcasts_through_two_syncs() {
2322        reth_tracing::init_test_tracing();
2323        let net = Testnet::create(3).await;
2324
2325        let mut handles = net.handles();
2326        let handle0 = handles.next().unwrap();
2327        let handle1 = handles.next().unwrap();
2328
2329        drop(handles);
2330        let handle = net.spawn();
2331
2332        let listener0 = handle0.event_listener();
2333        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2334        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2335
2336        let client = NoopProvider::default();
2337        let pool = testing_pool();
2338        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2339            .disable_discovery()
2340            .listener_port(0)
2341            .build(client);
2342        let transactions_manager_config = config.transactions_manager_config.clone();
2343        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2344            .await
2345            .unwrap()
2346            .into_builder()
2347            .transactions(pool.clone(), transactions_manager_config)
2348            .split_with_handle();
2349
2350        tokio::task::spawn(network);
2351
2352        // go to syncing (pipeline sync) to idle and then to syncing (live)
2353        network_handle.update_sync_state(SyncState::Syncing);
2354        assert!(NetworkInfo::is_syncing(&network_handle));
2355        network_handle.update_sync_state(SyncState::Idle);
2356        assert!(!NetworkInfo::is_syncing(&network_handle));
2357        network_handle.update_sync_state(SyncState::Syncing);
2358        assert!(NetworkInfo::is_syncing(&network_handle));
2359
2360        // wait for all initiator connections
2361        let mut established = listener0.take(2);
2362        while let Some(ev) = established.next().await {
2363            match ev {
2364                NetworkEvent::ActivePeerSession { .. } |
2365                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2366                    // to insert a new peer in transactions peerset
2367                    transactions.on_network_event(ev);
2368                }
2369                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2370                _ => {
2371                    error!("unexpected event {ev:?}")
2372                }
2373            }
2374        }
2375        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2376        let input = hex!(
2377            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2378        );
2379        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2380        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2381            peer_id: *handle1.peer_id(),
2382            msg: Transactions(vec![signed_tx.clone()]),
2383        });
2384        poll_fn(|cx| {
2385            let _ = transactions.poll_unpin(cx);
2386            Poll::Ready(())
2387        })
2388        .await;
2389        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2390        assert!(NetworkInfo::is_syncing(&network_handle));
2391        assert!(!pool.is_empty());
2392        handle.terminate().await;
2393    }
2394
2395    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2396    // event and is able to retrieve the corresponding transactions.
2397    #[tokio::test(flavor = "multi_thread")]
2398    async fn test_handle_incoming_transactions_hashes() {
2399        reth_tracing::init_test_tracing();
2400
2401        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2402        let client = NoopProvider::default();
2403
2404        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2405            // let OS choose port
2406            .listener_port(0)
2407            .disable_discovery()
2408            .build(client);
2409
2410        let pool = testing_pool();
2411
2412        let transactions_manager_config = config.transactions_manager_config.clone();
2413        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2414            .await
2415            .unwrap()
2416            .into_builder()
2417            .transactions(pool.clone(), transactions_manager_config)
2418            .split_with_handle();
2419
2420        let peer_id_1 = PeerId::new([1; 64]);
2421        let eth_version = EthVersion::Eth66;
2422
2423        let txs = vec![TransactionSigned::new_unhashed(
2424            Transaction::Legacy(TxLegacy {
2425                chain_id: Some(4),
2426                nonce: 15u64,
2427                gas_price: 2200000000,
2428                gas_limit: 34811,
2429                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2430                value: U256::from(1234u64),
2431                input: Default::default(),
2432            }),
2433            Signature::new(
2434                U256::from_str(
2435                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2436                )
2437                .unwrap(),
2438                U256::from_str(
2439                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2440                )
2441                .unwrap(),
2442                true,
2443            ),
2444        )];
2445
2446        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2447
2448        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2449        tx_manager.peers.insert(peer_id_1, peer_1);
2450
2451        assert!(pool.is_empty());
2452
2453        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2454            peer_id: peer_id_1,
2455            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2456                txs_hashes.clone(),
2457            )),
2458        });
2459
2460        // mock session of peer_1 receives request
2461        let req = to_mock_session_rx
2462            .recv()
2463            .await
2464            .expect("peer_1 session should receive request with buffered hashes");
2465        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2466        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2467
2468        let message: Vec<PooledTransactionVariant> = txs
2469            .into_iter()
2470            .map(|tx| {
2471                PooledTransactionVariant::try_from(tx)
2472                    .expect("Failed to convert MockTransaction to PooledTransaction")
2473            })
2474            .collect();
2475
2476        // return the transactions corresponding to the transaction hashes.
2477        response
2478            .send(Ok(PooledTransactions(message)))
2479            .expect("should send peer_1 response to tx manager");
2480
2481        // adance the transaction manager future
2482        poll_fn(|cx| {
2483            let _ = tx_manager.poll_unpin(cx);
2484            Poll::Ready(())
2485        })
2486        .await;
2487
2488        // ensure that the transactions corresponding to the transaction hashes have been
2489        // successfully retrieved and stored in the Pool.
2490        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2491    }
2492
2493    #[tokio::test(flavor = "multi_thread")]
2494    async fn test_handle_incoming_transactions() {
2495        reth_tracing::init_test_tracing();
2496        let net = Testnet::create(3).await;
2497
2498        let mut handles = net.handles();
2499        let handle0 = handles.next().unwrap();
2500        let handle1 = handles.next().unwrap();
2501
2502        drop(handles);
2503        let handle = net.spawn();
2504
2505        let listener0 = handle0.event_listener();
2506
2507        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2508        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2509
2510        let client = NoopProvider::default();
2511        let pool = testing_pool();
2512        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2513            .disable_discovery()
2514            .listener_port(0)
2515            .build(client);
2516        let transactions_manager_config = config.transactions_manager_config.clone();
2517        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2518            .await
2519            .unwrap()
2520            .into_builder()
2521            .transactions(pool.clone(), transactions_manager_config)
2522            .split_with_handle();
2523        tokio::task::spawn(network);
2524
2525        network_handle.update_sync_state(SyncState::Idle);
2526
2527        assert!(!NetworkInfo::is_syncing(&network_handle));
2528
2529        // wait for all initiator connections
2530        let mut established = listener0.take(2);
2531        while let Some(ev) = established.next().await {
2532            match ev {
2533                NetworkEvent::ActivePeerSession { .. } |
2534                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2535                    // to insert a new peer in transactions peerset
2536                    transactions.on_network_event(ev);
2537                }
2538                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2539                ev => {
2540                    error!("unexpected event {ev:?}")
2541                }
2542            }
2543        }
2544        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2545        let input = hex!(
2546            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2547        );
2548        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2549        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2550            peer_id: *handle1.peer_id(),
2551            msg: Transactions(vec![signed_tx.clone()]),
2552        });
2553        assert!(transactions
2554            .transactions_by_peers
2555            .get(signed_tx.tx_hash())
2556            .unwrap()
2557            .contains(handle1.peer_id()));
2558
2559        // advance the transaction manager future
2560        poll_fn(|cx| {
2561            let _ = transactions.poll_unpin(cx);
2562            Poll::Ready(())
2563        })
2564        .await;
2565
2566        assert!(!pool.is_empty());
2567        assert!(pool.get(signed_tx.tx_hash()).is_some());
2568        handle.terminate().await;
2569    }
2570
2571    #[tokio::test(flavor = "multi_thread")]
2572    async fn test_session_closed_cleans_transaction_peer_state() {
2573        let (mut tx_manager, _network) = new_tx_manager().await;
2574        let peer_id = PeerId::new([1; 64]);
2575        let fallback_peer = PeerId::new([2; 64]);
2576        let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2577        let hash_shared = B256::from_slice(&[1; 32]);
2578
2579        tx_manager.peers.insert(peer_id, peer);
2580        buffer_hash_to_tx_fetcher(
2581            &mut tx_manager.transaction_fetcher,
2582            hash_shared,
2583            peer_id,
2584            0,
2585            None,
2586        );
2587        buffer_hash_to_tx_fetcher(
2588            &mut tx_manager.transaction_fetcher,
2589            hash_shared,
2590            fallback_peer,
2591            0,
2592            None,
2593        );
2594        tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2595
2596        tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2597            peer_id,
2598            reason: None,
2599        }));
2600
2601        // peer removed from peers map and active_peers
2602        assert!(!tx_manager.peers.contains_key(&peer_id));
2603        assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2604        // fallback peer is still available for the hash
2605        assert_eq!(
2606            tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2607            Some(&fallback_peer)
2608        );
2609    }
2610
2611    #[tokio::test(flavor = "multi_thread")]
2612    async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2613        let (mut tx_manager, _network) = new_tx_manager().await;
2614        let peer_id = PeerId::new([1; 64]);
2615        let hash = B256::from_slice(&[1; 32]);
2616
2617        tx_manager.network.update_sync_state(SyncState::Idle);
2618        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2619
2620        let err = PoolError::new(
2621            hash,
2622            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2623                BlobTransactionValidationError::InvalidProof,
2624            )),
2625        );
2626
2627        tx_manager.on_bad_import(err);
2628
2629        assert!(!tx_manager.bad_imports.contains(&hash));
2630    }
2631
2632    #[tokio::test(flavor = "multi_thread")]
2633    async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2634        let (mut tx_manager, _network) = new_tx_manager().await;
2635        let peer_id = PeerId::new([1; 64]);
2636        let hash = B256::from_slice(&[3; 32]);
2637
2638        tx_manager.network.update_sync_state(SyncState::Idle);
2639        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2640
2641        let err = PoolError::new(
2642            hash,
2643            InvalidPoolTransactionError::Eip4844(
2644                Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2645            ),
2646        );
2647
2648        tx_manager.on_bad_import(err);
2649
2650        assert!(!tx_manager.bad_imports.contains(&hash));
2651    }
2652
2653    #[tokio::test(flavor = "multi_thread")]
2654    async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2655        let (mut tx_manager, _network) = new_tx_manager().await;
2656        let peer_id = PeerId::new([1; 64]);
2657        let hash = B256::from_slice(&[2; 32]);
2658
2659        tx_manager.network.update_sync_state(SyncState::Idle);
2660        tx_manager.transactions_by_peers.insert(hash, HashSet::from([peer_id]));
2661
2662        let err = PoolError::new(
2663            hash,
2664            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2665        );
2666
2667        tx_manager.on_bad_import(err);
2668
2669        assert!(tx_manager.bad_imports.contains(&hash));
2670    }
2671
2672    #[tokio::test(flavor = "multi_thread")]
2673    async fn test_on_get_pooled_transactions_network() {
2674        reth_tracing::init_test_tracing();
2675        let net = Testnet::create(2).await;
2676
2677        let mut handles = net.handles();
2678        let handle0 = handles.next().unwrap();
2679        let handle1 = handles.next().unwrap();
2680
2681        drop(handles);
2682        let handle = net.spawn();
2683
2684        let listener0 = handle0.event_listener();
2685
2686        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2687        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2688
2689        let client = NoopProvider::default();
2690        let pool = testing_pool();
2691        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2692            .disable_discovery()
2693            .listener_port(0)
2694            .build(client);
2695        let transactions_manager_config = config.transactions_manager_config.clone();
2696        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2697            .await
2698            .unwrap()
2699            .into_builder()
2700            .transactions(pool.clone(), transactions_manager_config)
2701            .split_with_handle();
2702        tokio::task::spawn(network);
2703
2704        network_handle.update_sync_state(SyncState::Idle);
2705
2706        assert!(!NetworkInfo::is_syncing(&network_handle));
2707
2708        // wait for all initiator connections
2709        let mut established = listener0.take(2);
2710        while let Some(ev) = established.next().await {
2711            match ev {
2712                NetworkEvent::ActivePeerSession { .. } |
2713                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2714                    transactions.on_network_event(ev);
2715                }
2716                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2717                ev => {
2718                    error!("unexpected event {ev:?}")
2719                }
2720            }
2721        }
2722        handle.terminate().await;
2723
2724        let tx = MockTransaction::eip1559();
2725        let _ = transactions
2726            .pool
2727            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2728            .await;
2729
2730        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2731
2732        let (send, receive) =
2733            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2734
2735        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2736            peer_id: *handle1.peer_id(),
2737            request,
2738            response: send,
2739        });
2740
2741        match receive.await.unwrap() {
2742            Ok(PooledTransactions(transactions)) => {
2743                assert_eq!(transactions.len(), 1);
2744            }
2745            Err(e) => {
2746                panic!("error: {e:?}");
2747            }
2748        }
2749    }
2750
2751    // Ensure that when the remote peer only returns part of the requested transactions, the
2752    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2753    // re-buffered.
2754    #[tokio::test]
2755    async fn test_partially_tx_response() {
2756        reth_tracing::init_test_tracing();
2757
2758        let mut tx_manager = new_tx_manager().await.0;
2759        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2760
2761        let peer_id_1 = PeerId::new([1; 64]);
2762        let eth_version = EthVersion::Eth66;
2763
2764        let txs = vec![
2765            TransactionSigned::new_unhashed(
2766                Transaction::Legacy(TxLegacy {
2767                    chain_id: Some(4),
2768                    nonce: 15u64,
2769                    gas_price: 2200000000,
2770                    gas_limit: 34811,
2771                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2772                    value: U256::from(1234u64),
2773                    input: Default::default(),
2774                }),
2775                Signature::new(
2776                    U256::from_str(
2777                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2778                    )
2779                    .unwrap(),
2780                    U256::from_str(
2781                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2782                    )
2783                    .unwrap(),
2784                    true,
2785                ),
2786            ),
2787            TransactionSigned::new_unhashed(
2788                Transaction::Eip1559(TxEip1559 {
2789                    chain_id: 4,
2790                    nonce: 26u64,
2791                    max_priority_fee_per_gas: 1500000000,
2792                    max_fee_per_gas: 1500000013,
2793                    gas_limit: MIN_TRANSACTION_GAS,
2794                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2795                    value: U256::from(3000000000000000000u64),
2796                    input: Default::default(),
2797                    access_list: Default::default(),
2798                }),
2799                Signature::new(
2800                    U256::from_str(
2801                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2802                    )
2803                    .unwrap(),
2804                    U256::from_str(
2805                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2806                    )
2807                    .unwrap(),
2808                    true,
2809                ),
2810            ),
2811        ];
2812
2813        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2814
2815        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2816        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2817        // fetch
2818        peer_1.seen_transactions.insert(txs_hashes[0]);
2819        peer_1.seen_transactions.insert(txs_hashes[1]);
2820        tx_manager.peers.insert(peer_id_1, peer_1);
2821
2822        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2823        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2824
2825        // peer_1 is idle
2826        assert!(tx_fetcher.is_idle(&peer_id_1));
2827        assert_eq!(tx_fetcher.active_peers.len(), 0);
2828
2829        // sends requests for buffered hashes to peer_1
2830        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2831
2832        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2833        // as long as request is in flight peer_1 is not idle
2834        assert!(!tx_fetcher.is_idle(&peer_id_1));
2835        assert_eq!(tx_fetcher.active_peers.len(), 1);
2836
2837        // mock session of peer_1 receives request
2838        let req = to_mock_session_rx
2839            .recv()
2840            .await
2841            .expect("peer_1 session should receive request with buffered hashes");
2842        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2843
2844        let message: Vec<PooledTransactionVariant> = txs
2845            .into_iter()
2846            .take(1)
2847            .map(|tx| {
2848                PooledTransactionVariant::try_from(tx)
2849                    .expect("Failed to convert MockTransaction to PooledTransaction")
2850            })
2851            .collect();
2852        // response partial request
2853        response
2854            .send(Ok(PooledTransactions(message)))
2855            .expect("should send peer_1 response to tx manager");
2856        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2857            unreachable!()
2858        };
2859
2860        // request has resolved, peer_1 is idle again
2861        assert!(tx_fetcher.is_idle(&peer_id));
2862        assert_eq!(tx_fetcher.active_peers.len(), 0);
2863        // failing peer_1's request buffers requested hashes for retry.
2864        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2865    }
2866
2867    #[tokio::test]
2868    async fn test_max_retries_tx_request() {
2869        reth_tracing::init_test_tracing();
2870
2871        let mut tx_manager = new_tx_manager().await.0;
2872        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2873
2874        let peer_id_1 = PeerId::new([1; 64]);
2875        let peer_id_2 = PeerId::new([2; 64]);
2876        let eth_version = EthVersion::Eth66;
2877        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2878
2879        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2880        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2881        // fetch
2882        peer_1.seen_transactions.insert(seen_hashes[0]);
2883        peer_1.seen_transactions.insert(seen_hashes[1]);
2884        tx_manager.peers.insert(peer_id_1, peer_1);
2885
2886        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2887        // for first retry in reverse order to make index 0 lru
2888        let retries = 1;
2889        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2890        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2891
2892        // peer_1 is idle
2893        assert!(tx_fetcher.is_idle(&peer_id_1));
2894        assert_eq!(tx_fetcher.active_peers.len(), 0);
2895
2896        // sends request for buffered hashes to peer_1
2897        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2898
2899        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2900
2901        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2902        // as long as request is in inflight peer_1 is not idle
2903        assert!(!tx_fetcher.is_idle(&peer_id_1));
2904        assert_eq!(tx_fetcher.active_peers.len(), 1);
2905
2906        // mock session of peer_1 receives request
2907        let req = to_mock_session_rx
2908            .recv()
2909            .await
2910            .expect("peer_1 session should receive request with buffered hashes");
2911        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2912        let GetPooledTransactions(hashes) = request;
2913
2914        let hashes = hashes.into_iter().collect::<B256Set>();
2915
2916        assert_eq!(hashes, seen_hashes.into_iter().collect::<B256Set>());
2917
2918        // fail request to peer_1
2919        response
2920            .send(Err(RequestError::BadResponse))
2921            .expect("should send peer_1 response to tx manager");
2922        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2923            unreachable!()
2924        };
2925
2926        // request has resolved, peer_1 is idle again
2927        assert!(tx_fetcher.is_idle(&peer_id));
2928        assert_eq!(tx_fetcher.active_peers.len(), 0);
2929        // failing peer_1's request buffers requested hashes for retry
2930        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2931
2932        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2933        tx_manager.peers.insert(peer_id_2, peer_2);
2934
2935        // peer_2 announces same hashes as peer_1
2936        let msg =
2937            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2938        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2939
2940        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2941
2942        // peer_2 should be in active_peers.
2943        assert_eq!(tx_fetcher.active_peers.len(), 1);
2944
2945        // since hashes are already seen, no changes to length of unknown hashes
2946        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2947        // but hashes are taken out of buffer and packed into request to peer_2
2948        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2949
2950        // mock session of peer_2 receives request
2951        let req = to_mock_session_rx
2952            .recv()
2953            .await
2954            .expect("peer_2 session should receive request with buffered hashes");
2955        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2956
2957        // report failed request to tx manager
2958        response
2959            .send(Err(RequestError::BadResponse))
2960            .expect("should send peer_2 response to tx manager");
2961        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2962
2963        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2964        // for retry
2965        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2966        assert_eq!(tx_fetcher.active_peers.len(), 0);
2967    }
2968
2969    #[test]
2970    fn test_transaction_builder_empty() {
2971        let mut builder =
2972            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2973        assert!(builder.is_empty());
2974
2975        let mut factory = MockTransactionFactory::default();
2976        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2977        builder.push(&tx);
2978        assert!(!builder.is_empty());
2979
2980        let txs = builder.build();
2981        assert!(txs.full.is_none());
2982        let txs = txs.pooled.unwrap();
2983        assert_eq!(txs.len(), 1);
2984    }
2985
2986    #[test]
2987    fn test_transaction_builder_large() {
2988        let mut builder =
2989            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2990        assert!(builder.is_empty());
2991
2992        let mut factory = MockTransactionFactory::default();
2993        let mut tx = factory.create_eip1559();
2994        // create a transaction that still fits
2995        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2996        let tx = Arc::new(tx);
2997        let tx = PropagateTransaction::pool_tx(tx);
2998        builder.push(&tx);
2999        assert!(!builder.is_empty());
3000
3001        let txs = builder.clone().build();
3002        assert!(txs.pooled.is_none());
3003        let txs = txs.full.unwrap();
3004        assert_eq!(txs.len(), 1);
3005
3006        builder.push(&tx);
3007
3008        let txs = builder.clone().build();
3009        let pooled = txs.pooled.unwrap();
3010        assert_eq!(pooled.len(), 1);
3011        let txs = txs.full.unwrap();
3012        assert_eq!(txs.len(), 1);
3013    }
3014
3015    #[test]
3016    fn test_transaction_builder_eip4844() {
3017        let mut builder =
3018            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
3019        assert!(builder.is_empty());
3020
3021        let mut factory = MockTransactionFactory::default();
3022        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
3023        builder.push(&tx);
3024        assert!(!builder.is_empty());
3025
3026        let txs = builder.clone().build();
3027        assert!(txs.full.is_none());
3028        let txs = txs.pooled.unwrap();
3029        assert_eq!(txs.len(), 1);
3030
3031        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
3032        builder.push(&tx);
3033
3034        let txs = builder.clone().build();
3035        let pooled = txs.pooled.unwrap();
3036        assert_eq!(pooled.len(), 1);
3037        let txs = txs.full.unwrap();
3038        assert_eq!(txs.len(), 1);
3039    }
3040
3041    #[tokio::test]
3042    async fn test_propagate_full() {
3043        reth_tracing::init_test_tracing();
3044
3045        let (mut tx_manager, network) = new_tx_manager().await;
3046        let peer_id = PeerId::random();
3047
3048        // ensure not syncing
3049        network.handle().update_sync_state(SyncState::Idle);
3050
3051        // mock a peer
3052        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3053
3054        let session_info = SessionInfo {
3055            peer_id,
3056            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3057            client_version: Arc::from(""),
3058            capabilities: Arc::new(vec![].into()),
3059            status: Arc::new(Default::default()),
3060            version: EthVersion::Eth68,
3061            peer_kind: PeerKind::Basic,
3062        };
3063        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3064        tx_manager
3065            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3066        let mut propagate = vec![];
3067        let mut factory = MockTransactionFactory::default();
3068        let eip1559_tx = Arc::new(factory.create_eip1559());
3069        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3070        let eip4844_tx = Arc::new(factory.create_eip4844());
3071        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3072
3073        let propagated =
3074            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3075        assert_eq!(propagated.len(), 2);
3076        let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3077        assert_eq!(prop_txs.len(), 1);
3078        assert!(prop_txs[0].is_full());
3079
3080        let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3081        assert_eq!(prop_txs.len(), 1);
3082        assert!(prop_txs[0].is_hash());
3083
3084        let peer = tx_manager.peers.get(&peer_id).unwrap();
3085        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3086        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3087        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3088
3089        // propagate again
3090        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3091        assert!(propagated.is_empty());
3092    }
3093
3094    #[tokio::test]
3095    async fn test_propagate_pending_txs_while_initially_syncing() {
3096        reth_tracing::init_test_tracing();
3097
3098        let (mut tx_manager, network) = new_tx_manager().await;
3099        let peer_id = PeerId::random();
3100
3101        // Keep the node in initial sync mode.
3102        network.handle().update_sync_state(SyncState::Syncing);
3103        assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3104
3105        // Add a peer so propagation has a destination.
3106        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3107        tx_manager.peers.insert(peer_id, peer);
3108
3109        let tx = MockTransaction::eip1559();
3110        tx_manager
3111            .pool
3112            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3113            .await
3114            .expect("transaction should be accepted into the pool");
3115
3116        tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3117
3118        let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3119        assert!(peer.seen_transactions.contains(tx.get_hash()));
3120    }
3121
3122    #[tokio::test]
3123    async fn test_relaxed_filter_ignores_unknown_tx_types() {
3124        reth_tracing::init_test_tracing();
3125
3126        let transactions_manager_config = TransactionsManagerConfig::default();
3127
3128        let propagation_policy = TransactionPropagationKind::default();
3129        let announcement_policy = RelaxedEthAnnouncementFilter::default();
3130
3131        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3132
3133        let pool = testing_pool();
3134        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3135        let client = NoopProvider::default();
3136
3137        let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3138            .listener_port(0)
3139            .disable_discovery()
3140            .build(client.clone());
3141
3142        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3143        let (to_tx_manager_tx, from_network_rx) =
3144            reth_metrics::common::mpsc::memory_bounded_channel::<
3145                NetworkTransactionEvent<EthNetworkPrimitives>,
3146            >(
3147                crate::transactions::constants::tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
3148                "test_tx_channel",
3149            );
3150        network_manager.set_transactions(to_tx_manager_tx);
3151        let network_handle = network_manager.handle().clone();
3152        let network_service_handle = tokio::spawn(network_manager);
3153
3154        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3155            network_handle.clone(),
3156            pool.clone(),
3157            from_network_rx,
3158            transactions_manager_config,
3159            policy_bundle,
3160        );
3161
3162        let peer_id = PeerId::random();
3163        let eth_version = EthVersion::Eth68;
3164        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3165        tx_manager.peers.insert(peer_id, mock_peer_metadata);
3166
3167        let mut tx_factory = MockTransactionFactory::default();
3168
3169        let valid_known_tx = tx_factory.create_eip1559();
3170        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3171
3172        let known_tx_hash = *known_tx_signed.hash();
3173        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3174        let known_tx_size = known_tx_signed.encoded_length();
3175
3176        let unknown_tx_hash = B256::random();
3177        let unknown_tx_type_byte = 0xff_u8;
3178        let unknown_tx_size = 150;
3179
3180        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3181            types: vec![known_tx_type_byte, unknown_tx_type_byte],
3182            sizes: vec![known_tx_size, unknown_tx_size],
3183            hashes: vec![known_tx_hash, unknown_tx_hash],
3184        });
3185
3186        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3187
3188        poll_fn(|cx| {
3189            let _ = tx_manager.poll_unpin(cx);
3190            Poll::Ready(())
3191        })
3192        .await;
3193
3194        let mut requested_hashes_in_getpooled = B256Set::default();
3195        let mut unexpected_request_received = false;
3196
3197        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3198            .await
3199        {
3200            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3201                let GetPooledTransactions(hashes) = request;
3202                for hash in hashes {
3203                    requested_hashes_in_getpooled.insert(hash);
3204                }
3205                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3206            }
3207            Ok(Some(other_request)) => {
3208                tracing::error!(?other_request, "Received unexpected PeerRequest type");
3209                unexpected_request_received = true;
3210            }
3211            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3212            Err(_timeout_err) => {
3213                tracing::info!("Timeout: No GetPooledTransactions request received.")
3214            }
3215        }
3216
3217        assert!(
3218            requested_hashes_in_getpooled.contains(&known_tx_hash),
3219            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3220        );
3221        assert!(
3222            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3223            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3224        );
3225        assert!(
3226            !unexpected_request_received,
3227            "An unexpected P2P request was received by the mock peer."
3228        );
3229
3230        network_service_handle.abort();
3231    }
3232}