Skip to main content

reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4use rayon::iter::{IntoParallelIterator, ParallelIterator};
5use smallvec::SmallVec;
6
7/// Aggregation on configurable parameters for [`TransactionsManager`].
8pub mod config;
9/// Default and spec'd bounds.
10pub mod constants;
11/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
12pub mod fetcher;
13/// Defines the traits for transaction-related policies.
14pub mod policy;
15
16pub use self::constants::{
17    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
18    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
19};
20use config::AnnouncementAcceptance;
21pub use config::{
22    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
23    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
24};
25use policy::NetworkPolicies;
26
27pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
28
29use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
30use crate::{
31    budget::{
32        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
33        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
34    },
35    cache::LruCache,
36    duration_metered_exec, metered_poll_nested_stream_with_budget,
37    metrics::{AnnouncedTxTypesMetrics, TransactionsManagerMetrics},
38    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
39    NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{
42    map::{B256Map, B256Set, FbBuildHasher},
43    TxHash, B256,
44};
45use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
46use futures::{stream::FuturesUnordered, Future, StreamExt};
47use reth_eth_wire::{
48    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
49    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
50    NewPooledTransactionHashes66, NewPooledTransactionHashes68, NewPooledTransactionHashes72,
51    PooledTransactions, RequestTxHashes, Transactions, ValidAnnouncementData,
52};
53use reth_ethereum_primitives::{TransactionSigned, TxType};
54use reth_metrics::common::mpsc::MemoryBoundedReceiver;
55use reth_network_api::{
56    events::{PeerEvent, SessionInfo},
57    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
58};
59use reth_network_p2p::{
60    error::{RequestError, RequestResult},
61    sync::SyncStateProvider,
62};
63use reth_network_peers::PeerId;
64use reth_network_types::ReputationChangeKind;
65use reth_primitives_traits::{InMemorySize, SignedTransaction};
66use reth_tokio_util::EventStream;
67use reth_transaction_pool::{
68    error::{PoolError, PoolResult},
69    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
70    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
71};
72use std::{
73    collections::{hash_map::Entry, HashMap, HashSet},
74    pin::Pin,
75    sync::{
76        atomic::{AtomicUsize, Ordering},
77        Arc,
78    },
79    task::{Context, Poll},
80    time::{Duration, Instant},
81};
82use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
83use tokio_stream::wrappers::UnboundedReceiverStream;
84use tracing::{debug, trace};
85
86/// The future for importing transactions into the pool.
87///
88/// Resolves with the result of each transaction import.
89pub type PoolImportFuture =
90    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
91
92/// Api to interact with [`TransactionsManager`] task.
93///
94/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
95/// with the [`TransactionsManager`] task once it is spawned.
96///
97/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
98/// known by a specific peer.
99#[derive(Debug, Clone)]
100pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
101    /// Command channel to the [`TransactionsManager`]
102    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
103}
104
105impl<N: NetworkPrimitives> TransactionsHandle<N> {
106    fn send(&self, cmd: TransactionsCommand<N>) {
107        let _ = self.manager_tx.send(cmd);
108    }
109
110    /// Fetch the [`PeerRequestSender`] for the given peer.
111    async fn peer_handle(
112        &self,
113        peer_id: PeerId,
114    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
115        let (tx, rx) = oneshot::channel();
116        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
117        rx.await
118    }
119
120    /// Manually propagate the transaction that belongs to the hash.
121    pub fn propagate(&self, hash: TxHash) {
122        self.send(TransactionsCommand::PropagateHash(hash))
123    }
124
125    /// Manually propagate the transaction hash to a specific peer.
126    ///
127    /// Note: this only propagates if the pool contains the transaction.
128    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
129        self.propagate_hashes_to(Some(hash), peer)
130    }
131
132    /// Manually propagate the transaction hashes to a specific peer.
133    ///
134    /// Note: this only propagates the transactions that are known to the pool.
135    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
136        let hashes = hash.into_iter().collect::<Vec<_>>();
137        if hashes.is_empty() {
138            return
139        }
140        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
141    }
142
143    /// Request the active peer IDs from the [`TransactionsManager`].
144    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
145        let (tx, rx) = oneshot::channel();
146        self.send(TransactionsCommand::GetActivePeers(tx));
147        rx.await
148    }
149
150    /// Manually propagate full transaction hashes to a specific peer.
151    ///
152    /// Do nothing if transactions are empty.
153    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
154        if transactions.is_empty() {
155            return
156        }
157        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
158    }
159
160    /// Manually propagate the given transaction hashes to all peers.
161    ///
162    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
163    /// full.
164    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
165        if transactions.is_empty() {
166            return
167        }
168        self.send(TransactionsCommand::PropagateTransactions(transactions))
169    }
170
171    /// Manually propagate the given transactions to all peers.
172    ///
173    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
174    /// full.
175    pub fn broadcast_transactions(
176        &self,
177        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
178    ) {
179        let transactions =
180            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
181        if transactions.is_empty() {
182            return
183        }
184        self.send(TransactionsCommand::BroadcastTransactions(transactions))
185    }
186
187    /// Request the transaction hashes known by specific peers.
188    pub async fn get_transaction_hashes(
189        &self,
190        peers: Vec<PeerId>,
191    ) -> Result<HashMap<PeerId, B256Set>, RecvError> {
192        if peers.is_empty() {
193            return Ok(Default::default())
194        }
195        let (tx, rx) = oneshot::channel();
196        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
197        rx.await
198    }
199
200    /// Request the transaction hashes known by a specific peer.
201    pub async fn get_peer_transaction_hashes(&self, peer: PeerId) -> Result<B256Set, RecvError> {
202        let res = self.get_transaction_hashes(vec![peer]).await?;
203        Ok(res.into_values().next().unwrap_or_default())
204    }
205
206    /// Requests the transactions directly from the given peer.
207    ///
208    /// Returns `None` if the peer is not connected.
209    ///
210    /// **Note**: this returns the response from the peer as received.
211    pub async fn get_pooled_transactions_from(
212        &self,
213        peer_id: PeerId,
214        hashes: Vec<B256>,
215    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
216        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
217
218        let (tx, rx) = oneshot::channel();
219        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
220        peer.try_send(request).ok();
221
222        rx.await?.map(|res| Some(res.0))
223    }
224}
225
226/// Manages transactions on top of the p2p network.
227///
228/// This can be spawned to another task and is supposed to be run as background service.
229/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
230/// interact with it.
231///
232/// The [`TransactionsManager`] is responsible for:
233///    - handling incoming eth messages for transactions.
234///    - serving transaction requests.
235///    - propagate transactions
236///
237/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
238///   - receives incoming network messages.
239///   - sends messages to dispatch (responses, propagate tx)
240///
241/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
242/// propagate new transactions over the network.
243///
244/// It can be configured with different policies for transaction propagation and announcement
245/// filtering. See [`NetworkPolicies`] for more details.
246///
247/// ## Network Transaction Processing
248///
249/// ### Message Types
250///
251/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
252/// - **`NewPooledTransactionHashes`**: Hash announcements
253///
254/// ### Peer Tracking
255///
256/// - Maintains per-peer transaction cache (default: 10,240 entries)
257/// - Prevents duplicate imports and enables efficient propagation
258///
259/// ### Bad Transaction Handling
260///
261/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
262/// Penalizes peers sending invalid transactions.
263///
264/// ### Import Management
265///
266/// Limits concurrent pool imports and backs off when approaching capacity.
267///
268/// ### Transaction Fetching
269///
270/// For announced transactions: filters known → queues unknown → fetches → imports
271///
272/// ### Propagation Rules
273///
274/// Based on: origin (Local/External/Private), peer capabilities, and network state.
275/// Disabled during initial sync.
276///
277/// ### Security
278///
279/// Rate limiting via reputation, bad transaction isolation, peer scoring.
280#[derive(Debug)]
281#[must_use = "Manager does nothing unless polled."]
282pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
283    /// Access to the transaction pool.
284    pool: Pool,
285    /// Network access.
286    network: NetworkHandle<N>,
287    /// Subscriptions to all network related events.
288    ///
289    /// From which we get all new incoming transaction related messages.
290    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
291    /// Transaction fetcher to handle inflight and missing transaction requests.
292    transaction_fetcher: TransactionFetcher<N>,
293    /// All currently pending transactions grouped by peers.
294    ///
295    /// This way we can track incoming transactions and prevent multiple pool imports for the same
296    /// transaction
297    transactions_by_peers: B256Map<SmallVec<[PeerId; 1]>>,
298    /// Transactions that are currently imported into the `Pool`.
299    ///
300    /// The import process includes:
301    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
302    ///    valid, or for 4844 transaction the blobs are valid. See also
303    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
304    /// - if the transaction is valid, it is added into the pool.
305    ///
306    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
307    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
308    /// receiver.
309    pool_imports: FuturesUnordered<PoolImportFuture>,
310    /// Stats on pending pool imports that help the node self-monitor.
311    pending_pool_imports_info: PendingPoolImportsInfo,
312    /// Bad imports.
313    bad_imports: LruCache<TxHash, FbBuildHasher<32>>,
314    /// All the connected peers.
315    peers: HashMap<PeerId, PeerMetadata<N>>,
316    /// Send half for the command channel.
317    ///
318    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
319    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
320    /// Incoming commands from [`TransactionsHandle`].
321    ///
322    /// This will only receive commands if a user manually sends a command to the manager through
323    /// the [`TransactionsHandle`] to interact with this type directly.
324    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
325    /// A stream that yields new __pending__ transactions.
326    ///
327    /// A transaction is considered __pending__ if it is executable on the current state of the
328    /// chain. In other words, this only yields transactions that satisfy all consensus
329    /// requirements, these include:
330    ///   - no nonce gaps
331    ///   - all dynamic fee requirements are (currently) met
332    ///   - account has enough balance to cover the transaction's gas
333    pending_transactions: mpsc::Receiver<TxHash>,
334    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
335    transaction_events: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
336    /// How the `TransactionsManager` is configured.
337    config: TransactionsManagerConfig,
338    /// Network Policies
339    policies: NetworkPolicies<N>,
340    /// `TransactionsManager` metrics
341    metrics: TransactionsManagerMetrics,
342    /// `AnnouncedTxTypes` metrics
343    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
344}
345
346impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
347    /// Sets up a new instance.
348    ///
349    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
350    pub fn new(
351        network: NetworkHandle<N>,
352        pool: Pool,
353        from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
354        transactions_manager_config: TransactionsManagerConfig,
355    ) -> Self {
356        Self::with_policy(
357            network,
358            pool,
359            from_network,
360            transactions_manager_config,
361            NetworkPolicies::new(
362                TransactionPropagationKind::default(),
363                StrictEthAnnouncementFilter::default(),
364            ),
365        )
366    }
367}
368
369impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
370    /// Sets up a new instance with given the settings.
371    ///
372    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
373    pub fn with_policy(
374        network: NetworkHandle<N>,
375        pool: Pool,
376        from_network: MemoryBoundedReceiver<NetworkTransactionEvent<N>>,
377        transactions_manager_config: TransactionsManagerConfig,
378        policies: NetworkPolicies<N>,
379    ) -> Self {
380        let network_events = network.event_listener();
381
382        let (command_tx, command_rx) = mpsc::unbounded_channel();
383
384        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
385            &transactions_manager_config.transaction_fetcher_config,
386        );
387
388        // install a listener for new __pending__ transactions that are allowed to be propagated
389        // over the network
390        let pending = pool.pending_transactions_listener();
391        let pending_pool_imports_info =
392            PendingPoolImportsInfo::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS);
393        let metrics = TransactionsManagerMetrics::default();
394        metrics
395            .capacity_pending_pool_imports
396            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
397
398        Self {
399            pool,
400            network,
401            network_events,
402            transaction_fetcher,
403            transactions_by_peers: Default::default(),
404            pool_imports: Default::default(),
405            pending_pool_imports_info,
406            bad_imports: LruCache::with_hasher(DEFAULT_MAX_COUNT_BAD_IMPORTS, Default::default()),
407            peers: Default::default(),
408            command_tx,
409            command_rx: UnboundedReceiverStream::new(command_rx),
410            pending_transactions: pending,
411            transaction_events: from_network,
412            config: transactions_manager_config,
413            policies,
414            metrics,
415            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
416        }
417    }
418
419    /// Returns a new handle that can send commands to this type.
420    pub fn handle(&self) -> TransactionsHandle<N> {
421        TransactionsHandle { manager_tx: self.command_tx.clone() }
422    }
423
424    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
425    /// `false` if [`TransactionsManager`] is operating close to full capacity.
426    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
427        self.has_capacity_for_pending_pool_imports() &&
428            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
429    }
430
431    /// Returns `true` if [`TransactionsManager`] has capacity for more pending pool imports.
432    fn has_capacity_for_pending_pool_imports(&self) -> bool {
433        self.remaining_pool_import_capacity() > 0
434    }
435
436    /// Returns the remaining capacity for pending pool imports.
437    fn remaining_pool_import_capacity(&self) -> usize {
438        self.pending_pool_imports_info.max_pending_pool_imports.saturating_sub(
439            self.pending_pool_imports_info.pending_pool_imports.load(Ordering::Relaxed),
440        )
441    }
442
443    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
444        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
445        self.metrics.reported_bad_transactions.increment(1);
446    }
447
448    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
449        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
450        self.network.reputation_change(peer_id, kind);
451    }
452
453    fn report_already_seen(&self, peer_id: PeerId) {
454        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
455        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
456    }
457
458    /// Handles a closed peer session, removing the peer from transaction-local tracking state.
459    fn on_peer_session_closed(&mut self, peer_id: &PeerId) {
460        if let Some(mut peer) = self.peers.remove(peer_id) {
461            self.policies.propagation_policy_mut().on_session_closed(&mut peer);
462        }
463        self.transaction_fetcher.remove_peer(peer_id);
464    }
465
466    /// Clear the transaction
467    fn on_good_import(&mut self, hash: TxHash) {
468        self.transactions_by_peers.remove(&hash);
469    }
470
471    /// Handles a failed transaction import.
472    ///
473    /// Blob sidecar errors (e.g. invalid proof, missing sidecar) are penalized via
474    /// `report_peer_bad_transactions` but NOT cached in `bad_imports` — the transaction itself
475    /// may be valid when fetched from another peer with correct sidecar data.
476    ///
477    /// Other bad transactions are penalized and cached in `bad_imports` to avoid fetching or
478    /// importing them again.
479    ///
480    /// Errors that count as bad transactions are:
481    ///
482    /// - intrinsic gas too low
483    /// - exceeds gas limit
484    /// - gas uint overflow
485    /// - exceeds max init code size
486    /// - oversized data
487    /// - signer account has bytecode
488    /// - chain id mismatch
489    /// - old legacy chain id
490    /// - tx type not supported
491    ///
492    /// (and additionally for blobs txns...)
493    ///
494    /// - no blobs
495    /// - too many blobs
496    /// - invalid kzg proof
497    /// - kzg error
498    /// - not blob transaction (tx type mismatch)
499    /// - wrong versioned kzg commitment hash
500    fn on_bad_import(&mut self, err: PoolError) {
501        let peers = self.transactions_by_peers.remove(&err.hash);
502
503        if err.is_bad_blob_sidecar() {
504            // Blob sidecar errors: penalize but do NOT cache the hash as bad.
505            // The transaction may be valid — only the sidecar from this peer was wrong.
506            // Using regular penalties means repeated offenders still get disconnected.
507            if let Some(peers) = peers {
508                for peer_id in peers {
509                    self.report_peer_bad_transactions(peer_id);
510                }
511            }
512            return
513        }
514
515        // if we're _currently_ syncing, we ignore a bad transaction
516        if !err.is_bad_transaction() || self.network.is_syncing() {
517            return
518        }
519        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
520        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
521        if let Some(peers) = peers {
522            for peer_id in peers {
523                self.report_peer_bad_transactions(peer_id);
524            }
525        }
526        self.metrics.bad_imports.increment(1);
527        self.bad_imports.insert(err.hash);
528    }
529
530    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
531    ///
532    /// Returns `true` if a request was sent.
533    fn on_fetch_hashes_pending_fetch(&mut self) -> bool {
534        // try drain transaction hashes pending fetch
535        let info = &self.pending_pool_imports_info;
536        let max_pending_pool_imports = info.max_pending_pool_imports;
537        let has_capacity_wrt_pending_pool_imports =
538            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
539
540        self.transaction_fetcher
541            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports)
542    }
543
544    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
545        let kind = match req_err {
546            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
547            RequestError::Timeout => ReputationChangeKind::Timeout,
548            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
549                // peer is already disconnected
550                return
551            }
552            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
553        };
554        self.report_peer(peer_id, kind);
555    }
556
557    #[inline]
558    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
559        let metrics = &self.metrics;
560
561        let TxManagerPollDurations {
562            acc_network_events,
563            acc_pending_imports,
564            acc_tx_events,
565            acc_imported_txns,
566            acc_fetch_events,
567            acc_pending_fetch,
568            acc_cmds,
569        } = poll_durations;
570
571        // update metrics for whole poll function
572        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
573        // update metrics for nested expressions
574        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
575        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
576        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
577        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
578        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
579        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
580        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
581    }
582}
583
584impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
585    /// Processes a batch import results.
586    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
587        for res in batch_results {
588            match res {
589                Ok(AddedTransactionOutcome { hash, .. }) => {
590                    self.on_good_import(hash);
591                }
592                Err(err) => {
593                    self.on_bad_import(err);
594                }
595            }
596        }
597    }
598
599    /// Request handler for an incoming `NewPooledTransactionHashes`
600    fn on_new_pooled_transaction_hashes(
601        &mut self,
602        peer_id: PeerId,
603        msg: NewPooledTransactionHashes,
604    ) {
605        // If the node is initially syncing, ignore transactions
606        if self.network.is_initially_syncing() {
607            return
608        }
609        if self.network.tx_gossip_disabled() {
610            return
611        }
612
613        // get handle to peer's session, if the session is still active
614        let Some(peer) = self.peers.get_mut(&peer_id) else {
615            trace!(
616                peer_id = format!("{peer_id:#}"),
617                ?msg,
618                "discarding announcement from inactive peer"
619            );
620
621            return
622        };
623        let client = peer.client_version.clone();
624
625        // keep track of the transactions the peer knows
626        let mut count_txns_already_seen_by_peer = 0;
627        for tx in msg.iter_hashes().copied() {
628            if !peer.seen_transactions.insert(tx) {
629                count_txns_already_seen_by_peer += 1;
630            }
631        }
632        if count_txns_already_seen_by_peer > 0 {
633            // this may occur if transactions are sent or announced to a peer, at the same time as
634            // the peer sends/announces those hashes to us. this is because, marking
635            // txns as seen by a peer is done optimistically upon sending them to the
636            // peer.
637            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
638            self.metrics
639                .occurrences_hash_already_seen_by_peer
640                .increment(count_txns_already_seen_by_peer);
641
642            trace!(target: "net::tx",
643                %count_txns_already_seen_by_peer,
644                peer_id=format!("{peer_id:#}"),
645                ?client,
646                "Peer sent hashes that have already been marked as seen by peer"
647            );
648
649            self.report_already_seen(peer_id);
650        }
651
652        // 1. filter out spam
653        if msg.is_empty() {
654            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
655            return;
656        }
657
658        let original_len = msg.len();
659        let mut partially_valid_msg = msg.dedup();
660
661        if partially_valid_msg.len() != original_len {
662            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
663        }
664
665        // 2. filter out transactions pending import to pool
666        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
667
668        // 3. filter out invalid entries (spam)
669        //
670        // validates messages with respect to the given network, e.g. allowed tx types.
671        // done before the pool lookup since these are cheap in-memory checks that shrink
672        // the set before acquiring the pool lock.
673        //
674        let mut should_report_peer = false;
675        let mut tx_types_counter = TxTypesCounter::default();
676
677        let has_eth68_metadata = partially_valid_msg
678            .msg_version()
679            .expect("partially valid announcement should have a version")
680            .has_eth68_metadata();
681
682        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
683            let (ty_byte, size_val) = match *metadata_ref_mut {
684                Some((ty, size)) => {
685                    if !has_eth68_metadata {
686                        should_report_peer = true;
687                    }
688                    (ty, size)
689                }
690                None => {
691                    if has_eth68_metadata {
692                        should_report_peer = true;
693                        return false;
694                    }
695                    (0u8, 0)
696                }
697            };
698
699            if has_eth68_metadata && let Some((actual_ty_byte, _)) = *metadata_ref_mut {
700                match TxType::try_from(actual_ty_byte) {
701                    Ok(parsed_tx_type) => tx_types_counter.increase_by_tx_type(parsed_tx_type),
702                    Err(_) => tx_types_counter.increase_other(),
703                }
704            }
705
706            let decision = self
707                .policies
708                .announcement_filter()
709                .decide_on_announcement(ty_byte, tx_hash, size_val);
710
711            match decision {
712                AnnouncementAcceptance::Accept => true,
713                AnnouncementAcceptance::Ignore => false,
714                AnnouncementAcceptance::Reject { penalize_peer } => {
715                    if penalize_peer {
716                        should_report_peer = true;
717                    }
718                    false
719                }
720            }
721        });
722
723        if has_eth68_metadata {
724            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
725        }
726
727        if should_report_peer {
728            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
729        }
730
731        // 4. filter out known hashes
732        //
733        // known txns have already been successfully fetched or received over gossip.
734        //
735        // most hashes will be filtered out here since the mempool protocol is a gossip
736        // protocol, healthy peers will send many of the same hashes.
737        //
738        let hashes_count_pre_pool_filter = partially_valid_msg.len();
739        self.pool.retain_unknown(&mut partially_valid_msg);
740        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
741            let already_known_hashes_count =
742                hashes_count_pre_pool_filter - partially_valid_msg.len();
743            self.metrics
744                .occurrences_hashes_already_in_pool
745                .increment(already_known_hashes_count as u64);
746        }
747
748        if partially_valid_msg.is_empty() {
749            // nothing to request
750            return
751        }
752
753        let mut valid_announcement_data =
754            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
755
756        if valid_announcement_data.is_empty() {
757            // no valid announcement data
758            return
759        }
760
761        // 5. filter out already seen unknown hashes
762        //
763        // seen hashes are already in the tx fetcher, pending fetch.
764        //
765        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
766        // fetcher, hence they should be valid at this point.
767        let bad_imports = &self.bad_imports;
768        self.transaction_fetcher.filter_unseen_and_pending_hashes(
769            &mut valid_announcement_data,
770            |hash| bad_imports.contains(hash),
771            &peer_id,
772            &client,
773        );
774
775        if valid_announcement_data.is_empty() {
776            // nothing to request
777            return
778        }
779
780        trace!(target: "net::tx::propagation",
781            peer_id=format!("{peer_id:#}"),
782            hashes_len=valid_announcement_data.len(),
783            hashes=?valid_announcement_data.keys(),
784            msg_version=%valid_announcement_data.msg_version(),
785            client_version=%client,
786            "received previously unseen and pending hashes in announcement from peer"
787        );
788
789        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
790        // fallback
791        if !self.transaction_fetcher.is_idle(&peer_id) {
792            // load message version before announcement data is destructed in packing
793            let msg_version = valid_announcement_data.msg_version();
794            let (hashes, _version) = valid_announcement_data.into_request_hashes();
795
796            trace!(target: "net::tx",
797                peer_id=format!("{peer_id:#}"),
798                hashes=?*hashes,
799                %msg_version,
800                %client,
801                "buffering hashes announced by busy peer"
802            );
803
804            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
805
806            return
807        }
808
809        let mut hashes_to_request =
810            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
811        let surplus_hashes =
812            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
813
814        if !surplus_hashes.is_empty() {
815            trace!(target: "net::tx",
816                peer_id=format!("{peer_id:#}"),
817                surplus_hashes=?*surplus_hashes,
818                %client,
819                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
820            );
821
822            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
823        }
824
825        trace!(target: "net::tx",
826            peer_id=format!("{peer_id:#}"),
827            hashes=?*hashes_to_request,
828            %client,
829            "sending hashes in `GetPooledTransactions` request to peer's session"
830        );
831
832        // request the missing transactions
833        //
834        // get handle to peer's session again, at this point we know it exists
835        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
836        if let Some(failed_to_request_hashes) =
837            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
838        {
839            let conn_eth_version = peer.version;
840
841            trace!(target: "net::tx",
842                peer_id=format!("{peer_id:#}"),
843                failed_to_request_hashes=?*failed_to_request_hashes,
844                %conn_eth_version,
845                %client,
846                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
847            );
848            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
849        }
850    }
851}
852
853impl<Pool, N> TransactionsManager<Pool, N>
854where
855    Pool: TransactionPool + Unpin + 'static,
856    N: NetworkPrimitives<
857            BroadcastedTransaction: SignedTransaction,
858            PooledTransaction: SignedTransaction,
859        > + Unpin,
860    Pool::Transaction:
861        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
862{
863    /// Invoked when transactions in the local mempool are considered __pending__.
864    ///
865    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
866    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
867    /// messages. The Transactions message relays complete transaction objects and is typically
868    /// sent to a small, random fraction of connected peers.
869    ///
870    /// All other peers receive a notification of the transaction hash and can request the
871    /// complete transaction object if it is unknown to them. The dissemination of complete
872    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
873    /// and won't need to request it.
874    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
875        // We intentionally do not gate this on initial sync.
876        // During initial sync we skip importing tx announcements from peers in
877        // `on_new_pooled_transaction_hashes`, so transactions reaching this path are local.
878        if self.network.tx_gossip_disabled() {
879            return
880        }
881
882        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
883
884        self.propagate_all(hashes);
885    }
886
887    /// Propagate the full transactions to a specific peer.
888    ///
889    /// Returns the propagated transactions.
890    fn propagate_full_transactions_to_peer(
891        &mut self,
892        txs: Vec<TxHash>,
893        peer_id: PeerId,
894        propagation_mode: PropagationMode,
895    ) -> Option<PropagatedTransactions> {
896        let peer = self.peers.get_mut(&peer_id)?;
897        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
898        let mut propagated = PropagatedTransactions::default();
899
900        // filter all transactions unknown to the peer
901        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
902
903        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
904
905        if propagation_mode.is_forced() {
906            // skip cache check if forced
907            full_transactions.extend(to_propagate);
908        } else {
909            // Iterate through the transactions to propagate and fill the hashes and full
910            // transaction
911            for tx in to_propagate {
912                if !peer.seen_transactions.contains(tx.tx_hash()) {
913                    // Only include if the peer hasn't seen the transaction
914                    full_transactions.push(&tx);
915                }
916            }
917        }
918
919        if full_transactions.is_empty() {
920            // nothing to propagate
921            return None
922        }
923
924        let PropagateTransactions { pooled, full } = full_transactions.build();
925
926        // send hashes if any
927        if let Some(new_pooled_hashes) = pooled {
928            for hash in new_pooled_hashes.iter_hashes().copied() {
929                propagated.record(hash, PropagateKind::Hash(peer_id));
930                // mark transaction as seen by peer
931                peer.seen_transactions.insert(hash);
932            }
933
934            // send hashes of transactions
935            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
936        }
937
938        // send full transactions, if any
939        if let Some(new_full_transactions) = full {
940            for tx in &new_full_transactions {
941                propagated.record(*tx.tx_hash(), PropagateKind::Full(peer_id));
942                // mark transaction as seen by peer
943                peer.seen_transactions.insert(*tx.tx_hash());
944            }
945
946            // send full transactions
947            self.network.send_transactions(peer_id, new_full_transactions);
948        }
949
950        // Update propagated transactions metrics
951        self.metrics.propagated_transactions.increment(propagated.len() as u64);
952
953        Some(propagated)
954    }
955
956    /// Propagate the transaction hashes to the given peer
957    ///
958    /// Note: This will only send the hashes for transactions that exist in the pool.
959    fn propagate_hashes_to(
960        &mut self,
961        hashes: Vec<TxHash>,
962        peer_id: PeerId,
963        propagation_mode: PropagationMode,
964    ) {
965        trace!(target: "net::tx", "Start propagating transactions as hashes");
966
967        // This fetches a transactions from the pool, including the blob transactions, which are
968        // only ever sent as hashes.
969        let propagated = {
970            let Some(peer) = self.peers.get_mut(&peer_id) else {
971                // no such peer
972                return
973            };
974
975            let to_propagate =
976                self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx);
977
978            let mut propagated = PropagatedTransactions::default();
979
980            // check if transaction is known to peer
981            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
982
983            if propagation_mode.is_forced() {
984                hashes.extend(to_propagate)
985            } else {
986                for tx in to_propagate {
987                    if !peer.seen_transactions.contains(tx.tx_hash()) {
988                        // Include if the peer hasn't seen it
989                        hashes.push(&tx);
990                    }
991                }
992            }
993
994            let new_pooled_hashes = hashes.build();
995
996            if new_pooled_hashes.is_empty() {
997                // nothing to propagate
998                return
999            }
1000
1001            for hash in new_pooled_hashes.iter_hashes().copied() {
1002                propagated.record(hash, PropagateKind::Hash(peer_id));
1003                peer.seen_transactions.insert(hash);
1004            }
1005
1006            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
1007
1008            // send hashes of transactions
1009            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
1010
1011            // Update propagated transactions metrics
1012            self.metrics.propagated_transactions.increment(propagated.len() as u64);
1013
1014            propagated
1015        };
1016
1017        // notify pool so events get fired
1018        self.pool.on_propagated(propagated);
1019    }
1020
1021    /// Propagate the transactions to all connected peers either as full objects or hashes.
1022    ///
1023    /// The message for new pooled hashes depends on the negotiated version of the stream.
1024    /// See [`NewPooledTransactionHashes`]
1025    ///
1026    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1027    fn propagate_transactions(
1028        &mut self,
1029        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1030        propagation_mode: PropagationMode,
1031    ) -> PropagatedTransactions {
1032        let mut propagated = PropagatedTransactions::default();
1033        if self.network.tx_gossip_disabled() {
1034            return propagated
1035        }
1036
1037        // send full transactions to a set of the connected peers based on the configured mode
1038        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1039
1040        // Note: Assuming ~random~ order due to random state of the peers map hasher
1041        let mut num_full_peers = 0;
1042        for (peer_id, peer) in &mut self.peers {
1043            if !self.policies.propagation_policy().can_propagate(peer) {
1044                // skip peers we should not propagate to
1045                continue
1046            }
1047            // determine whether to send full tx objects or hashes.
1048            let mut builder = if num_full_peers < max_num_full {
1049                num_full_peers += 1;
1050                PropagateTransactionsBuilder::full(peer.version, to_propagate.len())
1051            } else {
1052                PropagateTransactionsBuilder::pooled(peer.version, to_propagate.len())
1053            };
1054
1055            // Transactions are optimistically marked as seen by the peer when included in the
1056            // message, see `PeerMetadata::seen_transactions`.
1057            if propagation_mode.is_forced() {
1058                for tx in &to_propagate {
1059                    peer.seen_transactions.insert(*tx.tx_hash());
1060                    builder.push(tx);
1061                }
1062            } else {
1063                // Iterate through the transactions to propagate and fill the hashes and full
1064                // transaction lists, before deciding whether or not to send full transactions to
1065                // the peer.
1066                for tx in &to_propagate {
1067                    // Only include the transaction if the peer hasn't seen it yet
1068                    if peer.seen_transactions.insert(*tx.tx_hash()) {
1069                        builder.push(tx);
1070                    }
1071                }
1072            }
1073
1074            if builder.is_empty() {
1075                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1076                continue
1077            }
1078
1079            let PropagateTransactions { pooled, full } = builder.build();
1080
1081            // send hashes if any
1082            if let Some(mut new_pooled_hashes) = pooled {
1083                // Unhappy path: too many hashes for a single message. This should not happen
1084                // during regular propagation, which is capped at the soft limit per batch, and
1085                // is only reachable via manual propagation commands with oversized batches.
1086                if new_pooled_hashes.len() >
1087                    SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE
1088                {
1089                    // hashes that exceed the limit are not sent, so they must not be tracked as
1090                    // seen by the peer
1091                    for hash in new_pooled_hashes
1092                        .iter_hashes()
1093                        .skip(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE)
1094                    {
1095                        peer.seen_transactions.remove(hash);
1096                    }
1097                    new_pooled_hashes.truncate(
1098                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1099                    );
1100                }
1101
1102                for hash in new_pooled_hashes.iter_hashes().copied() {
1103                    propagated.record(hash, PropagateKind::Hash(*peer_id));
1104                }
1105
1106                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1107
1108                // send hashes of transactions
1109                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1110            }
1111
1112            // send full transactions, if any
1113            if let Some(new_full_transactions) = full {
1114                for tx in &new_full_transactions {
1115                    propagated.record(*tx.tx_hash(), PropagateKind::Full(*peer_id));
1116                }
1117
1118                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1119
1120                // send full transactions
1121                self.network.send_transactions(*peer_id, new_full_transactions);
1122            }
1123        }
1124
1125        // Update propagated transactions metrics
1126        self.metrics.propagated_transactions.increment(propagated.len() as u64);
1127
1128        propagated
1129    }
1130
1131    /// Propagates the given transactions to the peers
1132    ///
1133    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1134    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1135    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1136        if self.peers.is_empty() {
1137            // nothing to propagate
1138            return
1139        }
1140        let propagated = self.propagate_transactions(
1141            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1142            PropagationMode::Basic,
1143        );
1144
1145        // notify pool so events get fired
1146        self.pool.on_propagated(propagated);
1147    }
1148
1149    /// Request handler for an incoming request for transactions
1150    fn on_get_pooled_transactions(
1151        &mut self,
1152        peer_id: PeerId,
1153        request: GetPooledTransactions,
1154        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1155    ) {
1156        // fast exit if gossip is disabled
1157        if self.network.tx_gossip_disabled() {
1158            let _ = response.send(Ok(PooledTransactions::default()));
1159            return
1160        }
1161        if let Some(peer) = self.peers.get_mut(&peer_id) {
1162            let transactions = self.pool.get_pooled_transaction_elements(
1163                request.0,
1164                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1165                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1166                ),
1167            );
1168            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1169
1170            // we sent a response at which point we assume that the peer is aware of the
1171            // transactions
1172            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1173
1174            let resp = PooledTransactions(transactions);
1175            let _ = response.send(Ok(resp));
1176        }
1177    }
1178
1179    /// Handles a command received from a detached [`TransactionsHandle`]
1180    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1181        match cmd {
1182            TransactionsCommand::PropagateHash(hash) => {
1183                self.on_new_pending_transactions(vec![hash])
1184            }
1185            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1186                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1187            }
1188            TransactionsCommand::GetActivePeers(tx) => {
1189                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1190                tx.send(peers).ok();
1191            }
1192            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1193                if let Some(propagated) =
1194                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1195                {
1196                    self.pool.on_propagated(propagated);
1197                }
1198            }
1199            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1200            TransactionsCommand::BroadcastTransactions(txs) => {
1201                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1202                self.pool.on_propagated(propagated);
1203            }
1204            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1205                let mut res = HashMap::with_capacity(peers.len());
1206                for peer_id in peers {
1207                    let hashes = self
1208                        .peers
1209                        .get(&peer_id)
1210                        .map(|peer| peer.seen_transactions.iter().copied().collect::<B256Set>())
1211                        .unwrap_or_default();
1212                    res.insert(peer_id, hashes);
1213                }
1214                tx.send(res).ok();
1215            }
1216            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1217                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1218                peer_request_sender.send(sender).ok();
1219            }
1220        }
1221    }
1222
1223    /// Handles session establishment and peer transactions initialization.
1224    ///
1225    /// This is invoked when a new session is established.
1226    fn handle_peer_session(
1227        &mut self,
1228        info: SessionInfo,
1229        messages: PeerRequestSender<PeerRequest<N>>,
1230    ) {
1231        let SessionInfo { peer_id, client_version, version, .. } = info;
1232
1233        // Insert a new peer into the peerset.
1234        let peer = PeerMetadata::<N>::new(
1235            messages,
1236            version,
1237            client_version,
1238            self.config.max_transactions_seen_by_peer_history,
1239            info.peer_kind,
1240        );
1241        let peer = match self.peers.entry(peer_id) {
1242            Entry::Occupied(mut entry) => {
1243                entry.insert(peer);
1244                entry.into_mut()
1245            }
1246            Entry::Vacant(entry) => entry.insert(peer),
1247        };
1248
1249        self.policies.propagation_policy_mut().on_session_established(peer);
1250
1251        // Send a `NewPooledTransactionHashes` to the peer with up to
1252        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1253        // transactions in the pool.
1254        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1255            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1256            return
1257        }
1258
1259        // Get transactions to broadcast
1260        let pooled_txs = self.pool.pooled_transactions_max(
1261            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1262        );
1263        if pooled_txs.is_empty() {
1264            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1265            return;
1266        }
1267
1268        // Build and send transaction hashes message
1269        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1270        for pooled_tx in pooled_txs {
1271            peer.seen_transactions.insert(*pooled_tx.hash());
1272            msg_builder.push_pooled(pooled_tx);
1273        }
1274
1275        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.len(), "Broadcasting transaction hashes");
1276        let msg = msg_builder.build();
1277        self.network.send_transactions_hashes(peer_id, msg);
1278    }
1279
1280    /// Handles a received event related to common network events.
1281    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1282        match event_result {
1283            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1284                self.on_peer_session_closed(&peer_id);
1285            }
1286            NetworkEvent::ActivePeerSession { info, messages } => {
1287                // process active peer session and broadcast available transaction from the pool
1288                self.handle_peer_session(info, messages);
1289            }
1290            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1291                let peer_id = info.peer_id;
1292                // get messages from existing peer
1293                let messages = match self.peers.get(&peer_id) {
1294                    Some(p) => p.request_tx.clone(),
1295                    None => {
1296                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1297                        return;
1298                    }
1299                };
1300                self.handle_peer_session(info, messages);
1301            }
1302            _ => {}
1303        }
1304    }
1305
1306    /// Returns true if the ingress policy allows processing messages from the given peer.
1307    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1308        if self.config.ingress_policy.allows_all() {
1309            return true;
1310        }
1311        let Some(peer) = self.peers.get(peer_id) else {
1312            return false;
1313        };
1314        self.config.ingress_policy.allows(peer.peer_kind())
1315    }
1316
1317    /// Handles dedicated transaction events related to the `eth` protocol.
1318    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1319        match event {
1320            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1321                if !self.accepts_incoming_from(&peer_id) {
1322                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1323                    return;
1324                }
1325
1326                // ensure we didn't receive any blob transactions as these are disallowed to be
1327                // broadcasted in full
1328
1329                let has_blob_txs = msg.has_eip4844();
1330
1331                let non_blob_txs = msg
1332                    .into_iter()
1333                    .map(N::PooledTransaction::try_from)
1334                    .filter_map(Result::ok)
1335                    .collect();
1336
1337                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1338
1339                if has_blob_txs {
1340                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1341                    self.report_peer_bad_transactions(peer_id);
1342                }
1343            }
1344            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1345                if !self.accepts_incoming_from(&peer_id) {
1346                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1347                    return;
1348                }
1349                self.on_new_pooled_transaction_hashes(peer_id, msg)
1350            }
1351            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1352                self.on_get_pooled_transactions(peer_id, request, response)
1353            }
1354            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1355                let _ = response.send(Some(self.handle()));
1356            }
1357        }
1358    }
1359
1360    /// Starts the import process for the given transactions.
1361    fn import_transactions(
1362        &mut self,
1363        peer_id: PeerId,
1364        transactions: PooledTransactions<N::PooledTransaction>,
1365        source: TransactionSource,
1366    ) {
1367        // If the node is pipeline syncing, ignore transactions
1368        if self.network.is_initially_syncing() {
1369            return
1370        }
1371        if self.network.tx_gossip_disabled() {
1372            return
1373        }
1374
1375        // Early return if we don't have capacity for any imports
1376        if !self.has_capacity_for_pending_pool_imports() {
1377            return
1378        }
1379
1380        let mut transactions = transactions.0;
1381
1382        // Truncate to remaining capacity early to bound work on all subsequent processing.
1383        // Well-behaved peers follow the 4096 soft limit, so oversized payloads are likely
1384        // malicious and we avoid wasting CPU on them.
1385        let capacity = self.remaining_pool_import_capacity();
1386        if transactions.len() > capacity {
1387            let skipped = transactions.len() - capacity;
1388            transactions.truncate(capacity);
1389            self.metrics
1390                .skipped_transactions_pending_pool_imports_at_capacity
1391                .increment(skipped as u64);
1392            trace!(target: "net::tx", skipped, capacity, "Truncated transactions batch to capacity");
1393        }
1394
1395        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1396        let client_version = peer.client_version.clone();
1397
1398        let start = Instant::now();
1399
1400        // mark the transactions as received
1401        self.transaction_fetcher
1402            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1403
1404        // track that the peer knows these transaction, but only if this is a new broadcast.
1405        // If we received the transactions as the response to our `GetPooledTransactions``
1406        // requests (based on received `NewPooledTransactionHashes`) then we already
1407        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1408        let mut num_already_seen_by_peer = 0;
1409        for tx in &transactions {
1410            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1411                num_already_seen_by_peer += 1;
1412            }
1413        }
1414
1415        // tracks the quality of the given transactions
1416        let mut has_bad_transactions = false;
1417
1418        // 1. Remove known, already-tracked, and invalid transactions first since these are
1419        // cheap in-memory checks against local maps
1420        transactions.retain(|tx| {
1421            if let Entry::Occupied(mut entry) = self.transactions_by_peers.entry(*tx.tx_hash()) {
1422                let peers = entry.get_mut();
1423                if !peers.contains(&peer_id) {
1424                    peers.push(peer_id);
1425                }
1426                return false
1427            }
1428            if self.bad_imports.contains(tx.tx_hash()) {
1429                trace!(target: "net::tx",
1430                    peer_id=format!("{peer_id:#}"),
1431                    hash=%tx.tx_hash(),
1432                    %client_version,
1433                    "received a known bad transaction from peer"
1434                );
1435                has_bad_transactions = true;
1436                return false;
1437            }
1438            true
1439        });
1440
1441        // 2. filter out txns already inserted into pool
1442        let txns_count_pre_pool_filter = transactions.len();
1443        self.pool.retain_unknown(&mut transactions);
1444        if txns_count_pre_pool_filter > transactions.len() {
1445            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1446            self.metrics
1447                .occurrences_transactions_already_in_pool
1448                .increment(already_known_txns_count as u64);
1449        }
1450
1451        let txs_len = transactions.len();
1452
1453        let new_txs = transactions
1454            .into_par_iter()
1455            .filter_map(|tx| match tx.try_into_recovered() {
1456                Ok(tx) => Some(Pool::Transaction::from_pooled(tx)),
1457                Err(badtx) => {
1458                    trace!(target: "net::tx",
1459                        peer_id=format!("{peer_id:#}"),
1460                        hash=%badtx.tx_hash(),
1461                        client_version=%client_version,
1462                        "failed ecrecovery for transaction"
1463                    );
1464                    None
1465                }
1466            })
1467            .collect::<Vec<_>>();
1468
1469        has_bad_transactions |= new_txs.len() != txs_len;
1470
1471        // Record the transactions as seen by the peer
1472        for tx in &new_txs {
1473            self.transactions_by_peers.insert(*tx.hash(), smallvec::smallvec![peer_id]);
1474        }
1475
1476        // 3. import new transactions as a batch to minimize lock contention on the underlying
1477        // pool
1478        if !new_txs.is_empty() {
1479            let pool = self.pool.clone();
1480            // update metrics
1481            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1482            metric_pending_pool_imports.increment(new_txs.len() as f64);
1483
1484            // update self-monitoring info
1485            self.pending_pool_imports_info
1486                .pending_pool_imports
1487                .fetch_add(new_txs.len(), Ordering::Relaxed);
1488            let tx_manager_info_pending_pool_imports =
1489                self.pending_pool_imports_info.pending_pool_imports.clone();
1490
1491            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1492            let import = Box::pin(async move {
1493                let added = new_txs.len();
1494                let res = pool.add_external_transactions(new_txs).await;
1495
1496                // update metrics
1497                metric_pending_pool_imports.decrement(added as f64);
1498                // update self-monitoring info
1499                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1500
1501                res
1502            });
1503
1504            self.pool_imports.push(import);
1505        }
1506
1507        if num_already_seen_by_peer > 0 {
1508            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1509            self.metrics
1510                .occurrences_of_transaction_already_seen_by_peer
1511                .increment(num_already_seen_by_peer);
1512            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=%client_version, "Peer sent already seen transactions");
1513        }
1514
1515        if has_bad_transactions {
1516            // peer sent us invalid transactions
1517            self.report_peer_bad_transactions(peer_id)
1518        }
1519
1520        if num_already_seen_by_peer > 0 {
1521            self.report_already_seen(peer_id);
1522        }
1523
1524        self.metrics.pool_import_prepare_duration.record(start.elapsed());
1525    }
1526
1527    /// Processes a [`FetchEvent`].
1528    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1529        match fetch_event {
1530            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1531                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1532                if report_peer {
1533                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1534                }
1535            }
1536            FetchEvent::FetchError { peer_id, error } => {
1537                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1538                self.on_request_error(peer_id, error);
1539            }
1540            FetchEvent::EmptyResponse { peer_id } => {
1541                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1542            }
1543        }
1544    }
1545}
1546
1547/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1548/// [`crate::NetworkManager`] for more context on the design pattern.
1549///
1550/// This should be spawned or used as part of `tokio::select!`.
1551//
1552// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1553// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1554impl<
1555        Pool: TransactionPool + Unpin + 'static,
1556        N: NetworkPrimitives<
1557                BroadcastedTransaction: SignedTransaction,
1558                PooledTransaction: SignedTransaction,
1559            > + Unpin,
1560    > Future for TransactionsManager<Pool, N>
1561where
1562    Pool::Transaction:
1563        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1564{
1565    type Output = ();
1566
1567    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1568        let start = Instant::now();
1569        let mut poll_durations = TxManagerPollDurations::default();
1570
1571        let this = self.get_mut();
1572
1573        // All streams are polled until their corresponding budget is exhausted, then we manually
1574        // yield back control to tokio. See `NetworkManager` for more context on the design
1575        // pattern.
1576
1577        // Advance network/peer related events (update peers map).
1578        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1579            poll_durations.acc_network_events,
1580            "net::tx",
1581            "Network events stream",
1582            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1583            this.network_events.poll_next_unpin(cx),
1584            |event| this.on_network_event(event)
1585        );
1586
1587        // Advance incoming transaction events (stream new txns/announcements from
1588        // network manager and queue for import to pool/fetch txns).
1589        //
1590        // This will potentially remove hashes from hashes pending fetch, it the event
1591        // is an announcement (if same hashes are announced that didn't fit into a
1592        // previous request).
1593        //
1594        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1595        // (128 KiB / 10 bytes > 13k transactions).
1596        //
1597        // If this is an event with `Transactions` message, since transactions aren't
1598        // validated until they are inserted into the pool, this can potentially queue
1599        // >13k transactions for insertion to pool. More if the message size is bigger
1600        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1601        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1602            poll_durations.acc_tx_events,
1603            "net::tx",
1604            "Network transaction events stream",
1605            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1606            this.transaction_events.poll_next_unpin(cx),
1607            |event: NetworkTransactionEvent<N>| this.on_network_tx_event(event),
1608        );
1609
1610        // Advance inflight fetch requests (flush transaction fetcher and queue for
1611        // import to pool).
1612        //
1613        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1614        // (2 MiB / 10 bytes > 200k transactions).
1615        //
1616        // Since transactions aren't validated until they are inserted into the pool,
1617        // this can potentially queue >200k transactions for insertion to pool. More
1618        // if the message size is bigger than the soft limit on a `PooledTransactions`
1619        // response which is 2 MiB.
1620        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1621            poll_durations.acc_fetch_events,
1622            "net::tx",
1623            "Transaction fetch events stream",
1624            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1625            this.transaction_fetcher.poll_next_unpin(cx),
1626            |event| this.on_fetch_event(event),
1627        );
1628
1629        // Advance pool imports (flush txns to pool).
1630        //
1631        // Note, this is done in batches. A batch is filled from one `Transactions`
1632        // broadcast messages or one `PooledTransactions` response at a time. The
1633        // minimum batch size is 1 transaction (and might often be the case with blob
1634        // transactions).
1635        //
1636        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1637        // (2 MiB / 10 bytes > 200k transactions).
1638        //
1639        // Since transactions aren't validated until they are inserted into the pool,
1640        // this can potentially validate >200k transactions. More if the message size
1641        // is bigger than the soft limit on a `PooledTransactions` response which is
1642        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1643        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1644            poll_durations.acc_pending_imports,
1645            "net::tx",
1646            "Batched pool imports stream",
1647            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1648            this.pool_imports.poll_next_unpin(cx),
1649            |batch_results| this.on_batch_import_result(batch_results)
1650        );
1651
1652        // Advances new __pending__ transactions, transactions that were successfully inserted into
1653        // pending set in pool (are valid), and propagates them (inform peers which
1654        // transactions we have seen).
1655        //
1656        // This is polled after pool imports so transactions that became pending in this poll
1657        // iteration are propagated immediately, instead of waiting for the task to be woken
1658        // again.
1659        //
1660        // We try to drain this to batch the transactions in a single message.
1661        //
1662        // We don't expect this buffer to be large, since only pending transactions are
1663        // emitted here.
1664        let mut new_txs = Vec::new();
1665        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1666            cx,
1667            &mut new_txs,
1668            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1669        ) {
1670            Poll::Ready(count) => {
1671                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1672                    // we filled the entire buffer capacity and need to try again on the next poll
1673                    // immediately
1674                    true
1675                } else {
1676                    // try once more, because mostlikely the channel is now empty and the waker is
1677                    // registered if this is pending, if we filled additional hashes, we poll again
1678                    // on the next iteration
1679                    let limit =
1680                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1681                            new_txs.len();
1682                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1683                }
1684            }
1685            Poll::Pending => false,
1686        };
1687        if !new_txs.is_empty() {
1688            this.on_new_pending_transactions(new_txs);
1689        }
1690
1691        // Tries to drain hashes pending fetch cache if the tx manager currently has
1692        // capacity for this (fetch txns).
1693        //
1694        // Sends at most one request.
1695        duration_metered_exec!(
1696            {
1697                if this.has_capacity_for_fetching_pending_hashes() &&
1698                    this.on_fetch_hashes_pending_fetch()
1699                {
1700                    maybe_more_tx_fetch_events = true;
1701                }
1702            },
1703            poll_durations.acc_pending_fetch
1704        );
1705
1706        // Advance commands (propagate/fetch/serve txns).
1707        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1708            poll_durations.acc_cmds,
1709            "net::tx",
1710            "Commands channel",
1711            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1712            this.command_rx.poll_next_unpin(cx),
1713            |cmd| this.on_command(cmd)
1714        );
1715
1716        this.transaction_fetcher.update_metrics();
1717
1718        // all channels are fully drained and import futures pending
1719        if maybe_more_network_events ||
1720            maybe_more_commands ||
1721            maybe_more_tx_events ||
1722            maybe_more_tx_fetch_events ||
1723            maybe_more_pool_imports ||
1724            maybe_more_pending_txns
1725        {
1726            // make sure we're woken up again
1727            cx.waker().wake_by_ref();
1728            return Poll::Pending
1729        }
1730
1731        this.update_poll_metrics(start, poll_durations);
1732
1733        Poll::Pending
1734    }
1735}
1736
1737/// Represents the different modes of transaction propagation.
1738///
1739/// This enum is used to determine how transactions are propagated to peers in the network.
1740#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1741enum PropagationMode {
1742    /// Default propagation mode.
1743    ///
1744    /// Transactions are only sent to peers that haven't seen them yet.
1745    Basic,
1746    /// Forced propagation mode.
1747    ///
1748    /// Transactions are sent to all peers regardless of whether they have been sent or received
1749    /// before.
1750    Forced,
1751}
1752
1753impl PropagationMode {
1754    /// Returns `true` if the propagation kind is `Forced`.
1755    const fn is_forced(self) -> bool {
1756        matches!(self, Self::Forced)
1757    }
1758}
1759
1760/// A transaction that's about to be propagated to multiple peers.
1761#[derive(Debug, Clone)]
1762struct PropagateTransaction<T = TransactionSigned> {
1763    size: usize,
1764    transaction: Arc<T>,
1765}
1766
1767impl<T: SignedTransaction> PropagateTransaction<T> {
1768    /// Create a new instance from a transaction.
1769    pub fn new(transaction: T) -> Self {
1770        let size = transaction.length();
1771        Self { size, transaction: Arc::new(transaction) }
1772    }
1773
1774    /// Create a new instance from a pooled transaction
1775    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1776    where
1777        P: PoolTransaction<Consensus = T>,
1778    {
1779        let size = tx.encoded_length();
1780        let transaction = tx.transaction.clone_into_consensus();
1781        let transaction = Arc::new(transaction.into_inner());
1782        Self { size, transaction }
1783    }
1784
1785    fn tx_hash(&self) -> &TxHash {
1786        self.transaction.tx_hash()
1787    }
1788}
1789
1790/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1791/// should receive them in full or as pooled
1792#[derive(Debug, Clone)]
1793enum PropagateTransactionsBuilder<T> {
1794    Pooled(PooledTransactionsHashesBuilder),
1795    Full(FullTransactionsBuilder<T>),
1796}
1797
1798impl<T> PropagateTransactionsBuilder<T> {
1799    /// Create a builder for pooled transactions with capacity for the expected number of
1800    /// transactions.
1801    fn pooled(version: EthVersion, capacity: usize) -> Self {
1802        Self::Pooled(PooledTransactionsHashesBuilder::with_capacity(version, capacity))
1803    }
1804
1805    /// Create a builder that sends transactions in full and records transactions that don't fit,
1806    /// with capacity for the expected number of transactions.
1807    fn full(version: EthVersion, capacity: usize) -> Self {
1808        Self::Full(FullTransactionsBuilder::with_capacity(version, capacity))
1809    }
1810
1811    /// Returns true if no transactions are recorded.
1812    fn is_empty(&self) -> bool {
1813        match self {
1814            Self::Pooled(builder) => builder.is_empty(),
1815            Self::Full(builder) => builder.is_empty(),
1816        }
1817    }
1818
1819    /// Consumes the type and returns the built messages that should be sent to the peer.
1820    fn build(self) -> PropagateTransactions<T> {
1821        match self {
1822            Self::Pooled(pooled) => {
1823                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1824            }
1825            Self::Full(full) => full.build(),
1826        }
1827    }
1828}
1829
1830impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1831    /// Appends a transaction to the list.
1832    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1833        match self {
1834            Self::Pooled(builder) => builder.push(transaction),
1835            Self::Full(builder) => builder.push(transaction),
1836        }
1837    }
1838}
1839
1840/// Represents how the transactions should be sent to a peer if any.
1841struct PropagateTransactions<T> {
1842    /// The pooled transaction hashes to send.
1843    pooled: Option<NewPooledTransactionHashes>,
1844    /// The transactions to send in full.
1845    full: Option<Vec<Arc<T>>>,
1846}
1847
1848/// Helper type for constructing the full transaction message that enforces the
1849/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1850/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1851/// broadcasted in full.
1852#[derive(Debug, Clone)]
1853struct FullTransactionsBuilder<T> {
1854    /// The soft limit to enforce for a single broadcast message of full transactions.
1855    total_size: usize,
1856    /// All transactions to be broadcasted.
1857    transactions: Vec<Arc<T>>,
1858    /// Transactions that didn't fit into the broadcast message
1859    pooled: PooledTransactionsHashesBuilder,
1860}
1861
1862impl<T> FullTransactionsBuilder<T> {
1863    /// Create a builder for the negotiated version of the peer's session
1864    fn new(version: EthVersion) -> Self {
1865        Self {
1866            total_size: 0,
1867            pooled: PooledTransactionsHashesBuilder::new(version),
1868            transactions: vec![],
1869        }
1870    }
1871
1872    /// Create a builder with capacity for the expected number of full transactions.
1873    ///
1874    /// The overflow hashes builder remains lazily allocated since most transactions are expected
1875    /// to be broadcast in full.
1876    fn with_capacity(version: EthVersion, capacity: usize) -> Self {
1877        Self {
1878            total_size: 0,
1879            pooled: PooledTransactionsHashesBuilder::new(version),
1880            transactions: Vec::with_capacity(capacity),
1881        }
1882    }
1883
1884    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1885    fn is_empty(&self) -> bool {
1886        self.transactions.is_empty() && self.pooled.is_empty()
1887    }
1888
1889    /// Returns the messages that should be propagated to the peer.
1890    fn build(self) -> PropagateTransactions<T> {
1891        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1892        let full = Some(self.transactions).filter(|full| !full.is_empty());
1893        PropagateTransactions { pooled, full }
1894    }
1895}
1896
1897impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1898    /// Appends all transactions.
1899    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1900        for tx in txs {
1901            self.push(&tx)
1902        }
1903    }
1904
1905    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1906    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1907    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1908    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1909    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1910    ///
1911    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1912    /// to list of pooled transactions, (e.g. 4844 transactions).
1913    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1914    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1915        // Do not send full 4844 transaction hashes to peers.
1916        //
1917        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1918        //  Instead, those transactions are only announced using
1919        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1920        //  via `GetPooledTransactions`.
1921        //
1922        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1923        if !transaction.transaction.is_broadcastable_in_full() {
1924            self.pooled.push(transaction);
1925            return
1926        }
1927
1928        let new_size = self.total_size + transaction.size;
1929        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1930            self.total_size > 0
1931        {
1932            // transaction does not fit into the message
1933            self.pooled.push(transaction);
1934            return
1935        }
1936
1937        self.total_size = new_size;
1938        self.transactions.push(Arc::clone(&transaction.transaction));
1939    }
1940}
1941
1942/// A helper type to create the pooled transactions message based on the negotiated version of the
1943/// session with the peer
1944#[derive(Debug, Clone)]
1945enum PooledTransactionsHashesBuilder {
1946    Eth66(NewPooledTransactionHashes66),
1947    Eth68(NewPooledTransactionHashes68),
1948    Eth72(NewPooledTransactionHashes72),
1949}
1950
1951// === impl PooledTransactionsHashesBuilder ===
1952
1953impl PooledTransactionsHashesBuilder {
1954    /// Push a transaction from the pool to the list.
1955    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1956        match self {
1957            Self::Eth66(msg) => msg.push(*pooled_tx.hash()),
1958            Self::Eth68(msg) => {
1959                msg.hashes.push(*pooled_tx.hash());
1960                msg.sizes.push(pooled_tx.encoded_length());
1961                msg.types.push(pooled_tx.transaction.ty());
1962            }
1963            Self::Eth72(msg) => {
1964                msg.hashes.push(*pooled_tx.hash());
1965                msg.sizes.push(pooled_tx.encoded_length());
1966                msg.types.push(pooled_tx.transaction.ty());
1967            }
1968        }
1969    }
1970
1971    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1972    fn is_empty(&self) -> bool {
1973        match self {
1974            Self::Eth66(hashes) => hashes.is_empty(),
1975            Self::Eth68(hashes) => hashes.is_empty(),
1976            Self::Eth72(hashes) => hashes.is_empty(),
1977        }
1978    }
1979
1980    /// Returns the number of transactions in the builder.
1981    fn len(&self) -> usize {
1982        match self {
1983            Self::Eth66(hashes) => hashes.len(),
1984            Self::Eth68(hashes) => hashes.len(),
1985            Self::Eth72(hashes) => hashes.len(),
1986        }
1987    }
1988
1989    /// Appends all hashes
1990    fn extend<T: SignedTransaction>(
1991        &mut self,
1992        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1993    ) {
1994        for tx in txs {
1995            self.push(&tx);
1996        }
1997    }
1998
1999    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
2000        match self {
2001            Self::Eth66(msg) => msg.push(*tx.tx_hash()),
2002            Self::Eth68(msg) => {
2003                msg.hashes.push(*tx.tx_hash());
2004                msg.sizes.push(tx.size);
2005                msg.types.push(tx.transaction.ty());
2006            }
2007            Self::Eth72(msg) => {
2008                msg.hashes.push(*tx.tx_hash());
2009                msg.sizes.push(tx.size);
2010                msg.types.push(tx.transaction.ty());
2011            }
2012        }
2013    }
2014
2015    /// Create a builder for the negotiated version of the peer's session
2016    fn new(version: EthVersion) -> Self {
2017        match version {
2018            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
2019            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
2020                Self::Eth68(Default::default())
2021            }
2022            EthVersion::Eth72 => Self::Eth72(Default::default()),
2023        }
2024    }
2025
2026    /// Create a builder for the negotiated version of the peer's session with capacity for the
2027    /// expected number of hashes.
2028    fn with_capacity(version: EthVersion, capacity: usize) -> Self {
2029        match version {
2030            EthVersion::Eth66 | EthVersion::Eth67 => {
2031                Self::Eth66(NewPooledTransactionHashes66::with_capacity(capacity))
2032            }
2033            EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71 => {
2034                Self::Eth68(NewPooledTransactionHashes68::with_capacity(capacity))
2035            }
2036            EthVersion::Eth72 => Self::Eth72(NewPooledTransactionHashes72::with_capacity(capacity)),
2037        }
2038    }
2039
2040    fn build(self) -> NewPooledTransactionHashes {
2041        match self {
2042            Self::Eth66(mut msg) => {
2043                msg.shrink_to_fit();
2044                msg.into()
2045            }
2046            Self::Eth68(mut msg) => {
2047                msg.shrink_to_fit();
2048                msg.into()
2049            }
2050            Self::Eth72(mut msg) => {
2051                msg.shrink_to_fit();
2052                msg.into()
2053            }
2054        }
2055    }
2056}
2057
2058/// How we received the transactions.
2059enum TransactionSource {
2060    /// Transactions were broadcast to us via [`Transactions`] message.
2061    Broadcast,
2062    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
2063    Response,
2064}
2065
2066// === impl TransactionSource ===
2067
2068impl TransactionSource {
2069    /// Whether the transaction were sent as broadcast.
2070    const fn is_broadcast(&self) -> bool {
2071        matches!(self, Self::Broadcast)
2072    }
2073}
2074
2075/// Tracks a single peer in the context of [`TransactionsManager`].
2076#[derive(Debug)]
2077pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
2078    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
2079    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
2080    /// the peer.
2081    seen_transactions: LruCache<TxHash, FbBuildHasher<32>>,
2082    /// A communication channel directly to the peer's session task.
2083    request_tx: PeerRequestSender<PeerRequest<N>>,
2084    /// negotiated version of the session.
2085    version: EthVersion,
2086    /// The peer's client version.
2087    client_version: Arc<str>,
2088    /// The kind of peer.
2089    peer_kind: PeerKind,
2090}
2091
2092impl<N: NetworkPrimitives> PeerMetadata<N> {
2093    /// Returns a new instance of [`PeerMetadata`].
2094    pub fn new(
2095        request_tx: PeerRequestSender<PeerRequest<N>>,
2096        version: EthVersion,
2097        client_version: Arc<str>,
2098        max_transactions_seen_by_peer: u32,
2099        peer_kind: PeerKind,
2100    ) -> Self {
2101        Self {
2102            seen_transactions: LruCache::with_hasher(
2103                max_transactions_seen_by_peer,
2104                Default::default(),
2105            ),
2106            request_tx,
2107            version,
2108            client_version,
2109            peer_kind,
2110        }
2111    }
2112
2113    /// Returns a reference to the peer's request sender channel.
2114    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
2115        &self.request_tx
2116    }
2117
2118    /// Returns a mutable reference to the seen transactions LRU cache.
2119    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash, FbBuildHasher<32>> {
2120        &mut self.seen_transactions
2121    }
2122
2123    /// Returns the negotiated `EthVersion` of the session.
2124    pub const fn version(&self) -> EthVersion {
2125        self.version
2126    }
2127
2128    /// Returns a reference to the peer's client version string.
2129    pub fn client_version(&self) -> &str {
2130        &self.client_version
2131    }
2132
2133    /// Returns the peer's kind.
2134    pub const fn peer_kind(&self) -> PeerKind {
2135        self.peer_kind
2136    }
2137}
2138
2139/// Commands to send to the [`TransactionsManager`]
2140#[derive(Debug)]
2141enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2142    /// Propagate a transaction hash to the network.
2143    PropagateHash(B256),
2144    /// Propagate transaction hashes to a specific peer.
2145    PropagateHashesTo(Vec<B256>, PeerId),
2146    /// Request the list of active peer IDs from the [`TransactionsManager`].
2147    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2148    /// Propagate a collection of full transactions to a specific peer.
2149    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2150    /// Propagate a collection of hashes to all peers.
2151    PropagateTransactions(Vec<TxHash>),
2152    /// Propagate a collection of broadcastable transactions in full to all peers.
2153    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2154    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2155    GetTransactionHashes { peers: Vec<PeerId>, tx: oneshot::Sender<HashMap<PeerId, B256Set>> },
2156    /// Requests a clone of the sender channel to the peer.
2157    GetPeerSender {
2158        peer_id: PeerId,
2159        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2160    },
2161}
2162
2163/// All events related to transactions emitted by the network.
2164#[derive(Debug)]
2165pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2166    /// Represents the event of receiving a list of transactions from a peer.
2167    ///
2168    /// This indicates transactions that were broadcasted to us from the peer.
2169    IncomingTransactions {
2170        /// The ID of the peer from which the transactions were received.
2171        peer_id: PeerId,
2172        /// The received transactions.
2173        msg: Transactions<N::BroadcastedTransaction>,
2174    },
2175    /// Represents the event of receiving a list of transaction hashes from a peer.
2176    IncomingPooledTransactionHashes {
2177        /// The ID of the peer from which the transaction hashes were received.
2178        peer_id: PeerId,
2179        /// The received new pooled transaction hashes.
2180        msg: NewPooledTransactionHashes,
2181    },
2182    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2183    GetPooledTransactions {
2184        /// The ID of the peer from which the request was received.
2185        peer_id: PeerId,
2186        /// The received `GetPooledTransactions` request.
2187        request: GetPooledTransactions,
2188        /// The sender for responding to the request with a result of `PooledTransactions`.
2189        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2190    },
2191    /// Represents the event of receiving a `GetTransactionsHandle` request.
2192    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2193}
2194
2195/// Tracks stats about the [`TransactionsManager`].
2196#[derive(Debug)]
2197pub struct PendingPoolImportsInfo {
2198    /// Number of transactions about to be inserted into the pool.
2199    pending_pool_imports: Arc<AtomicUsize>,
2200    /// Max number of transactions allowed to be imported concurrently.
2201    max_pending_pool_imports: usize,
2202}
2203
2204impl PendingPoolImportsInfo {
2205    /// Returns a new [`PendingPoolImportsInfo`].
2206    pub fn new(max_pending_pool_imports: usize) -> Self {
2207        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2208    }
2209
2210    /// Returns `true` if the number of pool imports is under a given tolerated max.
2211    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2212        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2213    }
2214}
2215
2216impl Default for PendingPoolImportsInfo {
2217    fn default() -> Self {
2218        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2219    }
2220}
2221
2222#[derive(Debug, Default)]
2223struct TxManagerPollDurations {
2224    acc_network_events: Duration,
2225    acc_pending_imports: Duration,
2226    acc_tx_events: Duration,
2227    acc_imported_txns: Duration,
2228    acc_fetch_events: Duration,
2229    acc_pending_fetch: Duration,
2230    acc_cmds: Duration,
2231}
2232
2233impl<N: NetworkPrimitives> InMemorySize for NetworkTransactionEvent<N> {
2234    // `N::BroadcastedTransaction` and `N::PooledTransaction` already implement
2235    // `InMemorySize` via `SignedTransaction: InMemorySize`, so no extra bound is needed.
2236    fn size(&self) -> usize {
2237        match self {
2238            Self::IncomingTransactions { peer_id, msg } => {
2239                core::mem::size_of_val(peer_id) +
2240                    msg.0.iter().map(InMemorySize::size).sum::<usize>()
2241            }
2242            Self::IncomingPooledTransactionHashes { peer_id, msg } => {
2243                core::mem::size_of_val(peer_id) + msg.size()
2244            }
2245            Self::GetPooledTransactions { peer_id, request, response } => {
2246                core::mem::size_of_val(peer_id) +
2247                    request.0.len() * core::mem::size_of::<TxHash>() +
2248                    core::mem::size_of_val(response)
2249            }
2250            Self::GetTransactionsHandle(_) => 0,
2251        }
2252    }
2253}
2254
2255#[cfg(test)]
2256mod tests {
2257    use super::*;
2258    use crate::{
2259        test_utils::{
2260            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2261            Testnet,
2262        },
2263        transactions::config::RelaxedEthAnnouncementFilter,
2264        NetworkConfigBuilder, NetworkManager,
2265    };
2266    use alloy_consensus::{TxEip1559, TxLegacy};
2267    use alloy_eips::eip4844::BlobTransactionValidationError;
2268    use alloy_primitives::{hex, Signature, TxKind, B256, U256};
2269    use alloy_rlp::Decodable;
2270    use futures::FutureExt;
2271    use reth_chainspec::MIN_TRANSACTION_GAS;
2272    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2273    use reth_network_api::{NetworkInfo, PeerKind};
2274    use reth_network_p2p::{
2275        error::{RequestError, RequestResult},
2276        sync::{NetworkSyncUpdater, SyncState},
2277    };
2278    use reth_storage_api::noop::NoopProvider;
2279    use reth_tasks::Runtime;
2280    use reth_transaction_pool::{
2281        error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError},
2282        test_utils::{testing_pool, MockTransaction, MockTransactionFactory, TestPool},
2283    };
2284    use secp256k1::SecretKey;
2285    use std::{
2286        future::poll_fn,
2287        net::{IpAddr, Ipv4Addr, SocketAddr},
2288        str::FromStr,
2289    };
2290    use tracing::error;
2291
2292    #[tokio::test(flavor = "multi_thread")]
2293    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2294        reth_tracing::init_test_tracing();
2295        let net = Testnet::create(3).await;
2296
2297        let mut handles = net.handles();
2298        let handle0 = handles.next().unwrap();
2299        let handle1 = handles.next().unwrap();
2300
2301        drop(handles);
2302        let handle = net.spawn();
2303
2304        let listener0 = handle0.event_listener();
2305        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2306        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2307
2308        let client = NoopProvider::default();
2309        let pool = testing_pool();
2310        let config = NetworkConfigBuilder::eth(secret_key, Runtime::test())
2311            .disable_discovery()
2312            .listener_port(0)
2313            .build(client);
2314        let transactions_manager_config = config.transactions_manager_config.clone();
2315        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2316            .await
2317            .unwrap()
2318            .into_builder()
2319            .transactions(pool.clone(), transactions_manager_config)
2320            .split_with_handle();
2321
2322        tokio::task::spawn(network);
2323
2324        // go to syncing (pipeline sync)
2325        network_handle.update_sync_state(SyncState::Syncing);
2326        assert!(NetworkInfo::is_syncing(&network_handle));
2327        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2328
2329        // wait for all initiator connections
2330        let mut established = listener0.take(2);
2331        while let Some(ev) = established.next().await {
2332            match ev {
2333                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2334                    // to insert a new peer in transactions peerset
2335                    transactions
2336                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2337                }
2338                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2339                ev => {
2340                    error!("unexpected event {ev:?}")
2341                }
2342            }
2343        }
2344        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2345        let input = hex!(
2346            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2347        );
2348        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2349        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2350            peer_id: *handle1.peer_id(),
2351            msg: Transactions(vec![signed_tx.clone()]),
2352        });
2353        poll_fn(|cx| {
2354            let _ = transactions.poll_unpin(cx);
2355            Poll::Ready(())
2356        })
2357        .await;
2358        assert!(pool.is_empty());
2359        handle.terminate().await;
2360    }
2361
2362    #[tokio::test(flavor = "multi_thread")]
2363    async fn test_tx_broadcasts_through_two_syncs() {
2364        reth_tracing::init_test_tracing();
2365        let net = Testnet::create(3).await;
2366
2367        let mut handles = net.handles();
2368        let handle0 = handles.next().unwrap();
2369        let handle1 = handles.next().unwrap();
2370
2371        drop(handles);
2372        let handle = net.spawn();
2373
2374        let listener0 = handle0.event_listener();
2375        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2376        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2377
2378        let client = NoopProvider::default();
2379        let pool = testing_pool();
2380        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2381            .disable_discovery()
2382            .listener_port(0)
2383            .build(client);
2384        let transactions_manager_config = config.transactions_manager_config.clone();
2385        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2386            .await
2387            .unwrap()
2388            .into_builder()
2389            .transactions(pool.clone(), transactions_manager_config)
2390            .split_with_handle();
2391
2392        tokio::task::spawn(network);
2393
2394        // go to syncing (pipeline sync) to idle and then to syncing (live)
2395        network_handle.update_sync_state(SyncState::Syncing);
2396        assert!(NetworkInfo::is_syncing(&network_handle));
2397        network_handle.update_sync_state(SyncState::Idle);
2398        assert!(!NetworkInfo::is_syncing(&network_handle));
2399        network_handle.update_sync_state(SyncState::Syncing);
2400        assert!(NetworkInfo::is_syncing(&network_handle));
2401
2402        // wait for all initiator connections
2403        let mut established = listener0.take(2);
2404        while let Some(ev) = established.next().await {
2405            match ev {
2406                NetworkEvent::ActivePeerSession { .. } |
2407                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2408                    // to insert a new peer in transactions peerset
2409                    transactions.on_network_event(ev);
2410                }
2411                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2412                _ => {
2413                    error!("unexpected event {ev:?}")
2414                }
2415            }
2416        }
2417        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2418        let input = hex!(
2419            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2420        );
2421        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2422        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2423            peer_id: *handle1.peer_id(),
2424            msg: Transactions(vec![signed_tx.clone()]),
2425        });
2426        poll_fn(|cx| {
2427            let _ = transactions.poll_unpin(cx);
2428            Poll::Ready(())
2429        })
2430        .await;
2431        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2432        assert!(NetworkInfo::is_syncing(&network_handle));
2433        assert!(!pool.is_empty());
2434        handle.terminate().await;
2435    }
2436
2437    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2438    // event and is able to retrieve the corresponding transactions.
2439    #[tokio::test(flavor = "multi_thread")]
2440    async fn test_handle_incoming_transactions_hashes() {
2441        reth_tracing::init_test_tracing();
2442
2443        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2444        let client = NoopProvider::default();
2445
2446        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2447            // let OS choose port
2448            .listener_port(0)
2449            .disable_discovery()
2450            .build(client);
2451
2452        let pool = testing_pool();
2453
2454        let transactions_manager_config = config.transactions_manager_config.clone();
2455        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2456            .await
2457            .unwrap()
2458            .into_builder()
2459            .transactions(pool.clone(), transactions_manager_config)
2460            .split_with_handle();
2461
2462        let peer_id_1 = PeerId::new([1; 64]);
2463        let eth_version = EthVersion::Eth66;
2464
2465        let txs = vec![TransactionSigned::new_unhashed(
2466            Transaction::Legacy(TxLegacy {
2467                chain_id: Some(4),
2468                nonce: 15u64,
2469                gas_price: 2200000000,
2470                gas_limit: 34811,
2471                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2472                value: U256::from(1234u64),
2473                input: Default::default(),
2474            }),
2475            Signature::new(
2476                U256::from_str(
2477                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2478                )
2479                .unwrap(),
2480                U256::from_str(
2481                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2482                )
2483                .unwrap(),
2484                true,
2485            ),
2486        )];
2487
2488        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2489
2490        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2491        tx_manager.peers.insert(peer_id_1, peer_1);
2492
2493        assert!(pool.is_empty());
2494
2495        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2496            peer_id: peer_id_1,
2497            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2498                txs_hashes.clone(),
2499            )),
2500        });
2501
2502        // mock session of peer_1 receives request
2503        let req = to_mock_session_rx
2504            .recv()
2505            .await
2506            .expect("peer_1 session should receive request with buffered hashes");
2507        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2508        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2509
2510        let message: Vec<PooledTransactionVariant> = txs
2511            .into_iter()
2512            .map(|tx| {
2513                PooledTransactionVariant::try_from(tx)
2514                    .expect("Failed to convert MockTransaction to PooledTransaction")
2515            })
2516            .collect();
2517
2518        // return the transactions corresponding to the transaction hashes.
2519        response
2520            .send(Ok(PooledTransactions(message)))
2521            .expect("should send peer_1 response to tx manager");
2522
2523        // adance the transaction manager future
2524        poll_fn(|cx| {
2525            let _ = tx_manager.poll_unpin(cx);
2526            Poll::Ready(())
2527        })
2528        .await;
2529
2530        // ensure that the transactions corresponding to the transaction hashes have been
2531        // successfully retrieved and stored in the Pool.
2532        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2533    }
2534
2535    #[tokio::test(flavor = "multi_thread")]
2536    async fn test_handle_incoming_transactions() {
2537        reth_tracing::init_test_tracing();
2538        let net = Testnet::create(3).await;
2539
2540        let mut handles = net.handles();
2541        let handle0 = handles.next().unwrap();
2542        let handle1 = handles.next().unwrap();
2543
2544        drop(handles);
2545        let handle = net.spawn();
2546
2547        let listener0 = handle0.event_listener();
2548
2549        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2550        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2551
2552        let client = NoopProvider::default();
2553        let pool = testing_pool();
2554        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2555            .disable_discovery()
2556            .listener_port(0)
2557            .build(client);
2558        let transactions_manager_config = config.transactions_manager_config.clone();
2559        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2560            .await
2561            .unwrap()
2562            .into_builder()
2563            .transactions(pool.clone(), transactions_manager_config)
2564            .split_with_handle();
2565        tokio::task::spawn(network);
2566
2567        network_handle.update_sync_state(SyncState::Idle);
2568
2569        assert!(!NetworkInfo::is_syncing(&network_handle));
2570
2571        // wait for all initiator connections
2572        let mut established = listener0.take(2);
2573        while let Some(ev) = established.next().await {
2574            match ev {
2575                NetworkEvent::ActivePeerSession { .. } |
2576                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2577                    // to insert a new peer in transactions peerset
2578                    transactions.on_network_event(ev);
2579                }
2580                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2581                ev => {
2582                    error!("unexpected event {ev:?}")
2583                }
2584            }
2585        }
2586        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2587        let input = hex!(
2588            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2589        );
2590        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2591        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2592            peer_id: *handle1.peer_id(),
2593            msg: Transactions(vec![signed_tx.clone()]),
2594        });
2595        assert!(transactions
2596            .transactions_by_peers
2597            .get(signed_tx.tx_hash())
2598            .unwrap()
2599            .contains(handle1.peer_id()));
2600
2601        // advance the transaction manager future
2602        poll_fn(|cx| {
2603            let _ = transactions.poll_unpin(cx);
2604            Poll::Ready(())
2605        })
2606        .await;
2607
2608        assert!(!pool.is_empty());
2609        assert!(pool.get(signed_tx.tx_hash()).is_some());
2610        handle.terminate().await;
2611    }
2612
2613    #[tokio::test(flavor = "multi_thread")]
2614    async fn test_session_closed_cleans_transaction_peer_state() {
2615        let (mut tx_manager, _network) = new_tx_manager().await;
2616        let peer_id = PeerId::new([1; 64]);
2617        let fallback_peer = PeerId::new([2; 64]);
2618        let (peer, _) = new_mock_session(peer_id, EthVersion::Eth66);
2619        let hash_shared = B256::from_slice(&[1; 32]);
2620
2621        tx_manager.peers.insert(peer_id, peer);
2622        buffer_hash_to_tx_fetcher(
2623            &mut tx_manager.transaction_fetcher,
2624            hash_shared,
2625            peer_id,
2626            0,
2627            None,
2628        );
2629        buffer_hash_to_tx_fetcher(
2630            &mut tx_manager.transaction_fetcher,
2631            hash_shared,
2632            fallback_peer,
2633            0,
2634            None,
2635        );
2636        tx_manager.transaction_fetcher.active_peers.insert(peer_id, 1);
2637
2638        tx_manager.on_network_event(NetworkEvent::Peer(PeerEvent::SessionClosed {
2639            peer_id,
2640            reason: None,
2641        }));
2642
2643        // peer removed from peers map and active_peers
2644        assert!(!tx_manager.peers.contains_key(&peer_id));
2645        assert!(tx_manager.transaction_fetcher.active_peers.peek(&peer_id).is_none());
2646        // fallback peer is still available for the hash
2647        assert_eq!(
2648            tx_manager.transaction_fetcher.get_idle_peer_for(hash_shared),
2649            Some(&fallback_peer)
2650        );
2651    }
2652
2653    #[tokio::test(flavor = "multi_thread")]
2654    async fn test_bad_blob_sidecar_not_cached_as_bad_import() {
2655        let (mut tx_manager, _network) = new_tx_manager().await;
2656        let peer_id = PeerId::new([1; 64]);
2657        let hash = B256::from_slice(&[1; 32]);
2658
2659        tx_manager.network.update_sync_state(SyncState::Idle);
2660        tx_manager.transactions_by_peers.insert(hash, smallvec::smallvec![peer_id]);
2661
2662        let err = PoolError::new(
2663            hash,
2664            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::InvalidEip4844Blob(
2665                BlobTransactionValidationError::InvalidProof,
2666            )),
2667        );
2668
2669        tx_manager.on_bad_import(err);
2670
2671        assert!(!tx_manager.bad_imports.contains(&hash));
2672    }
2673
2674    #[tokio::test(flavor = "multi_thread")]
2675    async fn test_missing_blob_sidecar_not_cached_as_bad_import() {
2676        let (mut tx_manager, _network) = new_tx_manager().await;
2677        let peer_id = PeerId::new([1; 64]);
2678        let hash = B256::from_slice(&[3; 32]);
2679
2680        tx_manager.network.update_sync_state(SyncState::Idle);
2681        tx_manager.transactions_by_peers.insert(hash, smallvec::smallvec![peer_id]);
2682
2683        let err = PoolError::new(
2684            hash,
2685            InvalidPoolTransactionError::Eip4844(
2686                Eip4844PoolTransactionError::MissingEip4844BlobSidecar,
2687            ),
2688        );
2689
2690        tx_manager.on_bad_import(err);
2691
2692        assert!(!tx_manager.bad_imports.contains(&hash));
2693    }
2694
2695    #[tokio::test(flavor = "multi_thread")]
2696    async fn test_non_blob_sidecar_error_still_cached_as_bad_import() {
2697        let (mut tx_manager, _network) = new_tx_manager().await;
2698        let peer_id = PeerId::new([1; 64]);
2699        let hash = B256::from_slice(&[2; 32]);
2700
2701        tx_manager.network.update_sync_state(SyncState::Idle);
2702        tx_manager.transactions_by_peers.insert(hash, smallvec::smallvec![peer_id]);
2703
2704        let err = PoolError::new(
2705            hash,
2706            InvalidPoolTransactionError::Eip4844(Eip4844PoolTransactionError::NoEip4844Blobs),
2707        );
2708
2709        tx_manager.on_bad_import(err);
2710
2711        assert!(tx_manager.bad_imports.contains(&hash));
2712    }
2713
2714    #[tokio::test(flavor = "multi_thread")]
2715    async fn test_on_get_pooled_transactions_network() {
2716        reth_tracing::init_test_tracing();
2717        let net = Testnet::create(2).await;
2718
2719        let mut handles = net.handles();
2720        let handle0 = handles.next().unwrap();
2721        let handle1 = handles.next().unwrap();
2722
2723        drop(handles);
2724        let handle = net.spawn();
2725
2726        let listener0 = handle0.event_listener();
2727
2728        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2729        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2730
2731        let client = NoopProvider::default();
2732        let pool = testing_pool();
2733        let config = NetworkConfigBuilder::new(secret_key, Runtime::test())
2734            .disable_discovery()
2735            .listener_port(0)
2736            .build(client);
2737        let transactions_manager_config = config.transactions_manager_config.clone();
2738        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2739            .await
2740            .unwrap()
2741            .into_builder()
2742            .transactions(pool.clone(), transactions_manager_config)
2743            .split_with_handle();
2744        tokio::task::spawn(network);
2745
2746        network_handle.update_sync_state(SyncState::Idle);
2747
2748        assert!(!NetworkInfo::is_syncing(&network_handle));
2749
2750        // wait for all initiator connections
2751        let mut established = listener0.take(2);
2752        while let Some(ev) = established.next().await {
2753            match ev {
2754                NetworkEvent::ActivePeerSession { .. } |
2755                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2756                    transactions.on_network_event(ev);
2757                }
2758                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2759                ev => {
2760                    error!("unexpected event {ev:?}")
2761                }
2762            }
2763        }
2764        handle.terminate().await;
2765
2766        let tx = MockTransaction::eip1559();
2767        let _ = transactions
2768            .pool
2769            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2770            .await;
2771
2772        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2773
2774        let (send, receive) =
2775            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2776
2777        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2778            peer_id: *handle1.peer_id(),
2779            request,
2780            response: send,
2781        });
2782
2783        match receive.await.unwrap() {
2784            Ok(PooledTransactions(transactions)) => {
2785                assert_eq!(transactions.len(), 1);
2786            }
2787            Err(e) => {
2788                panic!("error: {e:?}");
2789            }
2790        }
2791    }
2792
2793    // Ensure that when the remote peer only returns part of the requested transactions, the
2794    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2795    // re-buffered.
2796    #[tokio::test]
2797    async fn test_partially_tx_response() {
2798        reth_tracing::init_test_tracing();
2799
2800        let mut tx_manager = new_tx_manager().await.0;
2801        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2802
2803        let peer_id_1 = PeerId::new([1; 64]);
2804        let eth_version = EthVersion::Eth66;
2805
2806        let txs = vec![
2807            TransactionSigned::new_unhashed(
2808                Transaction::Legacy(TxLegacy {
2809                    chain_id: Some(4),
2810                    nonce: 15u64,
2811                    gas_price: 2200000000,
2812                    gas_limit: 34811,
2813                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2814                    value: U256::from(1234u64),
2815                    input: Default::default(),
2816                }),
2817                Signature::new(
2818                    U256::from_str(
2819                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2820                    )
2821                    .unwrap(),
2822                    U256::from_str(
2823                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2824                    )
2825                    .unwrap(),
2826                    true,
2827                ),
2828            ),
2829            TransactionSigned::new_unhashed(
2830                Transaction::Eip1559(TxEip1559 {
2831                    chain_id: 4,
2832                    nonce: 26u64,
2833                    max_priority_fee_per_gas: 1500000000,
2834                    max_fee_per_gas: 1500000013,
2835                    gas_limit: MIN_TRANSACTION_GAS,
2836                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2837                    value: U256::from(3000000000000000000u64),
2838                    input: Default::default(),
2839                    access_list: Default::default(),
2840                }),
2841                Signature::new(
2842                    U256::from_str(
2843                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2844                    )
2845                    .unwrap(),
2846                    U256::from_str(
2847                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2848                    )
2849                    .unwrap(),
2850                    true,
2851                ),
2852            ),
2853        ];
2854
2855        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2856
2857        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2858        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2859        // fetch
2860        peer_1.seen_transactions.insert(txs_hashes[0]);
2861        peer_1.seen_transactions.insert(txs_hashes[1]);
2862        tx_manager.peers.insert(peer_id_1, peer_1);
2863
2864        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2865        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2866
2867        // peer_1 is idle
2868        assert!(tx_fetcher.is_idle(&peer_id_1));
2869        assert_eq!(tx_fetcher.active_peers.len(), 0);
2870
2871        // sends requests for buffered hashes to peer_1
2872        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2873
2874        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2875        // as long as request is in flight peer_1 is not idle
2876        assert!(!tx_fetcher.is_idle(&peer_id_1));
2877        assert_eq!(tx_fetcher.active_peers.len(), 1);
2878
2879        // mock session of peer_1 receives request
2880        let req = to_mock_session_rx
2881            .recv()
2882            .await
2883            .expect("peer_1 session should receive request with buffered hashes");
2884        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2885
2886        let message: Vec<PooledTransactionVariant> = txs
2887            .into_iter()
2888            .take(1)
2889            .map(|tx| {
2890                PooledTransactionVariant::try_from(tx)
2891                    .expect("Failed to convert MockTransaction to PooledTransaction")
2892            })
2893            .collect();
2894        // response partial request
2895        response
2896            .send(Ok(PooledTransactions(message)))
2897            .expect("should send peer_1 response to tx manager");
2898        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2899            unreachable!()
2900        };
2901
2902        // request has resolved, peer_1 is idle again
2903        assert!(tx_fetcher.is_idle(&peer_id));
2904        assert_eq!(tx_fetcher.active_peers.len(), 0);
2905        // failing peer_1's request buffers requested hashes for retry.
2906        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2907    }
2908
2909    #[tokio::test]
2910    async fn test_max_retries_tx_request() {
2911        reth_tracing::init_test_tracing();
2912
2913        let mut tx_manager = new_tx_manager().await.0;
2914        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2915
2916        let peer_id_1 = PeerId::new([1; 64]);
2917        let peer_id_2 = PeerId::new([2; 64]);
2918        let eth_version = EthVersion::Eth66;
2919        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2920
2921        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2922        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2923        // fetch
2924        peer_1.seen_transactions.insert(seen_hashes[0]);
2925        peer_1.seen_transactions.insert(seen_hashes[1]);
2926        tx_manager.peers.insert(peer_id_1, peer_1);
2927
2928        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2929        // for first retry in reverse order to make index 0 lru
2930        let retries = 1;
2931        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2932        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2933
2934        // peer_1 is idle
2935        assert!(tx_fetcher.is_idle(&peer_id_1));
2936        assert_eq!(tx_fetcher.active_peers.len(), 0);
2937
2938        // sends request for buffered hashes to peer_1
2939        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2940
2941        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2942
2943        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2944        // as long as request is in inflight peer_1 is not idle
2945        assert!(!tx_fetcher.is_idle(&peer_id_1));
2946        assert_eq!(tx_fetcher.active_peers.len(), 1);
2947
2948        // mock session of peer_1 receives request
2949        let req = to_mock_session_rx
2950            .recv()
2951            .await
2952            .expect("peer_1 session should receive request with buffered hashes");
2953        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2954        let GetPooledTransactions(hashes) = request;
2955
2956        let hashes = hashes.into_iter().collect::<B256Set>();
2957
2958        assert_eq!(hashes, seen_hashes.into_iter().collect::<B256Set>());
2959
2960        // fail request to peer_1
2961        response
2962            .send(Err(RequestError::BadResponse))
2963            .expect("should send peer_1 response to tx manager");
2964        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2965            unreachable!()
2966        };
2967
2968        // request has resolved, peer_1 is idle again
2969        assert!(tx_fetcher.is_idle(&peer_id));
2970        assert_eq!(tx_fetcher.active_peers.len(), 0);
2971        // failing peer_1's request buffers requested hashes for retry
2972        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2973
2974        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2975        tx_manager.peers.insert(peer_id_2, peer_2);
2976
2977        // peer_2 announces same hashes as peer_1
2978        let msg =
2979            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2980        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2981
2982        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2983
2984        // peer_2 should be in active_peers.
2985        assert_eq!(tx_fetcher.active_peers.len(), 1);
2986
2987        // since hashes are already seen, no changes to length of unknown hashes
2988        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2989        // but hashes are taken out of buffer and packed into request to peer_2
2990        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2991
2992        // mock session of peer_2 receives request
2993        let req = to_mock_session_rx
2994            .recv()
2995            .await
2996            .expect("peer_2 session should receive request with buffered hashes");
2997        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2998
2999        // report failed request to tx manager
3000        response
3001            .send(Err(RequestError::BadResponse))
3002            .expect("should send peer_2 response to tx manager");
3003        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
3004
3005        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
3006        // for retry
3007        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
3008        assert_eq!(tx_fetcher.active_peers.len(), 0);
3009    }
3010
3011    #[test]
3012    fn test_transaction_builder_empty() {
3013        let mut builder =
3014            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68, 0);
3015        assert!(builder.is_empty());
3016
3017        let mut factory = MockTransactionFactory::default();
3018        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
3019        builder.push(&tx);
3020        assert!(!builder.is_empty());
3021
3022        let txs = builder.build();
3023        assert!(txs.full.is_none());
3024        let txs = txs.pooled.unwrap();
3025        assert_eq!(txs.len(), 1);
3026    }
3027
3028    #[test]
3029    fn test_transaction_builder_large() {
3030        let mut builder =
3031            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68, 0);
3032        assert!(builder.is_empty());
3033
3034        let mut factory = MockTransactionFactory::default();
3035        let mut tx = factory.create_eip1559();
3036        // create a transaction that still fits
3037        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
3038        let tx = Arc::new(tx);
3039        let tx = PropagateTransaction::pool_tx(tx);
3040        builder.push(&tx);
3041        assert!(!builder.is_empty());
3042
3043        let txs = builder.clone().build();
3044        assert!(txs.pooled.is_none());
3045        let txs = txs.full.unwrap();
3046        assert_eq!(txs.len(), 1);
3047
3048        builder.push(&tx);
3049
3050        let txs = builder.clone().build();
3051        let pooled = txs.pooled.unwrap();
3052        assert_eq!(pooled.len(), 1);
3053        let txs = txs.full.unwrap();
3054        assert_eq!(txs.len(), 1);
3055    }
3056
3057    #[test]
3058    fn test_transaction_builder_eip4844() {
3059        let mut builder =
3060            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68, 0);
3061        assert!(builder.is_empty());
3062
3063        let mut factory = MockTransactionFactory::default();
3064        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
3065        builder.push(&tx);
3066        assert!(!builder.is_empty());
3067
3068        let txs = builder.clone().build();
3069        assert!(txs.full.is_none());
3070        let txs = txs.pooled.unwrap();
3071        assert_eq!(txs.len(), 1);
3072
3073        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
3074        builder.push(&tx);
3075
3076        let txs = builder.clone().build();
3077        let pooled = txs.pooled.unwrap();
3078        assert_eq!(pooled.len(), 1);
3079        let txs = txs.full.unwrap();
3080        assert_eq!(txs.len(), 1);
3081    }
3082
3083    #[tokio::test]
3084    async fn test_propagate_full() {
3085        reth_tracing::init_test_tracing();
3086
3087        let (mut tx_manager, network) = new_tx_manager().await;
3088        let peer_id = PeerId::random();
3089
3090        // ensure not syncing
3091        network.handle().update_sync_state(SyncState::Idle);
3092
3093        // mock a peer
3094        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
3095
3096        let session_info = SessionInfo {
3097            peer_id,
3098            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
3099            client_version: Arc::from(""),
3100            capabilities: Arc::new(vec![].into()),
3101            status: Arc::new(Default::default()),
3102            version: EthVersion::Eth68,
3103            peer_kind: PeerKind::Basic,
3104        };
3105        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
3106        tx_manager
3107            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
3108        let mut propagate = vec![];
3109        let mut factory = MockTransactionFactory::default();
3110        let eip1559_tx = Arc::new(factory.create_eip1559());
3111        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
3112        let eip4844_tx = Arc::new(factory.create_eip4844());
3113        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
3114
3115        let propagated =
3116            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
3117        assert_eq!(propagated.len(), 2);
3118        let prop_txs = propagated.get(eip1559_tx.transaction.hash()).unwrap();
3119        assert_eq!(prop_txs.len(), 1);
3120        assert!(prop_txs[0].is_full());
3121
3122        let prop_txs = propagated.get(eip4844_tx.transaction.hash()).unwrap();
3123        assert_eq!(prop_txs.len(), 1);
3124        assert!(prop_txs[0].is_hash());
3125
3126        let peer = tx_manager.peers.get(&peer_id).unwrap();
3127        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3128        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
3129        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
3130
3131        // propagate again
3132        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
3133        assert!(propagated.is_empty());
3134    }
3135
3136    #[tokio::test]
3137    async fn test_truncated_hash_announcement_not_marked_seen() {
3138        reth_tracing::init_test_tracing();
3139
3140        let (mut tx_manager, network) = new_tx_manager().await;
3141        // all peers receive hash announcements only
3142        tx_manager.config.propagation_mode = TransactionPropagationMode::Max(0);
3143
3144        // ensure not syncing
3145        network.handle().update_sync_state(SyncState::Idle);
3146
3147        let peer_id = PeerId::random();
3148        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3149        tx_manager.peers.insert(peer_id, peer);
3150
3151        // one more transaction than fits into a single hashes broadcast message
3152        let mut factory = MockTransactionFactory::default();
3153        let txs = (0..=SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE)
3154            .map(|_| PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559())))
3155            .collect::<Vec<_>>();
3156        let last_sent = *txs[txs.len() - 2].tx_hash();
3157        let truncated = *txs[txs.len() - 1].tx_hash();
3158
3159        let propagated = tx_manager.propagate_transactions(txs, PropagationMode::Basic);
3160
3161        // the truncated hash was not sent, so it must not be tracked as seen by the peer
3162        assert!(propagated.get(&truncated).is_none());
3163        let peer = tx_manager.peers.get(&peer_id).unwrap();
3164        assert!(!peer.seen_transactions.contains(&truncated));
3165        assert!(peer.seen_transactions.contains(&last_sent));
3166    }
3167
3168    #[tokio::test]
3169    async fn test_propagate_pending_txs_while_initially_syncing() {
3170        reth_tracing::init_test_tracing();
3171
3172        let (mut tx_manager, network) = new_tx_manager().await;
3173        let peer_id = PeerId::random();
3174
3175        // Keep the node in initial sync mode.
3176        network.handle().update_sync_state(SyncState::Syncing);
3177        assert!(NetworkInfo::is_initially_syncing(&network.handle()));
3178
3179        // Add a peer so propagation has a destination.
3180        let (peer, _rx) = new_mock_session(peer_id, EthVersion::Eth68);
3181        tx_manager.peers.insert(peer_id, peer);
3182
3183        let tx = MockTransaction::eip1559();
3184        tx_manager
3185            .pool
3186            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
3187            .await
3188            .expect("transaction should be accepted into the pool");
3189
3190        tx_manager.on_new_pending_transactions(vec![*tx.get_hash()]);
3191
3192        let peer = tx_manager.peers.get(&peer_id).expect("peer should exist");
3193        assert!(peer.seen_transactions.contains(tx.get_hash()));
3194    }
3195
3196    #[tokio::test]
3197    async fn test_relaxed_filter_ignores_unknown_tx_types() {
3198        reth_tracing::init_test_tracing();
3199
3200        let transactions_manager_config = TransactionsManagerConfig::default();
3201
3202        let propagation_policy = TransactionPropagationKind::default();
3203        let announcement_policy = RelaxedEthAnnouncementFilter::default();
3204
3205        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
3206
3207        let pool = testing_pool();
3208        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
3209        let client = NoopProvider::default();
3210
3211        let network_config = NetworkConfigBuilder::new(secret_key, Runtime::test())
3212            .listener_port(0)
3213            .disable_discovery()
3214            .build(client.clone());
3215
3216        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
3217        let (to_tx_manager_tx, from_network_rx) =
3218            reth_metrics::common::mpsc::memory_bounded_channel::<
3219                NetworkTransactionEvent<EthNetworkPrimitives>,
3220            >(
3221                crate::transactions::constants::tx_manager::DEFAULT_TX_MANAGER_CHANNEL_MEMORY_LIMIT_BYTES,
3222                "test_tx_channel",
3223            );
3224        network_manager.set_transactions(to_tx_manager_tx);
3225        let network_handle = network_manager.handle().clone();
3226        let network_service_handle = tokio::spawn(network_manager);
3227
3228        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
3229            network_handle.clone(),
3230            pool.clone(),
3231            from_network_rx,
3232            transactions_manager_config,
3233            policy_bundle,
3234        );
3235
3236        let peer_id = PeerId::random();
3237        let eth_version = EthVersion::Eth68;
3238        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
3239        tx_manager.peers.insert(peer_id, mock_peer_metadata);
3240
3241        let mut tx_factory = MockTransactionFactory::default();
3242
3243        let valid_known_tx = tx_factory.create_eip1559();
3244        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
3245
3246        let known_tx_hash = *known_tx_signed.hash();
3247        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
3248        let known_tx_size = known_tx_signed.encoded_length();
3249
3250        let unknown_tx_hash = B256::random();
3251        let unknown_tx_type_byte = 0xff_u8;
3252        let unknown_tx_size = 150;
3253
3254        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
3255            types: vec![known_tx_type_byte, unknown_tx_type_byte],
3256            sizes: vec![known_tx_size, unknown_tx_size],
3257            hashes: vec![known_tx_hash, unknown_tx_hash],
3258        });
3259
3260        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
3261
3262        poll_fn(|cx| {
3263            let _ = tx_manager.poll_unpin(cx);
3264            Poll::Ready(())
3265        })
3266        .await;
3267
3268        let mut requested_hashes_in_getpooled = B256Set::default();
3269        let mut unexpected_request_received = false;
3270
3271        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
3272            .await
3273        {
3274            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
3275                let GetPooledTransactions(hashes) = request;
3276                for hash in hashes {
3277                    requested_hashes_in_getpooled.insert(hash);
3278                }
3279                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
3280            }
3281            Ok(Some(other_request)) => {
3282                tracing::error!(?other_request, "Received unexpected PeerRequest type");
3283                unexpected_request_received = true;
3284            }
3285            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
3286            Err(_timeout_err) => {
3287                tracing::info!("Timeout: No GetPooledTransactions request received.")
3288            }
3289        }
3290
3291        assert!(
3292            requested_hashes_in_getpooled.contains(&known_tx_hash),
3293            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
3294        );
3295        assert!(
3296            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
3297            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
3298        );
3299        assert!(
3300            !unexpected_request_received,
3301            "An unexpected P2P request was received by the mock peer."
3302        );
3303
3304        network_service_handle.abort();
3305    }
3306}